参见:WSO2 ESB 5.0.0 + ActiveMQ 消息队列 -- 配置 JMS 传输
在此示例场景中,在代理服务器(如 ActiveMQ 或 WSO2 消息代理)中创建 JMS 主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。
可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到/repository/deployment/server/synapse-configs/default/proxy-services。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。
<?xml version="1.0" encoding="UTF-8"?> <proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <endpoint> <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.102.155:61616&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> </inSequence> <outSequence/> <faultSequence/> </target> </proxy>
发布 PublishSubscribe 代理服务后,ActiveMQ 控制台可以看到名为 PublishSubscribe 的队列
接下来,配置两个订阅JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。以下是这些代理服务的示例配置。
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber1" value="I am Subscriber1"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>
特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber2" value="I am Subscriber2"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>
特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。
订阅者发布成功!
在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ActiveMQ 的 PublishSubscribe 队列生产消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息,SimplePublishSubscribeService1 和SimplePublishSubscribeService2 都会收到所订阅主题的消息。
向 ActiveMQ 的 PublishSubscribe 队列生产消息
WSO2 ESB收到订阅消息
package topic.producer; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicMsgProducer { private static final String DEFAULT_BROKER_HOST = "192.168.102.155"; private static final String DEFAULT_BROKER_PORT = "61616"; private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; //private static final String USER_NAME = "user"; //private static final String PASSWORD = "user"; private static final String TOPIC_NAME = "SimplePublishSubscribeService"; public static void main(String[] args) { new TopicMsgProducer().send(); } public void send() { // 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection conn = null; try { factory.setBrokerURL(defaultURL); //factory.setUserName(USER_NAME); //factory.setPassword(PASSWORD); // 创建连接 conn = factory.createConnection(); conn.start(); // 创建会话 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建地点 Topic topic = session.createTopic(TOPIC_NAME); // 创建生产者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); for (int i = 0; i < 10; i++) { TextMessage tmsg = session.createTextMessage(); tmsg.setText("早上你好 " + i); producer.send(tmsg); System.out.println("发送的消息:" + tmsg.getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (conn != null) conn.close(); } catch (Throwable ignore) { } } } }
package topic.consumer; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicMsgConsumer { private static final String DEFAULT_BROKER_HOST = "192.168.102.155"; private static final String DEFAULT_BROKER_PORT = "61616"; private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; //private static final String USER_NAME = "user"; //private static final String PASSWORD = "user"; private static final String TOPIC_NAME = "SimplePublishSubscribeService"; private static final String CLIENT_ID = "calvinchan"; public static void main(String[] args) { // TODO 自动生成的方法存根 new TopicMsgConsumer().receive(); } public void receive() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection conn = null; try { factory.setBrokerURL(defaultURL); //factory.setUserName(USER_NAME); //factory.setPassword(PASSWORD); conn = factory.createConnection(); conn.setClientID(CLIENT_ID); conn.start(); Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 订阅发布模式的 Topic对象 不是Destination Topic topic = session.createTopic(TOPIC_NAME); TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID); subsriber.setMessageListener(new MessageListenerCallBack(session)); while (true); } catch (JMSException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
package topic.consumer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; public class MessageListenerCallBack implements MessageListener { private Session session; public MessageListenerCallBack(Session session) { this.session = session; } @Override public void onMessage(Message message) { // TODO 自动生成的方法存根 TextMessage textMessage = (TextMessage) message; try { if (textMessage != null) { // 接收到消息 System.out.println("接收到消息:" + textMessage.getText()); message.acknowledge(); } else { //没有接收到消息,通知producer重新发送 this.session.recover(); } } catch (JMSException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }
<?xml version="1.0" encoding="UTF-8"?> <proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <endpoint> <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.131.128:61616&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> </inSequence> <outSequence/> <faultSequence/> </target> </proxy>
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber1" value="I am Subscriber1"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber2" value="I am Subscriber2"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>