storm_可靠消息的例子
这是一个比较复杂的例子,含盖的知识面较多。
需求:统计出某文件夹下各文件中,各个单词出现的数次。(同时将文件剪切到另一个目录)
知识点一:多进程/多线程读取同一个文件,需要对文件进行加锁。
知识点二:对单词做统计,所以最好使用按字段分组
知识点三:如何得知一个文件的单词统计完了,需要使用storm的可靠消息机制。
知识点四:当知道某一个文件的单词统计完后,需要将处理这些单词的bolt结果汇总到一个bolt,需要用到globalGarouping分组,而且此时是将一个map发送到bolt,而不再是字串。
知识点五:某一个文件的所有bolt中的map汇总到一个bolt结束时,输出此文件中的所有字段和计数,知道这个时机需要再次用到storm的可靠消息机制。同一个topology中使用多套消息可靠机制,需要对机制有更深的理解。
知识点六:同一个topology中的spout/bolt,需要处理计数/汇总/输出结果三类消息任务,如何把各类任务区分开,需要更好的理解streamid的内容。
- 工作流程如下:
1.监控文件夹,获取文件(spout),将文件名发送出去
2.(bolt)读取文件,拆分为一个一个的单词,将单词发送出去
3.(bolt)按字段分组统计,使用map记下各单词对应的个数
4.当前面的统计结束时(使用可靠消息机制),各个统计的bolt将自己的map发出去,发到同一个bolt
5.(bolt)汇总前面统计的结果。
6.当汇总结束时(使用可靠消息机制),将结果打印出来。
因为需要两次使用可靠消息机制,因此相当于要跑三次任务,每次是一个topology,三个topology如下图:
2.Spout
负责从监控的目录中获取文件,剪切到另一目录后,将文件路径发送出去。
public class WordSpout extends BaseRichSpout{
SpoutOutputCollector _collector;
String _monitorDir = "E:/wordCount/";
File _file = new File(_monitorDir + "source/");
Map<String, String> _message_working = new HashMap<String, String>();
Map<String, String> _message_summary = new HashMap<String, String>();
static public void main(String[] args){
WordSpout ws = new WordSpout();
ws.nextTuple();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String filePath = getFilePath();// 使用加锁的方式获取某一个文件
if (filePath != null){
String uuid = UUID.randomUUID().toString();
_message_working.put(uuid, filePath);
// System.out.println("---WordSpout---------------lockFile:"+filePath);
// 开始处理某一个文件
_collector.emit("working", new Values(filePath), uuid);
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("working",new Fields("filePath"));
declarer.declareStream("summary", new Fields("filePath"));
declarer.declareStream("end", new Fields("filePath"));
}
@Override
public void ack(Object messageId) {
String msgId = (String)messageId;
// 统计工作结束,准备汇总
if (_message_working.containsKey(msgId)){
String filePath = _message_working.get(msgId);
String uuid = UUID.randomUUID().toString();
_message_summary.put(uuid, filePath);
// System.out.println("---------------working end-------------"+filePath);
_collector.emit("summary", new Values(filePath), uuid);
}
// 汇总结束,准备输出结果
if (_message_summary.containsKey(msgId)){
String filePath = _message_summary.get(msgId);
// System.out.println("---------------summary end-------------"+filePath);
_collector.emit("end", new Values(filePath));
}
}
@Override
public void fail(Object messageId) {
String msgId = (String)messageId;
// 统计工作结束,准备汇总
if (_message_working.containsKey(msgId)){
String filePath = _message_working.get(msgId);
// System.out.println("---------------working failed-------------"+filePath);
}
// 汇总结束,准备输出结果
if (_message_summary.containsKey(msgId)){
String filePath = _message_summary.get(msgId);
// System.out.println("---------------summary failed-------------"+filePath);
}
}
private String getFilePath(){
String[] files = _file.list();
String newPath = null;
String filePath = null;
boolean bDeleted = false;
for (String fileName : files){
FileLock fLock = null;
FileChannel fc = null;
try{
filePath = _monitorDir + "source/" + fileName;
fc = getChannel(filePath);
if (fc != null){
fLock = fc.tryLock();
if (fLock != null && fLock.isValid()) {
String tempNewPath = _monitorDir + "finished/" + fileName;
File newFile = new File(tempNewPath);
// 因为fc在copyFileForChannel里面close了,因此不需要再对fLock调用release了
copyFileForChannel(fc, newFile);
bDeleted = deleteFile(filePath);
if (bDeleted){
newPath = tempNewPath;
break;
}
else{
System.out.println("delete failed : " + filePath);
continue;
}
}
else{
System.out.println("----------------lock failed----------");
}
}
}
catch (Exception e) {
}
finally{
try {
if (fc != null){
fc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
return newPath;
}
public static FileChannel getChannel(String filePath){
FileChannel fc = null;
try {
File file = new File(filePath);
RandomAccessFile raf = new RandomAccessFile(file, "rw");
fc = raf.getChannel();
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException e) {
e.printStackTrace();
}
return fc;
}
public static void copyFileForChannel(FileChannel inC, File des) throws Exception {
int length = 2097152;
FileOutputStream out = new FileOutputStream(des);
FileChannel outC = out.getChannel();
ByteBuffer b = null;
while (true) {
if (inC.position() == inC.size()) {
inC.close();
outC.close();
return;
}
if ((inC.size() - inC.position()) < length) {
length = (int) (inC.size() - inC.position());
} else
length = 2097152;
b = ByteBuffer.allocateDirect(length);
inC.read(b);
b.flip();
outC.write(b);
outC.force(false);
}
}
public static boolean deleteFile(String path) {
boolean flag = false;
File file = new File(path);
if (!file.exists()) {
return flag;
}
flag = file.delete();
return flag;
}
}
3. 第一组bolt
public class SplitBolt extends BaseRichBolt{
OutputCollector _collector;
@Override
public void execute(Tuple tuple) {
String filePath = tuple.getStringByField("filePath");
String streamId = tuple.getSourceStreamId();
if (streamId.equals("working")){
String content = readerFile(filePath);
// System.out.println("---SplitBolt---------------readfile:"+filePath);
StringTokenizer token = new StringTokenizer(content);
while (token.hasMoreTokens()) {
String word = token.nextToken(", ()[]{}-/?.!:\"\n())");
// System.out.println("---SplitBolt--------------word:" + word);
_collector.emit("working", tuple, new Values(word, filePath));
}
}
if (streamId.equals("summary")){
// System.out.println("---SplitBolt--summary------------filePath:" + filePath);
_collector.emit("summary", tuple, new Values(filePath));
}
if (streamId.equals("end")){
// System.out.println("---SplitBolt--finish------------filePath:" + filePath);
_collector.emit("end", tuple, new Values(filePath));
}
this._collector.ack(tuple);
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this._collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("working",new Fields("word", "filePath"));
declarer.declareStream("summary", new Fields("filePath"));
declarer.declareStream("end", new Fields("filePath"));
}
private static String readerFile(String filePath){
File file = new File(filePath);
StringBuffer stringburf = new StringBuffer();
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String sLineString = br.readLine();
while(sLineString != null){
stringburf.append(sLineString + "\n");
sLineString = br.readLine();
}
} catch (IOException e) {
e.printStackTrace();
}
return stringburf.toString();
}
}
4. 第二组bolt
public class CountBolt extends BaseRichBolt{
OutputCollector _collector;
Map<String, Map<String, Integer>> _wordsCount = new HashMap<String, Map<String, Integer>>();
String m_key = null;
@Override
public void execute(Tuple tuple) {
String streamId = tuple.getSourceStreamId();
String filePath = tuple.getStringByField("filePath");
Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);
if (wordCount == null){
wordCount = new HashMap<String, Integer>();
_wordsCount.put(filePath, wordCount);
}
if (streamId.equals("working")){
String word = tuple.getStringByField("word");
Integer count = (Integer)wordCount.get(word);
if (count == null){
count = 0;
}
count++;
wordCount.put(word, count);
// System.out.println("---CountBolt---working----------:" + word+"-" + count);
}
if (streamId.equals("summary")){
// System.out.println("---CountBolt---summary----------filePath:" + filePath);
this._collector.emit("summary", tuple, new Values(filePath, wordCount));
}
if (streamId.equals("end")){
// System.out.println("---CountBolt--finish------------filePath:" + filePath);
_collector.emit("end", tuple, new Values(filePath));
wordCount.clear();
_wordsCount.remove(filePath);
}
this._collector.ack(tuple);
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this._collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("summary",new Fields("filePath", "content"));
declarer.declareStream("end", new Fields("filePath"));
}
}
5. 第三组bolt
public class SummaryBolt extends BaseRichBolt{
OutputCollector _collector;
Map<String, Map<String, Integer>> _wordsCount = new HashMap<String, Map<String, Integer>>();
@Override
public void execute(Tuple tuple) {
String filePath = tuple.getStringByField("filePath");
String streamId = tuple.getSourceStreamId();
if (streamId.equals("summary")){
Map<String, Integer> tempWordCount = (Map<String, Integer>)tuple.getValueByField("content");
Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);
if (wordCount == null){
wordCount = new HashMap<String, Integer>();
_wordsCount.put(filePath, wordCount);
}
wordCount.putAll(tempWordCount);
// System.out.println("-----SummaryBolt------summary-----------"+filePath);
}
if (streamId.equals("end")){
System.out.println("******finish**************** : " + filePath);
Map<String, Integer> wordCount = (Map<String, Integer>) _wordsCount.get(filePath);
if (wordCount != null) {
Iterator wordIter = wordCount.keySet().iterator();
while (wordIter.hasNext()) {
String keyword = (String) wordIter.next();
Integer nCount = (Integer) wordCount.get(keyword);
System.out.println("---------------- keyword : " + keyword + " count : " + nCount);
}
wordCount.clear();
}
_wordsCount.remove(filePath);
}
this._collector.ack(tuple);
}
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this._collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("wordSummary", "key"));
}
}
6. Topology
public class ExclamationTopology {
public static void main(String[] args){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("FileSpout", new WordSpout(), 1);
builder.setBolt("SplitBolt", new SplitBolt(), 4)
.shuffleGrouping("FileSpout", "working")
.shuffleGrouping("FileSpout", "summary")
.shuffleGrouping("FileSpout", "end");
builder.setBolt("CountBolt", new CountBolt(), 4)
.fieldsGrouping("SplitBolt", "working", new Fields("word"))
.allGrouping("SplitBolt", "summary")
.shuffleGrouping("SplitBolt", "end");
builder.setBolt("SummaryBolt", new SummaryBolt(), 1)
.globalGrouping("CountBolt", "summary")
.globalGrouping("CountBolt", "end");
Config conf = new Config();
// conf.setDebug(true);
if ((args != null) && (args.length > 0)){
conf.setNumWorkers(2);
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}
else{
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
// Utils.sleep(5000);
// cluster.killTopology("test");
// cluster.shutdown();
}
}
}