TaskRunner首先会循环一轮调用消费者(有配置MessageListener且从自己的MessageDispatchChannel读取消息)进行消费,然后从ActiveMQSessionExecutor的MessageDispatchChannel中读取消息调用消费者进行消费。
消费者消费:如果消费者有配置MessageListener则调用MessageListener的onMessage,否则存放到消费者自己的MessageDispatchChannel中(此类消息可以通过receive消费)。
ActiveMQSession实现了接口ActiveMQDispatcher,一个ActiveMQSession持有一个ActiveMQSessionExecutor,一个Session下创建多个Consumer则会串行处理
消费者监听器以线程名ActiveMQ Session Task-N执行
预读取(jms.prefetchPolicy.queuePrefetch,默认值1000,必须大于等于0),被预读取过的消息再次消费也会被标记为重发()
同步(receive)和异步(MessageListener)二选一
原生同步(receive)

1 | |
原生异步(MessageListener)

1 | |
实现MessageListener接口
1 | |
Spring JMS(JmsTemplate)同步消费
1 | |
##异步监听消费
基于Spring JMS之DefaultMessageListenerContainer
1 | |
消费者数量动态调整(concurrentConsumers和maxConcurrentConsumers之间)
消费者监听器以线程名queueListenerContainer-N执行,setReceiveTimeout(可以理解为keepalived,默认值为-1)和setMaxMessagesPerTask(最多处理消息数,默认值为java.lang.Integer#MIN_VALUE)决定释放与否,默认情况下不释放。
其实是对原生同步(receive)的包装。

自定义监听容器
1 | |
重试
onMessage抛出异常
默认重发6次,可以通过maximumRedeliveries设定
关闭应用
不限重发次数,但是会记录重试次数