首页 > 开发 > Java > 正文

springboot集成mqtt的实践开发

2024-07-13 10:11:07
字体:
来源:转载
供稿:网友

MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。

maven

    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-integration</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.integration</groupId>      <artifactId>spring-integration-stream</artifactId>    </dependency>    <dependency>      <groupId>org.springframework.integration</groupId>      <artifactId>spring-integration-mqtt</artifactId>    </dependency>

配置client factory

  @Bean  public MqttPahoClientFactory mqttClientFactory() {    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();    factory.setServerURIs("tcp://demo:1883");//    factory.setUserName("guest");//    factory.setPassword("guest");    return factory;  }

配置consumer

  @Bean  public IntegrationFlow mqttInFlow() {    return IntegrationFlows.from(mqttInbound())        .transform(p -> p + ", received from MQTT")        .handle(logger())        .get();  }  private LoggingHandler logger() {    LoggingHandler loggingHandler = new LoggingHandler("INFO");    loggingHandler.setLoggerName("siSample");    return loggingHandler;  }  @Bean  public MessageProducerSupport mqttInbound() {    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",        mqttClientFactory(), "siSampleTopic");    adapter.setCompletionTimeout(5000);    adapter.setConverter(new DefaultPahoMessageConverter());    adapter.setQos(1);    return adapter;  }

配置producer

@Bean  public IntegrationFlow mqttOutFlow() {    //console input//    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),//        e -> e.poller(Pollers.fixedDelay(1000)))//        .transform(p -> p + " sent to MQTT")//        .handle(mqttOutbound())//        .get();    return IntegrationFlows.from(outChannel())        .handle(mqttOutbound())        .get();  }    @Bean  public MessageChannel outChannel() {    return new DirectChannel();  }  @Bean  public MessageHandler mqttOutbound() {    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());    messageHandler.setAsync(true);    messageHandler.setDefaultTopic("siSampleTopic");    return messageHandler;  }

配置MessagingGateway

@MessagingGateway(defaultRequestChannel = "outChannel")public interface MsgWriter {  void write(String note);}

这样就大功告成了

doc

spring-integration-mqtt 

spring-integration-samples-mqtt

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表