提交 4a252c7e authored 作者: linzhenjie's avatar linzhenjie

加mqtt订阅

上级 6dcf1796
......@@ -97,6 +97,11 @@
</exclusion>
</exclusions>
</dependency>
<!--mqtt-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
......
package tech.glinfo.enbao.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.List;
@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout ; //连接超时
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setMaxInflight(100000000);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
// List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
// String[] topics = new String[topicList.size()];
// topicList.toArray(topics);
// MqttPahoMessageDrivenChannelAdapter adapter =
// new MqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(),defaultTopic);
// adapter.setCompletionTimeout(completionTimeout);
// adapter.setConverter(new DefaultPahoMessageConverter());
// adapter.setQos(1);
// adapter.setOutputChannel(mqttInputChannel());
// return adapter;
log.info("clientId:{}", clientId);
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),defaultTopic);
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
// 这里可以处理接收的数据
log.info("\n----------------------------START---------------------------\n" +
"接收到订阅消息:\ntopic:" + topic + "\nmessage:" + msg +
"\n-----------------------------END----------------------------");
}
};
}
}
package tech.glinfo.enbao.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
\ No newline at end of file
......@@ -58,6 +58,17 @@ spring:
pool:
enabled: false #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
#mqtt配置
mqtt:
url: tcp://mqtt.gonglian.info:1883
username: 002WX212230209
password: nfefhjljt
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
completionTimeout: 3000
##多数据源的配置
#dynamic:
# datasource:
......
......@@ -57,7 +57,16 @@ spring:
trust-all: true #信任所有的包
pool:
enabled: false #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
#mqtt配置
mqtt:
url: tcp://mqtt.gonglian.info:1883
username: 002WX212230209
password: nfefhjljt
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
completionTimeout: 3000
##多数据源的配置
#dynamic:
# datasource:
......
......@@ -60,6 +60,17 @@ spring:
max-connections: 10
idle-timeout: 30000
#mqtt配置
mqtt:
url: tcp://mqtt.gonglian.info:1883
username: 002WX212230209
password: nfefhjljt
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
completionTimeout: 3000
##多数据源的配置
#dynamic:
# datasource:
......
......@@ -59,6 +59,16 @@ spring:
enabled: true #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
max-connections: 5
idle-timeout: 30000
#mqtt配置
mqtt:
url: tcp://mqtt.gonglian.info:1883
username: 002WX212230209
password: nfefhjljt
client:
id: 'iotclientid${random.int}'
default:
topic: $share/api/sys/#
completionTimeout: 3000
##多数据源的配置
#dynamic:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论