MQTT Java客户端示例
Eave
2018.05.09 17:16
添加Maven依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
代码示例:
package com.qianyan.mqtt;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class Mqtt
{
// tcp://192.168.10.8:1883
// ssl://192.168.10.8:8883
private static final String HOST = "ssl://192.168.10.8:8883";
private static final String SUBSCRIBE_TOPIC = "qianyan/#";
private static final String TOPIC = "qianyan/shenzhen";
private static final String CLIENT_ID = "787B8AEAC537";
private static final String USERNAME = "szqianyan";
private static final String PASSWORD = "szqianyan";
private static MqttClient client;
public static void main(String[] args) throws Exception
{
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(HOST, CLIENT_ID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
// 加载SSL证书
SSLContext sslContext = SSLContext.getInstance("SSL");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore keyStore = readKeyStore();
trustManagerFactory.init(keyStore);
sslContext.init(null, trustManagerFactory.getTrustManagers(), new SecureRandom());
options.setSocketFactory(sslContext.getSocketFactory());
// 这是遗言
options.setWill(TOPIC, "I'm die".getBytes(), 0, false);
options.setUserName(USERNAME);
options.setPassword(PASSWORD.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
// 自动重连
// options.setAutomaticReconnect(true);
client.setCallback(new PushCallback());
client.connect(options);
MqttTopic topic = client.getTopic(TOPIC);
// 订阅消息
// 0: 最多一次送达。也就是发出去就fire掉,没有后面的事情了
// 1: 至少一次送达。发出去之后必须等待ack,没有ack,就要找时机重发
// 2: 准确一次送达。消息id将拥有一个简单的生命周期
int[] qos = {1};
String[] topics = {SUBSCRIBE_TOPIC};
client.subscribe(topics, qos);
// 发布消息
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
message.setPayload("szqianyan".getBytes());
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println("是否发送完成: " + token.isComplete());
System.out.println("Ratained状态: " + message.isRetained());
}
private static KeyStore readKeyStore() throws KeyStoreException, NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException
{
KeyStore store = KeyStore.getInstance("PKCS12");
store.load(new FileInputStream(new File("qianyan-keystore.p12")), "szqianyan".toCharArray());
return store;
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallback implements MqttCallback
{
@Override
public void connectionLost(Throwable cause)
{
// TODO Auto-generated method stub
System.out.println("连接断开,可以做重连");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token)
{
// TODO Auto-generated method stub
System.out.println("deliveryComplete: " + token.isComplete());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
// TODO Auto-generated method stub
System.out.println("接收消息主题: " + topic);
System.out.println("接收消息Qos: " + message.getQos());
System.out.println("接收消息内容: " + new String(message.getPayload()));
}
}