activeMQ 消息发送 服务器集群配置

1 下载activeMQ,然后解压,调用宾文件的下的activemq.bat 来启动mq 服务,然后就在localhost:61616下访问该控制台了,默认用户名和密码为admin,自启动的时候如果启动不了,有可能是jdk版本的问题,注意各个版本的对对应关系
然后写一个生产者和消费者

生产者:
public class AppProducer {
     private static final String url "tcp://127.0.0.1:61616";
    //private static final String url "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";  // 用来做集群
    private static final String queueName "myQueue";
    public static void main(String[] ars) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue(queueName);// 目标
        // 生产者
        MessageProducer messageProducer = session.createProducer(destination);
        for (int i = 0;i<100;i++){
            TextMessage textMessage = session.createTextMessage("test"+i);
            messageProducer.send(textMessage);
            System.out.println("消息已经发送"+i);
        }
    /*  User user = new User("ryz");
        ObjectMessage textMessage = session.createObjectMessage(user);
        messageProducer.send(textMessage);*/
        connection.close();

    }
}
生产者在队列为myQueue中放入100条数据,可以到服务器的控制台页面查看,确实存在这一百条数据,现在我们写一个消费者来消费数据
消费者
public class AppConsumer {
private static final String url "tcp://127.0.0.1:61616";
   // private static final String url "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";// 用来做集群
    private static final String queueName "myQueue";
    public static void main(String[] ars) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

        Connection connection = connectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue(queueName);// 目标
        // 生产者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("收到消息"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
       // connection.close();// 注意关闭连接的时机

    }
}
消费者在消费数据的是一个异步的,所以不能立马关闭连接
现在我们做一个集群的操作
三台集群的关系如下
activeMQ 消息发送 服务器集群配置
A,B,C 三台服务器,AB,AC的关系是消息同步,BC与数据库进行持久化,
activeMQ 消息发送 服务器集群配置
kahadb 文件夹是共享数据存储器
在activemq 的配置文件activemq.xml 中配置
nodea: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>
                </networkConnectors>
在jetty.xml  配置 <property name="port" value="8161"/>
nodeb: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_a" uri="static:(tcp://127.0.0.1:61616)"/>
                </networkConnectors>
<persistenceAdapter>
            <kahaDB directory="/ActiveMQ/activemq/kahadb"/>
        </persistenceAdapter>
在jetty.xml  配置 <property name="port" value="8162"/>
nodec: 在activemq.xml 中配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
                        <networkConnector name="local_b" uri="static:(tcp://127.0.0.1:61616)"/>
                </networkConnectors>
<persistenceAdapter>
            <kahaDB directory="/ActiveMQ/activemq/kahadb"/>
        </persistenceAdapter>
在jetty.xml  配置 <property name="port" value="8163"/>

好了三个服务都配置好了,现在启动三个服务。然后依次登录对应的服务控制台,可以看到a是可以访问,b也是可以访问的,但是c无法访问,因为c在等到共享资源的锁,如果得到锁,他就会立马启动。现在我们把b关闭,看看c是不是启动了?
好了,现在我们用程序来测试我们的服务集群,首先用生产者生产100数据,可以看到c中没有数据,因为c没有启动嘛,现在把b关闭了,再看c中有没有数据?当然有啦。a中自然没有,因为我们没有把a当发送服务器。现在启动消费者,看看数据是不是被消费掉了,当然是消费掉了啊。
好啦,一个简单的服务器集群做好了。