提交 e7e0866d authored 作者: linzhenjie's avatar linzhenjie

mqtt数据转化

上级 4a252c7e
......@@ -18,6 +18,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import tech.glinfo.enbao.common.utils.ByteUtils;
import java.util.List;
......@@ -50,7 +51,7 @@ public class MqttConfig {
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setKeepAliveInterval(60);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setMaxInflight(100000000);
......@@ -103,7 +104,12 @@ public class MqttConfig {
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),defaultTopic);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
// 设置转换器,接收bytes
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
// adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
......@@ -119,10 +125,12 @@ public class MqttConfig {
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
byte[] payLoad = (byte[])message.getPayload();
// 这里可以处理接收的数据
log.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + ByteUtils.bytes2HexString(payLoad) +
"\nmsg:" + msg +
"\n-----------------------------END----------------------------");
}
};
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论