STORM入门之(Trident集成Hbase)
Trident方式集成Hbase
(1)测试时需要配置hbase正确的host
win:C:\Windows\System32\drivers\etc\hosts
liunx:/etc/hosts
(2)Trident实现
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hbase.trident.mapper.SimpleTridentHBaseMapper;
import org.apache.storm.hbase.trident.mapper.TridentHBaseMapper;
import org.apache.storm.hbase.trident.state.HBaseState;
import org.apache.storm.hbase.trident.state.HBaseStateFactory;
import org.apache.storm.hbase.trident.state.HBaseUpdater;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.TridentOperationContext;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class HbaseTrident {
/**
* @param args
*/
@SuppressWarnings("unchecked")
public static void main(String[] args) {
// Config conf = new Config();
// Configuration HBASE_CONFIG = new Configuration();
// HBASE_CONFIG.setInt("hbase.rpc.timeout", 20000);
// HBASE_CONFIG.setInt("hbase.client.operation.timeout", 30000);
// HBASE_CONFIG.setInt("hbase.client.scanner.timeout.period", 20000);
// HBASE_CONFIG.set("hbase.zookeeper.quorum","10.2.4.39,10.2.4.40,10.2.4.41");
// HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
// conf.put("hbase", HBASE_CONFIG);
Config conf = new Config();
Map<String, String> HBASE_CONFIG = new HashMap<String, String>();
HBASE_CONFIG.put("hbase.rpc.timeout", "20000");
HBASE_CONFIG.put("hbase.client.operation.timeout", "30000");
HBASE_CONFIG.put("hbase.client.scanner.timeout.period", "20000");
HBASE_CONFIG.put("hbase.zookeeper.quorum","10.176.62.1,10.176.62.2,10.176.62.3");
HBASE_CONFIG.put("hbase.zookeeper.property.clientPort", "2181");
conf.put("HBASE_CONFIG", HBASE_CONFIG);
//数据写入到Hbase当中
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
.withColumnFamily("bas") //列簇
.withColumnFields(new Fields("id")) //列
.withRowKeyField("rowkey"); //rowkey 上bolt传入
HBaseState.Options options = new HBaseState.Options()
.withConfigKey("HBASE_CONFIG")
.withDurability(Durability.SYNC_WAL)
.withMapper(tridentHBaseMapper)
.withTableName("test1");
StateFactory hBaseStateFactory = new HBaseStateFactory(options);
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
// new Values("the man went to the store and bought some candy"),
// new Values("four score and seven years ago"),
new Values("the cow how many apples"));
spout.setCycle(false);
TridentTopology topology = new TridentTopology();
topology.newStream("spout",spout)
.each(new Fields("sentence"), new Split(), new Fields("rowkey","bas","id")).filter(new Filter())
.partitionPersist(hBaseStateFactory, new Fields("rowkey", "bas", "id"), new HBaseUpdater());
StormTopology stormTopology = topology.build();
LocalCluster cluster = new LocalCluster();
conf.setDebug(false);
cluster.submitTopology("test", conf,stormTopology);
}
public static class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word,word,"valuetest"));
}
}
}
public static class Filter extends BaseFilter {
public int partitionIndex = 0;
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.partitionIndex = context.getPartitionIndex();
super.prepare(conf, context);
}
@Override
public boolean isKeep(TridentTuple arg0) {
System.out.println(arg0.getStringByField("rowkey"));
// System.out.println("String[0] "+arg0.getString(0));
// System.out.println("String[1] "+arg0.getString(1));
// System.out.println("String[2] "+arg0.getString(2));
// System.out.println("String[3] "+arg0.getString(3));
//
// System.out.println("[String[0] "+partitionIndex+"]"+arg0.getString(0));
// System.out.println("[String[1] "+partitionIndex+"]"+arg0.getString(1));
// System.out.println("[String[2] "+partitionIndex+"]"+arg0.getString(2));
return true;
}
}
}
(3)入库结果展示