最近一個項目要用到ActiveMq,并且需要最大程度的保證消息不丟失。以前對activeMq不是很熟悉,完全是摸著石頭過河,目前基本配置都搞定了。只是對于它的自動重連一直找不到好的解決辦法,我希望的效果是當一個broker(假設只有這一個,沒有備用的)如果異常down掉的話,那么監(jiān)聽程序能夠等待broker重啟后再自動重新連接??戳怂奈臋n似乎 設置一下failover:(tcp://localhost:61616) 就可以了
Failover Transport
Failover Transport是一種重新連接的機制,它工作于其它transport的上層,用于建立可靠的傳輸。它的配置語法允許制定任意多個復合的URI。 Failover transport會自動選擇其中的一個URI來嘗試建立連接。如果沒有成功,那么會選擇一個其它的URI來建立一個新的連接。以下是配置語法:
failover:(uri1,...,uriN)?transportOptions
failover:uri1,...,uriN
Transport Options的可選值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
backup false initialize and hold a second transport connection - to enable fast failover
例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
2.2.4 Discovery transport
Discovery transport是可靠的tranport。它使用Discovery transport來定位用來連接的URI列表。以下是配置語法:
discovery:(discoveryAgentURI)?transportOptions
discovery:discoveryAgentURI
Transport Options的可選值如下:
Option Name Default Value Description
initialReconnectDelay 10 How long to wait before the first reconnect attempt
maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
backOffMultiplier 2 The exponent used in the exponential backoff attempts
maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
例如:discovery:(multicast://default)?initialReconnectDelay=100
為了使用Discovery來發(fā)現(xiàn)broker,需要為broker啟用discovery agent。 以下是XML配置文件中的一個例子:
Xml代碼
1. <broker name="foo">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
4. </transportConnectors>
5. ...
6. </broker>
在使用Failover Transport或Discovery transport等能夠自動重連的transport的時候,需要注意的是:設想有兩個broker,它們都啟用AMQ Message Store作為持久化存儲,有一個producer和一個consumer連接到某個queue。當因其中一個broker失效時而切換到另一個 broker的時候,如果失效的broker的queue中還有未被consumer消費的消息,那么這個queue里的消息仍然滯留在失效broker 的中,直到失效的broker被修復并重新切換回這個被修復的broker后,之前被保留的消息才會被consumer消費掉。如果被處理的消息有時序限 制,那么應用程序就需要處理這個問題。另外也可以通過ActiveMQ集群來解決這個問題。
在transport重連的時候,可以在connection上注冊TransportListener來獲得回調(diào),例如:
Java代碼
1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {
2. public void onCommand(Object cmd) {
3. }
4.
5. public void onException(IOException exp) {
6. }
7.
8. public void transportInterupted() {
9. // The transport has suffered an interruption from which it hopes to recover.
10. }
11.
12. public void transportResumed() {
13. // The transport has resumed after an interruption.
14. }
15. });
---------------------
我的做法是 不要在服務器端設置,而在本地設置:failover:(tcp://192.168.0.245:61616?wireFormat.maxInactivityDuration=0)