6.RabbitMQ Topics
# 代表多个字符,例:a.# a.abc123 a.2 a.22 等以a.开头的字符串均可匹配
* 代表一个单词,例: a.* a.abc a.1 a.a1均可匹配
package com.study.soufang.rabbit.a001.topic;
public class ConstantOfTopic {
public static final String QUEUE_NAME_A = "logs_topic_queue_a";
public static final String QUEUE_NAME_B = "logs_topic_queue_b";
public static final String QUEUE_NAME_C = "logs_topic_queue_c";
public static final String EXCHANGE_NAME="logs_topics";
public static final String EXCHANGE_TYPE_FANOUT="fanout";
public static final String EXCHANGE_TYPE_ROUTING="direct";
public static final String EXCHANGE_TYPE_TOPICS="topic";
}
package com.study.soufang.rabbit.a001.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.study.soufang.rabbit.a001.RabbitChannelUtil;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
try {
String message = new String(body, "UTF-8");
System.out.println(consumerTag+"--"+Thread.currentThread().getName()+" [x] Received '" + message + "'");
} finally {
/**
* deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
*/
getChannel().basicAck(envelope.getDeliveryTag(), false);
/*try {
RabbitChannelUtil.closeChannel(getChannel());
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
}
}
}
package com.study.soufang.rabbit.a001.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.study.soufang.rabbit.a001.RabbitChannelUtil;
public class Recv {
public static void doRecv(String queue,String routingKey) throws IOException, TimeoutException{
Channel channel = null;
try {
channel = RabbitChannelUtil.createChannel();
//声明交换机
channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_TOPICS);
//声明队列
channel.queueDeclare(queue, false, false, false, null);
//绑定队列到交换机
channel.queueBind(queue, ConstantOfTopic.EXCHANGE_NAME, routingKey);
Consumer consumer = new MyConsumer(channel);
boolean autoAck = false;
channel.basicConsume(queue, autoAck, consumer);
channel.basicQos(1);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, TimeoutException {
new Thread(new Runnable() {
@Override
public void run() {
try {
doRecv(ConstantOfTopic.QUEUE_NAME_A,"error.*.*");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
doRecv(ConstantOfTopic.QUEUE_NAME_B,"error.#");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
doRecv(ConstantOfTopic.QUEUE_NAME_C,"error.*");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
}
package com.study.soufang.rabbit.a001.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.study.soufang.rabbit.a001.RabbitChannelUtil;
import com.study.soufang.rabbit.a001.PublishAndSubscribe.ConstantOfPublish;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = null;
try {
channel = RabbitChannelUtil.createChannel();
//声明交换机
channel.exchangeDeclare(ConstantOfTopic.EXCHANGE_NAME, ConstantOfTopic.EXCHANGE_TYPE_TOPICS);
//发送消息,注意:需要消费者先启动并声明队列及绑定到此交换机,不然消息会丢失
for(int i=0;i<100;i++){
String message = "log---"+i;
String routingKey = null;
if(i%9==0){
routingKey = "error.a";
}if(i%7==0){
routingKey = "error.b.c";
}else{
routingKey = "error.abc";
}
channel.basicPublish(ConstantOfTopic.EXCHANGE_NAME, routingKey, null, message.getBytes());;
Thread.sleep(1);
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}finally{
RabbitChannelUtil.closeChannel(channel);
}
}
}
package com.study.soufang.rabbit.a001;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitChannelUtil {
private final static String RABBIT_HOST= "192.168.10.22";
private final static int RABBIT_PORT=5672;
public static Channel createChannel() throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBIT_HOST);
factory.setPort(RABBIT_PORT);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("helloworld");
Connection connection = null;
Channel channel = null;
connection = factory.newConnection();
channel = connection.createChannel();
return channel;
}
/**
* 关闭连接后不会再监听
* @param channel
* @throws IOException
* @throws TimeoutException
*/
public static void closeChannel(Channel channel) throws IOException, TimeoutException{
Connection connection = channel.getConnection();
if(null != channel){
channel.close();
}
if(null != connection){
connection.close();
}
}
}