spring提供了spring-integration-mqtt.jar方便我们进行mqtt的快速整合,本文以springboot2.2.8为例使用spring-integration-mqtt进行mqtt的整合实现收发消息.

一、添加pom依赖

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
</dependency>

二、自定义@TopicListener注解来接收消息

一般我们是通过@ServiceActivator注解及MqttPahoMessageDrivenChannelAdapter来实现接收消息的

1. 首先创建MqttPahoMessageDrivenChannelAdapter,通过MqttPahoMessageDrivenChannelAdapter订阅一个或多个主题,并设置MqttPahoMessageDrivenChannelAdapter的outputChannelName,
2. 再使用@ServiceActivator(inputChannel = `刚刚指定的outputChannelName`)作用在方法上来接收已订阅的主题消息

这样处理起来有些麻烦,下面准备定义@TopicListener来简化接收消息

package com.phy.springframework.boot.autoconfigure.mqtt.annotations;

import org.springframework.integration.annotation.Poller;

import java.lang.annotation.*;

/**
 * TopicListener
 * @desc: Mqtt主题监听器注解
 * 需要发布mqtt监听器需要在类上添加该注解,并提供监听的主题,数据获取方式
 * 类必须实现MqttListenerCallback接口,实现callback进行数据回调
 * 主题格式为a.b.c,其中可以包括*、#,*代表一个标识符,#代表多个标识符
 *
 * @version:
 * @createTime: 2021/2/7 16:14
 * @author: liuhr
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TopicListener {

    /**
     * 监听的主题
     * @return
     */
    String[] topic();

    /**
     * qos
     * @return
     */
    int[] qos() default {};

    String inputChannel() default "";

    String outputChannel() default "";

    String requiresReply() default "";

    String[] adviceChain() default {};

    String sendTimeout() default "";

    String autoStartup() default "";

    String phase() default "";

    String async() default "";

    Poller[] poller() default {};

}

看过@ServiceActivator注解源码的很容易就能发现@TopicListener只是在@ServiceActivator基础上加了参数topic、qos;同样的我们基于ServiceActivatorAnnotationPostProcessor.java源码稍加修改得到下面的TopicListenerAnnotationPostProcessor.java

package com.phy.springframework.boot.autoconfigure.mqtt.annotations;

import com.phy.springframework.boot.autoconfigure.mqtt.config.MqttAutoConfiguration;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;

/**
 * TopicListenerAnnotationPostProcessor
 *
 * @desc: TODO 类的设计目的、功能及注意事项
 * @version:
 * @createTime: 2021/2/8 12:55
 * @author: liuhr
 */
public class TopicListenerAnnotationPostProcessor extends AbstractMethodAnnotationPostProcessor<TopicListener> {

    public TopicListenerAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
        super(beanFactory);
        this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply", "adviceChain"));
    }

    @Override
    protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) {
        logger.info("TopicListenerAnnotationPostProcessor createHandler");

        String inputChannelName = resolveInputChannelName(annotations);
        AbstractMqttMessageDrivenChannelAdapter drivenChannelAdapter = registerDrivenChannelAdapter(inputChannelName);
        subscribe(drivenChannelAdapter, annotations);

        AbstractReplyProducingMessageHandler topicListener;
        if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) {
            Object target = this.resolveTargetBeanFromMethodWithBeanAnnotation(method);
            topicListener = this.extractTypeIfPossible(target, AbstractReplyProducingMessageHandler.class);
            if (topicListener != null) {
                this.checkMessageHandlerAttributes(this.resolveTargetBeanName(method), annotations);
                return (MessageHandler)target;
            }

            if (target instanceof MessageHandler) {
                return new ReplyProducingMessageHandlerWrapper((MessageHandler)target);
            }

            MessageProcessor<?> messageProcessor = this.buildLambdaMessageProcessorForBeanMethod(method, target);
            if (messageProcessor != null) {
                topicListener = new ServiceActivatingHandler(messageProcessor);
            } else {
                topicListener = new ServiceActivatingHandler(target);
            }
        } else {
            topicListener = new ServiceActivatingHandler(bean, method);
        }

        String requiresReply = MessagingAnnotationUtils.resolveAttribute(annotations, "requiresReply", String.class);
        if (StringUtils.hasText(requiresReply)) {
            topicListener.setRequiresReply(this.resolveAttributeToBoolean(requiresReply));
        }

        String isAsync = MessagingAnnotationUtils.resolveAttribute(annotations, "async", String.class);
        if (StringUtils.hasText(isAsync)) {
            topicListener.setAsync(this.resolveAttributeToBoolean(isAsync));
        }

        this.setOutputChannelIfPresent(annotations, topicListener);
        return topicListener;
    }

    private void subscribe(AbstractMqttMessageDrivenChannelAdapter drivenChannelAdapter, List<Annotation> annotations) {
        String[] topic = MessagingAnnotationUtils.resolveAttribute(annotations, "topic", String[].class);
        Integer[] _qos = MessagingAnnotationUtils.resolveAttribute(annotations, "qos", Integer[].class);
        int[] qos;
        if(Objects.isNull(_qos)) {
            qos = IntStream.generate(() -> 1).limit(topic.length).toArray();
        } else {
            qos = Arrays.stream(_qos).mapToInt(Integer::valueOf).toArray();
        }
        if(logger.isInfoEnabled()) {
            logger.info("subscribe topic [" + String.join(",", topic) + "]......");
        }
        drivenChannelAdapter.addTopics(topic, qos);
    }

    private String resolveInputChannelName(List<Annotation> annotations) {
        String inputChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, "inputChannel", String.class);
        if(StringUtils.hasText(inputChannelName)) {
            return inputChannelName;
        }
        //如果没有配置,则默认采用hash生成inputChannelName
        return "INPUT_CHANNEL_" + Integer.toHexString(Arrays.hashCode(annotations.toArray())).toUpperCase();
    }

    private String resolveDrivenChannelAdapterBeanName(String inputChannelName) {
        return "DRIVEN_CHANNEL_ADAPTER_OF_" + inputChannelName;
    }


    private AbstractMqttMessageDrivenChannelAdapter registerDrivenChannelAdapter(String inputChannelName) {
        String beanName = resolveDrivenChannelAdapterBeanName(inputChannelName);
        Assert.isTrue(!this.beanFactory.containsBean(beanName), "inputChannel '" + inputChannelName + "' is already in use, please specify the inputChannel manually and make sure it is unique");
        MqttPahoClientFactory mqttClientFactory = this.beanFactory.getBean(MqttPahoClientFactory.class);
        MqttAutoConfiguration.PhyMqttConnectOptions mqttConnectOptions = (MqttAutoConfiguration.PhyMqttConnectOptions) mqttClientFactory.getConnectionOptions();
        BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(MqttPahoMessageDrivenChannelAdapter.class)
                .addConstructorArgValue(mqttConnectOptions.getClientId() + "_" + inputChannelName)
                .addConstructorArgValue(mqttClientFactory)
                .addConstructorArgValue(new String[]{})
                .addPropertyValue("recoveryInterval", mqttConnectOptions.getRecoveryInterval())
                .addPropertyValue("completionTimeout", mqttConnectOptions.getCompletionTimeout())
                .addPropertyValue("disconnectCompletionTimeout", mqttConnectOptions.getDisconnectCompletionTimeout())
                .addPropertyValue("outputChannelName", inputChannelName);
        ((DefaultListableBeanFactory)this.beanFactory).registerBeanDefinition(beanName, beanDefinitionBuilder.getRawBeanDefinition());
        return this.beanFactory.getBean(beanName, MqttPahoMessageDrivenChannelAdapter.class);
    }

    @Override
    public boolean shouldCreateEndpoint(Method method, List<Annotation> annotations) {
        return true;
    }

    @Override
    protected AbstractEndpoint createEndpoint(MessageHandler handler, Method method, List<Annotation> annotations) {
        AbstractEndpoint endpoint = null;

        String inputChannelName = resolveInputChannelName(annotations);
        if (StringUtils.hasText(inputChannelName)) {
            MessageChannel inputChannel;
            try {
                inputChannel = this.channelResolver.resolveDestination(inputChannelName);
            } catch (DestinationResolutionException var8) {
                if (!(var8.getCause() instanceof NoSuchBeanDefinitionException)) {
                    throw var8;
                }

                inputChannel = new DirectChannel();
                this.beanFactory.registerSingleton(inputChannelName, inputChannel);
                inputChannel = (MessageChannel)this.beanFactory.initializeBean(inputChannel, inputChannelName);
                if (this.disposables != null) {
                    final String DISPOSABLES_CLASS_NAME = "org.springframework.integration.config.annotation.Disposables";
                    try {
                        Class clazz = Class.forName(DISPOSABLES_CLASS_NAME);
                        Method addDisposablesMethod = clazz.getMethod("add", DisposableBean[].class);
                        addDisposablesMethod.setAccessible(true);
                        //注意数组参数需要再用Object数组包裹一下
                        addDisposablesMethod.invoke(this.disposables, new Object[]{ new DisposableBean[]{(DisposableBean)inputChannel} });
                    } catch (ReflectiveOperationException e) {
                        throw new RuntimeException("execute disposables add method error", e);
                    }
                }
            }

            Assert.notNull(inputChannel, "failed to resolve inputChannel '" + inputChannelName + "'");
            endpoint = this.doCreateEndpoint(handler, inputChannel, annotations);
        }

        return endpoint;
    }


}

要使TopicListenerAnnotationPostProcessor.java发挥作用我们还需自定义CustomMessagingAnnotationPostProcessor.java覆盖掉原始MessagingAnnotationPostProcessor.java

package com.phy.springframework.boot.autoconfigure.mqtt.annotations;

import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.config.annotation.MethodAnnotationPostProcessor;

import java.lang.annotation.Annotation;
import java.util.HashMap;
import java.util.Map;

/**
 * CustomMessagingAnnotationPostProcessor
 *
 * @desc: 加入TopicListenerAnnotationPostProcessor
 * @version:
 * @createTime: 2021/2/8 13:13
 * @author: liuhr
 */
@Slf4j
public class CustomMessagingAnnotationPostProcessor extends MessagingAnnotationPostProcessor {

    public CustomMessagingAnnotationPostProcessor() {
        log.info("CustomMessagingAnnotationPostProcessor init");
    }

    @Override
    protected Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> setupCustomPostProcessors() {
        Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> customPostProcessors = new HashMap<>();
        customPostProcessors.put(TopicListener.class, new TopicListenerAnnotationPostProcessor(getBeanFactory()));
        return customPostProcessors;
    }

}

当然IntegrationRegistrar.java里面使用的是MessagingAnnotationPostProcessor.java,因此我们还需再自定义CustomIntegrationRegistrar.java,且该类应在@EnableIntegration之前生效

package com.phy.springframework.boot.autoconfigure.mqtt.config;

import com.phy.springframework.boot.autoconfigure.mqtt.annotations.CustomMessagingAnnotationPostProcessor;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.lang.Nullable;

public class CustomIntegrationRegistrar implements ImportBeanDefinitionRegistrar {

    @Override
    public void registerBeanDefinitions(@Nullable AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (importingClassMetadata != null) {
            this.registerMessagingAnnotationPostProcessors(importingClassMetadata, registry);
        }
    }

    private void registerMessagingAnnotationPostProcessors(AnnotationMetadata meta, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition("org.springframework.integration.internalMessagingAnnotationPostProcessor")) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CustomMessagingAnnotationPostProcessor.class).setRole(2);
            registry.registerBeanDefinition("org.springframework.integration.internalMessagingAnnotationPostProcessor", builder.getBeanDefinition());
        }
    }

}

三、Mqtt配置类

  1. MqttAutoConfiguration
package com.phy.springframework.boot.autoconfigure.mqtt.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageHandler;

import java.util.UUID;

/**
 * MqttAutoConfiguration
 *
 * @desc:
 *
 * 这里使用@Import(CustomIntegrationRegistrar.class)重写了MessagingAnnotationPostProcessor的注入
 * @version:
 * @createTime: 2021/2/7 9:46
 * @author: liuhr
 */
@Slf4j
@Configuration
@Import(CustomIntegrationRegistrar.class)
public class MqttAutoConfiguration {

    /**
     * 出站管道
     */
    public final static String DEFAULT_OUTPUT_CHANNEL = "output_channel";

    @Bean
    @ConfigurationProperties(prefix="phy.mqtt")
    public MqttConnectOptions mqttConnectOptions() {
        return new PhyMqttConnectOptions();
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions mqttConnectOptions) {
        DefaultMqttPahoClientFactory mqttClientFactory = new DefaultMqttPahoClientFactory();
        mqttClientFactory.setConnectionOptions(mqttConnectOptions);
        return mqttClientFactory;
    }

    /*************************************** 发送消息 ***************************************/
    @Bean
    @ServiceActivator(inputChannel = DEFAULT_OUTPUT_CHANNEL)
    public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
        String clientId = UUID.randomUUID().toString();
        return new MqttPahoMessageHandler(clientId, mqttClientFactory);
    }

    /**
     * MqttAutoConfiguration
     * @desc: 加上willDestination、willMessage的set方法
     * @version:
     * @createTime: 2021/2/7 10:48
     * @author: liuhr
     */
    public static class PhyMqttConnectOptions extends MqttConnectOptions {

        private final static int DEFAULT_RECOVERY_INTERVAL = 10000;

        private final static long DEFAULT_COMPLETION_TIMEOUT = 30000L;

        private final static long DEFAULT_DISCONNECT_COMPLETION_TIMEOUT = 5000L;

        private String clientId;

        private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

        private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

        private long disconnectCompletionTimeout = DEFAULT_DISCONNECT_COMPLETION_TIMEOUT;


        public String getClientId() {
            return clientId;
        }

        public void setClientId(String clientId) {
            this.clientId = clientId;
        }

        public int getRecoveryInterval() {
            return recoveryInterval;
        }

        public void setRecoveryInterval(int recoveryInterval) {
            this.recoveryInterval = recoveryInterval;
        }

        public long getCompletionTimeout() {
            return completionTimeout;
        }

        public void setCompletionTimeout(long completionTimeout) {
            this.completionTimeout = completionTimeout;
        }

        public long getDisconnectCompletionTimeout() {
            return disconnectCompletionTimeout;
        }

        public void setDisconnectCompletionTimeout(long disconnectCompletionTimeout) {
            this.disconnectCompletionTimeout = disconnectCompletionTimeout;
        }

        public void setWillDestination(String willDestination) {
            MqttMessage willMessage = super.getWillMessage();
            super.setWill(willDestination, willMessage, willMessage.getQos(), willMessage.isRetained());
        }

        public void setWillMessage(MqttMessage willMessage) {
            String willDestination = super.getWillDestination();
            super.setWill(willDestination, willMessage, willMessage.getQos(), willMessage.isRetained());
        }

    }

}
  1. IntegrationAutoConfiguration
package com.phy.springframework.boot.autoconfigure.mqtt.config;

import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.config.EnableIntegration;

/**
 * IntegrationAutoConfiguration
 *
 * @desc: TODO 类的设计目的、功能及注意事项
 * @version:
 * @createTime: 2021/2/18 17:47
 * @author: liuhr
 */
@AutoConfigureAfter(MqttAutoConfiguration.class)
@EnableIntegration
@IntegrationComponentScan("com.phy.springframework.boot.autoconfigure.mqtt.producer")
public class IntegrationAutoConfiguration {

}
  1. spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.phy.springframework.boot.autoconfigure.mqtt.config.MqttAutoConfiguration,\
    com.phy.springframework.boot.autoconfigure.mqtt.config.IntegrationAutoConfiguration

四、使用@MessagingGateway发送消息

package com.phy.springframework.boot.autoconfigure.mqtt.producer;

import com.phy.springframework.boot.autoconfigure.mqtt.config.MqttAutoConfiguration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;

/**
 * MqttService
 * @desc: TODO 类的设计目的、功能及注意事项
 * @version:
 * @createTime: 2021/2/7 14:41
 * @author: liuhr
 */
@MessagingGateway(defaultRequestChannel = MqttAutoConfiguration.DEFAULT_OUTPUT_CHANNEL)
public interface MqttMessageProducer  {

	/**
	 * send
	 * @desc: TODO 描述这个方法的功能、适用条件及注意事项
	 * @author: liuhr
	 * @createTime: 2021/2/7 14:40
	 * @param topic
	 * @param message
	 * @return: void
	 */
	void send(@Header(MqttHeaders.TOPIC) String topic, Message<byte[]> message);

}

五、测试

  1. 发送消息
package com.phy.springframework.boot.test.mqtt.web;

import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.phy.springframework.boot.autoconfigure.mqtt.producer.MqttMessageProducer;
import com.phy.springframework.boot.test.mqtt.vo.MqttData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * MailSenderController
 *
 * @desc: TODO 类的设计目的、功能及注意事项
 * @version:
 * @createTime: 2020/9/16 14:19
 * @author: liuhr
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {

    @Autowired
    private MqttMessageProducer mqttMessageProducer;

    @GetMapping("/test")
    public boolean index(MqttData mqttData){
        Message<byte[]> message = MessageBuilder.withPayload(JSONObject.toJSONString(mqttData.getData()).getBytes(Charsets.UTF_8)).build();
        mqttMessageProducer.send(mqttData.getTopic(), message);
	return true;
    }

}
  1. 接收消息
package com.phy.springframework.boot.test.mqtt.service;

import com.alibaba.fastjson.JSONObject;
import com.phy.springframework.boot.autoconfigure.mqtt.annotations.TopicListener;
import com.phy.springframework.boot.test.mqtt.vo.Company;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;

/**
 * TestConsumer
 *
 * @desc: TODO 类的设计目的、功能及注意事项
 * @version:
 * @createTime: 2021/2/7 16:30
 * @author: liuhr
 */
@Slf4j
@Component
public class TestConsumer {

    @TopicListener(topic = "test/a")
    public void handleMessage1(Message<?> message) throws MessagingException {
        final String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        final String payload = String.valueOf(message.getPayload());
        Company company = JSONObject.parseObject(payload, Company.class);
        log.info("test/a ---> 接收到 mqtt消息,主题:{} 消息:{}", topic, company);
    }

    @TopicListener(topic = "test/b")
    public void handleMessage2(Message<?> message) throws MessagingException {
        final String topic = String.valueOf(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
        final String payload = String.valueOf(message.getPayload());
        Company company = JSONObject.parseObject(payload, Company.class);
        log.info("test/b ---> 接收到 mqtt消息,主题:{} 消息:{}", topic, company);
    }

}
  1. application.yml配置文件
server:
  port: 8989

phy:
  mqtt:
    serverURIs: tcp://127.0.0.1:1883
    userName: admin
    password: admin
    clientId: 54321
    cleanSession: false

MqttData.java、Company.java

package com.phy.springframework.boot.test.mqtt.vo;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class MqttData {
    private String topic;
    private Company data;
}
package com.phy.springframework.boot.test.mqtt.vo;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@ToString
public class Company {
    private String code;
    private String name;
}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议