Kafka的Spring Cloud Stream

总览

该示例项目演示了如何使用事件驱动的体系结构Spring Boot ,Spring Cloud Stream, Apache KafkaLombok构建实时流应用程序。

在本教程结束时,您将运行一个简单的基于Spring Boot的Greetings微服务

  1. 从REST API获取消息
  2. 将其写入Kafka主题
  3. 从主题中读取
  4. 将其输出到控制台

让我们开始吧!

顺便说一句,您可以在此处找到源代码。

什么是Spring Cloud Streaming?

Spring Cloud Stream是基于Spring Boot构建的框架,用于构建消息驱动的微服务。

什么是卡夫卡?

Kafka是最初由LinkedIn开发的流行的高性能和水平可伸缩的消息传递平台。

安装Kafka

这里下载Kafka并将其解压缩:

> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0

启动Zookeeper和Kafka

在Windows上:

> bin\windows\zookeeper-server-start.bat config\zookeeper.properties
> bin\windows\kafka-server-start.bat config\server.properties

在Linux或Mac上:

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties

如果计算机从休眠状态唤醒后,Kafka没有运行并且无法启动,请删除<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。

什么是Lombok?

Lombok是一个Java框架,可在代码中自动生成getter,setter,toString(),构建器,记录器等。

Maven依赖

转到https://start.spring.io创建一个Maven项目:

Kafka的Spring Cloud Stream

  1. 添加必要的依赖项: Spring Cloud StreamKafkaDevtools (用于在开发过程中进行热重新部署,可选), Actuator (用于监视应用程序,可选), Lombok (确保在IDE中也安装了Lombok插件)
  2. 单击生成项目按钮以zip文件形式下载项目
  3. 解压缩zip文件并将maven项目导入到您喜欢的IDE

注意pom.xml文件中的Maven依赖项:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
  </dependency>
  <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <!-- Also install the Lombok plugin in your IDE -->
  <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
  </dependency>

  <!-- hot reload - press Ctrl+F9 in IntelliJ after a code change while application is running -->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
  </dependency>

…还有<dependencyManagement>部分:

<dependencyManagement>
  <dependencies>
    <dependency>
      <!-- Import dependency management from Spring Boot -->
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>${spring-cloud-stream.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
    </dependencyManagement>

…和<repository>部分:

<repository>
  <id>spring-milestones</id>
  <name>Spring Milestones</name>
  <url>http://repo.spring.io/libs-milestone</url>
  <snapshots>
    <enabled>false</enabled>
  </snapshots>
</repository>

定义卡夫卡流

package com.kaviddiss.streamkafka.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  

public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";

    @Input(INPUT)
    SubscribableChannel inboundGreetings();

    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

为了使我们的应用程序能够与Kafka进行通信,我们需要定义一个出站流以将消息写入Kafka主题,并定义一个入站流以读取来自Kafka主题的消息。

通过简单地创建一个接口为每个流定义单独的方法,Spring Cloud提供了一种方便的方法。

inboundGreetings()方法定义要从Kafka读取的入站流,而outboundGreetings()方法定义要写入Kafka的出站流。

在运行时,Spring将为GreetingsStreams接口创建一个基于Java代理的实现,该实现可以作为Spring Bean注入到代码中的任何位置,以访问我们的两个流。

配置Spring Cloud Stream

下一步是将Spring Cloud Stream配置为绑定到GreetingsStreams接口中的流。 这可以通过使用以下代码创建@Configurationcom.kaviddiss.streamkafka.config.StreamsConfig来完成:

package com.kaviddiss.streamkafka.config;

import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

使用@EnableBinding批注(将GreatingsService接口传递到该批注)完成@EnableBindingGreatingsService

Kafka的配置属性

默认情况下,配置属性存储在src/main/resources/application.properties文件中。

但是,我更喜欢使用YAML格式,因为它不太冗长,并且允许将公共属性和特定于环境的属性保留在同一文件中。

现在,让我们将application.properties重命名为application.yaml并将config片段下方粘贴到文件中:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

上面的配置属性配置要连接的Kafka服务器的地址,以及我们用于代码中的入站和出站流的Kafka主题。 他们俩都必须使用相同的Kafka主题!

contentType属性告诉Spring Cloud Stream在流中以String的形式发送/接收我们的消息对象。

创建消息对象

使用下面的代码创建一个简单的com.kaviddiss.streamkafka.model.Greetings类,该代码将表示我们从中读取并写入的greetings Kafka主题:

package com.kaviddiss.streamkafka.model;

// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter @Setter @ToString @Builder
public class Greetings {
    private long timestamp;
    private String message;
}

注意,由于Lombok批注,该类如何没有任何getter和setter。 @ToString将使用类的字段生成toString()方法,而@Builder批注将允许我们使用流畅的生成器创建Greetings对象(请参见下文)。

创建服务层以写入Kafka

让我们创建的com.kaviddiss.streamkafka.service.GreetingsService下面的代码,将写一个类Greetings对象的greetings卡夫卡话题:

package com.kaviddiss.streamkafka.service;

import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
@Slf4j
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;

    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }

    public void sendGreeting(final Greetings greetings) {
        log.info("Sending greetings {}", greetings);

        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }

@Service批注会将此类配置为Spring Bean,并通过构造函数注入GreetingsService依赖项。

@Slf4j批注将生成一个SLF4J记录器字段,可用于记录日志。

sendGreeting()方法中,我们使用注入的GreetingsStream对象发送由Greetings对象表示的消息。

创建REST API

现在,我们将创建一个REST api端点,该端点将触发使用GreetingsService Spring Bean向Kafka发送消息:

package com.kaviddiss.streamkafka.web;

import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; 

@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;

    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }

    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
            .message(message)
            .timestamp(System.currentTimeMillis())
            .build();

        greetingsService.sendGreeting(greetings);
    }
}

@RestController注释告诉Spring这是一个Controller bean(MVC中的C)。 greetings()方法定义一个HTTP GET /greetings端点,该端点接受message请求参数,并将其传递给GreetingsServicesendGreeting()方法。

听问候卡夫卡主题

让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类,该类将侦听greetings Kafka主题上的消息并将其记录在控制台上:

package com.kaviddiss.streamkafka.service;

import com.kaviddiss.streamkafka.model.Greetings;
import com.kaviddiss.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

@Component批注类似于@Service @Component@Service @RestController定义了一个Spring Bean。

GreetingsListener有一个方法, handleGreetings()将通过云春流与每一个新的调用Greetings的消息对象greetings卡夫卡的话题。 这要感谢为handleGreetings()方法配置的@StreamListener批注。

运行应用程序

最后一个难题是由Spring Initializer自动生成的com.kaviddiss.streamkafka.StreamKafkaApplication类:

package com.kaviddiss.streamkafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamKafkaApplication.class, args);
    }
}

无需在此处进行任何更改。 您可以在您的IDE中将此类作为Java应用程序运行,也可以使用Spring Boot maven插件从命令行运行该应用程序:

> mvn spring-boot:run

应用程序运行后,在浏览器中转到http:// localhost:8080 / greetings?message = hello并检查您的控制台。

摘要

我希望您喜欢本教程。 随时提出任何问题并留下您的反馈。

翻译自: https://www.javacodegeeks.com/2018/03/spring-cloud-stream-kafka.html