MQTT Java客户端示例

Eave 2018.05.09 17:16

添加Maven依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

代码示例:

package com.qianyan.mqtt;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Mqtt
{
    // tcp://192.168.10.8:1883
    // ssl://192.168.10.8:8883
    private static final String HOST = "ssl://192.168.10.8:8883";
    private static final String SUBSCRIBE_TOPIC = "qianyan/#";
    private static final String TOPIC = "qianyan/shenzhen";
    private static final String CLIENT_ID = "787B8AEAC537";
    private static final String USERNAME = "szqianyan";
    private static final String PASSWORD = "szqianyan";
    private static MqttClient client;

    public static void main(String[] args) throws Exception
    {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
        client = new MqttClient(HOST, CLIENT_ID, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);

        // 加载SSL证书
        SSLContext sslContext = SSLContext.getInstance("SSL");
        TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        KeyStore keyStore = readKeyStore();
        trustManagerFactory.init(keyStore);
        sslContext.init(null, trustManagerFactory.getTrustManagers(), new SecureRandom());
        options.setSocketFactory(sslContext.getSocketFactory());

        // 这是遗言
        options.setWill(TOPIC, "I'm die".getBytes(), 0, false);
        options.setUserName(USERNAME);
        options.setPassword(PASSWORD.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(10);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        // 自动重连
        // options.setAutomaticReconnect(true);

        client.setCallback(new PushCallback());
        client.connect(options);

        MqttTopic topic = client.getTopic(TOPIC);
        // 订阅消息
        // 0: 最多一次送达。也就是发出去就fire掉,没有后面的事情了
        // 1: 至少一次送达。发出去之后必须等待ack,没有ack,就要找时机重发
        // 2: 准确一次送达。消息id将拥有一个简单的生命周期
        int[] qos = {1};
        String[] topics = {SUBSCRIBE_TOPIC};
        client.subscribe(topics, qos);

        // 发布消息
        MqttMessage message = new MqttMessage();
        message.setQos(1);
        message.setRetained(true);
        message.setPayload("szqianyan".getBytes());

        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("是否发送完成: " + token.isComplete());
        System.out.println("Ratained状态: " + message.isRetained());
    }

    private static KeyStore readKeyStore() throws KeyStoreException, NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException
    {
        KeyStore store = KeyStore.getInstance("PKCS12");
        store.load(new FileInputStream(new File("qianyan-keystore.p12")), "szqianyan".toCharArray());
        return store;
    }
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PushCallback implements MqttCallback
{
    @Override
    public void connectionLost(Throwable cause)
    {
        // TODO Auto-generated method stub
        System.out.println("连接断开,可以做重连");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token)
    {
        // TODO Auto-generated method stub
        System.out.println("deliveryComplete: " + token.isComplete());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception
    {
        // TODO Auto-generated method stub
        System.out.println("接收消息主题: " + topic);
        System.out.println("接收消息Qos: " + message.getQos());
        System.out.println("接收消息内容: " + new String(message.getPayload()));
    }
}