黑猴子的家:Kafka Streams 案例
Code -> GitHub
https://github.com/liufengji/kafka_api.git
1、需求分析
实时处理单词带有”>>>”前缀的内容。例如输入”victor>>>mayy”,最终处理成“mayy”
2、创建主类 (创建一个工程,并添加jar包)
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 定义输入的topic
String from = "first";
// 定义输出的topic
String to = "second";
// 设置参数
Properties settings = new Properties();
//给应用程序设计一个名字
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
//连接哪台主机
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from)
.addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
@Override
public Processor<byte[], byte[]> get() {
// 具体分析处理
return new LogProcessor();
}
}, "SOURCE")
.addSink("SINK", to, "PROCESS");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
3、具体业务处理
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;台
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
4、运行程序,在node3上启动生产者
[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list node1:9092 \
--topic first
> hello>>>world
> hei>>>victor
> h>>>victor
> z>>>victor
5、在node2上启动消费者
[[email protected] kafka]$ bin/kafka-console-consumer.sh \
--zookeeper node1:2181 \
--from-beginning \
--topic second
world
victor
victor
victor