WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。
JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-.jar 中,org.apache.axis2.transport.jms.JMSListener 和org.apache.axis2.transport.jms.JMSSender 类分别充当传输接收器和发件人。
JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他实现,例如 Apache Qpid 和 Tibco。
通过导航到
文件夹并执行 ./activemq 控制台(在 Linux/OSX 上)或 activemq start(在 Windows 上)来下载、设置和启动 Apache ActiveMQ。按照安装指南设置 WSO2 ESB。
ActiveMQ 应该在启动 ESB 之前启动并运行。
将以下客户端库从
/lib 目录复制到/repository/components/lib 目录ActiveMQ 5.8.0 及以上
activemq-broker-5.8.0.jar
activemq-client-5.8.0.jar
activemq-kahadb-store-5.8.0.jar
geronimo-jms1.1spec-1.1.1.jar
geronimo-j2ee-management1.1spec-1.0.1.jar
geronimo-jta1.0.1Bspec-1.0.1.jar
hawtbuf-1.9.jar
Slf4j-api-1.6.6.jar
activeio-core-3.1.4.jar(在
/lib/optional 文件夹中可)早期版本的
ActiveMQactivemq-core-5.5.1.jar
geronimo-j2ee-management1.0spec-1.0.jar
geronimo-jms1.1spec-1.1.1.jar
根据要求在 ESB 中配置 JMS 传输侦听器和发送器。当需要侦听 JMS 队列时,需要配置 JMS 传输侦听器;当需要向 JMS 队列发送消息时,需要配置 JMS 传输发送器。
通过导航到
文件夹并执行 ./wso2server.sh(在 Linux/OSX 上)或 wso2server.bat(在 Windows 上)来启动 ESB。如果在处理消息存储时使用 ActiveMQ 5.12.2 及更高版本,则需要在服务器启动时设置以下系统属性,以使 WSO2 ESB 的 JMS 消息存储按预期工作。
-Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*"
对于Windows,编辑/bin/wso2server.bat,加到命令行最后面
对于Linux,编辑/bin/wso2server.sh,加到 JAVA_OPTS 后面
设置上述属性是必需的,因为强制用户将可以使用带有 ActiveMQ 5.12.2 及更高版本的 ObjectMessages 交换的包显式列入白名单。因此,如果未设置上述属性,消息处理器将无法从 ActiveMQ 读取消息,并出现错误。
现在已经配置、启动并运行了 ActiveMQ 和 WSO2 ESB 的实例。
要启用 JMS 传输侦听器,请取消注释/repository/conf/axis2/axis2.xml 文件中与 ActiveMQ 相关的以下侦听器配置。
<!--Uncomment this and configure as appropriate for JMS transport support,after setting up your JMS environment (e.g. ActiveMQ)--> <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> <parameter name="myTopicConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> </parameter> <parameter name="myQueueConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> <parameter name="default" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> </transportReceiver>
要启用 JMS 传输发送器,请取消注释/repository/conf/axis2/axis2.xml 文件中的以下配置。
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url 的位置添加以下配置,
failover:tcp://localhost:61616
这只会确保重新连接发生。 故障转移前缀与 ActiveMQ 的故障转移传输相关联。
<?xml version="1.0" encoding="UTF-8"?> <api context="/JMS_Queue" name="JMS_Queue" xmlns="http://ws.apache.org/ns/synapse"> <resource methods="POST GET"> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <send> <endpoint> <address uri="jms:/ordersQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.102.155:61616&transport.jms.DestinationType=queue"/> </endpoint> </send> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </resource> </api>
在 inSequence 中,该OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=queue
使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!
在activemq管理控制台可以看到待消费的消息
消费消息
一旦消息在代理中排队,JMS 使用者就可以连接到它并异步使用消息,而无需知道原始发送者。
<?xml version="1.0" encoding="UTF-8"?> <proxy name="JMSConsumerProxy" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <endpoint> <address uri="http://192.168.102.1:8280/testQueque"/> </endpoint> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> </inSequence> <outSequence/> <faultSequence/> </target> </proxy>
OUT_ONLY 属性设置为 true 以指示消息交换是单向的。
将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。
查看上面的示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。
在此示例配置, ActiveMQ 生产的消息将被发送到后端服务,ESB 不会等待服务的响应。
<?xml version="1.0" encoding="UTF-8"?> <api context="/testQueque" name="testQueque" xmlns="http://ws.apache.org/ns/synapse"> <resource methods="POST GET"> <inSequence> <log level="full"> <property name="log1" value="Messages from the message queue JMSConsumerProxy"/> </log> <log level="full"/> <respond/> </inSequence> <outSequence/> <faultSequence/> </resource> </api>
发布碳应用
ActiveMQ控制台可以看到被监听的队列
ESB 监听到队列消息并转发到后端服务
package queue.producer; import java.util.Date; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { private static final String DEFAULT_BROKER_HOST = "192.168.102.155"; private static final String DEFAULT_BROKER_PORT = "61616"; private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; //private static final String USER_NAME = "user"; //private static final String PASSWORD = "user"; private static final String QUEUE_NAME = "JMSConsumerProxy"; public static void main(String[] args) throws Exception { // 1 创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(defaultURL); //connectionFactory.setUserName(USER_NAME); //connectionFactory.setPassword(PASSWORD); // 2 创建连接 Connection connection = connectionFactory.createConnection(); // 3 打开连接 connection.start(); // 4 创建会话,第一个参数:是否开启事务,第二个参数:消息是否自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Queue queue = session.createQueue(QUEUE_NAME); // 5 创建生产者 MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("Publish subscription messages, " + new Date()); producer.send(message); // 8 关闭消息 //session.commit(); producer.close(); session.close(); connection.close(); System.out.println("消息生产成功"); } }
package queue.consumer; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { private static final String DEFAULT_BROKER_HOST = "192.168.102.155"; private static final String DEFAULT_BROKER_PORT = "61616"; private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; //private static final String USER_NAME = "user"; //private static final String PASSWORD = "user"; private static final String QUEUE_NAME = "ordersQueue"; public static void main(String[] args) throws Exception { // 创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(defaultURL); //connectionFactory.setUserName(USER_NAME); //connectionFactory.setPassword(PASSWORD); // 创建连接 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 创建会话 /** * 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit(); * 如果设置false,操作消息队列后,不使用session.commit(); */ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建队列 Queue queue = session.createQueue(QUEUE_NAME); // 创建消费者 MessageConsumer consumer = session.createConsumer(queue); try { consumer.setMessageListener(new MessageListenerCallBack(session)); } catch (Exception e) { // TODO: handle exception // 关闭连接 session.close(); connection.close(); System.out.println("消费结束0"); e.printStackTrace(); } } }
package queue.consumer; import javax.jms.*; public class MessageListenerCallBack implements MessageListener { private Session session; public MessageListenerCallBack(Session session) { this.session = session; } @Override public void onMessage(Message message) { // TODO 自动生成的方法存根 TextMessage textMessage = (TextMessage) message; try { if (textMessage != null) { // 接收到消息 System.out.println("接收到消息:" + textMessage.getText()); message.acknowledge(); } else { //没有接收到消息,通知producer重新发送 this.session.recover(); } } catch (JMSException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }