提交 eda1077f authored 作者: linzhenjie's avatar linzhenjie

mqtt订阅修改

上级 e7e0866d
......@@ -2,6 +2,7 @@ package tech.glinfo.enbao.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -19,6 +20,7 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import tech.glinfo.enbao.common.utils.ByteUtils;
import tech.glinfo.enbao.modules.mqtt.service.MqttService;
import java.util.List;
......@@ -45,6 +47,9 @@ public class MqttConfig {
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout ; //连接超时
@Autowired
private MqttService mqttService;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
......@@ -68,7 +73,7 @@ public class MqttConfig {
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultTopic("defaultTopic");
return messageHandler;
}
@Bean
......@@ -100,8 +105,9 @@ public class MqttConfig {
// return adapter;
log.info("clientId:{}", clientId);
String[] topics = defaultTopic.split(",");
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),defaultTopic);
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),topics);
adapter.setCompletionTimeout(completionTimeout);
......@@ -124,13 +130,15 @@ public class MqttConfig {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
// String msg = message.getPayload().toString();
byte[] payLoad = (byte[])message.getPayload();
String hexString = ByteUtils.bytes2HexString(payLoad);
mqttService.msgHandle(topic, hexString);
// 这里可以处理接收的数据
log.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + ByteUtils.bytes2HexString(payLoad) +
"\nmsg:" + msg +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + hexString +
"\n-----------------------------END----------------------------");
}
};
......
......@@ -121,6 +121,7 @@ public class MqConsumer {
shDevice.setName(product.getName());
shDevice.setPic(product.getPic());
shDevice.setProductId(product.getId());
shDevice.setOnlineStatus(2);//在线
otherShDeviceService.save(shDevice);
redisUtils.delete("device:save:" + datas.get("mac"));
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", datas.get("mac"))));
......@@ -156,6 +157,7 @@ public class MqConsumer {
ParseClassEntity entity = redisUtils.get(Constants.PARSE_CLASS + datas.get("deviceType"), ParseClassEntity.class);
if (entity == null) {
entity = parseClassService.getOne(new QueryWrapper<ParseClassEntity>().eq("type", datas.get("deviceType")));
redisUtils.set(Constants.PARSE_CLASS + datas.get("deviceType"), entity);
}
if (entity == null) {
logger.info("找不到配置的解析类:deviceType={}", datas.get("deviceType"));
......
package tech.glinfo.enbao.modules.mqtt.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import tech.glinfo.enbao.common.utils.ByteUtils;
import tech.glinfo.enbao.common.utils.EncryptUtil;
import tech.glinfo.enbao.common.utils.MapUtils;
import tech.glinfo.enbao.common.utils.RedisUtils;
import tech.glinfo.enbao.config.MqttGateway;
import tech.glinfo.enbao.modules.sh.entity.ShDeviceEntity;
import tech.glinfo.enbao.modules.sh.entity.ShInstructionParsingEntity;
import tech.glinfo.enbao.modules.sh.entity.ShProductEntity;
import tech.glinfo.enbao.modules.sh.service.OtherShDeviceService;
import tech.glinfo.enbao.modules.sh.service.ShProductService;
import javax.annotation.Resource;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class MqttService {
@Autowired
private RedisUtils redisUtils;
@Autowired
private OtherShDeviceService otherShDeviceService;
@Autowired
private ShProductService shProductService;
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Autowired
private Topic socketioTopic;
@Resource
private MqttGateway mqttGateway;
public void msgHandle(String topic, String msg) {
String[] topics = topic.split("/");
ShDeviceEntity device = otherShDeviceService.deviceInfo(new MapUtils().put("numbering", topics[1]));
if(topic.endsWith("data/up")) {
if(msg.startsWith("0A01")) {//实时数据上报
if(device != null) {
//查询解析指令集
List<ShInstructionParsingEntity> cmds = otherShDeviceService.getDeviceInstructionList(device.getProductId(), "01");
if (cmds != null && cmds.size() > 0) {
int length = 0;
//解析设备报文内容
Map<String, Object> map = new HashMap<>();
for (ShInstructionParsingEntity cmd : cmds) {
String hex = msg.substring(length, length + cmd.getLength());
String hex1;
if (cmd.getType() == 1 || cmd.getType() == 2) {
hex1 = String.valueOf(Integer.valueOf(hex, 16));
}else {
hex1 = ByteUtils.signHex2IntString(hex);
}
map.put(cmd.getName(), hex1);
length += cmd.getLength();
}
otherShDeviceService.updateDeviceData(JSON.toJSONString(map), device.getId());
}
}
} else {//设备信息上报
String mac = ByteUtils.hexToString(msg.substring(8, 32));
log.info("mac:{}", mac);
if (redisUtils.hasKey("device:save:GL" + mac)) {
log.debug("添加设备:{}", mac);
String[] userId = redisUtils.get("device:save:GL" + mac).split("-");
String deviceFlag = "0"+msg.substring(4,6);
// int dnleng = Integer.valueOf(msg.substring(32, 34), 16);
// String deviceNo = ByteUtils.hexToString(msg.substring(34, 34 + dnleng));
log.info("deviceNo:{}", topics[1]);
ShProductEntity product = shProductService.getOne(new QueryWrapper<ShProductEntity>().eq("device_flag", deviceFlag).last("LIMIT 1"));
ShDeviceEntity shDevice = new ShDeviceEntity();
shDevice.setUserId(Integer.valueOf(userId[0]));
shDevice.setOnlineStatus(2);//在线
shDevice.setMac(mac);
shDevice.setFamilyId(Integer.valueOf(userId[1]));
shDevice.setNumbering(topics[1]);
shDevice.setName(product.getName());
shDevice.setPic(product.getPic());
shDevice.setProductId(product.getId());
shDevice.setSpareOne(EncryptUtil.encrypt(topics[1]));//保存设备连接密码
otherShDeviceService.save(shDevice);
redisUtils.delete("device:save:GL" + mac);
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "GL"+mac)));
}
}
if (device != null && (device.getOnlineStatus() == 1 || device.getOnlineStatus() == 3)) {
List<Map<String, String>> mapList = new ArrayList<>();
Map<String, String> map1 = new HashMap<>();
map1.put("deviceId", String.valueOf(device.getId()));
map1.put("status", "2");
mapList.add(map1);
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
}
}
}
}
......@@ -66,7 +66,7 @@ spring:
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
topic: $share/api/sys/+/+/up,$share/api/sys/+/+/response
completionTimeout: 3000
##多数据源的配置
......
......@@ -65,7 +65,7 @@ spring:
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
topic: $share/api/sys/+/+/up,$share/api/sys/+/+/response
completionTimeout: 3000
##多数据源的配置
#dynamic:
......
......@@ -68,7 +68,7 @@ spring:
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
topic: $share/api/sys/+/+/up,$share/api/sys/+/+/response
completionTimeout: 3000
##多数据源的配置
......
......@@ -67,7 +67,7 @@ spring:
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
topic: $share/api/sys/+/+/up,$share/api/sys/+/+/response
completionTimeout: 3000
##多数据源的配置
......
......@@ -18,6 +18,10 @@ public class MqttTest {
encrypt(n);
}
String topic = "sys/002EB212230201/data/up";
String[] strings = topic.split("/");
System.out.println(strings[1]);
}
public static String encrypt(String sn) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论