Topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。默认是不存在于 MQ 服务器中的,一旦发送之后,如果没有订阅,消息则丢失。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency>
defaultUrl=tcp://192.168.100.129:61616 userName=user password=user queueName=PublishSubscribe clientIDPrefix=wxhntmy topicName=PublishTopicSubscribe topicClientId=wxhntmy
package com.chenwc.util; import java.io.InputStream; import java.util.Properties; /** * 读取ActiveMQ配置 * @author chenwc */ public class ActiveMQConfig { private String defaultUrl; private String userName; private String password; private String queueName; private String clientIDPrefix; private String topicName; private String topicClientId; public ActiveMQConfig() { Properties properties = new Properties(); //对于maven项目,ActiveMQConfig 类的包目录是 com/chenwc/util,要返回三级目录才能读取到 src/main/resource 目录下的文件 InputStream is = ActiveMQConfig.class.getResourceAsStream("../../../activemq.properties"); try { properties.load(is); this.defaultUrl = properties.getProperty("defaultUrl"); this.userName = properties.getProperty("userName"); this.password = properties.getProperty("password"); this.queueName = properties.getProperty("queueName"); this.clientIDPrefix = properties.getProperty("clientIDPrefix"); this.topicName = properties.getProperty("topicName"); this.topicClientId = properties.getProperty("topicClientId"); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } finally { try { if (null != is) { is.close(); } } catch (Exception e2) { // TODO: handle exception e2.printStackTrace(); } } } public String getTopicClientId() { return topicClientId; } public void setTopicClientId(String topicClientId) { this.topicClientId = topicClientId; } public String getTopicName() { return topicName; } public void setTopicName(String topicName) { this.topicName = topicName; } public String getClientIDPrefix() { return clientIDPrefix; } public void setClientIDPrefix(String clientIDPrefix) { this.clientIDPrefix = clientIDPrefix; } public String getDefaultUrl() { return defaultUrl; } public void setDefaultUrl(String defaultUrl) { this.defaultUrl = defaultUrl; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } }
package com.chenwc.topic.producer; import javax.jms.*; import com.chenwc.util.ActiveMQConfig; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; /** * 主题消息生产者 * * @author chenwc */ public class TopicMsgProducer { private static final Logger log = LoggerFactory.getLogger(TopicMsgProducer.class); public static void main(String[] args) { ActiveMQConfig activeMQConfig = new ActiveMQConfig(); // 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection conn = null; MessageProducer producer = null; try { factory.setBrokerURL(activeMQConfig.getDefaultUrl()); factory.setUserName(activeMQConfig.getUserName()); factory.setPassword(activeMQConfig.getPassword()); factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix()); factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix()); // 创建连接 conn = factory.createConnection(); // 开启连接 conn.start(); // 创建会话。第一个参数:是否开启分布式事务;第二个参数:消息是否自动确认(第一个参数为false时,Session.AUTO_ACKNOWLEDGE自动应答才生效) Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建主题 Topic topic = session.createTopic(activeMQConfig.getTopicName()); // 创建生产者 producer = session.createProducer(topic); // 使用持久模式,默认开启 // DeliveryMode 中的是否持久化,指的是当重启 activeMQ 之后,原来队列或者主题中未被消费的消息是否仍然保留 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 对消息设置存活过期时间,消息超过时间仍然未被消费,则会自动移动到死信队列中。 // timeToLive 默认值为 0,即表示消息永不过期。 producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); //创建 TextMessage 对象 TextMessage textMessage = session.createTextMessage(); for (int i = 0; i < 10; i++) { String content = "{\"TEST\": \"发布主题消息\", \"Date\":" + getDateFormat().format(new Date()) + "}"; sendMessage(producer, textMessage, content); log.info("向主题: {} 发送消息: {}", activeMQConfig.getTopicName(), content); Thread.sleep(2000); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != producer) producer.close(); if (null != conn) conn.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * 向主题发送消息 * * @param producer MessageProducer对象 * @param textMessage TextMessage对象 * @param content 消息内容 */ public static void sendMessage(MessageProducer producer, TextMessage textMessage, String content) throws JMSException { //设置消息内容 textMessage.setText(content); //发布消息 producer.send(textMessage); } /** * 线程安全的 SimpleDateFormat */ private static final ThreadLocal<DateFormat> threadLocal = new ThreadLocal<>(); /** * 获取 DateFormat * * @return DateFormat */ public static DateFormat getDateFormat() { DateFormat df = threadLocal.get(); if (df == null) { df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); threadLocal.set(df); } return df; } }
package com.chenwc.topic.consumer; import com.chenwc.util.ActiveMQConfig; import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 主题消息消费者 * * @author chenwc */ public class TopicMsgConsumer { private static final Logger log = LoggerFactory.getLogger(TopicMsgConsumer.class); /** * 是否使用持久化模式 */ private static final boolean IS_PERSISTENT = false; public static void main(String[] args) { ActiveMQConfig activeMQConfig = new ActiveMQConfig(); // 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection conn = null; try { factory.setBrokerURL(activeMQConfig.getDefaultUrl()); factory.setUserName(activeMQConfig.getUserName()); factory.setPassword(activeMQConfig.getPassword()); factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix()); factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix()); conn = factory.createConnection(); if (IS_PERSISTENT){ // 如果要进行持久化订阅,必须对连接设置clientID,设置ClientID后 // 使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。 conn.setClientID(activeMQConfig.getTopicClientId()); } conn.start(); //获取会话 Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 订阅发布模式的 Topic 对象 不是 Destination Topic topic = session.createTopic(activeMQConfig.getTopicName()); log.info("消费者启动完毕,开始接收消息..."); if (IS_PERSISTENT){ // 创建持久化订阅者 TopicSubscriber subsriber = session.createDurableSubscriber(topic, activeMQConfig.getTopicClientId()); // 异步消息接收 subsriber.setMessageListener(new MessageListenerCallBack(session)); } else { // 普通订阅者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListenerCallBack(session)); } /* 特别提醒:因为 consumer.setMessageListener 是线程非阻塞的,所以如果不加处理,则 mian 会直接运行结束 这里故意使用 System.in.read() 让 mian 线程阻塞停下来,实际开发中如果是 web 应用,或者是其它的不会马上结束 的 Java SE 应用,则是不需要这样处理的 */ //控制台随便输入字符,回车后,程序运行结束 int a = System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { log.info("消费结束!"); try { if (null != conn) conn.close(); } catch (Exception e) { e.printStackTrace(); } } } }
消息监听器
package com.chenwc.topic.consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; /** * 消息监听器 * * @author chenwc */ public class MessageListenerCallBack implements MessageListener { private static final Logger log = LoggerFactory.getLogger(MessageListenerCallBack.class); private final Session session; public MessageListenerCallBack(Session session) { this.session = session; } @Override public void onMessage(Message message) { // TODO 自动生成的方法存根 try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; // 接收到消息 log.info("接收到消息,消息ID:{}, 消息内容:{}", textMessage.getJMSMessageID(), textMessage.getText()); // 消息确认 message.acknowledge(); } else { //没有接收到消息,通知producer重新发送 this.session.recover(); } } catch (JMSException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }
开启了两个消费者,生产者发布了10条消息,因为有两个消费者所以被消费了20次。