下载Apache ActiveMQ
下载并安装WSO2 Micro Integrator
将以下客户端库从目录 ACTIVEMQHOME/lib 复制到目录 MIHOME/lib
ActiveMQ 5.8.0及更高版本
activemq-broker-5.8.0.jar
activemq-client-5.8.0.jar
activemq-kahadb-store-5.8.0.jar
geronimo-jms1.1spec-1.1.1.jar
geronimo-j2ee-management1.1spec-1.0.1.jar
geronimo-jta1.0.1Bspec-1.0.1.jar
hawtbuf-1.9.jar
Slf4j-api-1.6.6.jar
activeio-core-3.1.4.jar (在目录:ACTIVEMQ_HOME/lib/optional)
早期版本的ActiveMQ
activemq-core-5.5.1.jar
geronimo-j2ee-management1.0spec-1.0.jar
geronimo-jms1.1spec-1.1.1.jar
如果希望 Micro Integrator 从 ActiveMQ 实例接收消息,或向 ActiveMQ 实例发送消息,则需要使用相关连接参数更新 deployment.toml 文件。 :::info 配置jms侦听器时,请确保使用已定义的连接工厂的名称将连接工厂服务级别jms参数添加到synapse配置中。 :::
<parameter name="transport.jms.ConnectionFactory">myQueueListener</parameter>
添加以下配置以使用ActiveMQ连接参数启用JMS发送器。
[transport.jms] sender_enable = true listener_enable = true
[[transport.jms.sender]] name = "myTopicSender" parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory" parameter.provider_url = "tcp://192.168.100.151:61616" parameter.connection_factory_name = "TopicConnectionFactory" parameter.connection_factory_type = "topic" parameter.cache_level = "producer" parameter.username = "admin" parameter.password = "admin" [[transport.jms.sender]] name = "myQueueSender" parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory" parameter.provider_url = "tcp://192.168.100.151:61616" parameter.connection_factory_name = "QueueConnectionFactory" parameter.connection_factory_type = "queue" parameter.cache_level = "producer" parameter.username = "system" parameter.password = "manager"
[[transport.jms.listener]] name = "myQueueConnectionFactory" parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory" parameter.provider_url = "tcp://192.168.100.151:61616" parameter.connection_factory_name = "QueueConnectionFactory" parameter.connection_factory_type = "queue" parameter.username = "admin" parameter.password = "admin" [[transport.jms.listener]] name = "myTopicConnectionFactory" parameter.initial_naming_factory = "org.apache.activemq.jndi.ActiveMQInitialContextFactory" parameter.provider_url = "tcp://192.168.100.151:61616" parameter.connection_factory_name = "TopicConnectionFactory" parameter.connection_factory_type = "topic" parameter.username = "admin" parameter.password = "admin"
使用ActiveMQ配置JMS传输时,可以将特定于ActiveMQ的属性附加到该属性的值。例如,您可以在配置JMS入站端点时设置和属性,如下所示:
parameter.provider_url = "tcp://localhost:61616?jms.redeliveryPolicy.redeliveryDelay=10000&jms.redeliveryPolicy.initialRedeliveryDelay=10000"
上述配置没有解决ActiveMQ消息代理的瞬时故障问题。例如,如果 ActiveMQ 关闭并在一段时间后再次变为活动状态,则 Micro Integrator 将不会重新连接到 ActiveMQ。相反,将抛出一个错误,直到 Micro Integrator 重新启动。
为了避免此问题,您需要添加以下值作为参数 provider_url:failover:tcp://localhost:61616。这只是为了确保重新连接的发生。故障转移前缀与 ActiveMQ 的故障转移传输相关联。
当Micro Integrator配置为使用ActiveMQ队列中的消息时,您可以选择配置消息重新传递。当Micro Integrator由于故障而无法处理消息时,这很有用。
JMS parameters
将以下JMS参数添加到代理服务配置中。
<parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter> <parameter name="transport.jms.DestinationType">queue</parameter> <parameter name="transport.jms.SessionTransacted">true</parameter> <parameter name="transport.jms.Destination">JMStoHTTPStockQuoteProxy</parameter> <parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter> <parameter name="transport.jms.CacheLevel">consumer</parameter>
redeliveryPolicy.maximumRedeliverys:传递邮件的最大重试次数。如果设置为-1,ActiveMQ将无限次重试。
transport.jms.SessionTransacted:设置为true时,将启用代理服务的jms会话事务。
reddeliveryPolicy.redeliveryDelay:重试之间的延迟时间(以毫秒为单位)。
transport.jms.CacheLevel:需要将其设置为consumer,ActiveMQ重新传递机制才能工作。
Fault sequence
在故障序列中添加以下行:
<property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
当Micro Integrator由于错误而无法将消息传递到后端服务时,它将被路由到配置中的故障序列。当在故障序列中设置SETROLLBACKONLY属性时,Micro Integrator会通知ActiveMQ重新传递消息。
示例代理服务配置:
<proxy xmlns="http://ws.apache.org/ns/synapse" name="JMStoHTTPStockQuoteProxy" transports="jms" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <property name="transactionID" expression="get-property('MessageID')" scope="default"/> <property name="sourceMessageID" expression="get-property('MessageID')" scope="default"/> <property name="proxyMessageID" expression="get-property('MessageID')" scope="default"/> <log level="full"> <property name="transactionID" expression="get-property('transactionID')"/> <property name="sourceMessageID" expression="get-property('sourceMessageID')"/> <property name="MessageID" expression="get-property('proxyMessageID')"/> </log> <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/> <drop/> </inSequence> <faultSequence name="jms_fault"/> </target> <parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter> <parameter name="transport.jms.DestinationType">queue</parameter> <parameter name="transport.jms.SessionTransacted">true</parameter> <parameter name="transport.jms.Destination">JMStoHTTPStockQuoteProxy</parameter> <parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter> <parameter name="transport.jms.CacheLevel">consumer</parameter> <description/> </proxy>