storm_可靠消息的例子

这是一个比较复杂的例子,含盖的知识面较多。

需求:统计出某文件夹下各文件中,各个单词出现的数次。(同时将文件剪切到另一个目录)

知识点一:多进程/多线程读取同一个文件,需要对文件进行加锁。

知识点二:对单词做统计,所以最好使用按字段分组

知识点三:如何得知一个文件的单词统计完了,需要使用storm的可靠消息机制。

知识点四:当知道某一个文件的单词统计完后,需要将处理这些单词的bolt结果汇总到一个bolt,需要用到globalGarouping分组,而且此时是将一个map发送到bolt,而不再是字串。

知识点五:某一个文件的所有bolt中的map汇总到一个bolt结束时,输出此文件中的所有字段和计数,知道这个时机需要再次用到storm的可靠消息机制。同一个topology中使用多套消息可靠机制,需要对机制有更深的理解。

知识点六:同一个topology中的spout/bolt,需要处理计数/汇总/输出结果三类消息任务,如何把各类任务区分开,需要更好的理解streamid的内容。

 

  1. 工作流程如下:

1.监控文件夹,获取文件(spout),将文件名发送出去

2.(bolt)读取文件,拆分为一个一个的单词,将单词发送出去

3.(bolt)按字段分组统计,使用map记下各单词对应的个数

4.当前面的统计结束时(使用可靠消息机制),各个统计的bolt将自己的map发出去,发到同一个bolt

5.(bolt)汇总前面统计的结果。

6.当汇总结束时(使用可靠消息机制),将结果打印出来。

 

因为需要两次使用可靠消息机制,因此相当于要跑三次任务,每次是一个topology,三个topology如下图:

storm_可靠消息的例子

storm_可靠消息的例子

storm_可靠消息的例子

 

2.Spout

负责从监控的目录中获取文件,剪切到另一目录后,将文件路径发送出去。

public class WordSpout extends BaseRichSpout{

SpoutOutputCollector _collector;

String _monitorDir = "E:/wordCount/";

File _file = new File(_monitorDir + "source/");

Map<String, String> _message_working = new HashMap<String, String>();

Map<String, String> _message_summary = new HashMap<String, String>();

 

static public void main(String[] args){

WordSpout ws = new WordSpout();

ws.nextTuple();

}

@Override

public void nextTuple() {

Utils.sleep(100);

String filePath = getFilePath();// 使用加锁的方式获取某一个文件

if (filePath != null){

String uuid = UUID.randomUUID().toString();

_message_working.put(uuid, filePath);

// System.out.println("---WordSpout---------------lockFile:"+filePath);

// 开始处理某一个文件

_collector.emit("working", new Values(filePath), uuid);

}

}

 

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_collector = collector;

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream("working",new Fields("filePath"));

declarer.declareStream("summary", new Fields("filePath"));

declarer.declareStream("end", new Fields("filePath"));

}

 

@Override

public void ack(Object messageId) {

String msgId = (String)messageId;

 

// 统计工作结束,准备汇总

if (_message_working.containsKey(msgId)){

String filePath = _message_working.get(msgId);

String uuid = UUID.randomUUID().toString();

_message_summary.put(uuid, filePath);

// System.out.println("---------------working end-------------"+filePath);

_collector.emit("summary", new Values(filePath), uuid);

}

// 汇总结束,准备输出结果

if (_message_summary.containsKey(msgId)){

String filePath = _message_summary.get(msgId);

// System.out.println("---------------summary end-------------"+filePath);

_collector.emit("end", new Values(filePath));

}

}

 

@Override

public void fail(Object messageId) {

String msgId = (String)messageId;

// 统计工作结束,准备汇总

if (_message_working.containsKey(msgId)){

String filePath = _message_working.get(msgId);

// System.out.println("---------------working failed-------------"+filePath);

}

// 汇总结束,准备输出结果

if (_message_summary.containsKey(msgId)){

String filePath = _message_summary.get(msgId);

// System.out.println("---------------summary failed-------------"+filePath);

}

}

 

private String getFilePath(){

String[] files = _file.list();

String newPath = null;

String filePath = null;

boolean bDeleted = false;

for (String fileName : files){

FileLock fLock = null;

FileChannel fc = null;

try{

filePath = _monitorDir + "source/" + fileName;

fc = getChannel(filePath);

if (fc != null){

fLock = fc.tryLock();

if (fLock != null && fLock.isValid()) {

String tempNewPath = _monitorDir + "finished/" + fileName;

File newFile = new File(tempNewPath);

// 因为fc在copyFileForChannel里面close了,因此不需要再对fLock调用release了

copyFileForChannel(fc, newFile);

bDeleted = deleteFile(filePath);

if (bDeleted){

newPath = tempNewPath;

break;

}

else{

System.out.println("delete failed : " + filePath);

continue;

}

}

else{

System.out.println("----------------lock failed----------");

}

}

}

catch (Exception e) {

}

finally{

try {

if (fc != null){

fc.close();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

 

return newPath;

}

 

public static FileChannel getChannel(String filePath){

FileChannel fc = null;

try {

File file = new File(filePath);

RandomAccessFile raf = new RandomAccessFile(file, "rw");

fc = raf.getChannel();

} catch (FileNotFoundException e) {

e.printStackTrace();

}catch (IOException e) {

e.printStackTrace();

}

 

return fc;

}

 

public static void copyFileForChannel(FileChannel inC, File des) throws Exception {

int length = 2097152;

FileOutputStream out = new FileOutputStream(des);

FileChannel outC = out.getChannel();

ByteBuffer b = null;

while (true) {

if (inC.position() == inC.size()) {

inC.close();

outC.close();

return;

}

if ((inC.size() - inC.position()) < length) {

length = (int) (inC.size() - inC.position());

} else

length = 2097152;

b = ByteBuffer.allocateDirect(length);

inC.read(b);

b.flip();

outC.write(b);

outC.force(false);

}

}

 

public static boolean deleteFile(String path) {

boolean flag = false;

File file = new File(path);

if (!file.exists()) {

return flag;

}

flag = file.delete();

return flag;

}  

}

 

3. 第一组bolt

public class SplitBolt extends BaseRichBolt{

OutputCollector _collector;

@Override

public void execute(Tuple tuple) {

String filePath = tuple.getStringByField("filePath");

String streamId = tuple.getSourceStreamId();

if (streamId.equals("working")){

String content = readerFile(filePath);

// System.out.println("---SplitBolt---------------readfile:"+filePath);

StringTokenizer token = new StringTokenizer(content);

while (token.hasMoreTokens()) {

String word = token.nextToken(", ()[]{}-/?.!:\"\n())");

// System.out.println("---SplitBolt--------------word:" + word);

_collector.emit("working", tuple, new Values(word, filePath));

}

}

 

if (streamId.equals("summary")){

// System.out.println("---SplitBolt--summary------------filePath:" + filePath);

_collector.emit("summary", tuple, new Values(filePath));

}

 

if (streamId.equals("end")){

// System.out.println("---SplitBolt--finish------------filePath:" + filePath);

_collector.emit("end", tuple, new Values(filePath));

}

 

this._collector.ack(tuple);

}

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this._collector = collector;

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream("working",new Fields("word", "filePath"));

declarer.declareStream("summary", new Fields("filePath"));

declarer.declareStream("end", new Fields("filePath"));

}

 

private static String readerFile(String filePath){

File file = new File(filePath);

StringBuffer stringburf = new StringBuffer();

try {

BufferedReader br = new BufferedReader(new FileReader(file));

String sLineString = br.readLine();

while(sLineString != null){

stringburf.append(sLineString + "\n");

sLineString = br.readLine();

}

} catch (IOException e) {

e.printStackTrace();

}

 

    return stringburf.toString();

}

}

 

4. 第二组bolt

public class CountBolt extends BaseRichBolt{

OutputCollector _collector;

Map<String, Map<String, Integer>> _wordsCount = new HashMap<String, Map<String, Integer>>();

String m_key = null;

@Override

public void execute(Tuple tuple) {

String streamId = tuple.getSourceStreamId();

String filePath = tuple.getStringByField("filePath");

Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);

if (wordCount == null){

wordCount = new HashMap<String, Integer>();

_wordsCount.put(filePath, wordCount);

}

 

if (streamId.equals("working")){

String word = tuple.getStringByField("word");

Integer count = (Integer)wordCount.get(word);

if (count == null){

count = 0;

}

count++;

wordCount.put(word, count);

// System.out.println("---CountBolt---working----------:" + word+"-" + count);

}

 

if (streamId.equals("summary")){

// System.out.println("---CountBolt---summary----------filePath:" + filePath);

this._collector.emit("summary", tuple, new Values(filePath, wordCount));

}

 

if (streamId.equals("end")){

// System.out.println("---CountBolt--finish------------filePath:" + filePath);

_collector.emit("end", tuple, new Values(filePath));

wordCount.clear();

_wordsCount.remove(filePath);

}

 

this._collector.ack(tuple);

}

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this._collector = collector;

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declareStream("summary",new Fields("filePath", "content"));

declarer.declareStream("end", new Fields("filePath"));

}

}

5. 第三组bolt

public class SummaryBolt extends BaseRichBolt{

OutputCollector _collector;

Map<String, Map<String, Integer>> _wordsCount = new HashMap<String, Map<String, Integer>>();

@Override

public void execute(Tuple tuple) {

 

String filePath = tuple.getStringByField("filePath");

String streamId = tuple.getSourceStreamId();

if (streamId.equals("summary")){

Map<String, Integer> tempWordCount = (Map<String, Integer>)tuple.getValueByField("content");

Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);

if (wordCount == null){

wordCount = new HashMap<String, Integer>();

_wordsCount.put(filePath, wordCount);

}

wordCount.putAll(tempWordCount);

// System.out.println("-----SummaryBolt------summary-----------"+filePath);

}

 

if (streamId.equals("end")){

System.out.println("******finish**************** : " + filePath);

Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);

if (wordCount != null) {

Iterator wordIter = wordCount.keySet().iterator();

while (wordIter.hasNext()) {

String keyword = (String) wordIter.next();

Integer nCount = (Integer) wordCount.get(keyword);

System.out.println("---------------- keyword : " + keyword + "   count : " + nCount);

}

wordCount.clear();

}

_wordsCount.remove(filePath);

}

 

this._collector.ack(tuple);

}

 

@Override

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

this._collector = collector;

}

 

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("wordSummary", "key"));

}

}

 

6. Topology

public class ExclamationTopology {

public static void main(String[] args){

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("FileSpout", new WordSpout(), 1);

builder.setBolt("SplitBolt", new SplitBolt(), 4)

.shuffleGrouping("FileSpout", "working")

.shuffleGrouping("FileSpout", "summary")

.shuffleGrouping("FileSpout", "end");

builder.setBolt("CountBolt", new CountBolt(), 4)

.fieldsGrouping("SplitBolt", "working", new Fields("word"))

.allGrouping("SplitBolt", "summary")

.shuffleGrouping("SplitBolt", "end");

builder.setBolt("SummaryBolt", new SummaryBolt(), 1)

.globalGrouping("CountBolt", "summary")

.globalGrouping("CountBolt", "end");

 

Config conf = new Config();

// conf.setDebug(true);

if ((args != null) && (args.length > 0)){

conf.setNumWorkers(2);

try {

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} catch (AlreadyAliveException e) {

e.printStackTrace();

} catch (InvalidTopologyException e) {

e.printStackTrace();

}

}

else{

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());

// Utils.sleep(5000);

// cluster.killTopology("test");

// cluster.shutdown();

}

}

}