ActiveMQ 主题消息发布与订阅

Java常用方法   2025-01-12 15:18   248   0  

Topic 是发布订阅模式,一个生产者可以一个消息,可以被多个消费者消费。默认是不存在于 MQ 服务器中的,一旦发送之后,如果没有订阅,消息则丢失。 

一、依赖

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.9.1</version>
</dependency>

二、读取 ActiveMQ 配置

defaultUrl=tcp://192.168.100.129:61616
userName=user
password=user
queueName=PublishSubscribe
clientIDPrefix=wxhntmy

topicName=PublishTopicSubscribe
topicClientId=wxhntmy
package com.chenwc.util;

import java.io.InputStream;
import java.util.Properties;

/**
 * 读取ActiveMQ配置
 * @author chenwc
 */
public class ActiveMQConfig {

    private String defaultUrl;
    private String userName;
    private String password;
    private String queueName;
    private String clientIDPrefix;
    private String topicName;
    private String topicClientId;

    public ActiveMQConfig() {
        Properties properties = new Properties();
        //对于maven项目,ActiveMQConfig 类的包目录是 com/chenwc/util,要返回三级目录才能读取到 src/main/resource 目录下的文件
        InputStream is = ActiveMQConfig.class.getResourceAsStream("../../../activemq.properties");
        try {
            properties.load(is);
            this.defaultUrl = properties.getProperty("defaultUrl");
            this.userName = properties.getProperty("userName");
            this.password = properties.getProperty("password");
            this.queueName = properties.getProperty("queueName");
            this.clientIDPrefix = properties.getProperty("clientIDPrefix");
            this.topicName = properties.getProperty("topicName");
            this.topicClientId = properties.getProperty("topicClientId");
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            try {
                if (null != is) {
                    is.close();
                }
            } catch (Exception e2) {
                // TODO: handle exception
                e2.printStackTrace();
            }
        }
    }

    public String getTopicClientId() {
        return topicClientId;
    }

    public void setTopicClientId(String topicClientId) {
        this.topicClientId = topicClientId;
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    public String getClientIDPrefix() {
        return clientIDPrefix;
    }

    public void setClientIDPrefix(String clientIDPrefix) {
        this.clientIDPrefix = clientIDPrefix;
    }

    public String getDefaultUrl() {
        return defaultUrl;
    }

    public void setDefaultUrl(String defaultUrl) {
        this.defaultUrl = defaultUrl;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getQueueName() {
        return queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
}

三、消息生产者

package com.chenwc.topic.producer;

import javax.jms.*;

import com.chenwc.util.ActiveMQConfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 主题消息生产者
 *
 * @author chenwc
 */
public class TopicMsgProducer {

    private static final Logger log = LoggerFactory.getLogger(TopicMsgProducer.class);

    public static void main(String[] args) {

        ActiveMQConfig activeMQConfig = new ActiveMQConfig();
        // 创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        Connection conn = null;
        MessageProducer producer = null;
        try {
            factory.setBrokerURL(activeMQConfig.getDefaultUrl());
            factory.setUserName(activeMQConfig.getUserName());
            factory.setPassword(activeMQConfig.getPassword());
            factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix());
            factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix());
            // 创建连接
            conn = factory.createConnection();
            // 开启连接
            conn.start();
            // 创建会话。第一个参数:是否开启分布式事务;第二个参数:消息是否自动确认(第一个参数为false时,Session.AUTO_ACKNOWLEDGE自动应答才生效)
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建主题
            Topic topic = session.createTopic(activeMQConfig.getTopicName());
            // 创建生产者
            producer = session.createProducer(topic);
            // 使用持久模式,默认开启
            // DeliveryMode 中的是否持久化,指的是当重启 activeMQ 之后,原来队列或者主题中未被消费的消息是否仍然保留
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 对消息设置存活过期时间,消息超过时间仍然未被消费,则会自动移动到死信队列中。
            // timeToLive 默认值为 0,即表示消息永不过期。
            producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
            //创建 TextMessage 对象
            TextMessage textMessage = session.createTextMessage();

            for (int i = 0; i < 10; i++) {
                String content = "{\"TEST\": \"发布主题消息\", \"Date\":" + getDateFormat().format(new Date()) + "}";
                sendMessage(producer, textMessage, content);
                log.info("向主题: {} 发送消息: {}", activeMQConfig.getTopicName(), content);
                Thread.sleep(2000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != producer)
                    producer.close();
                if (null != conn)
                    conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 向主题发送消息
     *
     * @param producer    MessageProducer对象
     * @param textMessage TextMessage对象
     * @param content     消息内容
     */
    public static void sendMessage(MessageProducer producer, TextMessage textMessage, String content) throws JMSException {
        //设置消息内容
        textMessage.setText(content);
        //发布消息
        producer.send(textMessage);
    }

    /**
     * 线程安全的 SimpleDateFormat
     */
    private static final ThreadLocal<DateFormat> threadLocal = new ThreadLocal<>();

    /**
     * 获取 DateFormat
     *
     * @return DateFormat
     */
    public static DateFormat getDateFormat() {
        DateFormat df = threadLocal.get();
        if (df == null) {
            df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            threadLocal.set(df);
        }
        return df;
    }
}

四、消息消费者

package com.chenwc.topic.consumer;

import com.chenwc.util.ActiveMQConfig;

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 主题消息消费者
 *
 * @author chenwc
 */
public class TopicMsgConsumer {
    private static final Logger log = LoggerFactory.getLogger(TopicMsgConsumer.class);

    /**
     * 是否使用持久化模式
     */
    private static final boolean IS_PERSISTENT = false;

    public static void main(String[] args) {

        ActiveMQConfig activeMQConfig = new ActiveMQConfig();
        // 创建连接工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        Connection conn = null;
        try {
            factory.setBrokerURL(activeMQConfig.getDefaultUrl());
            factory.setUserName(activeMQConfig.getUserName());
            factory.setPassword(activeMQConfig.getPassword());
            factory.setClientIDPrefix(activeMQConfig.getClientIDPrefix());
            factory.setConnectionIDPrefix(activeMQConfig.getClientIDPrefix());

            conn = factory.createConnection();
            if (IS_PERSISTENT){
                // 如果要进行持久化订阅,必须对连接设置clientID,设置ClientID后
                // 使用相同的“clientID”,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
                conn.setClientID(activeMQConfig.getTopicClientId());
            }
            conn.start();
            //获取会话
            Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // 订阅发布模式的 Topic 对象 不是 Destination
            Topic topic = session.createTopic(activeMQConfig.getTopicName());
            log.info("消费者启动完毕,开始接收消息...");
            if (IS_PERSISTENT){
                // 创建持久化订阅者
                TopicSubscriber subsriber = session.createDurableSubscriber(topic, activeMQConfig.getTopicClientId());
                // 异步消息接收
                subsriber.setMessageListener(new MessageListenerCallBack(session));
            }
            else {
                // 普通订阅者
                MessageConsumer consumer = session.createConsumer(topic);
                consumer.setMessageListener(new MessageListenerCallBack(session));
            }

            /*
              特别提醒:因为 consumer.setMessageListener 是线程非阻塞的,所以如果不加处理,则 mian 会直接运行结束
              这里故意使用 System.in.read() 让 mian 线程阻塞停下来,实际开发中如果是 web 应用,或者是其它的不会马上结束
              的 Java SE 应用,则是不需要这样处理的
             */
            //控制台随便输入字符,回车后,程序运行结束
            int a = System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            log.info("消费结束!");
            try {
                if (null != conn)
                    conn.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

消息监听器

package com.chenwc.topic.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * 消息监听器
 *
 * @author chenwc
 */
public class MessageListenerCallBack implements MessageListener {

    private static final Logger log = LoggerFactory.getLogger(MessageListenerCallBack.class);
    private final Session session;

    public MessageListenerCallBack(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // TODO 自动生成的方法存根
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                // 接收到消息
                log.info("接收到消息,消息ID:{}, 消息内容:{}", textMessage.getJMSMessageID(), textMessage.getText());
                // 消息确认
                message.acknowledge();
            } else {
                //没有接收到消息,通知producer重新发送
                this.session.recover();
            }
        } catch (JMSException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
    }
}

五、ActiveMQ 服务器 Topics 内容说明

image.png
开启了两个消费者,生产者发布了10条消息,因为有两个消费者所以被消费了20次。
image.png

image.png
image.png
image.png
image.png


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。