RabbitMQ的安装及在Springboot项目中的简单使用
首先安装erlang,然后安装rabbitmq。
RabbitMQ下载地址:http://www.rabbitmq.com/install-windows.html
对应版本(必须是与mq版本适应)的erlang下载地址:http://www.erlang.org/downloads/20.2
怎么安装,网上有很多教程,这里不再赘述。
注意安装完Rabbit MQ以后,服务会自动运行,这时环境变量里的ERLANG_HOME会自动生成,在”环境变量”中检查是否存在,如果不存在,请在”环境变量”中手动添加。
这是我使用的版本:
安装完成后可以配置可视化RabbitMQ管理界面
运行命令行窗口cmd
输入命令rabbitmq-plugins enable rabbitmq_management,就可以添加可视化插件了。
查看可视化插件是否成功:
在浏览器中输入地址:http://127.0.0.1:15672/
输入默认账号: guest 密码: guest
就可以登录查看rabbitmq里的资源信息。
备注:可以运行 rabbitmqctl stop 命令来停止服务
可以运行 rabbitmq-server -detached命令来重启服务并后台运行。
RabbitMQ的日志信息,可以在 C:/Users/Administrator/AppData/Roaming/RabbitMQ/log/文件夹下进行查看,如:C:/Users/Administrator/AppData/Roaming/RabbitMQ/log/[email protected]
下来我们来看代码:
先看application.properties的配置, name随便取,其余默认
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
启动类加上@EnableAutoConfiguration注解
@SpringBootApplication
@EnableAutoConfiguration
@EnableScheduling
@MapperScan("com.example.test.mapper")
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
具体代码部分:
package com.example.test.common;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import lombok.Data;
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Configuration
@Data
public class RabbitConstants {
public static final String EXCHANGE = "bootExchange";
public static final String ROUTINGKEY = "routingkey";
public static final String QUEUE = "bootQueue";
// spring.rabbitmq.host=127.0.0.1
// spring.rabbitmq.port=5672
// spring.rabbitmq.username=guest
// spring.rabbitmq.password=guest
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("#{true}")
private boolean publisherConfirms;
}
消费者:
package com.example.test.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.test.common.RabbitConstants;
@Component
public class FirstConsumer {
@RabbitListener(queues = RabbitConstants.QUEUE)
public Object handleMessage(String message) throws Exception {
Thread.sleep(5000);
// 处理消息
System.out.println("处理消息-------->:"+message);
Thread.sleep(5000);
return "处理消息成功-------->:"+message;
}
}
监听器,交换机设置:
package com.example.test.rabbitmq.receive;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Payload;
import com.example.test.common.RabbitConstants;
@Configuration
@RabbitListener(queues = RabbitConstants.QUEUE)
public class Listener {
/** 设置交换机类型 */
@Bean
public DirectExchange defaultExchange() {
/**
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
*/
return new DirectExchange(RabbitConstants.EXCHANGE);
}
@Bean
public Queue fooQueue() {
return new Queue(RabbitConstants.QUEUE);
}
@Bean
public Binding binding() {
/** 将队列绑定到交换机 */
//将队列 fooQueue() 使用 routingkey 绑定到 交换机 bootExchange
return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(RabbitConstants.ROUTINGKEY);
}
@RabbitHandler
//@Payload有效载荷
public void process(@Payload String foo) {
System.out.println("监听器-------->: " + foo);
}
}
测试回调:
package com.example.test.rabbitmq.send;
import org.springframework.stereotype.Component;
import com.example.test.common.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
@Component
@Slf4j
public class DemoSender implements ConfirmCallback , ReturnCallback{
private RabbitTemplate rabbitTemplate;
@Autowired //也可以放在任意方法上表示,自动执行当前方法
public DemoSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
this.rabbitTemplate.setReceiveTimeout(1000000);
this.rabbitTemplate.setReplyTimeout(10000000);
}
public Object sendX(String msg) {
Object obj = this.rabbitTemplate.convertSendAndReceive(RabbitConstants.EXCHANGE, RabbitConstants.ROUTINGKEY, msg);
return obj;
}
public void send(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend(RabbitConstants.EXCHANGE, RabbitConstants.ROUTINGKEY, msg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
{
System.out.println("correlationData------->" + correlationData +";;;cause------>" + cause);
if (ack) {
System.out.println("消息消费成功");
} else {
System.out.println("消息消费失败:" + cause+"\n重新发送");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
{
System.out.println("准备返回信息------->" + message.toString() + ";" + replyCode + ";" + replyText);
}
}
主要配置类
package com.example.test.rabbitmq;
import javax.annotation.Resource;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import com.example.test.common.RabbitConstants;
@Configuration
public class RabbitMqConfig {
@Bean
public Queue Queue1() {
return new Queue("lyhTest1");
}
@Resource
private RabbitConstants rabbitConstants;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitConstants.getHost());
connectionFactory.setUsername(rabbitConstants.getUsername());
//connectionFactory.setVirtualHost(rabbitConstants.getVirtualHost());默认是“/”
connectionFactory.setPassword(rabbitConstants.getPassword());
// * 如果要进行消息回调,则这里必须要设置为true
connectionFactory.setPublisherConfirms(rabbitConstants.isPublisherConfirms());
connectionFactory.setPublisherReturns(rabbitConstants.isPublisherConfirms());
connectionFactory.setConnectionTimeout(10000000);
return connectionFactory;
}
/**
* 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
}
两个类接收消息
package com.example.test.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "lyhTest1")
public class Receiver1 {
@RabbitHandler
public void receiver(String msg){
System.out.println("Test1 receiver1:"+msg);
}
}
@Component
@RabbitListener(queues = "lyhTest1")
public class Receiver2 {
@RabbitHandler
public void receiver(String msg){
System.out.println("Test1 receiver2:"+msg);
}
}
这里就全部完成,可以启动测试一下,打开MQ的管理界面,就可以看到队列信息