ActiveMQ(三十二)--Message高级特性2
一、Blob Messages
测试:
该关闭的配置都先关掉
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import javax.jms.*;
import java.io.File;
public class BlogMsgSend {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://192.168.25.128:61616?jms.blobTransferPolicy.uploadUrl=http://192.168.25.128:8161/fileserver/");
// "tcp://192.168.25.128:61616?jms.blobTransferPolicy.defaultUploadUrl=http://192.168.25.128:8161/fileserver/");
Connection connection = connectionFactory.createConnection();
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
BlobMessage blobMsg = session.createBlobMessage(new File("pom.xml"));
producer.send(blobMsg);
session.commit();
session.close();
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
import javax.jms.*;
import java.io.InputStream;
public class BlogMsgReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (msg instanceof BlobMessage) {
BlobMessage message = (BlobMessage) msg;
try {
InputStream in = message.getInputStream();
byte[] buf = new byte[in.available()];
in.read(buf);
in.close();
System.out.println("content==" + new String(buf));
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
先运行生产者,后运行消费者。
二、Message Transformation
测试:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.MessageTransformer;
import javax.jms.*;
public class QueueSender {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
for (int i = 0; i < 3; i++) {
TextMessage message = session.createTextMessage("message---" + i);
producer.setTransformer(new MessageTransformer() {
@Override
public Message producerTransform(Session session, MessageProducer producer, Message msg) throws JMSException {
//把TextMessage转成MapMessage
MapMessage mapMessage = session.createMapMessage();
String value = ((TextMessage) msg).getText();
mapMessage.setString(value, "my map message AAA==" + value);
mapMessage.setStringProperty("extra", "okok");
return mapMessage;
}
@Override
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
return null;
}
});
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueReceiver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (i < 3) {
MapMessage message = (MapMessage) consumer.receive();
System.out.println("收到的消息:" + message.getString("message---" + i) +
", property==" + message.getStringProperty("extra"));
i++;
session.commit();
}
session.close();
connection.close();
}
}