springboot 集成kafka系列 二、springboot集成kafka生产者
1、新建springboot脚手架工程,pom文件如下,其中引入了kafka需要的依赖,注意这里的kafka版本号需要和之前安装的kafka版本一致,要不然会有问题
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zeshan</groupId>
<artifactId>kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-producer</name>
<description>kafka集成</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、在application.properties中配置producer基本信息
kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
3.编写ProducerController
package com.zeshan.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@RestController
@RequestMapping("kafka-producer")
public class ProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;
@RequestMapping("send")
@ResponseBody
public String sengMessage(HttpServletRequest request, HttpServletResponse response){
String message = request.getParameter("message");
try {
kafkaTemplate.send("demo",message);
return "success";
}catch (Exception e){
e.printStackTrace();
return "error";
}
}
}
4.启动项目,访问 http://127.0.0.1:6060/kafka-producer/send?message=hello kafka,访问结果如下
通过观察我们发送的消息已经被consumer消费,至此springboot集成kafka producer成功。
项目源码地址 https://gitee.com/yanfaze/kafka
下一篇文件会介绍springboot 集成kafka consumer,比克https://blog.****.net/yfz792178428/article/details/83415004