将kafka(kafka-python)转储到txt文件

问题描述:

我需要定期将kafka使用者的输出转储到excel文件中。我使用下面的代码:将kafka(kafka-python)转储到txt文件

from kafka import KafkaConsumer 
from kafka import KafkaProducer 
import json,time 
from xlutils.copy import copy  
from xlrd import open_workbook 
import pandas 

consumer = KafkaConsumer(bootstrap_servers='localhost:9092') 
KafkaConsumer() 
consumer.subscribe("test") 

rowx=0 
colx=0 

for msg in consumer: 
     book_ro = open_workbook("twitter.xls") 
     book = copy(book_ro) # creates a writeable copy 
     sheet1 = book.get_sheet(0) # get a first sheet 
     sheet1.write(rowx,colx, msg[6]) 
     book.save("twitter.xls") 

现在,我的问题是代码效率不高。对于我需要打开,写入并保存excel文件的每条消息。有没有办法打开一次,写入,然后关闭它(对于一批消息,而不是for循环)? tnx

+0

为什么关闭文件呢? –

是的,打开,写入,保存并关闭每封邮件效率低下,您可以批量处理。但仍然需要在消耗循环中进行。

msg_buffer = [] 
buffer_size = 100 
for msg in consumer: 
     msg_buffer.append(msg[6]) 
     if len(msg_buffer) >= buffer_size: 
      book_ro = open_workbook("twitter.xls") 
      book = copy(book_ro) # creates a writeable copy 
      for _msg in msg_buffer: 
       sheet1 = book.get_sheet(0) # get a first sheet 
       sheet1.write(rowx,colx, _msg) 
      book.save("twitter.xls") 
      msg_buffer = [] 

你可能认为这将比nobatch快100倍。

更新评论:

是的,通常我们会留在这个死循环,它在内部使用了民意调查,以获取新的消息,发送心跳和COMMIT偏移。如果你的目标是消耗这个主题的消息并保存消息,它应该是一个长时间运行的循环。

这是kafka-python设计,你应该像这样使用消息或使用consumer.poll()。

至于为什么你可以使用for msg in consumer:,因为消费者是一个迭代器对象,它的类实现__iter____next__,它潜在的使用提取器获取记录。更多实施细节你可以参考https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/group.py

+0

感谢您的注意。我想知道我们是否永远留在“消费循环中的味精”中?在这种情况下,你的代码是OK的。或者是否有任何回调函数(当客户端真的收到一条消息时,它会调用一个函数来做某事)?什么是消费者对象的类型?这是一个列表还是什么? – user2867237

+0

回答udpated。 – GuangshengZuo