Storm整合Redis
实现功能:
将之前的词频统计案例改编,将一个数组中的数据每隔1秒取出一个,通过Storm的Topology处理之后写入到Redis中
首先要记得导入pom依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.1.1</version>
</dependency>
代码实现:
package cn.ysjh.drpc;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.*;
public class StormRedis {
private static class DataSourceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
public static final String[] words = new String[]{"apple", "ysjh", "shjkl", "ueyowir", "tiuyh"};
@Override
public void nextTuple() {
Random random = new Random();
String word = words[random.nextInt(words.length)];
this.spoutOutputCollector.emit(new Values(word));
System.out.println("数据" + word);
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("lines"));
}
}
/*
词频分割Bolt
*/
private static class SplitBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
/*
对lines按照逗号进行切分
*/
@Override
public void execute(Tuple tuple) {
String lines = tuple.getStringByField("lines");
this.outputCollector.emit(new Values(lines));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("words"));
}
}
/*
词频统计Bolt
*/
private static class CountBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector=outputCollector;
}
Map<String, Integer> map = new HashMap<>();
@Override
public void execute(Tuple tuple) {
String words = tuple.getStringByField("words");
Integer count = map.get(words);
if (count == null) {
count = 0;
}
count++;
map.put(words, count);
//输出
this.outputCollector.emit(new Values(words,map.get(words)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
}
public static class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wc";
public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}
@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getIntegerByField("count")+"";
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout", new DataSourceSpout());
builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt");
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("118.89.108.116").setPort(6379).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt("RedisStoreBolt",storeBolt).shuffleGrouping("CountBolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormRedis", new Config(), builder.createTopology());
}
}
运行截图:
然后在Redis的图形连接软件中不断刷线来查看db0数据库中的键值对的变化