在Merlin之前,編寫Socket程序是比較繁瑣的工作.因?yàn)檩斎胼敵龆急仨毻?這樣,對(duì)于多客戶端客戶/服務(wù)器模式,不得不使用多線程.即為每個(gè)連接的客戶都分配一個(gè)線程來處理輸入輸出.由此而帶來的問題是可想而知的.程序員不得不為了避免死鎖,線程安全等問題,進(jìn)行大量的編碼和測(cè)試.很多人都在抱怨為什么不在Java中引入異步輸入輸出機(jī)制.比較官方的解釋是,任何一種應(yīng)用程序接口的引入,都必須兼容任何操作平臺(tái).因?yàn)镴ava是跨平臺(tái)的.而當(dāng)時(shí)支持異步輸入輸出機(jī)制的操作平臺(tái)顯然不可能是全部.自Java
2
Platform以后,分離出J2SE,J2ME,J2EE三種不同類型的應(yīng)用程序接口,以適應(yīng)不同的應(yīng)用開發(fā).Java標(biāo)準(zhǔn)的制訂者們意識(shí)到了這個(gè)問題,并且支持異步輸入輸出機(jī)制的操作平臺(tái)在當(dāng)今操作平臺(tái)中處于主流地位.于是,Jdk(J2SE)
的第五次發(fā)布中引入了異步輸入輸出機(jī)制.
以前的Socket進(jìn)程通信程序設(shè)計(jì)中,一般客戶端和服務(wù)器端程序設(shè)計(jì)如下: 服務(wù)器端: //服務(wù)器端監(jiān)聽線程 while (true) { ............. Socket clientSocket; clientSocket = socket.accept(); //取得客戶請(qǐng)求Socket,如果沒有//客戶請(qǐng)求連接,線程在此處阻塞 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //創(chuàng)建客戶會(huì)話線程,進(jìn)行輸入輸出控制,為同步機(jī)制 new ClientSession(); ....... } 客戶端: ............ clientSocket = new Socket(HOSTNAME, LISTENPORT);//連接服務(wù)器套接字 //用取得的Socket構(gòu)造輸入輸出流 PrintStream os = new PrintStream(new BufferedOutputStream(clientSocket.getOutputStream(), 1024), false); BufferedReader is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); //進(jìn)行輸入輸出控制 ....... 以上代碼段只是用同步機(jī)制編寫Socket進(jìn)程通信的一個(gè)框架,實(shí)際上要考慮的問題要復(fù)雜的多(有興趣的讀者可以參考我的一篇文章《Internet 實(shí)時(shí)通信系統(tǒng)設(shè)計(jì)與實(shí)現(xiàn)》)。將這樣一個(gè)框架列出來,只是為了與用異步機(jī)制實(shí)現(xiàn)的Socket進(jìn)程通信進(jìn)行比較。下面將介紹使用異步機(jī)制的程序設(shè)計(jì)。 回頁首 用異步輸入輸出流編寫Socket進(jìn)程通信程序 在Merlin中加入了用于實(shí)現(xiàn)異步輸入輸出機(jī)制的應(yīng)用程序接口包:java.nio(新的輸入輸出包,定義了很多基本類型緩沖(Buffer)), java.nio.channels(通道及選擇器等,用于異步輸入輸出),java.nio.charset(字符的編碼解碼)。通道 (Channel)首先在選擇器(Selector)中注冊(cè)自己感興趣的事件,當(dāng)相應(yīng)的事件發(fā)生時(shí),選擇器便通過選擇鍵(SelectionKey)通知已注冊(cè)的通道。然后通道將需要處理的信息,通過緩沖(Buffer)打包,編碼/解碼,完成輸入輸出控制。 通道介紹: 這里主要介紹ServerSocketChannel和 SocketChannel.它們都是可選擇的(selectable)通道,分別可以工作在同步和異步兩種方式下(注意,這里的可選擇不是指可以選擇兩種工作方式,而是指可以有選擇的注冊(cè)自己感興趣的事件)??梢杂胏hannel.configureBlocking(Boolean )來設(shè)置其工作方式。與以前版本的API相比較,ServerSocketChannel就相當(dāng)于ServerSocket (ServerSocketChannel封裝了ServerSocket),而SocketChannel就相當(dāng)于Socket (SocketChannel封裝了Socket)。當(dāng)通道工作在同步方式時(shí),編程方法與以前的基本相似,這里主要介紹異步工作方式。 所謂異步輸入輸出機(jī)制,是指在進(jìn)行輸入輸出處理時(shí),不必等到輸入輸出處理完畢才返回。所以異步的同義語是非阻塞(None Blocking)。在服務(wù)器端,ServerSocketChannel通過靜態(tài)函數(shù)open()返回一個(gè)實(shí)例serverChl。然后該通道調(diào)用 serverChl.socket().bind()綁定到服務(wù)器某端口,并調(diào)用register(Selector sel, SelectionKey.OP_ACCEPT)注冊(cè)O(shè)P_ACCEPT事件到一個(gè)選擇器中(ServerSocketChannel只可以注冊(cè) OP_ACCEPT事件)。當(dāng)有客戶請(qǐng)求連接時(shí),選擇器就會(huì)通知該通道有客戶連接請(qǐng)求,就可以進(jìn)行相應(yīng)的輸入輸出控制了;在客戶端,clientChl實(shí)例注冊(cè)自己感興趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的組合),調(diào)用clientChl.connect (InetSocketAddress )連接服務(wù)器然后進(jìn)行相應(yīng)處理。注意,這里的連接是異步的,即會(huì)立即返回而繼續(xù)執(zhí)行后面的代碼。 選擇器和選擇鍵介紹: 選擇器(Selector)的作用是:將通道感興趣的事件放入隊(duì)列中,而不是馬上提交給應(yīng)用程序,等已注冊(cè)的通道自己來請(qǐng)求處理這些事件。換句話說,就是選擇器將會(huì)隨時(shí)報(bào)告已經(jīng)準(zhǔn)備好了的通道,而且是按照先進(jìn)先出的順序。那么,選擇器是通過什么來報(bào)告的呢?選擇鍵(SelectionKey)。選擇鍵的作用就是表明哪個(gè)通道已經(jīng)做好了準(zhǔn)備,準(zhǔn)備干什么。你也許馬上會(huì)想到,那一定是已注冊(cè)的通道感興趣的事件。不錯(cuò),例如對(duì)于服務(wù)器端serverChl來說,可以調(diào)用key.isAcceptable()來通知serverChl有客戶端連接請(qǐng)求。相應(yīng)的函數(shù)還有: SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一個(gè)循環(huán)中輪詢感興趣的事件(具體可參照下面的代碼)。如果選擇器中尚無通道已注冊(cè)事件發(fā)生,調(diào)用Selector.select()將阻塞,直到有事件發(fā)生為止。另外,可以調(diào)用 selectNow()或者select(long timeout)。前者立即返回,沒有事件時(shí)返回0值;后者等待timeout時(shí)間后返回。一個(gè)選擇器最多可以同時(shí)被63個(gè)通道一起注冊(cè)使用。 應(yīng)用實(shí)例: 下面是用異步輸入輸出機(jī)制實(shí)現(xiàn)的客戶/服務(wù)器實(shí)例程序?D?D程序清單1(限于篇幅,只給出了服務(wù)器端實(shí)現(xiàn),讀者可以參照著實(shí)現(xiàn)客戶端代碼): 程序類圖 程序清單1 public class NBlockingServer { int port = 8000; int BUFFERSIZE = 1024; Selector selector = null; ServerSocketChannel serverChannel = null; HashMap clientChannelMap = null;//用來存放每一個(gè)客戶連接對(duì)應(yīng)的套接字和通道 public NBlockingServer( int port ) { this.clientChannelMap = new HashMap(); this.port = port; } public void initialize() throws IOException { //初始化,分別實(shí)例化一個(gè)選擇器,一個(gè)服務(wù)器端可選擇通道 this.selector = Selector.open(); this.serverChannel = ServerSocketChannel.open(); this.serverChannel.configureBlocking(false); InetAddress localhost = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(localhost, this.port ); this.serverChannel.socket().bind(isa);//將該套接字綁定到服務(wù)器某一可用端口 } //結(jié)束時(shí)釋放資源 public void finalize() throws IOException { this.serverChannel.close(); this.selector.close(); } //將讀入字節(jié)緩沖的信息解碼 public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException { Charset charset = Charset.forName( "ISO-8859-1" ); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode( byteBuffer ); String result = charBuffer.toString(); return result; } //監(jiān)聽端口,當(dāng)通道準(zhǔn)備好時(shí)進(jìn)行相應(yīng)操作 public void portListening() throws IOException, InterruptedException { //服務(wù)器端通道注冊(cè)O(shè)P_ACCEPT事件 SelectionKey acceptKey =this.serverChannel.register( this.selector, SelectionKey.OP_ACCEPT ); //當(dāng)有已注冊(cè)的事件發(fā)生時(shí),select()返回值將大于0 while (acceptKey.selector().select() > 0 ) { System.out.println("event happened"); //取得所有已經(jīng)準(zhǔn)備好的所有選擇鍵 Set readyKeys = this.selector.selectedKeys(); //使用迭代器對(duì)選擇鍵進(jìn)行輪詢 Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey key = (SelectionKey)i.next(); i.remove();//刪除當(dāng)前將要處理的選擇鍵 if ( key.isAcceptable() ) {//如果是有客戶端連接請(qǐng)求 System.out.println("more client connect in!"); ServerSocketChannel nextReady = (ServerSocketChannel)key.channel(); //獲取客戶端套接字 Socket s = nextReady.accept(); //設(shè)置對(duì)應(yīng)的通道為異步方式并注冊(cè)感興趣事件 s.getChannel().configureBlocking( false ); SelectionKey readWriteKey = s.getChannel().register( this.selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE ); //將注冊(cè)的事件與該套接字聯(lián)系起來 readWriteKey.attach( s ); //將當(dāng)前建立連接的客戶端套接字及對(duì)應(yīng)的通道存放在哈希表//clientChannelMap中 this.clientChannelMap.put( s, new ClientChInstance( s.getChannel() ) ); } else if ( key.isReadable() ) {//如果是通道讀準(zhǔn)備好事件 System.out.println("Readable"); //取得選擇鍵對(duì)應(yīng)的通道和套接字 SelectableChannel nextReady = (SelectableChannel) key.channel(); Socket socket = (Socket) key.attachment(); //處理該事件,處理方法已封裝在類ClientChInstance中 this.readfromChannel( socket.getChannel(), (ClientChInstance) this.clientChannelMap.get( socket ) ); } else if ( key.isWritable() ) {//如果是通道寫準(zhǔn)備好事件 System.out.println("writeable"); //取得套接字后處理,方法同上 Socket socket = (Socket) key.attachment(); SocketChannel channel = (SocketChannel) socket.getChannel(); this.writeToChannel( channel,"This is from server!"); } } } } //對(duì)通道的寫操作 public void writeToChannel( SocketChannel channel, String message ) throws IOException { ByteBuffer buf = ByteBuffer.wrap( message.getBytes() ); int nbytes = channel.write( buf ); } //對(duì)通道的讀操作 public void readfromChannel( SocketChannel channel, ClientChInstance clientInstance ) throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE ); int nbytes = channel.read( byteBuf |
|