实践ActiveMQ之连接

连接工厂

ActiveMQ-ConnectionFactory

ActiveMQConnectionFactory:ActiveMQ的连接工厂实现,用于创建连接

SingleConnectionFactory:Spring JMS封装的连接工厂代理,只生成同一个连接(忽略关闭连接),通过动态代理实现。

CachingConnectionFactory:Spring JMS封装的缓存连接工厂代理,继承SingleConnectionFactory,缓存SessionProducerConsumer,通过动态代理实现。可以通过sessionCacheSize指定缓存大小。缓存的Session按照modeSESSION_TRANSACTED = 0AUTO_ACKNOWLEDGE = 1CLIENT_ACKNOWLEDGE = 2DUPS_OK_ACKNOWLEDGE = 3)分别存放在单独的List中,用则取,用完归还,建议配合JmsTemplate使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
		  p:brokerURL="failover:(tcp://localhost:61616)" p:userName="system" p:password="manager"/>

<!-- 包装为SingleConnectionFactory -->
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
	<constructor-arg ref="amqConnectionFactory" />
</bean>

<!-- 包装为CachingConnectionFactory -->
<bean id="cachedConnectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
	<constructor-arg ref="amqConnectionFactory" />
	<property name="sessionCacheSize" value="100" />
</bean>

连接

线程安全,相比较于JDBC的连接(非线程安全)

1
2
3
4
5
ActiveMQ Transport tcp://localhost/127.0.0.1:61616@58000
ActiveMQ InactivityMonitor ReadCheckTimer
ActiveMQ InactivityMonitor WriteCheckTimer

ActiveMQ InactivityMonitor Worker

每个连接(ActiveMQConnection)维持一个TaskRunnerFactoryExecutor),参数如下:

1
2
3
4
5
6
name:ActiveMQ Session Task
core:0
max:1000
keepalived:30 second
workQueue:SynchronousQueue
maxIterationsPerRun:1000

TaskRunnerPooledTaskRunner)是任务(TaskActiveMQSessionExecutor))的运行者,一个ActiveMQSessionExecutor持有一个TaskRunner

Transport读取消息后先放入ActiveMQSessionExecutorMessageDispatchChannel结束,同时唤醒TaskRunner。

Transport栈