请教!rabbitMQ怎么确认同一个队列所有消费者收到消息?
求大神指导~
查看了很多能查到的文档都没有找到明确说明的,rabbit的发布订阅和ActiveMq 的发布订阅有所区别,我之前一直用 ActiveMq 的 后来公司要求改为rabbitMQ,请问同一个队列如果有多个消费者,那么如何确保所有消费者都恩那个收到同一条消息?而不是 消费者1消费掉 如图:
我写的消费者代码如下:
public
void
receiveMessage(String topicName)
{
try
{
Consumer consumer =
new
DefaultConsumer(channel)
{
public
void
handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,
byte
[] body)
throws
IOException {
//delivery.getEnvelope().getDeliveryTag()消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(envelope.getDeliveryTag(),
true
);
HashMap<Integer, LinkedList<Long>> message = (HashMap<Integer, LinkedList<Long>>) toObject(body);
for
(Integer str : message.keySet())
{
System.out.println(
" [x] Received '"
+ str);
}
}
};
//指定接收者,false-不自动应答
channel.basicConsume(topicName,
false
, consumer);
}
catch
(IOException e) {
e.printStackTrace();
}
}
private
Object toObject (
byte
[] bytes) {
Object obj =
null
;
try
{
ByteArrayInputStream bis =
new
ByteArrayInputStream (bytes);
ObjectInputStream ois =
new
ObjectInputStream (bis);
obj = ois.readObject();
ois.close();
bis.close();
}
catch
(IOException ex) {
ex.printStackTrace();
}
catch
(ClassNotFoundException ex) {
ex.printStackTrace();
}
return
obj;
}
现在这个问题解决了吗,我也遇到这个问题
只要是topic就可以,但要求消费时刻所有消费者必须都在线,离线的消费者收不到
可以采用topic模式,自已本地可以搭建rabbitmq,mq启动后,进入mq管理页面,自己可以建exchange,队列,通过routingkey绑定二者关系。可以在下面测试结果。
可以了解下RabbitMQ的ACK机制
难道不是用fanout模式吗?
我认为 这是对于消息队列的基本的一个 概念的错误理解
不同的消费者不应该等待同一个队列
首先 要理解 rabbitMQ 有一个 交换器 和队列 两层中间件
交换器是对应消息发送方的 一个类型的消息应该发送给特定的一个交换器
而队列是对应消费者的 一个类型的消费者应该对应于一个特定队列
至于消息同时给到多少消费者,给哪几个等分配逻辑 是由加换机 和队列直接绑定关系来实现的
而且和网上大多数教程不同的是 我认为这个绑定关系不应该有代码写死,而是配置的,这样当情况发生改变后 不用改代码 重启系统就可以修改
这样上面的问题 就是一个 交换机绑定多个队列就好了
另外 多个消费者从同一队列中获取消息,这是派其他用处的.类似负载均衡,比如消费者处理消息的速度要比信息发出慢的多,这样就可以启动多个同样的服务同时从一个队列中获取消息分布式处理.
exchange有两种方式一种direct,一种topic.topic就能满足你的需求,百度一搜一大把.
楼主不能以ActiveMq 的Topic来理解 RabbitMQ 。
我的理解是:RabbitMQ只有Queue,如果多个消费者绑定同一个queue,那么一条消息,只能被其中一个消费者取走。
RabbitMQ没有直接可用的传统方式的Topic,只不过是以exchange+queue的方式,变通地实现了Topic。
这才是正解啊!!
确实,应该使用主题模式,让每个消费者都得到消息
通过 fanout + 匿名queue 可以实现多服务器同时接收同一条MQ,因为匿名queue 在每台服务器都是随机不一样的,所以只要消费端 把该匿名 queue 绑定到同一个 exchange 就可以了。
fanout也不行,实测过了,注意题主所说的是同一个队列,我测试得出来的结论是只能设置两个不同的队列绑定到同一个交换器
是想要多个消费者同时获取到消息?还是说都有可能获取到?后者很好办,楼上说的fanout,tpoic模式都可以做到,前者应该是办不到的,一个队列可以有多个消费者实例,只有一个消费者实例会消费消息
除非多个队列使用相同的bindkey绑定到同一个exchange,然后这些队列下的消费者才能同时获取同一个消息。(具体请看topic路由模式)
A点发到 B点 消息。
A点 发到MQserver 能收到通知,B点接收是 没有通知功能,只能是lrc让 B收到后再发个消息回A来确认消息在B点是否接收到!
这个问题也困扰我好久,我的做法是:
生产者声明一个FANOUT交换机
消费者声明一个系统产生的随机队列绑定到这个交换机上,然后往交换机发消息,只要绑定到这个交换机上都能收到消息,关键代码如下:
生产者:
@Slf4j
@Component
public
class
RabbitHelper {
@Autowired
private
RabbitTemplate rabbitTemplate;
/**
* 广播消息
* @param exchange
* @param message
*/
public
void
broadcast(String exchange,String message){
rabbitTemplate.convertAndSend(exchange,
""
,message);
}
}
消费者:
@Slf4j
@Component
public
class
DownCmdConsumer {
@RabbitListener
(bindings =
@QueueBinding
(
value =
@Queue
(),
//注意这里不要定义队列名称,系统会随机产生
exchange =
@Exchange
(value =
"填写交换机名称"
,type = ExchangeTypes.FANOUT)
)
)
public
void
process(String payload) {
log.info(
"receive:{}"
,payload)
}
}
我也遇到这个问题了,ActiveMQ的订阅操作起来很简单, RabbitMQ确实需要像楼上那样搞!