实践ActiveMQ之消费者

TaskRunner首先会循环一轮调用消费者(有配置MessageListener且从自己的MessageDispatchChannel读取消息)进行消费,然后从ActiveMQSessionExecutorMessageDispatchChannel中读取消息调用消费者进行消费。

消费者消费:如果消费者有配置MessageListener则调用MessageListener的onMessage,否则存放到消费者自己的MessageDispatchChannel中(此类消息可以通过receive消费)。

ActiveMQSession实现了接口ActiveMQDispatcher,一个ActiveMQSession持有一个ActiveMQSessionExecutor,一个Session下创建多个Consumer则会串行处理

消费者监听器以线程名ActiveMQ Session Task-N执行

预读取(jms.prefetchPolicy.queuePrefetch,默认值1000,必须大于等于0),被预读取过的消息再次消费也会被标记为重发()

同步(receive)和异步(MessageListener)二选一

原生同步(receive)

receive栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = null;
Session session = null;
try {
    conn = cf.createConnection();
	// 作为消费者,需要start
    conn.start();
	session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = new ActiveMQQueue("queue001");
	MessageConsumer consumer = session.createConsumer(destination);
	Message message = consumer.receive();
	TextMessage textMessage = (TextMessage) message;
	logger.info("GOT A MESSAGE: " + textMessage.getText());
} catch (JMSException e) {
    logger.error(e.getMessage(), e);
} finally {
	try {
		if (session != null) {
			session.close();
		}
		if (conn != null) {
			conn.close();
		}
	} catch (JMSException ex) {
		logger.error(ex.getMessage(), ex);
	}
}

原生异步(MessageListener)

MessageListener栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = null;
Session session = null;
try {
    conn = cf.createConnection();
	// 作为消费者,需要start
    conn.start();
	session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = new ActiveMQQueue("queue001");
	MessageConsumer consumer = session.createConsumer(destination);
	SimpleMessageListener simpleMessageListener= new SimpleMessageListener();
	consumer.setMessageListener(simpleMessageListener);
} catch (JMSException e) {
    logger.error(e.getMessage(), e);
} finally {
	try {
		if (session != null) {
			session.close();
		}
		if (conn != null) {
			conn.close();
		}
	} catch (JMSException ex) {
		logger.error(ex.getMessage(), ex);
	}
}

实现MessageListener接口

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SimpleMessageListener implements MessageListener {
	private static final Logger logger = LoggerFactory.getLogger(SimpleMessageListener.class);
	public void onMessage(Message message) {
		TextMessage textMessage = (TextMessage) message;
		String text = null;
		try {
			text = textMessage.getText();
			logger.info("Message received: " + text);
		} catch (Exception ex) {
			logger.error("JMS error", ex);
		}
	}
}

Spring JMS(JmsTemplate)同步消费

1
2
3
// 推荐使用CachingConnectionFactory
private JmsTemplate jmsTemplate;
TextMessage message = (TextMessage) jmsTemplate.receive();

##异步监听消费

基于Spring JMS之DefaultMessageListenerContainer

1
2
3
4
5
6
7
8
<bean id="queueListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="jmsConnectionFactory" />
	<property name="destinationName" value="queue123" />
	<property name="messageListener" ref="simpleMessageListener" />
	<property name="concurrentConsumers" value="5" />
	<property name="maxConcurrentConsumers" value="20" />
</bean>

消费者数量动态调整(concurrentConsumers和maxConcurrentConsumers之间)

消费者监听器以线程名queueListenerContainer-N执行,setReceiveTimeout(可以理解为keepalived,默认值为-1)和setMaxMessagesPerTask(最多处理消息数,默认值为java.lang.Integer#MIN_VALUE)决定释放与否,默认情况下不释放。

其实是对原生同步(receive)的包装。

DefaultMessageListenerContainer栈

自定义监听容器

1
2
3
4
5
6
7
while (i < maxConcurrentConsumers) {
	Session session = connection.createSession(transacted, acknowledgeMode);
	sessions.add(session);
	MessageConsumer messageConsumer = session.createConsumer(destination);
	messageConsumers.add(messageConsumer);
	messageConsumer.setMessageListener(messageListener);
}

重试

onMessage抛出异常

默认重发6次,可以通过maximumRedeliveries设定

关闭应用

不限重发次数,但是会记录重试次数