【一】kafka安装及基本使用
概述
kafka是一个分布式的流处理平台。它通常用于构建实时的数据管道,以及实时流处理。能够横向扩展,有容错机制,高速运行在生产上。
特性:
发布&订阅:像消息系统一样读写流数据。
处理:数据流能够高效的被处理,接近实时。
存储:数据流能够安全的在进行分布式中多副本的存储。
kafka架构:
producer:生产者
consumer:消费者
broker:存储容器。生产者生产数据放到broker中,消费者从broker中消费。
topic:主题。
kafka集群可以运行在一个或者多个服务器上。它能存储以topic分门别类的数据流记录。每一个记录由key value timestamp组成。
安装
下载:
http://kafka.apache.org/downloads
单节点单broker的部署及使用
解压
tar -zxvf kafka_2.11-0.9.0.0.tgz
修改文件名称
mv kafka_2.11-0.9.0.0 kafka
配置环境变量
vi ~/.bashrc
export KAFKA_HOME=/app/kafka
export PATH=:$PATH:$KAFKA_HOME/bin
使配置文件生效
source ~/.bashrc
创建目录
mkdir /app/kafka/logs
修改配置文件
cd /app/kafka/config
vi server.properties
broker.id=0 每台机器的broker.id不能一样,这是唯一标识。同一台机器多个broker的ID也不能一样。
host.name=node1 指定当前机器IP
log.dirs=/app/kafka/logs 队列中消息存放的地方,多个看引用逗号隔开
zookeeper.connect=node1:2181,node2:2181,node3:2181 指明zookeeper集群所在的机器的IP和对应的端口
启动
bin/kafka-server-start.sh config/server.properties
创建topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看当前zookeeper上有多少个topic
bin/kafka-topics.sh --list --zookeeper node1:2181
发送消息
bin/kafka-console-producer.sh --broker-list node1:9092 --topic test_topic
9092是server.properties中配置的监听端口
消费消息
bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic test_topic --from-beginning
--from-beginning的使用表示从头开始消费,新开一个控制台启动消费程序,会消费到以前的数据。
如果不要这个参数,指挥接收这个消费程序启动以后生产者发送的数据。
查看所有topic的详细信息
bin/kafka-topics.sh --describe --zookeeper node1:2181
查看指定topic的详细信息
bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test_topic
单节点多broker的部署及使用
一个server.properties对应一个Broker。拷贝出3个server.properties
cd /app/kafka/config
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
mkdir /app/kafka/logs-1
mkdir /app/kafka/logs-2
mkdir /app/kafka/logs-3
vi server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/app/kafka/logs-1
vi server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/app/kafka/logs-2
vi server-3.properties
broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/app/kafka/logs-3
启动
bin/kafka-server-start.sh -daemon config/server-1.properties &
bin/kafka-server-start.sh -daemon config/server-2.properties &
bin/kafka-server-start.sh -daemon config/server-3.properties &
查看jps -m
创建topice,有3个副本
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 3 --partitions 1 --topic test_3replication_topic
查看当前zookeeper上有多少个topic
bin/kafka-topics.sh --list --zookeeper node1:2181
查看topic
leader:3表示id为3的broker是主
replicas表示副本存在哪些broker上
isr表示哪些broker是活着的
发送消息
bin/kafka-console-producer.sh --broker-list node1:9093, node1:9094,node1:9095 --topic test_3replication_topic
消费消息
bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic test_3replication_topic --from-beginning