WSO2 MI 4.2.0 配置 ActiveMQ

WSO2   2025-01-11 17:01   185   0  

一、设置 Micro Integrator 使用 ActiveMQ

  1. 下载Apache ActiveMQ

  2. 下载并安装WSO2 Micro Integrator

  3. 将以下客户端库从目录 ACTIVEMQHOME/lib 复制到目录 MIHOME/lib

  • ActiveMQ 5.8.0及更高版本

  • activemq-broker-5.8.0.jar

  • activemq-client-5.8.0.jar

  • activemq-kahadb-store-5.8.0.jar

  • geronimo-jms1.1spec-1.1.1.jar

  • geronimo-j2ee-management1.1spec-1.0.1.jar

  • geronimo-jta1.0.1Bspec-1.0.1.jar

  • hawtbuf-1.9.jar

  • Slf4j-api-1.6.6.jar

  • activeio-core-3.1.4.jar (在目录:ACTIVEMQ_HOME/lib/optional)

  • 早期版本的ActiveMQ

  • activemq-core-5.5.1.jar

  • geronimo-j2ee-management1.0spec-1.0.jar

  • geronimo-jms1.1spec-1.1.1.jar

如果希望 Micro Integrator 从 ActiveMQ 实例接收消息,或向 ActiveMQ 实例发送消息,则需要使用相关连接参数更新 deployment.toml 文件。 :::info 配置jms侦听器时,请确保使用已定义的连接工厂的名称将连接工厂服务级别jms参数添加到synapse配置中。 :::

<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>

添加以下配置以使用ActiveMQ连接参数启用JMS发送器。

[transport.jms]
sender_enable = true
listener_enable = true
[[transport.jms.sender]]
name = "myTopicSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://192.168.100.151:61616"
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
parameter.cache_level = "producer"
parameter.username = "admin"
parameter.password = "admin"

[[transport.jms.sender]]
name = "myQueueSender"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://192.168.100.151:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.cache_level = "producer"
parameter.username = "system"
parameter.password = "manager"
[[transport.jms.listener]]
name = "myQueueConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://192.168.100.151:61616"
parameter.connection_factory_name = "QueueConnectionFactory"
parameter.connection_factory_type = "queue"
parameter.username = "admin"
parameter.password = "admin"

[[transport.jms.listener]]
name = "myTopicConnectionFactory"
parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"
parameter.provider_url = "tcp://192.168.100.151:61616"
parameter.connection_factory_name = "TopicConnectionFactory"
parameter.connection_factory_type = "topic"
parameter.username = "admin"
parameter.password = "admin"

使用ActiveMQ配置JMS传输时,可以将特定于ActiveMQ的属性附加到该属性的值。例如,您可以在配置JMS入站端点时设置和属性,如下所示:

parameter.provider_url = "tcp://localhost:61616?jms.redeliveryPolicy.redeliveryDelay=10000&amp;jms.redeliveryPolicy.initialRedeliveryDelay=10000"

上述配置没有解决ActiveMQ消息代理的瞬时故障问题。例如,如果 ActiveMQ 关闭并在一段时间后再次变为活动状态,则 Micro Integrator 将不会重新连接到 ActiveMQ。相反,将抛出一个错误,直到 Micro Integrator 重新启动。
为了避免此问题,您需要添加以下值作为参数 provider_url:failover:tcp://localhost:61616。这只是为了确保重新连接的发生。故障转移前缀与 ActiveMQ 的故障转移传输相关联。

二、在 ActiveMQ 队列中配置重新传递

当Micro Integrator配置为使用ActiveMQ队列中的消息时,您可以选择配置消息重新传递。当Micro Integrator由于故障而无法处理消息时,这很有用。

  • JMS parameters

  • 将以下JMS参数添加到代理服务配置中。

<parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter>
<parameter name="transport.jms.DestinationType">queue</parameter>
<parameter name="transport.jms.SessionTransacted">true</parameter>
<parameter name="transport.jms.Destination">JMStoHTTPStockQuoteProxy</parameter>
<parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
  • redeliveryPolicy.maximumRedeliverys:传递邮件的最大重试次数。如果设置为-1,ActiveMQ将无限次重试。

  • transport.jms.SessionTransacted:设置为true时,将启用代理服务的jms会话事务。

  • reddeliveryPolicy.redeliveryDelay:重试之间的延迟时间(以毫秒为单位)。

  • transport.jms.CacheLevel:需要将其设置为consumer,ActiveMQ重新传递机制才能工作。

  • Fault sequence

  • 在故障序列中添加以下行:

<property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>

当Micro Integrator由于错误而无法将消息传递到后端服务时,它将被路由到配置中的故障序列。当在故障序列中设置SETROLLBACKONLY属性时,Micro Integrator会通知ActiveMQ重新传递消息。
示例代理服务配置:

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="JMStoHTTPStockQuoteProxy"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="transactionID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <property name="sourceMessageID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <property name="proxyMessageID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <log level="full">
            <property name="transactionID" expression="get-property('transactionID')"/>
            <property name="sourceMessageID" expression="get-property('sourceMessageID')"/>
            <property name="MessageID" expression="get-property('proxyMessageID')"/>
         </log>
         <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
         <drop/>               
      </inSequence>
      <faultSequence name="jms_fault"/>
   </target>
   <parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter>
   <parameter name="transport.jms.DestinationType">queue</parameter>
   <parameter name="transport.jms.SessionTransacted">true</parameter>
   <parameter name="transport.jms.Destination">JMStoHTTPStockQuoteProxy</parameter>
   <parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter>
   <parameter name="transport.jms.CacheLevel">consumer</parameter>
   <description/>
</proxy>


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。