参见:WSO2 ESB 5.0.0 + ActiveMQ 消息队列 -- 配置 JMS 传输

在此示例场景中,在代理服务器(如 ActiveMQ 或 WSO2 消息代理)中创建 JMS 主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。


可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到/repository/deployment/server/synapse-configs/default/proxy-services。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。

<?xml version="1.0" encoding="UTF-8"?> <proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <endpoint> <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.102.155:61616&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> </inSequence> <outSequence/> <faultSequence/> </target> </proxy>
发布 PublishSubscribe 代理服务后,ActiveMQ 控制台可以看到名为 PublishSubscribe 的队列

接下来,配置两个订阅JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。以下是这些代理服务的示例配置。

<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber1" value="I am Subscriber1"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>

特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber2" value="I am Subscriber2"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>

特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。

订阅者发布成功!
在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ActiveMQ 的 PublishSubscribe 队列生产消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息,SimplePublishSubscribeService1 和SimplePublishSubscribeService2 都会收到所订阅主题的消息。
向 ActiveMQ 的 PublishSubscribe 队列生产消息


WSO2 ESB收到订阅消息

package topic.producer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicMsgProducer {
private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
//private static final String USER_NAME = "user";
//private static final String PASSWORD = "user";
private static final String TOPIC_NAME = "SimplePublishSubscribeService";
public static void main(String[] args) {
new TopicMsgProducer().send();
}
public void send() {
// 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
try {
factory.setBrokerURL(defaultURL);
//factory.setUserName(USER_NAME);
//factory.setPassword(PASSWORD);
// 创建连接
conn = factory.createConnection();
conn.start();
// 创建会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建地点
Topic topic = session.createTopic(TOPIC_NAME);
// 创建生产者
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
for (int i = 0; i < 10; i++) {
TextMessage tmsg = session.createTextMessage();
tmsg.setText("早上你好 " + i);
producer.send(tmsg);
System.out.println("发送的消息:" + tmsg.getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Throwable ignore) {
}
}
}
}
package topic.consumer;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TopicMsgConsumer {
private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
private static final String DEFAULT_BROKER_PORT = "61616";
private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
//private static final String USER_NAME = "user";
//private static final String PASSWORD = "user";
private static final String TOPIC_NAME = "SimplePublishSubscribeService";
private static final String CLIENT_ID = "calvinchan";
public static void main(String[] args) {
// TODO 自动生成的方法存根
new TopicMsgConsumer().receive();
}
public void receive() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
try {
factory.setBrokerURL(defaultURL);
//factory.setUserName(USER_NAME);
//factory.setPassword(PASSWORD);
conn = factory.createConnection();
conn.setClientID(CLIENT_ID);
conn.start();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 订阅发布模式的 Topic对象 不是Destination
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID);
subsriber.setMessageListener(new MessageListenerCallBack(session));
while (true);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
package topic.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
public class MessageListenerCallBack implements MessageListener {
private Session session;
public MessageListenerCallBack(Session session) {
this.session = session;
}
@Override
public void onMessage(Message message) {
// TODO 自动生成的方法存根
TextMessage textMessage = (TextMessage) message;
try {
if (textMessage != null) {
// 接收到消息
System.out.println("接收到消息:" + textMessage.getText());
message.acknowledge();
} else {
//没有接收到消息,通知producer重新发送
this.session.recover();
}
} catch (JMSException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}
<?xml version="1.0" encoding="UTF-8"?> <proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <endpoint> <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://192.168.131.128:61616&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> </inSequence> <outSequence/> <faultSequence/> </target> </proxy>
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber1" value="I am Subscriber1"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>
<?xml version="1.0" encoding="UTF-8"?> <proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse"> <target> <inSequence> <property name="OUT_ONLY" scope="default" type="STRING" value="true"/> <log level="custom"> <property name="Subscriber2" value="I am Subscriber2"/> </log> <drop/> </inSequence> <outSequence> <send/> </outSequence> <faultSequence/> </target> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/json</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> </proxy>