(〇)QuickFix/J簡介 FIX是Financial Information eXchange的簡稱。FIX是一種專門為實時電子證券交易設(shè)計的標(biāo)準(zhǔn)消息協(xié)議。FIX協(xié)議由FIX protocol, Ltd(FPL)所有并維護。FIX協(xié)議的網(wǎng)址為http://www. QuickFix/J是實現(xiàn)了FIX協(xié)議所有版本及其功能的開源軟件,100%使用JAVA實現(xiàn)。 QuickFix/J的網(wǎng)址為http://www. QuickFix/J的源代碼可以從http:///projects/quickfixj/files/QuickFIX_J 下載,也可以去QuickFix/J的官方網(wǎng)站,進入下載頁面下載源代碼。 那么能用QuickFix/J做什么事情呢?關(guān)注股票的兄弟們一定留意過LevelII這個名詞,他是中國股票交易新行情的簡稱。簡單說,可以將QuickFix/J的代碼改造一下,就用來接受深圳證券交易的LevelII行情數(shù)據(jù)。當(dāng)然接受行情不是免費的,需要諸多的商務(wù)手續(xù),但是本文僅僅討論QuickFix/J的開源代碼的設(shè)計和實現(xiàn),并且側(cè)重于QuickFix/J的客戶端實現(xiàn)。服務(wù)器端留在以后的文章介紹。關(guān)于上海證券交易所的LevelII數(shù)據(jù)的格式和接受,跟深圳的有諸多的不同,也留在以后討論。 首先QuickFixJ代碼功能主要有兩大部分,一部分是Fix協(xié)議數(shù)據(jù)的解析,另外一部分是客戶端跟服務(wù)器端建立連接并維持回話,傳輸數(shù)據(jù)。第一部分將主要介紹QuickFix/J的傳輸部分的實現(xiàn)。 (一) QuickFix/J傳輸功能部分 QuickFix/J的連接管理和傳輸功能是基于MINA框架實現(xiàn)的。MINA是什么?MINA是Apache旗下的一個網(wǎng)絡(luò)應(yīng)用框架,能夠幫助大家輕松的開發(fā)高性能、高擴展性的網(wǎng)絡(luò)程序。它使用NIO在傳輸協(xié)議(比如TCP/IP,UDP/IP)之上提供了抽象的、事件驅(qū)動的、異步處理的API。MINA的網(wǎng)址為http://mina.。 A). QuickFix/J客戶端用到的主要類的功能說明(V1.5.0) 1. quickfix.Initiator:定義了一些從配置中獲取通信協(xié)議、主機、端口及重連接的時間間隔的KEY,僅僅是key而已,沒有其他的。 2. quickfix.mina.SessionConnector:定義了一些helper方法,為initiator和acceptor提供公用的功能,比如獲取Session,創(chuàng)建Session,動態(tài)添加/刪除Session,判斷是否已經(jīng)登陸。最重要的是他定義了SessionTimerTask這個內(nèi)部類。SessionTimerTask的功能有自動重連登陸、檢查和更新Session時間戳、發(fā)送心跳消息。 3. quickfix.mina.initiator.AbstractSocketInitiator:是SocketInitiator的基礎(chǔ)抽象基類,繼承了SessionConnector和Initiator。在QuickFix/J中提供了兩種默認(rèn)的具體實現(xiàn),分別是SocketInitiator和ThreadedSocketInitiator。這兩種具體實現(xiàn)的功能都一樣,兩者的區(qū)別僅僅是處理消息時使用線程的策略不同,具體請參考7和8。抽象類AbstractSocketInitiator提供的功能有: a) 遍歷配置文件取得所有[session]節(jié)的配置并創(chuàng)建相應(yīng)的FixSession (如果[session]中沒有指定ConnectionType或者明確指定了ConnectionType為initiator,則建立FixSession(quickfix.Session),其他類型的ConnectionType無效,如acceptor)。配置文件中可以指定多個[session]。 b) 通過已經(jīng)生成的FixSession和傳入的eventHandlingStrategy創(chuàng)建IoSessionInitiator,并保存入initiators(Set類型的緩存)中。 那么FixSession和IoSessionInitiator有什么區(qū)別呢?請參考5、6。 c) 啟動、關(guān)閉客戶端(initiator)。啟動initiator時首先啟動應(yīng)用層的SessionTimer(請參考2),然后啟動連接層的initiator(IoSessionInitiator)。關(guān)閉initiator時,先關(guān)閉連接層的initiator(IoSessionInitiator),再關(guān)閉應(yīng)用層的SessionTimer。 4. quickfix.SessionID:是Session的唯一標(biāo)識。SessionID中包含beginString(必須),senderCompID(必須),senderSubID (可選),senderLocationID(可選),targetCompID(必須),targetSubID(可選),targetSubID(可選),targetLocationID(可選),sessionQualifier(可選)。sessionQualifer用于區(qū)分具有相同的targetCompID不同的session,只能用在initiator角色中。SessionID.toString生成的可讀的Session ID字符串組成為:beginString:senderCompID/senderSubID/senderLocationID->targetCompID/targetSubID/targetLocationID/sessionQualifier。如果可選值未設(shè)置則在Session ID字符串中默認(rèn)空字符串。 5. quickfix.Session:Session是FIX消息通訊中最基本的抽象。 a) fixSession維護Session內(nèi)部消息的自增序列號、自動錯誤恢復(fù)、與通信對方(counterpart)建立通信信道(communication channel)。 b) Session是獨立于特定的傳輸層協(xié)議的。Session被新建時,消息序列號置為1,每次通信序列號自增,直到Session被重置(reset)。每個Session能夠跨越多個傳輸連接(并非同時跨越,而是說第一次網(wǎng)絡(luò)連接斷開后,隨后重連,雖然底層的網(wǎng)絡(luò)連接已經(jīng)是新建的了,但是Session還能保持跟斷網(wǎng)之前是同一個Session)。 c) fixSession中核心邏輯在next方法中。next(message)首先檢查SessionTime,如果超過1秒未刷新則刷新時間戳;如果發(fā)現(xiàn)Session不存在,則執(zhí)行reset,重置Session。然后取到消息的header,msgType進行檢查,首先beginString不正確,拋異常并退出。然后從dataDictionaryProvider取得數(shù)據(jù)字典,驗證數(shù)據(jù)字典。然后根據(jù)消息類型,分別回調(diào)用戶接口。回調(diào)用戶函數(shù)的入口在驗證完數(shù)據(jù)字典之后,請注意verify(message)函數(shù),所有的普通消息通過這個函數(shù)去回調(diào)application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。關(guān)于消息的解析,其中普通Message是通過quickfix.MessageUtils.parse將String類型的消息解析成Message。 6. quickfix.mina.initiator.IoSessionInitiator:使用MINA提供的傳輸層的API,建立、維護同服務(wù)器之間的傳輸層的網(wǎng)絡(luò)連接,而不是應(yīng)用層的網(wǎng)絡(luò)連接。這些網(wǎng)絡(luò)功能都在一個叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的一個私有的TimerTask中實現(xiàn)。具體實現(xiàn)功能有連接(包括普通連接和加密SSL連接)、重連、判斷是否應(yīng)該重連、處理連接異常、啟動和停止ConnectTask。 7. quickfix.SocketInitiator:使用單獨的線程去為所有的Session處理消息。SocketInitiator提供的功能有: a) 初始化,即用eventHandlingStrategy創(chuàng)建Initiator,然后注冊此SocketInitiator所管理的全部Session,然后啟動Initiator,最后調(diào)用eventHandlingStrategy.blockInThread()在另外的后臺線程中去處理SessionTimer收到的插入隊列的消息。啟動Initiator做的事情依次是:先啟動SessionTimer去監(jiān)聽從傳輸層過來的消息,如果沒有Logon則先Logon,然后在收到消息后回調(diào)用戶代碼處理消息;啟動reconnectTask去建立和維護傳輸層的網(wǎng)絡(luò)連接。 b) 啟動Initiator,在另外的線程中后臺(Daemon)處理消息。 c) 阻塞Initiator,在同一線程中處理消息。 d) 停止Initiator。分為強制停止和非強制停止。強制或者非強制Logout所有FixSession,停止連接層的Initiator,取消注冊所有此SocketInitiator所管理的全部Session。 e) 關(guān)于a) b)如何處理來自底層的消息的邏輯,請參考11和12。因為這里所謂的處理消息實際上是直接或者間接調(diào)用了SingleThreadedEventHandlingStrategy的block處理消息。 8. quickfix.ThreadedSocketInitiator:為每一個Session使用一個單獨的線程去處理消息。功能參考7。除了線程工作模式不一樣,功能和7完全一樣。 9. quickfix.SessionState: Session和對方通信過程中使用的helper類。主要功能就是存儲了Session的所有狀態(tài),并且提供了響應(yīng)API訪問這些狀態(tài)。狀態(tài)包括heartBeatInterval,heartBeatMillis,是否需要heartBeat,判斷Session所在應(yīng)用程序的角色(客戶端Initiator or 服務(wù)器端Acceptor),lastReceivedTime,lastSentTime,獲取logger,判斷作為客戶端的Initiator登陸消息是否已經(jīng)發(fā)出,判斷作為服務(wù)器端的Acceptor登陸消息是否收到,判斷是否需要登陸,判斷登陸是否TimeOut,messageStore,testRequestCounter,判斷是否需要TestRequest,判斷是否處于TimeOut狀態(tài),將收到的Message入隊(enqueue),出隊(dequeue),鎖定和解鎖發(fā)送/接受的Sequence Number,獲取、設(shè)置自增的下一個Sequence Number,重置(即將Sequence清空,重新從1開始計數(shù)),設(shè)置、獲取Logout的原因。 10. quickfix.mina.EventHandlingStrategy:用于不同版本FIX協(xié)議處理事件的策略的接口,是應(yīng)用級處理消息回調(diào)接口的根源。當(dāng)傳輸層消息到達時調(diào)用此接口onMessage,可以這么理解,onMessage是EventHandlingStrategy的輸入,這個輸入來自底層。getSessionConnector獲取和這個策略相關(guān)的SessionConnector,即獲取和這個Session相關(guān)的Initiator/Acceptor去處理響應(yīng)的輸入消息,一般情況下是逐層將消息向上層傳出,回調(diào)用戶的函數(shù)處理該消息。getQueueSize獲取當(dāng)前被處理消息隊列的長度。EventHandlingStrategy一般作為AbstractSocketInitiator的成員,建立IoSessionInitiator時傳給IoSessionInitiator,請參考3 b)。目前在QuickFix/J中有兩個具體實現(xiàn),分別是quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。這兩個具體實現(xiàn)類的說明請參考11,12。 11. quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J處理消息的核心類。處理消息時即便有多個Session也使用單線程模式。 a) 為了不阻塞輸入,那么就需要一個eventQueue來臨時快速的存儲收到的所有消息。 b) onMessage接到底層傳入的消息包裝成SessionMessageEvent,首先將其存入eventQueue。 c) 那么SessionMessageEvent里面有什么?SessionEvent僅僅是把fixSession和Message包裝到一起,并且提供了處理Message的方法processMessage??梢赃@樣理解,。 d) getSessionConnector獲取需要處理應(yīng)用級的connector以便處理eventQueue中的消息。 e) getMessage從eventQueue中取出SessionMessageEvent待處理。 f) block就是應(yīng)用程序級別處理消息的入口。block判斷HandlingMessage是否應(yīng)該繼續(xù)運新,如果是則從消息隊列中取出SessionMessageEvent,調(diào)用其中的processMessage去處理該Message。 g) processMessage如何處理了收到的消息呢?它會調(diào)用fixSession的next方法,將消息傳給Session,由fixSession再接力將消息回調(diào)到用戶手中。請參考5。 h) 也許你會注意到處理消息的block在run中始終被調(diào)用,而且沒有任何sleep時間,難道它在沒有消息的時候始終不停的死循環(huán)運行且絲毫不休息?CPU會保持100%?實際上效果不是這樣的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在沒有消息的時候,這個循環(huán)會每休息一秒再執(zhí)行下一次循環(huán)。如何做到這樣的效果呢?原因是如果eventQueue中如果沒有消息,而該eventQueue設(shè)置了阻塞超時1000毫秒,則取消息的操作會等待最多1000毫秒,如果沒有等到消息則超時退出不再等待,執(zhí)行完畢本次循環(huán),如果等到了則按照正常流程處理消息。這樣做最大的好處就是,如果eventQueue中有事件,那么就會連續(xù)不斷的處理,如果沒有消息,就會休息timeout毫秒再查看。 i) blockInThread,在新啟動的后臺線程中處理SessionMessageEvent 12. quickfix.mina.ThreadPerSessionEventHandlingStrategy:同樣是QuickFix/J處理消息的核心類。和單線程模式不同的是該策略會為每個session啟動一個新的線程去處理消息。 a) 由于是每個Session對應(yīng)一個線程,因此該策略內(nèi)部需要一個稱之為dispatchersMap作為緩存為每個Session保存響應(yīng)的處理線程(MessageDispatchingThread)引用。 b) 當(dāng)onMessage收到來自底層的輸入消息時,根據(jù)輸入的fixSession從dispatchers中取到相應(yīng)的處理線程,并將該消息加入(enqueue)到該線程內(nèi)部的消息隊列中待處理。 c) 每個dispatcher(MessageDispatchingThread類型)內(nèi)部均維護了自己的消息隊列,和單線程模式不同在于,消息隊列中的消息僅僅是Message,不是SessionMessageEvent。處理Message的邏輯從單線程中的SessionMessageEvent中移出到dispatcher中。 13. quickfix.DataDictionaryProvider :是一個接口,為指定的session protocol或者application version提供數(shù)據(jù)字典。getSessionDataDictionary根據(jù)提供的beginString 即協(xié)議版本獲取相應(yīng)的數(shù)據(jù)字典。getApplicationDataDictionary根據(jù)提供的application version ID和custom application ID獲取數(shù)據(jù)字典。application version ID在FIXT.1.1之前由BeginString字段確定。custom application ID是可選值,不是必須的。 14. quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默認(rèn)實現(xiàn)。在DefaultDataDictionaryProvider中,有兩種數(shù)據(jù)字典,一種是傳輸用的數(shù)據(jù)字典,一種是應(yīng)用程序用的數(shù)據(jù)字典,分別緩存在兩個Map中。這個DefaultDataDictionaryProvider是在創(chuàng)建Session時由默認(rèn)的DefaultSessionFactory根據(jù)beginString創(chuàng)建的。addApplicationDictionary和addTransportDictionary分別用于向DefaultDataDictionaryProvider添加新的數(shù)據(jù)字典。目前QuickFix/J的實現(xiàn)中,在DefaultSessionFactory中初始化Session時添加字典。對于fixt之前版本的數(shù)據(jù)字典,每個數(shù)據(jù)字典會被同時添加進入到傳輸數(shù)據(jù)字典和應(yīng)用程序數(shù)據(jù)字典中。 B). 網(wǎng)絡(luò)數(shù)據(jù)在QuickFix/J中的流向 ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服務(wù)器端的通信。收到網(wǎng)絡(luò)數(shù)據(jù),ioConnector觸發(fā)相應(yīng)事件,并把事件交給ioHandler(InitiatorIoHandler)的processMessage -> processMessage中調(diào)用eventHandlingStrategy.onMessage(quickfixSession, message) ,將消息向外回調(diào) -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 將收到的消息入隊(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中啟動單獨的后臺線程,依次從eventQueue取出消息處理,向session回調(diào) -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根據(jù)msgType判斷回調(diào) ->逐層回調(diào) (verify -> veriry -> fromCallback ->
|