初识Storm
Storm,分布式的实时流处理框架,处理对象是流Stream,就是连续的元组Tuple,源头Spout,如水龙头的管口,流出的Tuple转换为Bolt(水处理器,或者叫处理容器),然后可以导向另外一个容器Bolt(水处理器,或者叫处理容器)。为了提升效率,可以多接一些Bolt,这样形成一张有向无环图,Topology(拓扑),实际上就是一个流转换图。
- 角色和组件对比:
- 核心组件:主节点Nimbus负责分配任务和监测故障,工作节点Supervisor,Worker负责处理事务的进程,里面的每一个线程就是一个TASK,0.7版本引入了事务,保证每一个消息至少被处理一次。
Storm使用Zookeeper协调集群,由于Zookeeper并不用于消息传递,所以Storm给Zookeeper带来的压力相当低。大多数情况下,单个节点的Zookeeper集群足够胜任,不过为了确保故障恢复或者部署大规模Storm集群,可能需要更大规模节点的Zookeeper集群(对于Zookeeper集群的话,官方推荐的最小节点数为3个)。在Zookeeper集群的每台机器上完成以下安装部署步骤:
1)下载安装Java JDK,官方下载链接为http://java.sun.com/javase/downloads/index.jsp,JDK版本为JDK 6或以上。
2)根据Zookeeper集群的负载情况,合理设置Java堆大小,尽可能避免发生swap,导致Zookeeper性能下降。保守期间,4GB内存的机器可以为Zookeeper分配3GB最大堆空间。
3)下载后解压安装Zookeeper包,官方下载链接为http://hadoop.apache.org/zookeeper/releases.html。
4)根据Zookeeper集群节点情况,创建如下格式的Zookeeper配置文件zoo.cfg:
其中,dataDir指定Zookeeper的数据文件目录;其中server.id=host:port:port,id是为每个Zookeeper节点的编号,保存在dataDir目录下的myid文件中,zoo1~zoo3表示各个Zookeeper节点的hostname,第一个port是用于连接leader的端口,第二个port是用于leader选举的端口。
5)在dataDir目录下创建myid文件,文件中只包含一行,且内容为该节点对应的server.id中的id编号。
6)启动Zookeeper服务:
java -cp zookeeper.jar:lib/log4j-1.2.15.jar:conf \ org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.cfg
也可以通过bin/zkServer.sh脚本启动Zookeeper服务。
7)通过Zookeeper客户端测试服务是否可用:
java -cp zookeeper.jar:src/java/lib/log4j-1.2.15.jar:conf:src/java/lib/jline-0.9.94.jar \ org.apache.zookeeper.ZooKeeperMain -server 127.0.0.1:2181
也可以通过bin/zkCli.sh脚本启动Zookeeper Java客户端。
./configure make cli_st make cli_mt
运行进入C客户端:
cli_st 127.0.0.1:2181 cli_mt 127.0.0.1:2181
至此,完成了Zookeeper集群的部署与启动。
注意事项:
接下来,需要在Nimbus和Supervisor机器上安装Storm的依赖库,具体如下:
以上依赖库的版本是经过Storm测试的,Storm并不能保证在其他版本的Java或Python库下可运行。
下载后编译安装ZMQ:
注意事项:
sudo yum install e2fsprogsl -b current sudo yum install e2fsprogs-devel -b current
下载后编译安装JZMQ:
为了保证JZMQ正常工作,可能需要完成以下配置:
注意事项:
1. 如果运行./configure命令出现问题,参考这里。
1. 下载并安装JDK 6,参考这里;
2. 配置JAVA_HOME环境变量;
3. 运行java、javac命令,测试java正常安装。
1. 下载Python2.6.6:
wget http://www.python.org/ftp/python/2.6.6/Python-2.6.6.tar.bz2
2. 编译安装Python2.6.6:
tar –jxvf Python-2.6.6.tar.bz2 cd Python-2.6.6 ./configure make make install
3. 测试Python2.6.6:
$ python -V Python 2.6.6
1. 如果使用RedHat系列Linux系统,执行以下命令安装unzip:
apt-get install unzip
2. 如果使用Debian系列Linux系统,执行以下命令安装unzip:
yum install unzip
下一步,需要在Nimbus和Supervisor机器上安装Storm发行版本。
1. 下载Storm发行版本,推荐使用Storm0.8.1:
wget https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
2. 解压到安装目录下:
unzip storm-0.8.1.zip
Storm发行版本解压目录下有一个conf/storm.yaml文件,用于配置Storm。默认配置在这里可以查看。conf/storm.yaml中的配置选项将覆盖defaults.yaml中的默认配置。以下配置选项是必须在conf/storm.yaml中进行配置的:
1) storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址,其格式如下:
storm.zookeeper.servers: - "111.222.333.444" - "555.666.777.888"
如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。
2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:
storm.local.dir: "/home/admin/storm/workdir"
3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为"/usr/local/lib:/opt/local/lib:/usr/lib",一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。
4) nimbus.host: Storm集群Nimbus机器地址,各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件,如:
nimbus.host: "111.222.333.444"
5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
最后一步,启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Storm才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Supervisors被重启,运行中的Topologies不会受到影响。
以下是启动Storm各个后台进程的方式:
注意事项:
至此,Storm集群已经部署、配置完毕,可以向集群提交拓扑运行了。
1)启动Storm Topology:
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3
其中,allmycode.jar是包含Topology实现代码的jar包,org.me.MyTopology的main方法是Topology的入口,arg1、arg2和arg3为org.me.MyTopology执行时需要传入的参数。
2)停止Storm Topology:
storm kill {toponame}
其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称。
1. https://github.com/nathanmarz/storm/wiki/Tutorial
2. https://github.com/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster