WSO2 ESB 5.0.0 + ActiveMQ 消息队列

WSO2   2025-01-09 00:47   143   0  

一、JMS 传输

WSO2 ESB 的 Java 消息服务 (JMS) 传输可以轻松地向任何实现 JMS 规范的 JMS 服务的队列和主题发送和接收消息。
JMS 传输实现来自 WS-Commons Transports 项目,它利用 JNDI 连接到各种 JMS 代理。 因此,WSO2 ESB 可以与任何提供 JNDI 支持的 JMS 代理一起工作。 所有相关类都打包到axis2-transport-jms-.jar 中,org.apache.axis2.transport.jms.JMSListener 和org.apache.axis2.transport.jms.JMSSender 类分别充当传输接收器和发件人。
JMS 传输实现需要一个活动的 JMS 服务器实例才能接收和发送消息。 建议使用 WSO2 Message Broker 或 Apache ActiveMQ,但也支持其他实现,例如 Apache Qpid 和 Tibco。

二、配置 JMS 传输

1、使用 ActiveMQ 配置

①、设置 WSO2 ESB 和 ActiveMQ

  1. 通过导航到

    文件夹并执行 ./activemq 控制台(在 Linux/OSX 上)或 activemq start(在 Windows 上)来下载、设置和启动 Apache ActiveMQ。
  2. 按照安装指南设置 WSO2 ESB。

  • ActiveMQ 应该在启动 ESB 之前启动并运行。

  1. 将以下客户端库从

    /lib 目录复制到/repository/components/lib 目录
  2. ActiveMQ 5.8.0 及以上

  • activemq-broker-5.8.0.jar

  • activemq-client-5.8.0.jar

  • activemq-kahadb-store-5.8.0.jar

  • geronimo-jms1.1spec-1.1.1.jar

  • geronimo-j2ee-management1.1spec-1.0.1.jar

  • geronimo-jta1.0.1Bspec-1.0.1.jar

  • hawtbuf-1.9.jar

  • Slf4j-api-1.6.6.jar

  • activeio-core-3.1.4.jar(在

    /lib/optional 文件夹中可)

早期版本的

  • ActiveMQactivemq-core-5.5.1.jar

  • geronimo-j2ee-management1.0spec-1.0.jar

  • geronimo-jms1.1spec-1.1.1.jar

根据要求在 ESB 中配置 JMS 传输侦听器和发送器。当需要侦听 JMS 队列时,需要配置 JMS 传输侦听器;当需要向 JMS 队列发送消息时,需要配置 JMS 传输发送器。

通过导航到

文件夹并执行 ./wso2server.sh(在 Linux/OSX 上)或 wso2server.bat(在 Windows 上)来启动 ESB。

如果在处理消息存储时使用 ActiveMQ 5.12.2 及更高版本,则需要在服务器启动时设置以下系统属性,以使 WSO2 ESB 的 JMS 消息存储按预期工作。

  • -Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*"

对于Windows,编辑/bin/wso2server.bat,加到命令行最后面
image.png
对于Linux,编辑/bin/wso2server.sh,加到 JAVA_OPTS 后面
image.png

  1. 设置上述属性是必需的,因为强制用户将可以使用带有 ActiveMQ 5.12.2 及更高版本的 ObjectMessages 交换的包显式列入白名单。因此,如果未设置上述属性,消息处理器将无法从 ActiveMQ 读取消息,并出现错误。

  2. 现在已经配置、启动并运行了 ActiveMQ 和 WSO2 ESB 的实例。

②、设置 JMS 侦听器

要启用 JMS 传输侦听器,请取消注释/repository/conf/axis2/axis2.xml 文件中与 ActiveMQ 相关的以下侦听器配置。

<!--Uncomment this and configure as appropriate for JMS transport support,after setting up your JMS environment (e.g. ActiveMQ)-->
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
    <parameter name="myTopicConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
    </parameter>
    <parameter name="myQueueConnectionFactory" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
    <parameter name="default" locked="false">
        <parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
        <parameter name="java.naming.provider.url" locked="false">tcp://192.168.102.155:61616</parameter>
        <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
        <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
    </parameter>
</transportReceiver>

image.png

③、设置JMS发件人

要启用 JMS 传输发送器,请取消注释/repository/conf/axis2/axis2.xml 文件中的以下配置。

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>

image.png
上述配置并没有解决 ActiveMQ 消息代理的瞬时故障问题。假设由于某种原因 ActiveMQ 宕机了一段时间后又恢复了。 ESB 不会重新连接到 ActiveMQ,而是在向 ESB 发送请求时抛出一些错误,直到重新启动。 为了解决这个问题,需要在 java.naming.provider.url 的位置添加以下配置,

failover:tcp://localhost:61616

这只会确保重新连接发生。 故障转移前缀与 ActiveMQ 的故障转移传输相关联。

三、向 JMS 队列发送消息

image.png

1、创建消息中介构件

image.png
image.png

2、REST API 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<api context="/JMS_Queue" name="JMS_Queue" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST GET">
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
            <send>
                <endpoint>
                    <address uri="jms:/ordersQueue?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://192.168.102.155:61616&amp;transport.jms.DestinationType=queue"/>
                </endpoint>
            </send>
        </inSequence>
        <outSequence>
            <send/>
        </outSequence>
        <faultSequence/>
    </resource>
</api>

在 inSequence 中,该OUT_ONLY 属性设置为 true 以指示消息交换是单向的。

3、AddressEndpoint 配置

image.png
jms:/队列名称?transport.jms.ConnectionFactoryJNDIName=JMS侦听器配置的jdni名称&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://ActiveMQ服务器地址:61616&transport.jms.DestinationType=queue

4、发布碳应用

image.png
image.png
image.png
image.png

5、测试配置

使用 postman 向 api 发送消息,注意,这个是没有响应消息的!!!
image.png
在activemq管理控制台可以看到待消费的消息
image.png
消费消息
image.png

四、监听 JMS 队列中的消息

image.png
一旦消息在代理中排队,JMS 使用者就可以连接到它并异步使用消息,而无需知道原始发送者。

1、创建代理服务

image.png
image.png

2、代理服务配置文件

image.png

<?xml version="1.0" encoding="UTF-8"?>
<proxy name="JMSConsumerProxy" startOnLoad="true" transports="jms" xmlns="http://ws.apache.org/ns/synapse">
    <target>
        <endpoint>
            <address uri="http://192.168.102.1:8280/testQueque"/>
        </endpoint>
        <inSequence>
            <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </target>
</proxy>
  • OUT_ONLY 属性设置为 true 以指示消息交换是单向的。

  • 将传输设置为 jms 来使代理服务成为 JMS 侦听器。 一旦为代理服务启用了 JMS 传输,ESB 就会开始侦听与代理服务同名的 JMS 队列。

  • 查看上面的示例配置,ESB 会侦听名为 JMSConsumerProxy 的 JMS 队列。 要使代理服务侦听不同的 JMS 队列,请使用目标队列的名称定义 transport.jms.Destination 参数。

  • 在此示例配置, ActiveMQ 生产的消息将被发送到后端服务,ESB 不会等待服务的响应。

3、创建后端 API

image.png

<?xml version="1.0" encoding="UTF-8"?>
<api context="/testQueque" name="testQueque" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST GET">
        <inSequence>
            <log level="full">
                <property name="log1" value="Messages from the message queue JMSConsumerProxy"/>
            </log>
            <log level="full"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

image.png
发布碳应用
image.png
image.png

4、测试配置

image.png
ActiveMQ控制台可以看到被监听的队列
image.png
ESB 监听到队列消息并转发到后端服务
image.png

五、示例工程

1、ActiveMQ

①、消息生产者

package queue.producer;

import java.util.Date;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
    private static final String DEFAULT_BROKER_PORT = "61616";
    private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

    //private static final String USER_NAME = "user";
    //private static final String PASSWORD = "user";

    private static final String QUEUE_NAME = "JMSConsumerProxy";

    public static void main(String[] args) throws Exception {

        // 1 创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

        connectionFactory.setBrokerURL(defaultURL);
        //connectionFactory.setUserName(USER_NAME);
        //connectionFactory.setPassword(PASSWORD);
        // 2 创建连接
        Connection connection = connectionFactory.createConnection();
        // 3 打开连接
        connection.start();
        // 4 创建会话,第一个参数:是否开启事务,第二个参数:消息是否自动确认
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建生产者
        MessageProducer producer = session.createProducer(queue);

        Message message = session.createTextMessage("Publish subscription messages, " + new Date());
        producer.send(message);

        // 8 关闭消息
        //session.commit();
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息生产成功");
    }
}

②、消息消费者

package queue.consumer;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    private static final String DEFAULT_BROKER_HOST = "192.168.102.155";
    private static final String DEFAULT_BROKER_PORT = "61616";
    private static final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;

    //private static final String USER_NAME = "user";
    //private static final String PASSWORD = "user";

    private static final String QUEUE_NAME = "ordersQueue";

    public static void main(String[] args) throws Exception {

        // 创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

        connectionFactory.setBrokerURL(defaultURL);
        //connectionFactory.setUserName(USER_NAME);
        //connectionFactory.setPassword(PASSWORD);

        // 创建连接
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 创建会话
        /**
         * 第一个参数,是否使用事务 如果设置true,操作消息队列后,必须使用 session.commit();
         * 如果设置false,操作消息队列后,不使用session.commit();
         */
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        try {
            consumer.setMessageListener(new MessageListenerCallBack(session));
        } catch (Exception e) {
            // TODO: handle exception
            // 关闭连接
            session.close();
            connection.close();
            System.out.println("消费结束0");
            e.printStackTrace();
        }
    }
}
package queue.consumer;

import javax.jms.*;

public class MessageListenerCallBack implements MessageListener {

    private Session session;

    public MessageListenerCallBack(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // TODO 自动生成的方法存根
        TextMessage textMessage = (TextMessage) message;
        try {
            if (textMessage != null) { 
                // 接收到消息
                System.out.println("接收到消息:" + textMessage.getText());
                message.acknowledge();
            } else {
                //没有接收到消息,通知producer重新发送
                this.session.recover();
            }
        } catch (JMSException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
    }

}


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。