《2019/04/26》Docker构建kafka伪分布式集群及java代码测试
《2019/04/26》Docker构建kafka伪分布式集群及java代码测试
博客以实用为主。
文章目录
一、安装docker
参看我的这篇博客:Ubuntu18.10上Docker的安装及简单使用 。
二、安装docker-compose
1、下载docker-compose
安装教程地址: https://docs.docker.com/compose/install/ 。
by date 2019/04/26最新版本是1.24.0。
sudo curl -L "https://github.com/docker/compose/releases/download/1.24.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
2、授权
sudo chmod +x /usr/local/bin/docker-compose
3、查看版本
$ docker-compose --version
docker-compose version 1.24.0, build 1110ad01
三、集群构建
使用 wurstmeister/kafka 镜像来构建集群。进去这个链接,有详细的教程说明:
1、编写docker-compose.yml文件
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.255.131
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
你只需要将 KAFKA_ADVERTISED_HOST_NAME
替换为你的ip地址即可。
2、 启动服务
- 在docker-compose.yml所在的文件夹下,执行命令docker-compose up -d,会先下载zookeeper和kafka的镜像,然后创建容器;
- 执行命令docker ps,可见启动了一个zookeeper和一个kafka容器。
048a03158e0b wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About an hour ago Up About an hour 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp lhcz_zookeeper_1
b660addb9134 wurstmeister/kafka "start-kafka.sh" About an hour ago Up About an hour 0.0.0.0:32775->9092/tcp lhcz_kafka_1
3、查看kafka版本及zookeeper版本
查看kafka版本:
docker exec lhcz_kafka_1 find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
查看zookeeper版本:
docker exec lhcz_zookeeper_1 pwd
4、扩展broker
docker-compose scale kafka=3
执行 docker ps
查看:
19d356b69e1a wurstmeister/kafka "start-kafka.sh" About an hour ago Up About an hour 0.0.0.0:32777->9092/tcp lhcz_kafka_3
03b6c74f9233 wurstmeister/kafka "start-kafka.sh" About an hour ago Up About an hour 0.0.0.0:32776->9092/tcp lhcz_kafka_2
048a03158e0b wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About an hour ago Up About an hour 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp lhcz_zookeeper_1
b660addb9134 wurstmeister/kafka "start-kafka.sh" About an hour ago Up About an hour 0.0.0.0:32775->9092/tcp lhcz_kafka_1
5、生产和消费信息
创建topic
1、创建一个topic,名为topic001,4个partition,副本因子2,执行以下命令即可:
docker exec lhcz_kafka_1 \
kafka-topics.sh \
--create --topic topic001 \
--partitions 4 \
--zookeeper zookeeper:2181 \
--replication-factor 2
2 、执行以下命令查看刚刚创建的topic,这次在容器lhcz_kafka_3上执行命令试试:
[[email protected] kafka-docker]# docker exec lhcz_kafka_3 \
kafka-topics.sh --list \
--zookeeper zookeeper:2181 \
topic001
3、查看刚刚创建的topic的情况,broker和副本情况一目了然,如下:
[[email protected] kafka-docker]# docker exec lhcz_kafka_3 \
> kafka-topics.sh \
> --describe \
> --topic topic001 \
> --zookeeper zookeeper:2181
Topic:topic001 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: topic001 Partition: 0 Leader: 1002 Replicas: 1002,1003 Isr: 1002,1003
Topic: topic001 Partition: 1 Leader: 1003 Replicas: 1003,1004 Isr: 1003,1004
Topic: topic001 Partition: 2 Leader: 1004 Replicas: 1004,1001 Isr: 1004,1001
Topic: topic001 Partition: 3 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
消费消息
执行如下命令,即可进入等待topic为topic001消息的状态:
docker exec lhcz_kafka_2 \
kafka-console-consumer.sh \
--topic topic001 \
--bootstrap-server lhcz_kafka_1:9092,lhcz_kafka_2:9092,lhcz_kafka_3:9092
目前还没有生产消息,因此控制台不会有内容输出,接下来尝试生产消息。
生产消息
1、打开一个新的窗口,执行如下命令,进入生产消息的命令行模式,注意不要漏掉参数"-it",我之前就是因为漏掉了参数"-it",导致生产的消息时虽然不提示异常,但是始终无法消费到消息:
docker exec -it lhcz_kafka_1 \
kafka-console-producer.sh \
--topic topic001 \
--broker-list lhcz_kafka_1:9092,lhcz_kafka_2:9092,lhcz_kafka_3:9092
2、 现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。
四、代码测试
使用的是IDEA构建springboot工程。
1、maven依赖如下
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、在application.yml添加配置信息
kafka:
consumer:
enable-auto-commit: true
auto-offset-reset: latest
bootstrap-servers: 192.168.255.131:32775,192.168.255.131:32776,192.168.255.131:32777
group-id: gxz
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 192.168.255.131:32775,192.168.255.131:32776,192.168.255.131:32777
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
根据个人情况,自行修改 bootstrap-servers
的配置。
3、 代码编写及测试
1、在test模块下编写生产消息代码:
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ViewsApplicationTests {
@Autowired
KafkaTemplate kafkaTemplate;
@Test
public void contextLoads() {
for (int i = 0; i < 10; i++) {
String message = UUID.randomUUID().toString();
ListenableFuture future = kafkaTemplate.send("topic01", message);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
}
try {
// 为了查看到消费数据情况
Thread.sleep(20 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、编写消费信息代码:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"topic01"})
public void receive(String message) {
System.out.println("topic01 -- 消费信息: " + message);
}
}
3、执行 contextLoads()
即可看到效果:
— end