TaskRunner
首先会循环一轮调用消费者(有配置MessageListener且从自己的MessageDispatchChannel
读取消息)进行消费,然后从ActiveMQSessionExecutor
的MessageDispatchChannel
中读取消息调用消费者进行消费。
消费者消费:如果消费者有配置MessageListener则调用MessageListener的onMessage,否则存放到消费者自己的MessageDispatchChannel
中(此类消息可以通过receive消费)。
ActiveMQSession
实现了接口ActiveMQDispatcher
,一个ActiveMQSession
持有一个ActiveMQSessionExecutor
,一个Session下创建多个Consumer则会串行处理
消费者监听器以线程名ActiveMQ Session Task-N
执行
预读取(jms.prefetchPolicy.queuePrefetch,默认值1000,必须大于等于0),被预读取过的消息再次消费也会被标记为重发()
同步(receive)和异步(MessageListener)二选一
原生同步(receive)
1 |
|
原生异步(MessageListener)
1 |
|
实现MessageListener
接口
1 |
|
Spring JMS(JmsTemplate)同步消费
1 |
|
##异步监听消费
基于Spring JMS之DefaultMessageListenerContainer
1 |
|
消费者数量动态调整(concurrentConsumers和maxConcurrentConsumers之间)
消费者监听器以线程名queueListenerContainer-N
执行,setReceiveTimeout(可以理解为keepalived,默认值为-1)和setMaxMessagesPerTask(最多处理消息数,默认值为java.lang.Integer#MIN_VALUE)决定释放与否,默认情况下不释放。
其实是对原生同步(receive)的包装。
自定义监听容器
1 |
|
重试
onMessage抛出异常
默认重发6次,可以通过maximumRedeliveries
设定
关闭应用
不限重发次数,但是会记录重试次数