java mq 使用
1、mq地址配置
2、pom.xml mq配置
<activemq-pool.version>5.15.0</activemq-pool.version>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq-pool.version}</version>
</dependency>
3、底层调取mq
package unistctest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.jeecgframework.core.util.PropertiesUtil;
import org.jeecgframework.web.system.pojo.base.TSUser;
import com.thoughtworks.xstream.XStream;
import com.unistc.mq.SendMQ;
import com.unistc.mq.exchangeentity.ExchangeMessage;
import com.unistc.mq.exchangeentity.ExchangeObjectUser;
public class MSProduct {
public static int send(String queueName,String data) {
PropertiesUtil util = new PropertiesUtil("mqinformation.properties");
String mqAddress = util.readProperty("mqaddress");
String mqUsername = util.readProperty("username");
String mqUserPassword = util.readProperty("userpassword");
// 连接工厂
ConnectionFactory factory;
// 连接实例
Connection connection = null;
// 收发的线程实例
Session session;
// 消息发送目标地址
Destination destination;
// 消息创建者
MessageProducer messageProducer;
try {
// factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.10.10.45:61616");
factory = new ActiveMQConnectionFactory(mqUsername, mqUserPassword, mqAddress);
// 获取连接实例
connection = factory.createConnection();
// 启动连接
connection.start();
// 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建队列(返回一个消息目的地)
destination = session.createQueue(queueName);
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 创建TextMessage消息实体
TextMessage message = session.createTextMessage(data);
// System.out.println("message==="+message.getText());
messageProducer.send(message);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
return 1;
}
public static void main(String[] args) {
for(int i =0;i<1;i++) {
TSUser changeUser = new TSUser();
changeUser.setUserName("user.getUserName()"+i);
changeUser.setRealName("user.getRealName()"+i);
changeUser.setId("user.getId()"+i);
changeUser.setMobilePhone("user.getMobilePhone()"+i);
changeUser.setEmail("user.getEmail()"+i);
// changeUser.setState("0"+i);
XStream xStream = new XStream();
//设置别名, 默认会输出全路径
xStream.alias("ChangeUser", ExchangeObjectUser.class);
try {
ExchangeMessage exchangeMessage = SendMQ.assembleExchangeMessage("associationRole", changeUser);
SendMQ.send("sbsminbanQueue", exchangeMessage);
System.out.println("这个消息发送给sbscmisQueue,用户是"+changeUser.getUserName());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
4、调取封装
package com.unistc.mq;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.jeecgframework.web.system.pojo.base.TSUser;
import com.thoughtworks.xstream.XStream;
import com.unistc.mq.exchangeentity.ExchangeHeader;
import com.unistc.mq.exchangeentity.ExchangeMessage;
import com.unistc.mq.exchangeentity.ExchangeObject;
import com.unistc.mq.exchangeentity.ExchangeObjectUser;
import unistctest.MSProduct;
public class SendMQ {
public static ExchangeMessage assembleExchangeMessage(String actionType,TSUser tsUser) {
ExchangeObjectUser changeUser = new ExchangeObjectUser();
changeUser.setUserName(tsUser.getUserName());
changeUser.setRealName(tsUser.getRealName());
changeUser.setId(tsUser.getId());
changeUser.setMobilePhone(tsUser.getMobilePhone());
changeUser.setEmail(tsUser.getEmail());
changeUser.setUserKey(tsUser.getPassword());
Calendar calendar = Calendar.getInstance();
// calendar.add(Calendar.HOUR, 8);
ExchangeMessage exchangeMessage = new ExchangeMessage();
ExchangeHeader exchangeHeader = new ExchangeHeader();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
exchangeHeader.setOperationDate(sdf.format(new Date()));
ExchangeObject exchangeObject = new ExchangeObject();
exchangeObject.setExchangeObjectUser(changeUser);
exchangeObject.setActionType(actionType);
exchangeObject.setObjectName("user");
exchangeMessage.setVersion("1.0");
exchangeMessage.setExchangeHeader(exchangeHeader);
exchangeMessage.setExchangeObject(exchangeObject);
return exchangeMessage;
}
public static int send(String queueName ,String data) throws Exception {
/**
* 1 直接发送 2 订阅发送
*/
return MSProduct.send(queueName, data);
}
public static int send(String queueName ,ExchangeMessage exchangeMessage) throws Exception {
/**
* 1 直接发送 2 订阅发送
*/
XStream xStream = new XStream();
xStream.autodetectAnnotations(true);
System.out.println(xStream.toXML(exchangeMessage));
return MSProduct.send(queueName, xStream.toXML(exchangeMessage));
// return 1;
}
}
5、调取方法
private void cleanUpPassAndSend(TSUser user, TSRole tsRole, String actionType) {
try {
if (tsRole.getRoleCode().equals(ExchangeConstants.SBS_JBXXGL_ROLENAME)) {
ExchangeMessage exchangeMessage = SendMQ.assembleExchangeMessage(actionType, user);
SendMQ.send("sbsjbxxglQueue", exchangeMessage);
// System.out.println("这个消息发送给sbsminbanxiaoQueue,用户是"+user.getUserName());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}