RabbitMQ简介与应用
1、RabbitMQ简介
(1)RabbitMQ特点:
RabbitMQ是一个由ERlang语言开发的基于AMQP(高级消息队列协议)标准的消息队列开源实现。RabbitMQ最初起源于金融系统,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面有很好的表现。主要特点包括:
- 可靠性:RabbitMQ支持持久化、传输确认、发布确认等
- 灵活的路由功能:在消息进入队列之前,通过Exchange交换机来实现灵活路由。
- 支持消息集群:多态RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 高可用性:队列可以在集群中镜像,实现高可用。
- 支持多种协议:如AMQP、STOMP、MQTT等。
- 支持多语言客户端:如java、.NET、Ruby等
- 提供管理界面:便于监控和管理连接、交换机、队列等信息
- 提供插件机制:方便扩展。
(2)RabbitMQ基本概念
RabbitMQ整体结构如图:
- Message(消息):消息由消息头和消息体组成,消息包括routing-key(路由键)、priority(优先级)、delivery-mode(持久化存储)等属性。
- Publisher(消息发布者):向交换机发布消息的客户端程序。
- Exchange(交换器):用来接收消息发布者发送到的消息,并将消息路由给服务器中的队列
- Binding(绑定):一个绑定就是基于路由键将交换器与消息队列连接起来的路由规则。
- Queue(消息队列):是消息的容器,又来保存消息直到消息被发送到消费者。一个消息可以被投入到一个或多个队列中。
- Connection(网络连接):比如一个TCP连接。
- Channel(信道):多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接里面的虚拟连接,AMQP命令通过信道发送。
- Consumer(消息消费者):从消息队列取消息的客户端程序。
- Virtual Host(虚拟主机):表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。RabbitMQ默认的vhost是"/"。
- Broker(消息队列服务器实体)
(3)AMQP消息路由
相比于JMS,AMQP的路由加入了Exchange和Binding两个角色。消息由消息生产者发送到交换器,Binding再决定交换器中的消息应发送到哪个队列。
交换器类型
不同的交换器分发消息的策略也不同。交换器有4中类型:Direct、Fanout、Topic、Headers。
- Headers交换器:不处理路由键,匹配消息的Header。几乎不用。
- Fanout交换器:不处理路由键,只是简单地将队列绑定到交换器,发送交换器中的消息时会发送到所有绑定的队列中。通过该类型交换器转发消息是最快的。
- Direct交换器:如果消息中的路由键和Binding中的绑定键完全一致,则发送到对应的队列中。比如dog只匹配dog,不匹配dog.black
- Topic交换器:最灵活的交换器。将路由键和绑定的字符串切分成单词,单词间用".“隔开,识别两个通配符:”*":匹配一个单词,“#”:匹配0个或多个单词。
2、RabbitMQ应用实例
下面是一个在java中使用RabbitMQ的例子:
下载安装Erlang
下载安装RabbitMQ服务
启动RabbitMQ服务
创建java工程
在Maven工程中添加依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
消息生产者:
package com.youzi.rabbitMQ.amqp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置rabbitmq地址
factory.setHost("localhost");
factory.setVirtualHost("/");
//建立到代理服务器的连接
Connection conn = factory.newConnection();
//创建信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "helloexchange";
channel.exchangeDeclare(exchangeName,"direct",true);
String routingKey = "testRoutingKey";
//发布消息
byte[] messageBytes = "hello rabbitmq!!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBytes);
//关闭信道和连接
channel.close();
conn.close();
}
}
消息消费者:
package com.youzi.rabbitMQ.amqp;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置rabbitmq地址
factory.setHost("localhost");
factory.setVirtualHost("/");
//建立到代理服务器的连接
Connection conn = factory.newConnection();
//创建信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "helloexchange";
channel.exchangeDeclare(exchangeName,"direct",true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String RoutingKey = "testRoutingKey";
//绑定路由键
channel.queueBind(queueName,exchangeName,RoutingKey);
while (true){
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费路由键:"+routingKey);
System.out.println("消息内容的类型:"+contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag,false);
String bodyStr = new String(body,"UTF-8");
System.out.println("消费的消息体内容是:"+bodyStr);
}
});
}
}
}
先运行Consumer、在运行Producer,Consumer控制台打印消息:
3、Sring整合RabbitMQ应用实例
一个消息生产者生产50条消息,每条间隔时间一秒,两个消息消费者监听一个队列。
添加依赖:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!-- spring版本号 -->
<spring.version>4.1.3.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
Spring配置文件spring-context:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--这里配置了两个消息消费者-->
<bean id="myMessageListener1" class="com.youzi.rabbitMQ.spring.MyMessageListener1"/>
<bean id="myMessageListener2" class="com.youzi.rabbitMQ.spring.MyMessageListener2"/>
<!--配置连接-->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="guest" password="guest"
virtual-host="/" requested-heartbeat="60"/>
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 定义Rabbit模板,指定连接工厂以及定义exchange以及消息路由键 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="message.test"
message-converter="jsonMessageConverter" />
<!--配置RabbitAdmin-->
<rabbit:admin connection-factory="connectionFactory" />
<!--配置队列名称-->
<rabbit:queue name="myQueue"/>
<!--配置交换器-->
<rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="message.*"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--配置监听器-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="myMessageListener1" queue-names="myQueue"/>
<rabbit:listener ref="myMessageListener2" queue-names="myQueue"/>
</rabbit:listener-container>
</beans>
消息pojo对象:
package com.youzi.rabbitMQ.spring;
public class Message {
private int id;
private String content;
//getter、setter、toString
}
消息生产者:
package com.youzi.rabbitMQ.spring;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Producer {
public static void main(String[] args) throws InterruptedException {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
Message message = new Message();
for(int i=1;i<=50;i++){
message.setId(i);
message.setContent("这是消息"+i);
template.convertAndSend(message);
Thread.sleep(1000);
}
ctx.close();
}
}
两个消息消费者:
package com.youzi.rabbitMQ.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MyMessageListener1 implements MessageListener {
@Override
public void onMessage(Message message) {
String body = new String(message.getBody());
System.out.println("接收者1接收到消息:"+body);
}
}
package com.youzi.rabbitMQ.spring;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MyMessageListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
String body = new String(message.getBody());
System.out.println("接受者2接收到消息:"+body);
}
}
启动Producer,两个消费者监听器监听到队列中的消息,50条消息被均匀地分发到两个消费者:
总结:
RabbitMQ最大的优势就是灵活的路由策略,高可用,可靠性,适用于金融等高可用、可靠性需求大的领域,不过由于AMQP协议,导致RabbitMQ比较重量,吞吐量不及Kafka优秀。