基本的Queue消息发送和消费
配置Maven所需的依赖,示例如下
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.7</version> </dependency>
Queue消息发送的示例代码如下:
public class JmsProducer {
public static void main(String[] args) {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://39.96.192.171:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message--" + i);
Thread.sleep(1000);
//通过消息生产者发出消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
然后运行这个类
可以看到队列中我们自定义的my-queue,中有3个还没有被消费的消息!
接下来我们编写消费者来消费消息!
public class JmsReceiver {
public static void main(String[] args) {
try {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://39.96.192.171:61616");
Connection connection = cf.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
i++;
TextMessage message = (TextMessage) consumer.receive();
session.commit();
System.out.println("收到消息:" + message.getText());
}
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行这个类!可以看到控制台收到3条消息!
再看activemq控制台
可以看到有3条消息显示被消费!
这就是简单的消息生产和消费!消息消费分为两种方式,同步消费和异步消费,上面的例子就是同步消费,通过调用消费者的receive方法从目的地中显式提取消息,receive 方法可以一直阻塞到消息到达。
然后再写一个异步消费的例子!
public class JmsReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
if (null != message) {
System.out.println("收到的消息:" + textMessage.getText());
}
// 如果session设置为Session.CLIENT_ACKNOWLEDGE,要加上这一步
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
}