为什么我的卡夫卡消费者比我的卡夫卡生产者慢得多?
问题描述:
我得到了一个能够完全抓取的数据流。数据全部投入Kafka,然后发送给Cassandra。现在卡夫卡消费者非常缓慢,比制片人慢得多。我希望他们完全一样。我能做些什么来实现这个结果或者我的代码出了什么问题?为什么我的卡夫卡消费者比我的卡夫卡生产者慢得多?
这里是蟒蛇我卡夫卡消费者代码:
import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
kafka = KafkaClient('localhost')
consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
while True:
messages = consumer.get_messages(count=16)
if len(messages) == 0:
print 'IDLE'
continue
for message in messages:
try:
response = json.loads(message.value)
data = json.loads(response['body'])
print response['body']
articles = data['articles']
idlist = [r['id'] for r in articles]
if len(idlist)>0:
article_rows = session.execute(article_lookup_stmt,[idlist])
rows = [r.id for r in article_rows]
for article in articles:
try:
if not article['id'] in rows:
article['created_at'] = parse(article['created_at'])
scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
log.debug('%s %s' % (article['id'],article['created_at']))
session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
except Exception as e:
print 'error==============:',e
continue
except Exception as e:
print 'error is:',e
log.exception(e.message)
except Exception as e:
log.exception(e.message)
编辑:
我还添加了我的个人资料结果与上面的代码慢行似乎是
article_rows = session.execute(article_lookup_stmt,[idlist])
Sun Feb 14 16:01:01 2016 consumer.out
395793 function calls (394232 primitive calls) in 23.074 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
141 10.695 0.076 10.695 0.076 {select.select}
7564 10.144 0.001 10.144 0.001 {method 'acquire' of 'thread.lock' objects}
1 0.542 0.542 23.097 23.097 consumer.py:5(<module>)
1510 0.281 0.000 0.281 0.000 {method 'recv' of '_socket.socket' objects}
38 0.195 0.005 0.195 0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
13 0.078 0.006 0.078 0.006 {time.sleep}
2423 0.073 0.000 0.137 0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
22112 0.063 0.000 0.095 0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
3 0.052 0.017 0.162 0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005 0.047 0.000 0.055 0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
1270 0.032 0.000 0.034 0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
3 0.024 0.008 0.226 0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
33 0.024 0.001 0.031 0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
15374 0.024 0.000 0.024 0.000 {built-in method new of type object at 0x788ee0}
141 0.023 0.000 11.394 0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
288 0.020 0.000 0.522 0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
2423 0.018 0.000 0.029 0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
115 0.018 0.000 11.372 0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
2423 0.018 0.000 0.059 0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
24548 0.017 0.000 0.017 0.000 {_struct.unpack}
44228/43959 0.016 0.000 0.016 0.000 {len}
谢谢期待你的答复。
答
您可以尝试运行消费者而不保存到C *,因此您可以观察它产生的差异。
如果事实证明保存到C *是一个阻塞点(我假设它是这样),那么您可以拥有一个线程池(大于16个线程用户生成线程),其唯一责任是写入C *。
这样,您就可以卸载代码的慢速部分,这只会在消费者代码中留下微不足道的部分。
您可以使用from multiprocessing import Pool
。
更多here。
正如目前所述,您的问题缺乏正确答案所需的详细信息。使用分析器查明脚本的哪些部分很慢,然后尝试重写这些部分以使其更快。有关更多详细信息,请参见https://docs.python.org/2/library/profile.html。 – liori
我的脚本缓慢的部分是消息后的消息。 – peter
你的消费者问题5 cassandra查询 - 没有迹象表明你的用户做了什么,但似乎5个同步CQL查询可能需要比平凡的生产者花费更长的时间。 –