使用spring boot + rabbitmq的时候,在开发过程中,可能会想要临时停用/启用监听,或修改监听消费者数量。如果每次修改都重启比较浪费时间,所以研究了一下不停机就启用停用监听或修改一些配置
一. 关于rabbitmq监听的配置
@Configuration@ConditionalOnClass({ RabbitTemplate.class, Channel.class })@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)public class RabbitAutoConfiguration { ...}
RabbitAnnotationDrivenConfiguration中主要就是监听工厂的配置、监听工厂,但是这里也只是创建bean,并没有真正的初始化
通过配置里的bean类名,分析一下,rabbitmq的监听肯定是由监听工厂创建的,所以找到监听工厂SimpleRabbitListenerContainerFactory
@Bean@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory;}
既然自动配置里面没有初始化监听,那就应该是在其他地方调用的,进入监听工厂类中,发现有initializeContainer(SimpleMessageListenerContainer instance)方法,猜测初始化肯定与这个方法有关,所以查看有哪些地方调用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有创建监听容器和初始化的代码
/** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. * @param factory the {@link RabbitListenerContainerFactory} to use. * @return the {@link MessageListenerContainer}. */protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer;}
继续找调用这个方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,发现调用的地方很多了
看看afterPropertiesSet方法,是InitializingBean接口中的,猜测应该是spring容器创建bean之后都会调用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里创建的实例。原来是在RabbitListenerAnnotationBeanPostProcessor中的私有属性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration这个自动配置里面初始化的,所以这就找到rabbitmq初始化监听的源头了
二. 动态管理rabbitmq监听
回到最初的问题,想要动态的启用停用mq的监听,所以先看看初始化配置的类,既然有初始化,那可能会有相关的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有对监听容器进行操作,主要源码如下
/** * @return the managed {@link MessageListenerContainer} instance(s). */public Collection<MessageListenerContainer> getListenerContainers() { return Collections.unmodifiableCollection(this.listenerContainers.values());} @Overridepublic void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); }}/** * Start the specified {@link MessageListenerContainer} if it should be started * on startup or when start is called explicitly after startup. * @see MessageListenerContainer#isAutoStartup() */private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); }}@Overridepublic void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); }}
写个controller,注入RabbitListenerEndpointRegistry,使用start()和stop()对监听进行启用停用的操作,并且RabbitListenerEndpointRegistry实例还可以获取监听容器,对监听的一些参数也能进行修改,比如消费者数量。代码如下:
import java.util.Set;import javax.annotation.Resource;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.itopener.framework.ResultMap;/** * Created by fuwei.deng on 2017年7月24日. */@RestController@RequestMapping("rabbitmq/listener")public class RabbitMQController { @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("stop") public ResultMap stop(){ rabbitListenerEndpointRegistry.stop(); return ResultMap.buildSuccess(); } @RequestMapping("start") public ResultMap start(){ rabbitListenerEndpointRegistry.start(); return ResultMap.buildSuccess(); } @RequestMapping("setup") public ResultMap setup(int consumer, int maxConsumer){ Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds(); SimpleMessageListenerContainer container = null; for(String id : containerIds){ container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id); if(container != null){ container.setConcurrentConsumers(consumer); container.setMaxConcurrentConsumers(maxConsumer); } } return ResultMap.buildSuccess(); }}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持VeVb武林网。
新闻热点
疑难解答
图片精选