kafka基础知识及操作命令

适用场景

Kafka适合什么样的场景?

它可以用于两大类别的应用:

* 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)

* 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

 

相关概念:

Kafka作为一个集群,运行在一台或者多台服务器上,Kafka通过 topic 对存储的流数据进行分类。每条记录中包含一个key,一个value和一个timestamp(时间戳)。

Kafka有四个核心的API:

The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。

The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。

The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。

The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

 

  • Topics和日志

让我们首先深入了解下Kafka的核心概念:提供一串流式的记录—topic 。

Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。

Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:

kafka基础知识及操作命令

日志中的 partition(分区)有以下几个用途。第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。第二,可以作为并行的单元集。

 

每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录(消息)都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。

 

Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题.

由此可见采用./kafka-consumer-groups.sh --bootstrap-server 10.3.76.90:9092 --group group2 --describe 命令查看的消费组情况,里面的偏移只代表当前group内那些记录(message)被读取过,并不代表它们实际在队列中已不存在。 offset也是是可以手动指定的。

kafka基础知识及操作命令

事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。

相关命令: 

查看创建的所有topic信息

./kafka-topics.sh --describe --zookeeper localhost:2181

查看指定top信息

./kafka-topics.sh --zookeeper 10.3.76.90:2181 --topic "smtp" --describe

或者

./kafka-topics.sh --describe --topic smtp --zookeeper localhost:2181

查看指定消费组

./kafka-consumer-groups.sh --bootstrap-server 10.3.76.90:9092 --group group2 --describe

其中 topic是被group消费过的topic, partition是分区号,current-offiset 是该分区被该group消费到的offsetid,log-end-offset,是该分区里有的offsetid个数

lag是该分区该group还有多少消息没读。

查看所有消费组

./kafka-consumer-groups.sh --bootstrap-server 10.3.76.90:9092 --list

用Kafka的console-producer在topic test1 生产消息

新建终端窗口,到/Applications/kafka目录,运行命令:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

    然后输入想要产生的消息内容(如 hello world),回车:

用Kafka的console-consumer 消费topic test1的消息

 新建终端窗口,到/Applications/kafka目录,

  运行命令:./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning

  (9092是kafka单机启动的端口;--bootstrap-server   新旧kafka版本不一样,这个是新版本的命令)

ps. 1)若producer 和 consumer 两个窗口同时打开,在producer输入信息,consumer会立即消费信息并打印在终端

 2)新开一个终端,去消费同一个topic,刚刚已经消费过的消息还会被新终端继续消费。也就是说,消息被消费过后不会立即被删除。