weblogic中使用jms发送和接受消息
一. 开篇语
今天很坑爹啊, 为了搞一下JMS这个玩意, 整了一天了, 光是weblogic的环境就换了两, 最后总算是搞定了, 写下这篇日志, 记录下学习的心得.
二. JMS的思想
所谓的JMS其实就是异步通信, 我们可以打个简单的比方: 现在的手机基本上普及了, 手机它有两个基本功能, 一个是打电话, 一个是发短信.
打电话是同步的, 必须要保证双方都开机才能进行通话; 而发短信则是异步的, 接收方不需要保持开机状态;
SUN公司给我们提供了一组标准被Java API用于企业级的消息处理, 通过JMS可以在Java程序之间发送和接受消息以达到交换数据的目的,
异步通信实现了程序之间的松耦合的关系.
三. 名词解释
1. 连接工厂(ConnectionFactory): 用来创建消息服务器的connection对象.
2. 连接(Connection): 代表一个与JMS提供者的活动连接.
3. 目的(Destination): 标识消息的发送和接收方式, 分为队列(Queue)和主题(Topic)两种.
4. 会话(Session): 接收和发送消息的会话线程.
5. 为了实现JMS独立于不同供应商MS的专有技术, weblogic JMS采用了受管对象(administratored object)的机制. 受管对象就是由消息服务器通过管理界面创建, 程序通过JNDI接口取得这些对象.weblogic 中的两种受管对象: connection factory, distination.
四. 消息类型
1. StreamMessage: 消息由串行化的Java对象组成, 必须按照设置时的顺序读取对象.
2. MapMessage: 消息由key/value对组成, 其中名称为string类型, 值为Java数据类型. 可以使用列举顺序读取该消息的值, 也可以通过名称无序地获取值。
3. TextMessage: 消息的主体为字符串, 这是最常用的消息类型.
4. ObjectMessage: 消息的主体为串行化的Java对象, 可以是自己定义的串行化的Java对象.
5. BytesMessage: 消息的主体是二进制数据.
五. 环境准备
1. myeclipse环境
① jre1.4
② j2ee 1.4 libraries
③ 导入weblogic.jar
2. weblogic环境
① 安装weblogic8.1并集成到myeclipse
http://blog.****.net/zdp072/article/details/26831739
② 配置weblogic服务器
1)、新建jms连接工厂,工厂名称为“myJMSConnectionFactory”, JNDI name为"myJMSConnectionFactoryJNDIName" .
2)、定义后备存储, 并填写存储目录.
3)、新建jms服务器,服务器名称为:“myJMSServer”.
4)、在“myJMSServer”服务下新建目标为“myJMSQueue”队列, JNDI name为"myJMSQueueJNDIName".
5)、在“myJMSServer”服务下新建目标为“myJMSTopic”主题, JNDI name为"myJMSTopicJNDIName".
六. 代码实现
1. PTP(point to point) - 点对点模式.
生产者发送一条消息到消息服务器, 消息服务器发送给一个消费者, 这条消息不能再发送给其他消费者, 相当于队列, 先到先得.
(一个消息生产者对应一个消息消费者)
①. 实体类User
/** * 实体类 * @author zhangjim */ public class User implements Serializable { private static final long serialVersionUID = 1L; private String name; private int age; public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
②. 生产者QueueMsgSender
public class QueueMsgSender { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JNDI provider url. public final static String PROVIDER_URL = "t3://localhost:7001/"; // Defines the JMS connection factory for the queue. public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName"; // Defines the queue, use the queue JNDI name public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName"; private QueueConnectionFactory qconFactory; private QueueConnection queueConnection; private QueueSession queueSession; private QueueSender queueSender; private Queue queue; private TextMessage textMessage; private StreamMessage streamMessage; private BytesMessage bytesMessage; private MapMessage mapMessage; private ObjectMessage objectMessage; /** * get the context object. * * @return context object * @throws NamingException if operation cannot be performed */ private static InitialContext getInitialContext() throws NamingException { Hashtable table = new Hashtable(); table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); table.put(Context.PROVIDER_URL, PROVIDER_URL); InitialContext context = new InitialContext(table); return context; } /** * Creates all the necessary objects for sending messages to a JMS queue. * * @param ctx JNDI initial context * @param queueName name of queue * @exception NamingException if operation cannot be performed * @exception JMSException if JMS fails to initialize due to internal error */ public void init(Context ctx, String queueName) throws NamingException, JMSException { qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME); queueConnection = qconFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = (Queue) ctx.lookup(queueName); queueSender = queueSession.createSender(queue); textMessage = queueSession.createTextMessage(); streamMessage = queueSession.createStreamMessage(); bytesMessage = queueSession.createBytesMessage(); mapMessage = queueSession.createMapMessage(); objectMessage = queueSession.createObjectMessage(); queueConnection.start(); } /** * Sends a message to a JMS queue. * * @param message message to be sent * @exception JMSException if JMS fails to send message due to internal error */ public void send(String message) throws JMSException { // type1: set TextMessage textMessage.setText(message); // type2: set StreamMessage streamMessage.writeString(message); streamMessage.writeInt(20); // type3: set BytesMessage byte[] block = message.getBytes(); bytesMessage.writeBytes(block); // type4: set MapMessage mapMessage.setString("name", message); // type5: set ObjectMessage User user = new User(); user.setName(message); user.setAge(30); objectMessage.setObject(user); queueSender.send(objectMessage); } /** * read the msg from the console, then send it. * * @param msgSender * @throws IOException if IO fails to send message due to internal error * @throws JMSException if JMS fails to send message due to internal error */ private static void readAndSend(QueueMsgSender msgSender) throws IOException, JMSException { BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Enter message(input quit to quit):"); String line = null; boolean quit = false; do { line = msgStream.readLine(); if (line != null && line.trim().length() != 0) { msgSender.send(line); System.out.println("JMS Message Sent: " + line + "\n"); quit = line.equalsIgnoreCase("quit"); } } while (!quit); } /** * release resources. * * @exception JMSException if JMS fails to close objects due to internal error */ public void close() throws JMSException { queueSender.close(); queueSession.close(); queueConnection.close(); } /** * test client. * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { InitialContext ctx = getInitialContext(); QueueMsgSender sender = new QueueMsgSender(); sender.init(ctx, QUEUE_JNDI_NAME); readAndSend(sender); sender.close(); } }
③. 消费者QueueMsgReceiver
public class QueueMsgReceiver implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JNDI provider url. public final static String PROVIDER_URL = "t3://localhost:7001"; // Defines the JMS connection factory for the queue. public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName"; // Defines the queue, use the queue JNDI name public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName"; private QueueConnectionFactory qconFactory; private QueueConnection queueConnection; private QueueSession queueSession; private QueueReceiver queueReceiver; private Queue queue; private boolean quit = false; /** * get the context object. * * @return context object * @throws NamingException if operation cannot be performed */ private static InitialContext getInitialContext() throws NamingException { Hashtable table = new Hashtable(); table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); table.put(Context.PROVIDER_URL, PROVIDER_URL); InitialContext context = new InitialContext(table); return context; } /** * Creates all the necessary objects for receiving messages from a JMS queue. * * @param ctx JNDI initial context * @param queueName name of queue * @exception NamingException if operation cannot be performed * @exception JMSException if JMS fails to initialize due to internal error */ public void init(Context ctx, String queueName) throws NamingException, JMSException { qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME); queueConnection = qconFactory.createQueueConnection(); queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); queue = (Queue) ctx.lookup(queueName); queueReceiver = queueSession.createReceiver(queue); queueReceiver.setMessageListener(this); // second thread: message reveive thread. queueConnection.start(); } /** * implement from MessageListener. * when a message arrived, it will be invoked. * * @param message message */ public void onMessage(Message message) { try { String msgStr = ""; int age = 0; if (message instanceof TextMessage) { msgStr = ((TextMessage) message).getText(); } else if (message instanceof StreamMessage) { msgStr = ((StreamMessage) message).readString(); age = ((StreamMessage) message).readInt(); } else if (message instanceof BytesMessage) { byte[] block = new byte[1024]; ((BytesMessage) message).readBytes(block); msgStr = String.valueOf(block); } else if (message instanceof MapMessage) { msgStr = ((MapMessage) message).getString("name"); } else if (message instanceof ObjectMessage) { User user = (User) ((ObjectMessage) message).getObject(); msgStr = user.getName(); age = user.getAge(); } System.out.println("Message Received: " + msgStr + ", " + age); if (msgStr.equalsIgnoreCase("quit")) { synchronized (this) { quit = true; this.notifyAll(); // Notify main thread to quit } } } catch (JMSException e) { throw new RuntimeException("error happens", e); } } /** * release resources. * * @exception JMSException if JMS fails to close objects due to internal error */ public void close() throws JMSException { queueReceiver.close(); queueSession.close(); queueConnection.close(); } /** * test client. * first thread(main thread) * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { InitialContext ctx = getInitialContext(); QueueMsgReceiver receiver = new QueueMsgReceiver(); receiver.init(ctx, QUEUE_JNDI_NAME); // Wait until a "quit" message has been received. synchronized (receiver) { while (!receiver.quit) { try { receiver.wait(); } catch (InterruptedException e) { throw new RuntimeException("error happens", e); } } } receiver.close(); } }
2. publisher and subscriber - 发布订阅模式.
生产者发送一条消息到消息服务器, 消息服务器发送给正在监听的所有消费者, 类似广播.
(一个消息生产者对应多个消息消费者)
①. 生产者TopicMsgPublisher
public class TopicMsgPublisher { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JNDI provider url. public final static String PROVIDER_URL = "t3://localhost:7001/"; // Defines the JMS connection factory for the topic. public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName"; // Defines the topic, use the topic JNDI name public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName"; private TopicConnectionFactory tconFactory; private TopicConnection topicConnection; private TopicSession topicSession; private TopicPublisher topicPublisher; private Topic topic; private TextMessage textMessage; private StreamMessage streamMessage; private BytesMessage bytesMessage; private MapMessage mapMessage; private ObjectMessage objectMessage; /** * get the context object. * * @return context object * @throws NamingException if operation cannot be performed */ private static InitialContext getInitialContext() throws NamingException { Hashtable table = new Hashtable(); table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); table.put(Context.PROVIDER_URL, PROVIDER_URL); InitialContext context = new InitialContext(table); return context; } /** * Creates all the necessary objects for sending messages to a JMS topic. * * @param ctx JNDI initial context * @param topicName name of topic * @exception NamingException if operation cannot be performed * @exception JMSException if JMS fails to initialize due to internal error */ public void init(Context ctx, String topicName) throws NamingException, JMSException { tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME); topicConnection = tconFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) ctx.lookup(topicName); topicPublisher = topicSession.createPublisher(topic); textMessage = topicSession.createTextMessage(); streamMessage = topicSession.createStreamMessage(); bytesMessage = topicSession.createBytesMessage(); mapMessage = topicSession.createMapMessage(); objectMessage = topicSession.createObjectMessage(); topicConnection.start(); } /** * Sends a message to a JMS topic. * * @param message message to be sent * @exception JMSException if JMS fails to send message due to internal error */ public void send(String message) throws JMSException { // type1: set TextMessage textMessage.setText(message); // type2: set StreamMessage streamMessage.writeString(message); streamMessage.writeInt(20); // type3: set BytesMessage byte[] block = message.getBytes(); bytesMessage.writeBytes(block); // type4: set MapMessage mapMessage.setString("name", message); // type5: set ObjectMessage User user = new User(); user.setName(message); user.setAge(30); objectMessage.setObject(user); topicPublisher.publish(objectMessage); } /** * read the msg from the console, then send it. * * @param msgPublisher * @throws IOException if IO fails to send message due to internal error * @throws JMSException if JMS fails to send message due to internal error */ private static void readAndSend(TopicMsgPublisher msgPublisher) throws IOException, JMSException { BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Enter message(input quit to quit):"); String line = null; boolean quit = false; do { line = msgStream.readLine(); if (line != null && line.trim().length() != 0) { msgPublisher.send(line); System.out.println("JMS Message Sent: " + line + "\n"); quit = line.equalsIgnoreCase("quit"); } } while (!quit); } /** * release resources. * * @exception JMSException if JMS fails to close objects due to internal error */ public void close() throws JMSException { topicPublisher.close(); topicSession.close(); topicConnection.close(); } /** * test client. * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { InitialContext ctx = getInitialContext(); TopicMsgPublisher publisher = new TopicMsgPublisher(); publisher.init(ctx, TOPIC_JNDI_NAME); readAndSend(publisher); publisher.close(); } }
②. 消费者TopicMsgSubscriber
public class TopicMsgSubscriber implements MessageListener { // Defines the JNDI context factory. public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory"; // Defines the JNDI provider url. public final static String PROVIDER_URL = "t3://localhost:7001"; // Defines the JMS connection factory for the topic. public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName"; // Defines the topic, use the topic JNDI name public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName"; private TopicConnectionFactory tconFactory; private TopicConnection topicConnection; private TopicSession topicSession; private TopicSubscriber topicSubscriber; private Topic topic; private boolean quit = false; /** * get the context object. * * @return context object * @throws NamingException if operation cannot be performed */ private static InitialContext getInitialContext() throws NamingException { Hashtable table = new Hashtable(); table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); table.put(Context.PROVIDER_URL, PROVIDER_URL); InitialContext context = new InitialContext(table); return context; } /** * Creates all the necessary objects for receiving messages from a JMS topic. * * @param ctx JNDI initial context * @param topicName name of topic * @exception NamingException if operation cannot be performed * @exception JMSException if JMS fails to initialize due to internal error */ public void init(Context ctx, String topicName) throws NamingException, JMSException { tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME); topicConnection = tconFactory.createTopicConnection(); topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); topic = (Topic) ctx.lookup(topicName); topicSubscriber = topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(this); // second thread: message reveive thread. topicConnection.start(); } /** * implement from MessageListener. * when a message arrived, it will be invoked. * * @param message message */ public void onMessage(Message message) { try { String msgStr = ""; int age = 0; if (message instanceof TextMessage) { msgStr = ((TextMessage) message).getText(); } else if (message instanceof StreamMessage) { msgStr = ((StreamMessage) message).readString(); age = ((StreamMessage) message).readInt(); } else if (message instanceof BytesMessage) { byte[] block = new byte[1024]; ((BytesMessage) message).readBytes(block); msgStr = String.valueOf(block); } else if (message instanceof MapMessage) { msgStr = ((MapMessage) message).getString("name"); } else if (message instanceof ObjectMessage) { User user = (User) ((ObjectMessage) message).getObject(); msgStr = user.getName(); age = user.getAge(); } System.out.println("Message subscribed: " + msgStr + ", " + age); if (msgStr.equalsIgnoreCase("quit")) { synchronized (this) { quit = true; this.notifyAll(); // Notify main thread to quit } } } catch (JMSException e) { throw new RuntimeException("error happens", e); } } /** * release resources. * * @exception JMSException if JMS fails to close objects due to internal error */ public void close() throws JMSException { topicSubscriber.close(); topicSession.close(); topicConnection.close(); } /** * test client. * first thread(main thread) * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { InitialContext ctx = getInitialContext(); TopicMsgSubscriber subscriber = new TopicMsgSubscriber(); subscriber.init(ctx, TOPIC_JNDI_NAME); // Wait until a "quit" message has been subscribed. synchronized (subscriber) { while (!subscriber.quit) { try { subscriber.wait(); } catch (InterruptedException e) { throw new RuntimeException("error happens", e); } } } subscriber.close(); } }
七. 总结
使用JMS可以把不影响用户执行结果又比较耗时的任务异步的扔给JMS服务器端, 而尽快地把屏幕还给用户, 且服务器端能够多线程排队相应高并发的请求, 在Java世界里达到最高境界的解耦. 客户端和服务器端无须直连, 甚至无需知道对方是谁、在哪里、有多少人, 只要对流过的信息作响应就行了, 在企业应用复杂时作用非常明显.
八. 源码下载
下载源码请点击此链接: http://download.****.net/detail/zdp072/7422431