EMQ X 主题消息发布与订阅

Java常用方法   2025-01-12 15:18   241   0  

一、依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.5</version>
</dependency>

二、读取 mqtt 配置

userName=wxhntmy
password=123456
clientIDPrefix=wxhntmy
qos=1
topicName=/myTopic
host=tcp://192.168.100.115:1883
package com.chenwc.util;

import java.io.InputStream;
import java.util.Properties;

/**
 * 读取EMQX配置
 *
 * @author chenwc
 */
public class EmqxConfig {

    private String userName;
    private String password;
    private String clientIDPrefix;
    private Integer qos;
    private String topicName;
    private String host;

    @Override
    public String toString() {
        return "MQTT配置,消息服务器为:" + host + ",连接用户名为:" + userName + ",连接密码为:" + password + ",客户端ID为:" + clientIDPrefix + ",订阅的主题为:" + topicName + ",QOS为:" + qos + "";
    }

    public EmqxConfig() {
        Properties properties = new Properties();
        //对于maven项目,ActiveMQConfig 类的包目录是 com/chenwc/util,要返回三级目录才能读取到 src/main/resource 目录下的文件
        InputStream is = EmqxConfig.class.getResourceAsStream("../../../emqx.properties");
        try {
            properties.load(is);
            this.userName = properties.getProperty("userName");
            this.password = properties.getProperty("password");
            this.clientIDPrefix = properties.getProperty("clientIDPrefix");
            this.qos = Integer.valueOf(properties.getProperty("qos"));
            this.topicName = properties.getProperty("topicName");
            this.host = properties.getProperty("host");
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            try {
                if (null != is) {
                    is.close();
                }
            } catch (Exception e2) {
                // TODO: handle exception
                e2.printStackTrace();
            }
        }
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getClientIDPrefix() {
        return clientIDPrefix;
    }

    public void setClientIDPrefix(String clientIDPrefix) {
        this.clientIDPrefix = clientIDPrefix;
    }

    public Integer getQos() {
        return qos;
    }

    public void setQos(Integer qos) {
        this.qos = qos;
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }
}

三、生产者

package com.chenwc.topic.publish;

import com.chenwc.util.EmqxConfig;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
 * 主题消息生产者
 * @author chenwc
 */
public class PublishSample {
    private static final Logger log = LoggerFactory.getLogger(PublishSample.class);

    /**
     * mqtt 配置
     */
    private static final EmqxConfig emqxConfig = new EmqxConfig();

    /**
     * 要发布消息的主题
     */
    private static final String PUBLISH_TOPIC = emqxConfig.getTopicName() + "/" + UUID.randomUUID().toString().substring(0, 8).toUpperCase();

    /**
     * 获取 mqtt 客户端
     *
     * @return MqttClient
     * @exception MqttException 异常信息
     */
    protected static MqttClient getMqttClient() throws MqttException {
        // 创建客户端id
        String id = String.valueOf(System.currentTimeMillis());
        String clientId = emqxConfig.getClientIDPrefix() + "_" + id;
        log.info("客户端ID为:{}", clientId);
        MqttClient client = new MqttClient(emqxConfig.getHost(), clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        options.setUserName(emqxConfig.getUserName());
        options.setPassword(emqxConfig.getPassword().toCharArray());
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        client.connect(options);
        return client;
    }

    /**
     * 发布主题消息
     * @param client MqttClient
     * @param topic 主题名
     * @param content 消息内容
     * @throws MqttException 异常信息
     */
    public static void sendMessage(MqttClient client, String topic, String content) throws MqttException {
        MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));
        message.setQos(emqxConfig.getQos());
        log.info("在主题:{} 发布消息:{}", PUBLISH_TOPIC, content);
        client.publish(PUBLISH_TOPIC, message);
    }

    public static void main(String[] args) {
        MqttClient client = null;
        try {
            client = getMqttClient();
            //设置发布消息的主题
            emqxConfig.setTopicName(PUBLISH_TOPIC);
            log.info(emqxConfig.toString());

            //发布消息
            for (int i = 0; i < 1000; i++){
                String content = "测试" + System.currentTimeMillis();
                sendMessage(client, PUBLISH_TOPIC, content);
                Thread.sleep(10);
            }
            //防止程序退出,需要在控制台输入任意字符
            int a = System.in.read();
        } catch (Exception e) {
            log.error("异常了,异常信息为:" + e.getMessage());
            e.printStackTrace();
        } finally {
            try {
                if (null != client) {
                    client.disconnect();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

四、消费者

package com.chenwc.topic.subscribe;

import com.chenwc.util.EmqxConfig;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 主题消息消费者
 * @author chenwc
 */
public class SubscribeSample {

    private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class);

    /**
     * mqtt 配置
     */
    private static final EmqxConfig emqxConfig = new EmqxConfig();
    /**
     * 待订阅的主题名
     */
    private static final String SUBSCRIBE_TOPIC = emqxConfig.getTopicName() + "/#";

    /**
     * 获取 mqtt 客户端
     *
     * @return MqttClient
     * @throws MqttException 异常信息
     */
    protected static MqttClient getMqttClient() throws MqttException {
        // 创建客户端id
        String id = String.valueOf(System.currentTimeMillis());
        String clientId = emqxConfig.getClientIDPrefix() + "_" + id;
        log.info("客户端ID为:{}", clientId);
        MqttClient client = new MqttClient(emqxConfig.getHost(), clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
        options.setUserName(emqxConfig.getUserName());
        options.setPassword(emqxConfig.getPassword().toCharArray());
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置回调方法
        client.setCallback(new MyMqttCallback());
        client.connect(options);
        log.info("成功连接 EMQ X 服务器..........");
        return client;
    }

    public static void main(String[] args) {
        MqttClient client = null;
        try {
            //订阅主题
            emqxConfig.setTopicName(SUBSCRIBE_TOPIC);
            log.info(emqxConfig.toString());
            log.info("设置客户端连接参数........");
            client = getMqttClient();
            log.info("开始订阅主题: {} 消息..........", SUBSCRIBE_TOPIC);
            // 订阅消息
            client.subscribe(SUBSCRIBE_TOPIC, emqxConfig.getQos());
            int a = System.in.read();
        } catch (Exception e) {
            log.error("异常了,异常信息为:" + e.getMessage());
            e.printStackTrace();
        } finally {
            try {
                if (null != client) {
                    client.disconnect();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
package com.chenwc.topic.subscribe;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消息回调
 * @author chenwc 
 */
public class MyMqttCallback implements MqttCallback {
    private static final Logger log = LoggerFactory.getLogger(MyMqttCallback.class);
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("连接丢失!");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {
        log.info("订阅到主题 Topic: " + s + " 的消息, Qos 为: " + mqttMessage.getQos() + ", 消息内容为: " + mqttMessage.toString());

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息交付完成");
    }
}

五、测试

1、订阅

image.png
image.png
image.png
image.png
image.png
image.png

2、发布
image.png

image.png
image.png
image.png

3、开启订阅和发布

image.png
image.png


博客评论
还没有人评论,赶紧抢个沙发~
发表评论
说明:请文明发言,共建和谐网络,您的个人信息不会被公开显示。