kafka消息系统之消费者模型使用介绍。
这两天工作中有用到kafka消息系统,因为之前不熟悉kafka消息系统,所有在这里总结一点自己在使用过程中的方法:
有关kafka的介绍,网上有很多的资料,这里就不在介绍了。这次介绍的是kafka的消费者使用过程,详细过程如下:
以下代码从简,只介绍如何使用,方法详情请参考rdkafka.h头文件中的介绍。如果那里写的不好,请不吝赐教,谢谢!
<1>read_kafka.h
#ifndef _READ_RADIUS_FROM_KAFKA_H_
#define _READ_RADIUS_FROM_KAFKA_H_
#include <iostream>
#include <string.h>
#include "rdkafkacpp.h"
class CReadKafka{
public:
CReadKafka(std::string brokers,std::string topics,std::string
groupid,int partition);
~CReadKafka();
bool Start();
//初始化初始kafka消费者
bool InitConsumer();
//启动消费
bool StartConsumer(int timeout_ms);
//停止消费者
void StopConsumer();
static void* OpenConsumerThread(void* argc);
private:
//消息诊断方法
void MsgOpt(RdKafka::Message *msg,void* opt);
private:
std::string brokers_;
std::string topics_;
std::string groupid_;
int partition_;
unsigned int offset_; //分区偏转位置
unsigned int last_offset_; //消费者上次的消费偏转位置
std::string strfetch_num_; //消费消息的最大尺寸
std::string errstr_; //错误信息
RdKafka::Consumer* kafka_consumer_; //消费者对象
RdKafka::Topic* kafka_topic_; //topic对象
RdKafka::Conf* con_conf_; //配置对象
RdKafka::Conf* topic_conf_;
bool is_run_; //启动消费者标志
};
#endif
<2>read_kafka.cpp
CReadKafka::CReadKafka(std::string brokers,std::string topics,std::string
groupid,int partition):
brokers_(brokers),
topics_(topics),
groupid_(groupid),
partition_(partition){
offset_ = RdKafka::Topic::OFFSET_BEGINNING; //从起始位置获取消息
is_run_ = false;
strfetch_num_ = "10240000"; //每次从分区中读取信息的最大尺寸
}
CReadKafka::~CReadKafka(){}
bool CReadKafka::Start(){
//初始化
if(!pcrk->InitConsumer()){
return NULL;
}
//启动
pcrk->StartConsumer(30000);
return true;
}
bool CReadKafka::InitConsumer(){
/*创建全局配置*/
con_conf_ = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(!con_conf_){
return false;
}
/*设置broker list*/
if(con_conf_->set("bootstrap.servers", brokers_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
/*设置consumer group*/
if(con_conf_->set("group.id", groupid_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
/*每次从单个分区中拉取消息的最大尺寸*/
if(con_conf_->set("max.partition.fetch.bytes", strfetch_num_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
//创建消费者实例
kafka_consumer_ = RdKafka::Consumer::create(con_conf_,errstr_);
if(!kafka_consumer_){
return false;
}
delete con_conf_;
//创建kafka topic配置
topic_conf_ = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(!topic_conf_){
return false;
}
//设置topic对象配置
if(topic_conf_->set("auto.offset.reset", "smallest", errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
//创建topic 实例
kafka_topic_ = RdKafka::Topic::create(kafka_consumer_,topics_,topic_conf_,errstr_);
if(!kafka_topic_){
return false;
}
delete topic_conf_;
//启动消费者实例
RdKafka::ErrorCode resp = kafka_consumer_->start(kafka_topic_,partition_,offset_);
if(resp != RdKafka::ERR_NO_ERROR){
return false;
}
return true;
}
/**
轮询消费消息
*/
bool CReadKafka::StartConsumer(INT32 timeout_ms){
RdKafka::Message *obj_msg = NULL;
is_run_ = true;
while(is_run_){
obj_msg = kafka_consumer_->consume(kafka_topic_, partition_,timeout_ms);
MsgOpt(obj_msg,NULL);
std::cout<<"Get message is "<<static_cast<char*>(obj_msg->payload())<<std::endl;
kafka_consumer_->poll(0);
//消息对象每次使用完之后必须销毁
delete obj_msg;
}
//停止消费
kafka_consumer_->stop(kafka_topic_,partition_);
if(!kafka_topic_){
delete kafka_topic_;
kafka_topic_ = NULL;
}
if(!kafka_consumer_){
delete kafka_consumer_;
kafka_consumer_ = NULL;
}
//销毁消费对象
RdKafka::wait_destroyed(timeout_ms);
return true;
}
/**
停止消费消息
*/
void CReadKafka::StopConsumer(){
is_run_ = false;
}
/**
检测消息处理过程
*/
void CReadKafka::MsgOpt(RdKafka::Message *msg,void* opt){
switch(msg->err()){
case RdKafka::ERR__TIMED_OUT:
std::cout<<"Get message "<<static_cast<char*>(msg->payload())<<" is time_out ."<<std::endl;
break;
case RdKafka::ERR_NO_ERROR:
last_offset_ = msg->offset(); //消费者上次消费的偏转位置
break;
case RdKafka::ERR__PARTITION_EOF:
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
StopConsumer();
break;
default:
StopConsumer();
break;
}
}
//使用举例:
#include <iostream>
#include <string.h>
using namespace std;
int main(int argc,char** argv){
CReadKafka* readkafk = new CReadKafka("localhost:9092",
"topic_name",
"1",0);
//启动kafka消费者
if(!readkafk->Start()){
return false;
}
return 0;
有关kafka的介绍,网上有很多的资料,这里就不在介绍了。这次介绍的是kafka的消费者使用过程,详细过程如下:
以下代码从简,只介绍如何使用,方法详情请参考rdkafka.h头文件中的介绍。如果那里写的不好,请不吝赐教,谢谢!
<1>read_kafka.h
#ifndef _READ_RADIUS_FROM_KAFKA_H_
#define _READ_RADIUS_FROM_KAFKA_H_
#include <iostream>
#include <string.h>
#include "rdkafkacpp.h"
class CReadKafka{
public:
CReadKafka(std::string brokers,std::string topics,std::string
groupid,int partition);
~CReadKafka();
bool Start();
//初始化初始kafka消费者
bool InitConsumer();
//启动消费
bool StartConsumer(int timeout_ms);
//停止消费者
void StopConsumer();
static void* OpenConsumerThread(void* argc);
private:
//消息诊断方法
void MsgOpt(RdKafka::Message *msg,void* opt);
private:
std::string brokers_;
std::string topics_;
std::string groupid_;
int partition_;
unsigned int offset_; //分区偏转位置
unsigned int last_offset_; //消费者上次的消费偏转位置
std::string strfetch_num_; //消费消息的最大尺寸
std::string errstr_; //错误信息
RdKafka::Consumer* kafka_consumer_; //消费者对象
RdKafka::Topic* kafka_topic_; //topic对象
RdKafka::Conf* con_conf_; //配置对象
RdKafka::Conf* topic_conf_;
bool is_run_; //启动消费者标志
};
#endif
<2>read_kafka.cpp
CReadKafka::CReadKafka(std::string brokers,std::string topics,std::string
groupid,int partition):
brokers_(brokers),
topics_(topics),
groupid_(groupid),
partition_(partition){
offset_ = RdKafka::Topic::OFFSET_BEGINNING; //从起始位置获取消息
is_run_ = false;
strfetch_num_ = "10240000"; //每次从分区中读取信息的最大尺寸
}
CReadKafka::~CReadKafka(){}
bool CReadKafka::Start(){
//初始化
if(!pcrk->InitConsumer()){
return NULL;
}
//启动
pcrk->StartConsumer(30000);
return true;
}
bool CReadKafka::InitConsumer(){
/*创建全局配置*/
con_conf_ = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(!con_conf_){
return false;
}
/*设置broker list*/
if(con_conf_->set("bootstrap.servers", brokers_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
/*设置consumer group*/
if(con_conf_->set("group.id", groupid_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
/*每次从单个分区中拉取消息的最大尺寸*/
if(con_conf_->set("max.partition.fetch.bytes", strfetch_num_, errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
//创建消费者实例
kafka_consumer_ = RdKafka::Consumer::create(con_conf_,errstr_);
if(!kafka_consumer_){
return false;
}
delete con_conf_;
//创建kafka topic配置
topic_conf_ = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(!topic_conf_){
return false;
}
//设置topic对象配置
if(topic_conf_->set("auto.offset.reset", "smallest", errstr_) != RdKafka::Conf::CONF_OK){
return false;
}
//创建topic 实例
kafka_topic_ = RdKafka::Topic::create(kafka_consumer_,topics_,topic_conf_,errstr_);
if(!kafka_topic_){
return false;
}
delete topic_conf_;
//启动消费者实例
RdKafka::ErrorCode resp = kafka_consumer_->start(kafka_topic_,partition_,offset_);
if(resp != RdKafka::ERR_NO_ERROR){
return false;
}
return true;
}
/**
轮询消费消息
*/
bool CReadKafka::StartConsumer(INT32 timeout_ms){
RdKafka::Message *obj_msg = NULL;
is_run_ = true;
while(is_run_){
obj_msg = kafka_consumer_->consume(kafka_topic_, partition_,timeout_ms);
MsgOpt(obj_msg,NULL);
std::cout<<"Get message is "<<static_cast<char*>(obj_msg->payload())<<std::endl;
kafka_consumer_->poll(0);
//消息对象每次使用完之后必须销毁
delete obj_msg;
}
//停止消费
kafka_consumer_->stop(kafka_topic_,partition_);
if(!kafka_topic_){
delete kafka_topic_;
kafka_topic_ = NULL;
}
if(!kafka_consumer_){
delete kafka_consumer_;
kafka_consumer_ = NULL;
}
//销毁消费对象
RdKafka::wait_destroyed(timeout_ms);
return true;
}
/**
停止消费消息
*/
void CReadKafka::StopConsumer(){
is_run_ = false;
}
/**
检测消息处理过程
*/
void CReadKafka::MsgOpt(RdKafka::Message *msg,void* opt){
switch(msg->err()){
case RdKafka::ERR__TIMED_OUT:
std::cout<<"Get message "<<static_cast<char*>(msg->payload())<<" is time_out ."<<std::endl;
break;
case RdKafka::ERR_NO_ERROR:
last_offset_ = msg->offset(); //消费者上次消费的偏转位置
break;
case RdKafka::ERR__PARTITION_EOF:
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
StopConsumer();
break;
default:
StopConsumer();
break;
}
}
//使用举例:
#include <iostream>
#include <string.h>
using namespace std;
int main(int argc,char** argv){
CReadKafka* readkafk = new CReadKafka("localhost:9092",
"topic_name",
"1",0);
//启动kafka消费者
if(!readkafk->Start()){
return false;
}
return 0;
}
实际应用中需注意:
kafka消费消息的时候,需要注意的是当从一个分区中读取数据的时候需要每次读取完之后会有磁盘分区的偏转量(这是因为虽然一条能被多个分组消费但是,只能被每个分组中一个消费者消费掉),所以在实际试用中,为了保证当客户端重启的时候仍旧能从上次的偏转量继续进行消费消息,需要开发人员手动借助外力来存储磁盘分区的偏转量(没次消费一条消息之后,及时更新所存储的磁盘分区偏转量),保证因重新启动上次的顺序能按照顺序来消费消息。保证了消费了消息能持续的被消费。
可以再每次启动的时候,获取上次的各个分区的偏转量,继续从上次的偏转量消费消息即可,就可保证数据不丢失。
如图所示: