java mq 使用

1、mq地址配置

java mq 使用

2、pom.xml mq配置

<activemq-pool.version>5.15.0</activemq-pool.version>

<!-- activemq -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>${activemq-pool.version}</version>
        </dependency>

3、底层调取mq

package unistctest;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.jeecgframework.core.util.PropertiesUtil;
import org.jeecgframework.web.system.pojo.base.TSUser;

import com.thoughtworks.xstream.XStream;
import com.unistc.mq.SendMQ;
import com.unistc.mq.exchangeentity.ExchangeMessage;
import com.unistc.mq.exchangeentity.ExchangeObjectUser;

public class MSProduct {
    public static int send(String queueName,String data) {
        PropertiesUtil util = new PropertiesUtil("mqinformation.properties");
        String mqAddress = util.readProperty("mqaddress");
        String mqUsername = util.readProperty("username");
        String mqUserPassword = util.readProperty("userpassword");
        // 连接工厂
                ConnectionFactory factory;
                // 连接实例
                Connection connection = null;
                // 收发的线程实例
                Session session;
                // 消息发送目标地址
                Destination destination;
                // 消息创建者
                MessageProducer messageProducer;
                try {
//                    factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.10.10.45:61616");
                    factory = new ActiveMQConnectionFactory(mqUsername, mqUserPassword, mqAddress);
                    // 获取连接实例
                    connection = factory.createConnection();
                    // 启动连接
                    connection.start();
                    // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息)
                    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                    // 创建队列(返回一个消息目的地)
                    destination = session.createQueue(queueName);
                    // 创建消息生产者
                    messageProducer = session.createProducer(destination);
                    // 创建TextMessage消息实体
                    TextMessage message = session.createTextMessage(data);
//                    System.out.println("message==="+message.getText());
                    messageProducer.send(message);
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                } finally {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return 1;
    }
    
    
    
    public static void main(String[] args) {
        for(int i =0;i<1;i++) {
            TSUser changeUser = new TSUser();
            changeUser.setUserName("user.getUserName()"+i);
            changeUser.setRealName("user.getRealName()"+i);
            changeUser.setId("user.getId()"+i);
            changeUser.setMobilePhone("user.getMobilePhone()"+i);
            changeUser.setEmail("user.getEmail()"+i);
//            changeUser.setState("0"+i);
            XStream xStream = new XStream();
            //设置别名, 默认会输出全路径
            xStream.alias("ChangeUser", ExchangeObjectUser.class);
            
            try {
    
                    ExchangeMessage exchangeMessage = SendMQ.assembleExchangeMessage("associationRole", changeUser);
                    SendMQ.send("sbsminbanQueue", exchangeMessage);
                    System.out.println("这个消息发送给sbscmisQueue,用户是"+changeUser.getUserName());
                
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
 

4、调取封装

package com.unistc.mq;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.jeecgframework.web.system.pojo.base.TSUser;

import com.thoughtworks.xstream.XStream;
import com.unistc.mq.exchangeentity.ExchangeHeader;
import com.unistc.mq.exchangeentity.ExchangeMessage;
import com.unistc.mq.exchangeentity.ExchangeObject;
import com.unistc.mq.exchangeentity.ExchangeObjectUser;

import unistctest.MSProduct;

public class SendMQ {
    public static ExchangeMessage assembleExchangeMessage(String actionType,TSUser tsUser) {
        
        ExchangeObjectUser changeUser = new ExchangeObjectUser();
        changeUser.setUserName(tsUser.getUserName());
        changeUser.setRealName(tsUser.getRealName());
        changeUser.setId(tsUser.getId());
        changeUser.setMobilePhone(tsUser.getMobilePhone());
        changeUser.setEmail(tsUser.getEmail());
        changeUser.setUserKey(tsUser.getPassword());
        
        Calendar calendar = Calendar.getInstance();
//        calendar.add(Calendar.HOUR, 8);
        ExchangeMessage exchangeMessage = new ExchangeMessage();
        ExchangeHeader exchangeHeader = new ExchangeHeader();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        exchangeHeader.setOperationDate(sdf.format(new Date()));
       
        
        ExchangeObject exchangeObject = new ExchangeObject();
        exchangeObject.setExchangeObjectUser(changeUser);
        exchangeObject.setActionType(actionType);
        exchangeObject.setObjectName("user");
        
        exchangeMessage.setVersion("1.0");
        exchangeMessage.setExchangeHeader(exchangeHeader);
        exchangeMessage.setExchangeObject(exchangeObject);
        return exchangeMessage;
    }
    
    
    public static int send(String queueName ,String data) throws Exception {

        /** 
         * 1 直接发送 2 订阅发送 
         */
        return MSProduct.send(queueName, data);
    }
    public static int send(String queueName ,ExchangeMessage exchangeMessage) throws Exception {

        /** 
         * 1 直接发送 2 订阅发送 
         */
        XStream xStream = new XStream();
        xStream.autodetectAnnotations(true); 
        System.out.println(xStream.toXML(exchangeMessage));
        return MSProduct.send(queueName, xStream.toXML(exchangeMessage));
//        return 1;
    }
}
 

5、调取方法

private void cleanUpPassAndSend(TSUser user, TSRole tsRole, String actionType) {
        try {
            if (tsRole.getRoleCode().equals(ExchangeConstants.SBS_JBXXGL_ROLENAME)) {
                ExchangeMessage exchangeMessage = SendMQ.assembleExchangeMessage(actionType, user);
                SendMQ.send("sbsjbxxglQueue", exchangeMessage);
//                        System.out.println("这个消息发送给sbsminbanxiaoQueue,用户是"+user.getUserName());
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }