如何根据消息中的值将元组发送到不同的螺栓

如何根据消息中的值将元组发送到不同的螺栓

问题描述:

我有一个Storm集群连接到Kinesis Stream。消息看起来像这样。如何根据消息中的值将元组发送到不同的螺栓

{ 
    _c: "a" 
} 

,或者它应该是

{ 
    _c: "b" 
} 

我想与_c = “a” 到一个螺栓和_c = “b” 的发送的元组到不同的螺栓。我如何实现这一目标?

这是使用GSON

@Override 
public void execute(Tuple tuple) { 
    String partitionKey = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); 
    String sequenceNumber = (String) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); 
    byte[] payload = (byte[]) tuple.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); 

    ByteBuffer buffer = ByteBuffer.wrap(payload); 
    String data = null; 
    try { 
    data = decoder.decode(buffer).toString(); 

    HashMap < String, String > map = new Gson().fromJson(data, new TypeToken < HashMap < String, Object >>() {}.getType()); 

    this.outputCollector.emit(tuple, new Values(map)); 
    this.outputCollector.ack(tuple); 

    } catch (CharacterCodingException e) { 
    this.outputCollector.fail(tuple); 
    } 

} 

感谢

您可以定义您的螺栓两个流,然后声明两个outputstreams该分析从室壁运动的消息JSON对象螺栓:

@Override 
public void execute(Tuple tuple) { 
    // ... 
    // Some Code 
    // ... 
    if (_c =="a") { 
    collector.emit("stream1", tuple, new Values(_c)); 
    } else { 
    collector.emit("stream2", tuple, new Values(_c)); 
    } 

} 

@Override 
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    outputFieldsDeclarer.declareStream("stream1", new Fields("_c")); 
    outputFieldsDeclarer.declareStream("stream2", new Fields("_c")); 
} } 

在您的拓扑中,您​​可以使用ShuffleGrouping中的选项传递Stream_id。

topology.setBolt("FirstBolt",new FirstBolt(),1);  
topology.setBolt("newBolt1", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream1"); 
topology.setBolt("newBolt2", new Custombolt(),1).shuffleGrouping("FirstBolt", "stream2"); 

另一种可能性是将它发送到两个螺栓,然后检查两个螺栓中的值并执行所需的代码。