Topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。默认是不存在于 MQ 服务器中的,一旦发送之后,如果没有订阅,消息则丢失。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency>
defaultUrl=tcp://192.168.100.129:61616 userName=user password=user queueName=PublishSubscribe clientIDPrefix=wxhntmy topicName=PublishTopicSubscribe topicClientId=wxhntmy
package com.chenwc.util;
import java.io.InputStream;
import java.util.Properties;
/**
* 读取ActiveMQ配置
* @author chenwc
*/
public class ActiveMQConfig {
private String defaultUrl;
private String userName;
private String password;
private String queueName;
private String clientIDPrefix;
private String topicName;
private String topicClientId;
public ActiveMQConfig() {
Properties properties = new Properties();
//对于maven项目,ActiveMQConfig 类的包目录是 com/chenwc/util,要返回三级目录才能读取到 src/main/resource 目录下的文件
InputStream is = ActiveMQConfig.class.getResourceAsStream("../../../activemq.properties");
try {
properties.load(is);
this.defaultUrl = properties.getProperty("defaultUrl");
this.userName = properties.getProperty("userName");
this.password = properties.getProperty("password");
this.queueName = properties.getProperty("queueName");
this.clientIDPrefix = properties.getProperty("clientIDPrefix");
this.topicName = properties.getProperty("topicName");
this.topicClientId = properties.getProperty("topicClientId");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
try {
if (null != is) {
is.close();
}
} catch (Exception e2) {
// TODO: handle exception
e2.printStackTrace();
}
}
}
public String getTopicClientId() {
return topicClientId;
}
public void setTopicClientId(String topicClientId) {
this.topicClientId = topicClientId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getClientIDPrefix() {
return clientIDPrefix;
}
public void setClientIDPrefix(String clientIDPrefix) {
this.clientIDPrefix = clientIDPrefix;
}
public String getDefaultUrl() {
return defaultUrl;
}
public void setDefaultUrl(String defaultUrl) {
this.defaultUrl = defaultUrl;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
package com.chenwc.topic.producer;
import javax.jms.*;
import com.chenwc.util.ActiveMQConfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 主题消息生产者
*
* @author chenwc
*/
public class TopicMsgProducer {
private static final Logger log = LoggerFactory.getLogger(TopicMsgProducer.class);
public static void main(String[] args) {
ActiveMQConfig activeMQConfig = new ActiveMQConfig();
// 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
MessageProducer producer = null;
try {
factory.setBrokerURL(activeMQConfig.getDefaultUrl());
factory.setUserName(activeMQConfig.getUserName());
factory.setPassword(activeMQConfig.getPassword());
factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix());
factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix());
// 创建连接
conn = factory.createConnection();
// 开启连接
conn.start();
// 创建会话。第一个参数:是否开启分布式事务;第二个参数:消息是否自动确认(第一个参数为false时,Session.AUTO_ACKNOWLEDGE自动应答才生效)
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(activeMQConfig.getTopicName());
// 创建生产者
producer = session.createProducer(topic);
// 使用持久模式,默认开启
// DeliveryMode 中的是否持久化,指的是当重启 activeMQ 之后,原来队列或者主题中未被消费的消息是否仍然保留
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 对消息设置存活过期时间,消息超过时间仍然未被消费,则会自动移动到死信队列中。
// timeToLive 默认值为 0,即表示消息永不过期。
producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
//创建 TextMessage 对象
TextMessage textMessage = session.createTextMessage();
for (int i = 0; i < 10; i++) {
String content = "{\"TEST\": \"发布主题消息\", \"Date\":" + getDateFormat().format(new Date()) + "}";
sendMessage(producer, textMessage, content);
log.info("向主题: {} 发送消息: {}", activeMQConfig.getTopicName(), content);
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != producer)
producer.close();
if (null != conn)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 向主题发送消息
*
* @param producer MessageProducer对象
* @param textMessage TextMessage对象
* @param content 消息内容
*/
public static void sendMessage(MessageProducer producer, TextMessage textMessage, String content) throws JMSException {
//设置消息内容
textMessage.setText(content);
//发布消息
producer.send(textMessage);
}
/**
* 线程安全的 SimpleDateFormat
*/
private static final ThreadLocal<DateFormat> threadLocal = new ThreadLocal<>();
/**
* 获取 DateFormat
*
* @return DateFormat
*/
public static DateFormat getDateFormat() {
DateFormat df = threadLocal.get();
if (df == null) {
df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
threadLocal.set(df);
}
return df;
}
}
package com.chenwc.topic.consumer;
import com.chenwc.util.ActiveMQConfig;
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 主题消息消费者
*
* @author chenwc
*/
public class TopicMsgConsumer {
private static final Logger log = LoggerFactory.getLogger(TopicMsgConsumer.class);
/**
* 是否使用持久化模式
*/
private static final boolean IS_PERSISTENT = false;
public static void main(String[] args) {
ActiveMQConfig activeMQConfig = new ActiveMQConfig();
// 创建连接工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection conn = null;
try {
factory.setBrokerURL(activeMQConfig.getDefaultUrl());
factory.setUserName(activeMQConfig.getUserName());
factory.setPassword(activeMQConfig.getPassword());
factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix());
factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix());
conn = factory.createConnection();
if (IS_PERSISTENT){
// 如果要进行持久化订阅,必须对连接设置clientID,设置ClientID后
// 使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
conn.setClientID(activeMQConfig.getTopicClientId());
}
conn.start();
//获取会话
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 订阅发布模式的 Topic 对象 不是 Destination
Topic topic = session.createTopic(activeMQConfig.getTopicName());
log.info("消费者启动完毕,开始接收消息...");
if (IS_PERSISTENT){
// 创建持久化订阅者
TopicSubscriber subsriber = session.createDurableSubscriber(topic, activeMQConfig.getTopicClientId());
// 异步消息接收
subsriber.setMessageListener(new MessageListenerCallBack(session));
}
else {
// 普通订阅者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListenerCallBack(session));
}
/*
特别提醒:因为 consumer.setMessageListener 是线程非阻塞的,所以如果不加处理,则 mian 会直接运行结束
这里故意使用 System.in.read() 让 mian 线程阻塞停下来,实际开发中如果是 web 应用,或者是其它的不会马上结束
的 Java SE 应用,则是不需要这样处理的
*/
//控制台随便输入字符,回车后,程序运行结束
int a = System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
log.info("消费结束!");
try {
if (null != conn)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消息监听器
package com.chenwc.topic.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息监听器
*
* @author chenwc
*/
public class MessageListenerCallBack implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(MessageListenerCallBack.class);
private final Session session;
public MessageListenerCallBack(Session session) {
this.session = session;
}
@Override
public void onMessage(Message message) {
// TODO 自动生成的方法存根
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
// 接收到消息
log.info("接收到消息,消息ID:{}, 消息内容:{}", textMessage.getJMSMessageID(), textMessage.getText());
// 消息确认
message.acknowledge();
} else {
//没有接收到消息,通知producer重新发送
this.session.recover();
}
} catch (JMSException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}

开启了两个消费者,生产者发布了10条消息,因为有两个消费者所以被消费了20次。



