IBMMQ消息队列
环境准备:
JDK1.7
IBM WebSphere MQ 8.0
安装MQ并打开MQ管理器,并创建队列
创建队列管理器,输入队列名称点击完成
创建本地消息队列,输入队列名称点击下一步,选择缺省持久性为“持久”,点击完成
创建服务器连接通道,输入名称点击下一步,在MAC处填写组为mqm权限的用户(计算机用户)
客户端代码处:
1:创建MQServier类
package com.hgsoft.messageQueue;
import java.io.Serializable;
import com.hgsoft.exception.ApplicationException;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;
public class MQServer implements Serializable{
private String hostName;
private Integer port;
private String channel;
private String userID;
private String password;
private int cssid;
private String qManager ;
private MQQueueManager qMgr ;
public void login(String strServerIP, int iServerPort, String strChannel,int iServerCssid,String strQManager,
String strUserName, String strPassword) throws Exception {
userID = strUserName;
password = strPassword;
hostName = strServerIP;
port = iServerPort;
channel = strChannel;
cssid = iServerCssid;
qManager = strQManager;
admLogin();
}
@SuppressWarnings({ "deprecation", "unchecked" })
public void admLogin() throws Exception {
try {
MQEnvironment.hostname=hostName;
MQEnvironment.port=port;
MQEnvironment.channel=channel;
MQEnvironment.userID=userID;
MQEnvironment.password=password;
MQEnvironment.CCSID=cssid;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
qMgr = new MQQueueManager(qManager);
System.out.println("Connecting to queue manager: "+qMgr.getName());
} catch (ApplicationException e) {
throw new ApplicationException("6000",(new StringBuilder("login error:")).append( e.toString()).toString());
}
}
public void logOut(){
try {
if(qMgr!=null)qMgr.disconnect();
} catch (Exception e) {
}
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getChannel() {
return channel;
}
public void setChannel(String channel) {
this.channel = channel;
}
public String getUserID() {
return userID;
}
public void setUserID(String userID) {
this.userID = userID;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getCssid() {
return cssid;
}
public void setCssid(int cssid) {
this.cssid = cssid;
}
public String getqManager() {
return qManager;
}
public void setqManager(String qManager) {
this.qManager = qManager;
}
public MQQueueManager getqMgr() {
return qMgr;
}
public void setqMgr(MQQueueManager qMgr) {
this.qMgr = qMgr;
}
}
2:创建MQSession类
package com.hgsoft.messageQueue;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Date;
import com.hgsoft.exception.ApplicationException;
import com.ibm.mq.MQC;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMsg2;
public class MQSession {
private MQQueueRequest m_pQTo;
private MQQueueResponse m_pQFrom;
private int m_iTimeState=0;
private int m_iTimeout=0;
private int m_iTryTimes=0;
private byte[] messageId;
public int getM_iTimeState() {
return m_iTimeState;
}
public void setM_iTimeState(int m_iTimeState) {
this.m_iTimeState = m_iTimeState;
}
public int getM_iTimeout() {
return m_iTimeout;
}
public void setM_iTimeout(int m_iTimeout) {
this.m_iTimeout = m_iTimeout;
}
public int getM_iTryTimes() {
return m_iTryTimes;
}
public void setM_iTryTimes(int m_iTryTimes) {
this.m_iTryTimes = m_iTryTimes;
}
public MQQueueRequest getM_pQTo() {
return m_pQTo;
}
public void setM_pQTo(MQQueueRequest m_pQTo) {
this.m_pQTo = m_pQTo;
}
public MQQueueResponse getM_pQFrom() {
return m_pQFrom;
}
public void setM_pQFrom(MQQueueResponse m_pQFrom) {
this.m_pQFrom = m_pQFrom;
}
public byte[] getMessageId() {
return messageId;
}
public void setMessageId(byte[] messageId) {
this.messageId = messageId;
}
public void MQCall(MQMessage pmsg, MQMessage qmsg) {
MQSendWithoutLock(pmsg, 2);
MQRecvWithoutLock(qmsg);
}
protected void MQSendWithoutLock(MQMessage msg, int iSessionStatus) {
int iLoops = m_iTryTimes == 0 ? 2147483647 : m_iTryTimes;
int k = 0;
for (k = 0; k < iLoops; k++) {
long s = (new Date()).getTime();
boolean flag = false;
while((new Date().getTime())<(s+(m_iTimeout*1000l))){
try {
MQSendOnceWithoutLock(msg, iSessionStatus);
flag=true;
break;
} catch (MQException e) {
String message = e.getMessage();
String err= message.substring(24,28);
/*System.out.println(err);*/
try {
if ("2358".equals(err) || "2085".equals(err) || "2540".equals(err) || "2058".equals(err) || "2035".equals(err))
throw e;
} catch (MQException e1) {
e1.printStackTrace();
}
continue;
} catch (Exception ee) {
ee.printStackTrace();
}
flag=true;
break;
}
if(flag){
break;
}
}
if (k == iLoops)
throw new ApplicationException("6121", (new StringBuilder("has tried ")).append(k).append("times.").toString());
else
return;
}
protected void MQSendOnceWithoutLock(MQMessage msg, int iSessionStatus) throws MQException{
if (m_pQTo == null || msg == null || iSessionStatus != 4
&& iSessionStatus != 2 && iSessionStatus != 1)
throw new ApplicationException("6120", " MQSendOnceWithoutLock");
m_pQFrom.put(msg.getMsg());
messageId = msg.getMessageId();
System.out.println(Arrays.toString(msg.getMessageId()));
}
protected void MQRecvWithoutLock(MQMessage pMsg) {
int k = 0;
int iLoops = m_iTryTimes == 0 ? 2147483647 : m_iTryTimes;
for (k = 0; k < iLoops; k++) {
long s = (new Date()).getTime();
boolean flag = false;
while((new Date().getTime())<(s+(m_iTimeout*1000l))){
try {
MQRecvOnceWithoutLock(pMsg);
flag=true;
break;
}
catch (MQException e) {
String message = e.getMessage();
String err= message.substring(24,28);
/*System.out.println(err);*/
try {
if (!"2033".equals(err))throw e;
} catch (MQException e1) {
e1.printStackTrace();
}
continue;
}
catch (Exception ee) {
ee.printStackTrace();
}
flag=true;
break;
}
if(flag){
break;
}
}
if (k == iLoops)
throw new ApplicationException("103", (new StringBuilder(
"has tried ")).append(k).append("times.").toString());
else
return;
}
protected int MQRecvOnceWithoutLock(MQMessage pMsg) throws MQException {
if (m_pQFrom == null || pMsg == null)
return -1;
pMsg.setMessageId(messageId);
try {
m_pQTo.getMsg(pMsg.getMsg(),m_iTimeState);
} catch (MQException e) {
throw e;
}
return 0;
}
}
3:创建MQQueueRequest.java
package com.hgsoft.messageQueue;
import com.ibm.mq.MQC;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMsg2;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MQQueueRequest {
private MQQueueManager qMgr ;
private MQQueue resquestMsg;
public MQQueueRequest(MQServer mqserver) {
this.qMgr = mqserver.getqMgr();
}
public void open(String qName){
try {
int openOptions = MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_INPUT_AS_Q_DEF;
resquestMsg = qMgr.accessQueue(qName, openOptions);
} catch (MQException e) {
e.printStackTrace();
}
}
public void getMsg(MQMsg2 msg,int timeOut) throws MQException{
MQGetMessageOptions gmo = new MQGetMessageOptions();
/* mgo.options |= MQC.MQGMO_WAIT;*/
gmo.options = gmo.options + MQC.MQGMO_WAIT;
/*如果队列管理器停顿则失败 */
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.waitInterval = timeOut;
try {
resquestMsg.getMsg2(msg,gmo);
} catch (MQException e) {
throw e;
}
}
public void close(){
try {
resquestMsg.close();
} catch (MQException e) {
e.printStackTrace();
}
}
}
4:创建MQQueueResponse
package com.hgsoft.messageQueue;
import com.ibm.mq.MQC;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMsg2;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
public class MQQueueResponse {
private MQQueueManager qMgr ;
private MQQueue responseMsg;
public MQQueueResponse(MQServer mqserver) {
this.qMgr = mqserver.getqMgr();
}
public void open(String qName){
try {
int openOptions = MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_OUTPUT;
responseMsg = qMgr.accessQueue(qName, openOptions);
} catch (MQException e) {
e.printStackTrace();
}
}
public void put(MQMsg2 msg){
try {
responseMsg.putMsg2(msg);
} catch (MQException e) {
e.printStackTrace();
}
}
public void close(){
try {
responseMsg.close();
} catch (MQException e) {
e.printStackTrace();
}
}
}
5:创建MQMessage
package com.hgsoft.messageQueue;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMsg2;
public class MQMessage {
private MQMsg2 msg;
public MQMessage() {
msg = new MQMsg2();
}
public byte[] getMessageData() {
return msg.getMessageData();
}
public void setMessageData(byte[] messageData) {
try {
msg.setMessageData(messageData);
} catch (MQException e) {
e.printStackTrace();
}
}
public byte[] getMessageId(){
return msg.getMessageId();
}
public void setMessageId(byte[] messageId){
msg.setMessageId(messageId);
}
public MQMsg2 getMsg() {
return msg;
}
public void setMsg(MQMsg2 msg) {
this.msg = msg;
}
}
6:测试类
public static void send(){
String hostName="10.173.239.2";
String channel="LOCALSERVER";
String qManager = "QUEUE_GAO";
int port = 1414;
int CSSID = 1381;
String userID="MUSR_MQADMIN";
String password="Root1234";
MQSession ses = new MQSession();
ses.setM_iTimeout(3); // 秒
ses.setM_iTryTimes(3);
ses.setM_iTimeState(500);
String mResult = "";
String params = "测试MQ"+System.currentTimeMillis();
MQServer server = new MQServer();
MQQueueRequest reqQ = null;
MQQueueResponse ansQ = null;
try {
server.login(hostName, port, channel, CSSID, qManager, userID, password);
reqQ = new MQQueueRequest(server);
ansQ = new MQQueueResponse(server);
ansQ.open("CNN_GAO");
reqQ.open("CNN_GAO2");
ses.setM_pQTo(reqQ);
ses.setM_pQFrom(ansQ);
/*MQMsg2 reqMsg = new MQMsg2();
MQMsg2 resMsg = new MQMsg2();*/
MQMessage reqMsg = new MQMessage();
MQMessage resMsg = new MQMessage();
reqMsg.setMessageData(params.getBytes("utf-8"));
ses.MQCall(reqMsg, resMsg);
mResult = new String(resMsg.getMessageData(), "utf-8");
System.out.println(Arrays.toString(resMsg.getMessageId()));
System.out.println(mResult);
} catch (Exception e) {
e.printStackTrace();
}finally{
if(reqQ!=null)reqQ.close();
if(ansQ!=null)ansQ.close();
if(server!=null)server.logOut();
}
}
服务端代码:
MQSendService.java:
private String hostName;
private String channel;
private String qManager;
private String qName;
private int port;
private int cssid;
private String userId;
private String password;
public void send(byte[] str,byte[] messageId){
MQEnvironment.hostname=hostName;
MQEnvironment.port=port;
MQEnvironment.channel=channel;
MQEnvironment.userID=userId;
MQEnvironment.password=password;
MQEnvironment.CCSID=cssid;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);
MQQueueManager qMgr = null;
MQQueue responseMsg = null;
try {
qMgr = new MQQueueManager(qManager);
int openOptions = MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_OUTPUT;
responseMsg = qMgr.accessQueue(qName, openOptions);
MQMsg2 msg = new MQMsg2();
msg.setMessageId(messageId);
msg.setMessageData(str);
responseMsg.putMsg2(msg);
} catch (MQException e) {
}catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(responseMsg!=null)responseMsg.close();
if(qMgr!=null)qMgr.disconnect();
} catch (MQException e) {
e.printStackTrace();
}
}
}
消息转换MessageConverter
public class MessageConverter implements org.springframework.jms.support.converter.MessageConverter {
/**
* 发送消息的转换
*/
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
return null;
}
/**
* 接收消息的转换
*/
public Object fromMessage(Message message) throws JMSException,MessageConversionException {
// TODO
CallRecord callRecord = new CallRecord();
callRecord.setEndTime(new Date());
callRecord.setRequestContent(message.toString());
System.out.println(callRecord);
return callRecord;
}
}
JMS设置 ProducerImpl
public class ProducerImpl {
private JmsTemplate jmsTemplate;
/**
*/
/*@Transactional*/
public void send(Message message) {
try {
jmsTemplate.convertAndSend(message);
} catch (Exception e) {
e.printStackTrace();
}
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
消息监听器:MQMessageListener
public class MQMessageListener implements MessageListener {
@Resource
private MessageConverter messageConverter;
/*@Resource
ProducerImpl producerImpl;*/
@Resource
UserCredentialsConnectionFactoryAdapter connectionFactory;
@Resource
MQSendService mqSendService;
/**
* 接收消息
*
* @param callRecord
*/
public void receviedMessage(CallRecord callRecord) {
System.out.println(callRecord.getRequestContent());
}
public static void main(String[] args) {
String msgId ="ID:414d512051554555455f47414f202020fb6d3557206d7e02";
System.out.println(msgId.indexOf("ID:")+"ID:".length());
msgId = msgId.substring(msgId.indexOf("ID:")+"ID:".length());
System.out.println(msgId);
}
@Override
public void onMessage(Message msg) {
try {
String responseXml="";
if(msg instanceof BytesMessage){
BytesMessage bm = (BytesMessage) msg;
byte[] bys = new byte[(int) bm.getBodyLength()];
bm.readBytes(bys);
try {
responseXml = new String(bys,"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("responseXml:"+responseXml);
String msgId = msg.getJMSMessageID();
msgId = msgId.substring(msgId.indexOf("ID:")+"ID:".length());
System.out.println(msgId);
byte[] messageId = StringUtil.strToByte(msgId);
try {
mqSendService.send(("测试测试测试"+System.currentTimeMillis()).getBytes("utf-8"), messageId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (JMSException e) {
}finally{
}
}
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
}