提交 4e6492f0 authored 作者: linzhenjie's avatar linzhenjie

增加mqtt转socketio

上级 3b6c5fb6
......@@ -4,8 +4,10 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import tech.glinfo.enbao.common.contants.Constants;
import tech.glinfo.enbao.common.utils.Constant;
import tech.glinfo.enbao.common.utils.MapUtils;
import tech.glinfo.enbao.common.utils.PageUtils;
......@@ -34,6 +36,9 @@ public class OtherShDeviceServiceImpl extends ServiceImpl<OtherShDeviceDao, ShDe
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Autowired
private RedisTemplate redisTemplate;
@Override
public PageUtils queryPage(Map<String, Object> params) {
int current = Integer.parseInt((String)params.get(Constant.PAGE));
......@@ -73,11 +78,12 @@ public class OtherShDeviceServiceImpl extends ServiceImpl<OtherShDeviceDao, ShDe
map1.put("status", "3");
mapList.add(map1);
//发送到单个设备
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", String.valueOf(id)).put("status", "3")));
// jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", String.valueOf(id)).put("status", "3")));
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, JSON.toJSONString(new MapUtils().put("deviceId", String.valueOf(id)).put("status", "3")));
}
//发送到主页所有设备
if (mapList.size() > 0) {
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
}
//智能锁密码过期
this.baseMapper.updateDevicePassword();
......
......@@ -135,11 +135,16 @@ public class MqttConfig {
String hexString = ByteUtils.bytes2HexString(payLoad);
mqttService.msgHandle(topic, hexString);
// 这里可以处理接收的数据
log.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + hexString +
"\n-----------------------------END----------------------------");
// 这里可以处理接收的数据
try {
mqttService.msgHandle(topic, hexString);
} catch (Exception e) {
log.error("处理数据出错了:{}", e.getMessage());
}
}
};
}
......
package tech.glinfo.enbao.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import tech.glinfo.enbao.common.contants.Constants;
import tech.glinfo.enbao.modules.redis.RedisReceiver;
/**
* Redis配置
*
* @author Mark sunlightcs@gmail.com
*/
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory factory;
@Bean
RedisMessageListenerContainer container (RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 可以添加多个 messageListener,配置不同的交换机
container.addMessageListener(listenerAdapter,new PatternTopic(Constants.REIDS_TOPIC));
// container.addMessageListener(listenerAdapter,new ChannelTopic("__keyevent@0__:expired"));
return container;
}
//消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}
@Bean
public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
return redisTemplate.opsForValue();
}
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}
......@@ -9,16 +9,15 @@
package tech.glinfo.enbao.modules.appuser.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.*;
import tech.glinfo.enbao.common.annotation.Login;
import tech.glinfo.enbao.common.contants.Constants;
import tech.glinfo.enbao.common.utils.R;
import tech.glinfo.enbao.common.utils.StringUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
......@@ -42,6 +41,9 @@ public class AppTestController {
// return R.ok().put("user", user);
// }
@Autowired
private RedisTemplate redisTemplate;
@Login
@GetMapping("userId")
@ApiOperation("获取用户ID")
......@@ -68,4 +70,15 @@ public class AppTestController {
return R.error("缺少参数");
}
@PostMapping("redisTest")
public R redisTest(@RequestBody Map<String, Object> params){
String msg = (String) params.get("msg");
if(!StringUtils.isBlank(msg)) {
// mqProducer.sendPush(title, msg, cid);
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, msg);
return R.ok();
}
return R.error();
}
}
package tech.glinfo.enbao.modules.mqtt.controller;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import tech.glinfo.enbao.common.annotation.Login;
import tech.glinfo.enbao.common.contants.Constants;
import tech.glinfo.enbao.common.utils.EncryptUtil;
import tech.glinfo.enbao.common.utils.R;
import tech.glinfo.enbao.common.utils.StringUtils;
import tech.glinfo.enbao.config.MqttGateway;
import tech.glinfo.enbao.modules.mqtt.entity.AuthEntity;
import javax.annotation.Resource;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("mqtt")
public class MqttController {
@Resource
private MqttGateway mqttGateway;
@PostMapping("auth")
public ResponseEntity<Object> auth(AuthEntity auth){
log.info("请求参数:{}", auth);
......@@ -24,4 +37,16 @@ public class MqttController {
return new ResponseEntity<Object>("auth deny!", HttpStatus.UNAUTHORIZED);
}
}
@Login
@PostMapping("send")
public R sendToMqtt(@RequestBody Map<String, String> params) {
String data = params.get("data");
String topic = params.get("topic");
if(!StringUtils.isBlank(data, topic)) {
log.info("\nsend message >>>> topic : {}, data : {}", topic, data);
mqttGateway.sendToMqtt(data, topic);
}
return R.ok();
}
}
......@@ -4,20 +4,20 @@ import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import tech.glinfo.enbao.common.contants.Constants;
import tech.glinfo.enbao.common.utils.ByteUtils;
import tech.glinfo.enbao.common.utils.EncryptUtil;
import tech.glinfo.enbao.common.utils.MapUtils;
import tech.glinfo.enbao.common.utils.RedisUtils;
import tech.glinfo.enbao.config.MqttGateway;
import tech.glinfo.enbao.modules.sh.entity.ShDeviceEntity;
import tech.glinfo.enbao.modules.sh.entity.ShInstructionParsingEntity;
import tech.glinfo.enbao.modules.sh.entity.ShProductEntity;
import tech.glinfo.enbao.modules.sh.service.OtherShDeviceService;
import tech.glinfo.enbao.modules.sh.service.ShProductService;
import javax.annotation.Resource;
import javax.jms.Topic;
import java.util.*;
......@@ -34,14 +34,16 @@ public class MqttService {
private JmsMessagingTemplate jmsTemplate;
@Autowired
private Topic socketioTopic;
@Resource
private MqttGateway mqttGateway;
@Autowired
private RedisTemplate redisTemplate;
public void msgHandle(String topic, String msg) {
String[] topics = topic.split("/");
ShDeviceEntity device = otherShDeviceService.deviceInfo(new MapUtils().put("numbering", topics[1]));
if(topic.endsWith("data/up")) {
if(msg.startsWith("0A01")) {//实时数据上报
Map<String, Object> ws = new HashMap<>();//推送给socketio
if(topic.endsWith("data/up")) { //数据上报
String type = msg.substring(2,4);
if("01".startsWith(type)) {//实时数据上报
if(device != null) {
//查询解析指令集
List<ShInstructionParsingEntity> cmds = otherShDeviceService.getDeviceInstructionList(device.getProductId(), "01");
......@@ -62,10 +64,11 @@ public class MqttService {
map.put(cmd.getName(), hex1);
length += cmd.getLength();
}
ws.putAll(map);
otherShDeviceService.updateDeviceData(JSON.toJSONString(map), device.getId());
}
}
} else {//设备信息上报
} else if("02".startsWith(type)) {//设备信息上报
String mac = ByteUtils.hexToString(msg.substring(8, 32));
log.info("mac:{}", mac);
if (redisUtils.hasKey("device:save:GL" + mac)) {
......@@ -88,22 +91,35 @@ public class MqttService {
shDevice.setSpareOne(EncryptUtil.encrypt(topics[1]));//保存设备连接密码
otherShDeviceService.save(shDevice);
redisUtils.delete("device:save:GL" + mac);
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "GL"+mac)));
// jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "GL"+mac)));
//采用redis的订阅发布
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, JSON.toJSONString(new MapUtils().put("deviceId", "GL"+mac)));
}
return;
}
if (device != null && (device.getOnlineStatus() == 1 || device.getOnlineStatus() == 3)) {
}
if(device != null) {//设备存在
ws.put("deviceId", String.valueOf(device.getId()));
ws.put("_msg", msg);
ws.put("_topic", topic);
//推送到socketio
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, JSON.toJSONString(ws));
if(device.getOnlineStatus() != 2) {//设备上线推送
List<Map<String, String>> mapList = new ArrayList<>();
Map<String, String> map1 = new HashMap<>();
map1.put("deviceId", String.valueOf(device.getId()));
map1.put("status", "2");
mapList.add(map1);
jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
//采用redis的订阅发布
redisTemplate.convertAndSend(Constants.REIDS_TOPIC, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
// jmsTemplate.convertAndSend(socketioTopic, JSON.toJSONString(new MapUtils().put("deviceId", "onlineStatus").put("list", mapList)));
}
//更新设备状态为在线
new Thread(() -> {
ShDeviceEntity deviceEntity = new ShDeviceEntity();
deviceEntity.setId(device.getId());
deviceEntity.setOnlineStatus(2);
deviceEntity.setOnlineStatus(topic.endsWith("/will/up")?3:2);//遗嘱消息就是下线,其他上线
deviceEntity.setOnlineTime(new Date());
otherShDeviceService.updateById(deviceEntity);
}).start();
......
package tech.glinfo.enbao.modules.redis;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.glinfo.enbao.modules.socketio.SocketIOService;
import java.util.Map;
@Slf4j
@Component
public class RedisReceiver {
@Autowired
private SocketIOService socketIOService;
public void receiveMessage(String message, String channel) {
log.info("redis subscribe receive message:channel:{},msg:{}", channel, message);
try {
Map<String, String> datas = (Map<String, String>) JSON.parse(message);
socketIOService.pushMessageToDevice(datas.get("deviceId"), message);
// logger.info("send message to websocket success : {}", message);
} catch (Exception e) {
log.error("parse message to websocket error : {}", e.getMessage());
}
}
}
\ No newline at end of file
import io.swagger.models.auth.In;
import tech.glinfo.enbao.common.utils.ByteUtils;
import java.util.Arrays;
import java.util.Collections;
......@@ -22,6 +23,13 @@ public class MqttTest {
String[] strings = topic.split("/");
System.out.println(strings[1]);
String aa = "2B021C643430393135313461353434620E30314345423232383830303030310C54502D4C494E4B5F39333244";
String mac = ByteUtils.hexToString(aa.substring(8, 32));
Integer length = Integer.valueOf(aa.substring(32, 34), 16);
String wifi = ByteUtils.hexToString(aa.substring(34, 34+length*2));
System.out.println(mac);
System.out.println(length);
System.out.println(wifi);
}
public static String encrypt(String sn) {
......
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.binary.Base64;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import tech.glinfo.enbao.common.utils.StringUtils;
import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
import java.security.*;
import java.security.spec.AlgorithmParameterSpec;
public class StringTest {
public static void main(String[] args) {
// String sessionKey = "JocQqerjzSbHxUAAH5sgeA==";
// String encryptedData = "LNHylEIMCHE+6YpbaxyC69fmb7zWCcM8PSUE+V25JQn5QEdcyyji6/4EcnkXnNiwYeOgGacBZwQrYApGvhCUgh5quSMCXBpUavPCWdFosU89j1jWAWh8UC7wHUBqO83hipEDuRfKgcvHoQPj7dhmrHZ+9AguPvrs5NlGIR4eFuHOR81Y1o0WgK6PV2VV0OmYnAqqETls3SH1pg+PNfqml2bGPKHM5eM8w/6qPoVrmLIKPo6uAFTfAUX+Ca1kBzpZabJqqEloHaa4I6rs5gVxG+tt2HtwTo7rPpjLNIzjfnGiVE7KzLv/EJ+aVx5WTQLQvqEWqoVYnUFMqvu8qb8zOmOsnyHdKzaSAT/wWqI3cpww8ZA3JWBob+nQztBMU5AUwDxsNivD/1ePx2f6MZYnWZKethUdrZla6wiaBRzm1aEuWwHXHdsfhWTu3pHTK5vpcQYXIyfhG2YycmrALxHoquFKajacwxD9oGk0+bBDZASNmqte9GTB4htkE4zjjcv4B/mC1mLLQo7mUTDaayRAqg==";
// String iv = "2FMgfMUw/GeHKp1VjYdwZQ==";
// JSONObject jsonObject = decode(sessionKey, encryptedData, iv);
// System.out.println(jsonObject);
String ssid = "TP-LINK_932D中";
String _pwd = "gl123456#";
//03AB78550C54502D4C494E4B5F3933324409676C3132333435362306FB
//03AB78550C54502D4C494E4B5F3933324409676C3132333435362306FB
//03AB78550C54502D4C494E4B5F3933324409676C3132333435362306FB
//03AB78550F54502D4C494E4B5F39333244
// E4B8AD
// 09676C313233343536230947
//03AB78550E54502D4C494E4B5F39333244
// 4E2D
// 09676C313233343536230778
//03AB78550F54502D4C494E4B5F39333244E4B8AD09676C313233343536230947
//03AB78550F54502D4C494E4B5F39333244E4B8AD09676C313233343536230947
//03AB78550F54502D4C494E4B5F39333244E4B8AD09676C313233343536230947
StringBuffer info = new StringBuffer("03AB7855");
// String wifi = strToHex(ssid);
// String pwd = strToHex(_pwd);
String wifi = bytes2HexString(getBytesByString(ssid));
System.out.println(wifi);
System.out.println(bytes2HexString(getBytesByString("TP-LINK_932D")));
String pwd = bytes2HexString(getBytesByString(_pwd));
info.append(int2Hex(wifi.length()/2));
info.append(wifi);
info.append(int2Hex(pwd.length()/2));
info.append(pwd);
info.append(makeCheckNum(info.toString()));
// Log.e("INFO",info.toString());
//THE INTERNET OF EVERYTHING BEGINS HERE
System.out.println(info);
byte[] bytes = hexStrToBinaryStr(info.toString());
for (byte b : bytes) {
System.out.print(Integer.valueOf(b));
}
}
public static byte[] hexStrToBinaryStr(String hexString) {
hexString = hexString.replaceAll(" ", "");
int len = hexString.length();
int index = 0;
byte[] bytes = new byte[len / 2];
while (index < len) {
String sub = hexString.substring(index, index + 2);
bytes[index / 2] = (byte) Integer.parseInt(sub, 16);
index += 2;
}
return bytes;
}
public static String strToHex(String s) {
StringBuilder str = new StringBuilder();
for (int i = 0; i < s.length(); i++) {
int ch = (int) s.charAt(i);
String s4 = Integer.toHexString(ch);
str.append(s4);
}
return str.toString().toUpperCase();
}
private static String intToHex(int n) {
//StringBuffer s = new StringBuffer();
StringBuilder sb = new StringBuilder(8);
String a;
char[] b = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
while (n != 0) {
sb = sb.append(b[n % 16]);
n = n / 16;
}
a = sb.reverse().toString();
return a;
}
public static String int2Hex(int n) {
if(n == 0) {
return "00";
}
String s = intToHex(n);
int i = s.length() % 2;
StringBuilder sb = new StringBuilder();
while (i > 0) {
sb.append("0");
i--;
}
return sb.append(s).toString();
}
public static String makeCheckNum(String data) {
if (data == null || data.equals("")) {
return "";
}
int total = 0;
int len = data.length();
int num = 0;
while (num < len) {
String s = data.substring(num, num + 2);
total += Integer.parseInt(s, 16);
num = num + 2;
}
return int2Hex(total);
}
public static String bytes2HexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
public static byte[] getBytesByString(String string) {
try {
return string.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("the charset is invalid");
}
}
private static JSONObject decode(String sessionKey, String encryptedData, String iv) {
byte[] encrypData = Base64.decodeBase64(encryptedData);
byte[] ivData = Base64.decodeBase64(iv);
byte[] sessionKeyB = Base64.decodeBase64(sessionKey);
Security.addProvider(new BouncyCastleProvider());
AlgorithmParameterSpec ivSpec = new IvParameterSpec(ivData);
byte[] doFinal = new byte[0];
try {
Cipher cipher = Cipher.getInstance("AES/CBC/PKCS7Padding","BC");
SecretKeySpec keySpec = new SecretKeySpec(sessionKeyB, "AES");
cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
doFinal = cipher.doFinal(encrypData);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (NoSuchProviderException e) {
e.printStackTrace();
} catch (NoSuchPaddingException e) {
e.printStackTrace();
} catch (InvalidKeyException e) {
e.printStackTrace();
} catch (InvalidAlgorithmParameterException e) {
e.printStackTrace();
} catch (IllegalBlockSizeException e) {
e.printStackTrace();
} catch (BadPaddingException e) {
e.printStackTrace();
}
if(doFinal != null && doFinal.length > 0) {
String result = new String(doFinal);
System.out.println(result);
return JSONObject.parseObject(result);
}
return new JSONObject();
}
}
......@@ -9,4 +9,7 @@ public class Constants {
//设备使用时间
public static final String FILTER_UES_TIME = "filter:use:time";
//redis订阅发布主题==主要用于socketio信息传递
public static final String REIDS_TOPIC = "redis.msg";
}
......@@ -408,5 +408,10 @@ public class ByteUtils {
System.out.println(Integer.toHexString(-256));
System.out.println(signInt2HexString("20", 2));
System.out.println(hexToString("343431373933336638663330"));
System.out.println(hexToString("54502D4C494E4B5F393332"));
System.out.println(hexToString("3031384542323132353132303031"));
System.out.println(hexToString("3031414542323132323030323030"));
//2B0218643434313739333366386633300E30313845423231323531323030310C54502D4C494E4B5F39333244
}
}
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论