首页 > 学院 > 开发设计 > 正文

【spring-jms】DefaultMessageListenerContainer

2019-11-11 02:51:00
字体:
来源:转载
供稿:网友

DefaultMessageListenerContainer 分析

org.sPRingframework.jms.listener.DefaultMessageListenerContainer

通过分析该源码,了解下 Spring 下消费者如何进行管理。。。

maven

<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.4.RELEASE</version></dependency>

重要步骤

初始化 initialize()默认的线程执行器 SimpleAsyncTaskExecutor, createDefaultTaskExecutor()恢复策略 recoverAfterListenerSetupFailure()

调用栈

先看 Refer 1 中的一个配置

<!-- 定义消息队列(Queue),我们监听一个新的队列,queue2 --><bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>queue2</value> </constructor-arg></bean><!-- 配置消息队列监听者(Queue),代码下面给出,只有一个onMessage方法 --><bean id="queueMessageListener" class="guo.examples.mq02.queue.QueueMessageListener" /><!-- 消息监听容器(Queue),配置连接工厂,监听的队列是queue2,监听器是上面定义的监听器 --><bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination2" /> <property name="messageListener" ref="queueMessageListener" /></bean>

下面是 messageListener 的调用栈

AsyncMessageListenerInvoker.run()↓AsyncMessageListenerInvoker.invokeListener()↓AbstractPollingMessageListenerContainer.receiveAndExecute(Object invoker, session session, MessageConsumer consumer)↓AbstractPollingMessageListenerContainer.doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)↓AbstractMessageListenerContainer.doExecuteListener(Session session, Message message)# 判断 isAcceptMessagesWhileStopping & isRunning↓AbstractMessageListenerContainer.invokeListener(Session session, Message message)↓# @param listener 即上面用户定义的 MessageListener# 由于 SimpleAsyncTaskExecutor 是内部类,所以可以访问 DefaultMessageListenerContainer.messageListenerAbstractMessageListenerContainer.doInvokeListener(MessageListener listener, Message message)↓MessageListener.onMessage(Message message)

SimpleAsyncTaskExecutor 线程执行过程

# 创建新线程DefaultMessageListenerContainer.scheduleNewInvoker()↓# 判断 DefaultMessageListenerContainer 处于 active, running 哪一状态,进行后面的操作AbstractJmsListeningContainer.rescheduleTaskIfNecessary(Object task)# 如果 running↓DefaultMessageListenerContainer.doRescheduleTask(Object task)# 注意,不是 AbstractJmsListeningContainer.doRescheduleTask(Object task)↓# 然后执行上面的 `调用栈` 过程

Refer

ActiveMQ学习笔记(5)——使用Spring JMS收发消息[1]

SimpleAsyncTaskExecutor 执行器


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表