WSO2 ESB 5.0.0 + ActiveMQ 主题

WSO2   2025-01-09 00:47   159   0  

一、配置 JMS 传输

参见:WSO2 ESB 5.0.0 + ActiveMQ 消息队列 -- 配置 JMS 传输

二、配置发布者

image.png
在此示例场景中,在代理服务器(如 ActiveMQ 或 WSO2 消息代理)中创建 JMS 主题,然后在 WSO2 ESB 中添加充当发布者和订阅者的代理服务。

1、在 ActiveMQ 创建为 SimplePublishSubscribeService 的主题

image.png
image.png

2、添加一个名为 PublishSubscribe 的代理服务并将其配置为发布到主题 SimplePublishSubscribeService

可以使用管理控制台将代理服务添加到 ESB,方法是在设计视图中构建代理服务,或者将 XML 配置复制到源视图中。 或者,可以将名为 PublishSubscribe.xml 的 XML 文件添加到/repository/deployment/server/synapse-configs/default/proxy-services。 下面给出了定义代理服务的示例 XML 代码段。 请注意,地址 URI 指定了用于配置 JMS 传输的属性。
image.png
image.png

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://192.168.102.155:61616&amp;transport.jms.DestinationType=topic"/>
        </endpoint>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

发布 PublishSubscribe 代理服务后,ActiveMQ 控制台可以看到名为 PublishSubscribe 的队列
image.png
image.png

三、配置订阅者

接下来,配置两个订阅JMS 主题 SimplePublishSubscribeService 的代理服务,以便每当该主题收到消息时,它就会发送到这些订阅代理服务。以下是这些代理服务的示例配置。

1、订阅者1配置

image.png

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
  <target>
    <inSequence>
      <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
      <log level="custom">
          <property name="Subscriber1" value="I am Subscriber1"/>
      </log>
      <drop/>
    </inSequence>
    <outSequence>
    <send/>
    </outSequence>
    <faultSequence/>
  </target>
  <parameter name="transport.jms.DestinationType">topic</parameter>
  <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
  <parameter name="transport.jms.ContentType">
  <rules>
    <jmsProperty>contentType</jmsProperty>
    <default>application/json</default>
  </rules>
  </parameter>
  <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>

image.png
特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
image.png

2、订阅者2配置

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
  <target>
    <inSequence>
      <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
          <log level="custom">
      <property name="Subscriber2" value="I am Subscriber2"/>
      </log>
      <drop/>
    </inSequence>
    <outSequence>
    <send/>
    </outSequence>
    <faultSequence/>
  </target>
  <parameter name="transport.jms.DestinationType">topic</parameter>
  <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
  <parameter name="transport.jms.ContentType">
  <rules>
    <jmsProperty>contentType</jmsProperty>
    <default>application/json</default>
  </rules>
  </parameter>
  <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>

image.png
特别特别注意!!!!!!transport.jms.ConnectionFactory配置的是 JMS 传输侦听器中的parameter名字
image.png

四、发布到主题

发布 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 代理服务。
image.png
image.png
订阅者发布成功!

五、测试消息发布与订阅

在本实例中,ESB 中创建了名为 PublishSubscribe 的代理服务,该服务是作用是监听 ActiveMQ 的 PublishSubscribe 队列,如果该队列有消息,则把消息发布到 ActiveMQ 中的 SimplePublishSubscribeService 主题,代理服务 SimplePublishSubscribeService1 和 SimplePublishSubscribeService2 订阅了主题 SimplePublishSubscribeService,如果收到主题发布的消息,则会打印出日志。
测试方法:向 ActiveMQ 的 PublishSubscribe 队列生产消息,或者向 ActiveMQ 的 SimplePublishSubscribeService 主题发布消息,SimplePublishSubscribeService1 和SimplePublishSubscribeService2 都会收到所订阅主题的消息。
WSO2 + ActiveMQ 主题消息.jpg
向 ActiveMQ 的 PublishSubscribe 队列生产消息
image.png
image.png
image.png
WSO2 ESB收到订阅消息
image.png
image.png

六、示例工程

1、ActiveMQ

①、主题消息生产者

package topic.producer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicMsgProducer {

    private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
    private static final String DEFAULT_BROKER_PORT = "61616";
    private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

    //private static final String USER_NAME = "user";
    //private static final String PASSWORD = "user";

    private static final String TOPIC_NAME = "SimplePublishSubscribeService";

    public static void main(String[] args) {
        new TopicMsgProducer().send();
    }

    public void send() {
        // 创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        Connection conn = null;
        try {
            factory.setBrokerURL(defaultURL);
            //factory.setUserName(USER_NAME);
            //factory.setPassword(PASSWORD);

            // 创建连接
            conn = factory.createConnection();
            conn.start();
            // 创建会话
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建地点
            Topic topic = session.createTopic(TOPIC_NAME);
            // 创建生产者
            MessageProducer producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
            for (int i = 0; i < 10; i++) {
                TextMessage tmsg = session.createTextMessage();
                tmsg.setText("早上你好   " + i);
                producer.send(tmsg);
                System.out.println("发送的消息:" + tmsg.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            try {
                if (conn != null)
                    conn.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

②、主题消息消费者

package topic.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicMsgConsumer {

    private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
    private static final String DEFAULT_BROKER_PORT = "61616";
    private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

    //private static final String USER_NAME = "user";
    //private static final String PASSWORD = "user";

    private static final String TOPIC_NAME = "SimplePublishSubscribeService";

    private static final String CLIENT_ID = "calvinchan";


    public static void main(String[] args) {
        // TODO 自动生成的方法存根
        new TopicMsgConsumer().receive();
    }

    public void receive() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        Connection conn = null;
        try {
            factory.setBrokerURL(defaultURL);
            //factory.setUserName(USER_NAME);
            //factory.setPassword(PASSWORD);

            conn = factory.createConnection();
            conn.setClientID(CLIENT_ID);
            conn.start();
            Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // 订阅发布模式的 Topic对象 不是Destination
            Topic topic = session.createTopic(TOPIC_NAME);
            TopicSubscriber subsriber = session.createDurableSubscriber(topic, CLIENT_ID);
            subsriber.setMessageListener(new MessageListenerCallBack(session));
            while (true);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package topic.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

public class MessageListenerCallBack implements MessageListener {

    private Session session;

    public MessageListenerCallBack(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // TODO 自动生成的方法存根
        TextMessage textMessage = (TextMessage) message;

        try {
            if (textMessage != null) { 
                // 接收到消息
                System.out.println("接收到消息:" + textMessage.getText());
                message.acknowledge();
            } else {
                //没有接收到消息,通知producer重新发送
                this.session.recover();
            }
        } catch (JMSException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
    }

}

2、PublishSubscribe

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="PublishSubscribe" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="jms:/SimplePublishSubscribeService?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://192.168.131.128:61616&amp;transport.jms.DestinationType=topic"/>
        </endpoint>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>

3、SubscribeService

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService1" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <log level="custom">
                <property name="Subscriber1" value="I am Subscriber1"/>
            </log>
            <drop/>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">topic</parameter>
    <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>
<?xml version="1.0" encoding="UTF-8"?>
<proxy name="SimplePublishSubscribeService2" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <log level="custom">
                <property name="Subscriber2" value="I am Subscriber2"/>
            </log>
            <drop/>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </target>
    <parameter name="transport.jms.DestinationType">topic</parameter>
    <parameter name="transport.jms.Destination">SimplePublishSubscribeService</parameter>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/json</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
</proxy>


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。