3.5.4 DatagramChannel 最后一個(gè)socket通道是DatagramChannel 。正如SocketChannel 對(duì)應(yīng)Socket ,ServerSocketChannel 對(duì)應(yīng)ServerSocket ,每一個(gè)DatagramChannel 對(duì)象也有一個(gè)關(guān)聯(lián)的DatagramSocket 對(duì)象。不過原命名模式在此并未適用:「DatagramSocketChannel」顯得有點(diǎn)笨拙,因此采用了簡潔的「DatagramChannel」名稱。 正如SocketChannel 模擬連接導(dǎo)向的流協(xié)議(如 TCP/IP),DatagramChannel 則模擬包導(dǎo)向的無連接協(xié)議(如 UDP/IP): - public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
- // 這里僅列出部分API
- public static DatagramChannel open() throws IOException
- public abstract DatagramSocket socket();
- public abstract DatagramChannel connect(SocketAddress remote) throws IOException;
- public abstract boolean isConnected();
- public abstract DatagramChannel disconnect() throws IOException;
- public abstract SocketAddress receive(ByteBuffer dst) throws IOException;
- public abstract int send(ByteBuffer src, SocketAddress target)
- public abstract int read(ByteBuffer dst) throws IOException;
- public abstract long read(ByteBuffer[] dsts) throws IOException;
- public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
- public abstract int write(ByteBuffer src) throws IOException;
- public abstract long write(ByteBuffer[] srcs) throws IOException;
- public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
- }
創(chuàng)建DatagramChannel 的模式和創(chuàng)建其他socket通道是一樣的:調(diào)用靜態(tài)的open() 方法來創(chuàng)建一個(gè)新實(shí)例。新DatagramChannel 會(huì)有一個(gè)可以通過調(diào)用socket() 方法獲取的對(duì)等DatagramSocket 對(duì)象。DatagramChannel 對(duì)象既可以充當(dāng)服務(wù)器(監(jiān)聽者)也可以充當(dāng)客戶端(發(fā)送者)。如果您希望新創(chuàng)建的通道負(fù)責(zé)監(jiān)聽,那么通道必須首先被綁定到一個(gè)端口或地址/端口組合上。綁定DatagramChannel 同綁定一個(gè)常規(guī)的DatagramSocket 沒什么區(qū)別,都是委托對(duì)等socket對(duì)象上的API實(shí)現(xiàn)的: - DatagramChannel channel = DatagramChannel.open();
- DatagramSocket socket = channel.socket();
- socket.bind(new InetSocketAddress(portNumber));
DatagramChannel 是無連接的。每個(gè)數(shù)據(jù)報(bào)(datagram)都是一個(gè)自包含的實(shí)體,擁有它自己的目的地址及不依賴其他數(shù)據(jù)報(bào)的數(shù)據(jù)凈荷。與面向流的的socket不同,DatagramChannel 可以發(fā)送單獨(dú)的數(shù)據(jù)報(bào)給不同的目的地址。同樣,DatagramChannel 對(duì)象也可以接收來自任意地址的數(shù)據(jù)包。每個(gè)到達(dá)的數(shù)據(jù)報(bào)都含有關(guān)于它來自何處的信息(源地址)。
一個(gè)未綁定的DatagramChannel 仍能接收數(shù)據(jù)包。當(dāng)一個(gè)底層socket被創(chuàng)建時(shí),一個(gè)動(dòng)態(tài)生成的端口號(hào)就會(huì)分配給它。綁定行為要求通道關(guān)聯(lián)的端口被設(shè)置為一個(gè)特定的值(此過程可能涉及安全檢查或其他驗(yàn)證)。不論通道是否綁定,所有發(fā)送的包都含有DatagramChannel 的源地址(帶端口號(hào))。未綁定的DatagramChannel 可以接收發(fā)送給它的端口的包,通常是來回應(yīng)該通道之前發(fā)出的一個(gè)包。已綁定的通道接收發(fā)送給它們所綁定的熟知端口(wellknown port)的包。數(shù)據(jù)的實(shí)際發(fā)送或接收是通過send() 和receive() 方法來實(shí)現(xiàn)的: - public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
- // 這里僅列出部分API
- public abstract SocketAddress receive(ByteBuffer dst) throws IOException;
- public abstract int send(ByteBuffer src, SocketAddress target)
- }
receive() 方法將下次將傳入的數(shù)據(jù)報(bào)的數(shù)據(jù)凈荷復(fù)制到預(yù)備好的ByteBuffer 中并返回一個(gè)SocketAddress 對(duì)象以指出數(shù)據(jù)來源。如果通道處于阻塞模式,receive() 可能無限期地休眠直到有包到達(dá)。如果是非阻塞模式,當(dāng)沒有可接收的包時(shí)則會(huì)返回null 。如果包內(nèi)的數(shù)據(jù)超出緩沖區(qū)能承受的范圍,多出的數(shù)據(jù)都會(huì)被悄悄地丟棄。
假如您提供的ByteBuffer 沒有足夠的剩余空間來存放您正在接收的數(shù)據(jù)包,沒有被填充的字節(jié)都會(huì)被悄悄地丟棄。 調(diào)用send() 會(huì)發(fā)送給定ByteBuffer 對(duì)象的內(nèi)容到給定SocketAddress 對(duì)象所描述的目的地址和端口,內(nèi)容范圍為從當(dāng)前position 開始到末尾處結(jié)束。如果DatagramChannel 對(duì)象處于阻塞模式,調(diào)用線程可能會(huì)休眠直到數(shù)據(jù)報(bào)被加入傳輸隊(duì)列。如果通道是非阻塞的,返回值要么是字節(jié)緩沖區(qū)的字節(jié)數(shù),要么是「0」。發(fā)送數(shù)據(jù)報(bào)是一個(gè)全有或全無(all-or-nothing)的行為。如果傳輸隊(duì)列沒有足夠空間來承載整個(gè)數(shù)據(jù)報(bào),那么什么內(nèi)容都不會(huì)被發(fā)送。 如果安裝了安全管理器,那么每次調(diào)用send() 或receive() 時(shí)安全管理器的checkConnect() 方法都會(huì)被調(diào)用以驗(yàn)證目的地址,除非通道處于已連接的狀態(tài)(本節(jié)后面會(huì)討論到)。 請(qǐng)注意,數(shù)據(jù)報(bào)協(xié)議的不可靠性是固有的,它們不對(duì)數(shù)據(jù)傳輸做保證。send() 方法返回的非零值并不表示數(shù)據(jù)報(bào)到達(dá)了目的地,僅代表數(shù)據(jù)報(bào)被成功加到本地網(wǎng)絡(luò)層的傳輸隊(duì)列。此外,傳輸過程中的協(xié)議可能將數(shù)據(jù)報(bào)分解成碎片。例如,以太網(wǎng)不能傳輸超過1,500個(gè)字節(jié)左右的包。如果您的數(shù)據(jù)報(bào)比較大,那么就會(huì)存在被分解成碎片的風(fēng)險(xiǎn),成倍地增加了傳輸過程中包丟失的幾率。被分解的數(shù)據(jù)報(bào)在目的地會(huì)被重新組合起來,接收者將看不到碎片。但是,如果有一個(gè)碎片不能按時(shí)到達(dá),那么整個(gè)數(shù)據(jù)報(bào)將被丟棄。 DatagramChannel 有一個(gè)connect() 方法:
- public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
- // 這里僅列出部分API
- public abstract DatagramChannel connect(SocketAddress remote) throws IOException;
- public abstract boolean isConnected();
- public abstract DatagramChannel disconnect() throws IOException;
- }
DatagramChannel 對(duì)數(shù)據(jù)報(bào)socket的連接語義不同于對(duì)流socket的連接語義。有時(shí)候,將數(shù)據(jù)報(bào)對(duì)話限制為兩方是很可取的。將DatagramChannel 置于已連接的狀態(tài)可以使除了它所「連接」到的地址之外的任何其他源地址的數(shù)據(jù)報(bào)被忽略。這是很有幫助的,因?yàn)椴幌胍陌家呀?jīng)被網(wǎng)絡(luò)層丟棄了,從而避免了使用代碼來接收、檢查然后丟棄包的麻煩。
當(dāng)DatagramChannel 已連接時(shí),使用同樣的令牌,您不可以發(fā)送包到除了指定給connect() 方法的目的地址以外的任何其他地址。試圖一定要這樣做的話會(huì)導(dǎo)致一個(gè)SecurityException 異常。 我們可以通過調(diào)用帶SocketAddress對(duì)象的connect() 方法來連接一個(gè)DatagramChannel ,該SocketAddress 對(duì)象描述了DatagramChannel 遠(yuǎn)程對(duì)等體的地址。如果已經(jīng)安裝了一個(gè)安全管理器,那么它會(huì)進(jìn)行權(quán)限檢查。之后,每次send/receive時(shí)就不會(huì)再有安全檢查了,因?yàn)閬碜曰蛉サ饺魏纹渌刂返陌际遣辉试S的。 已連接通道會(huì)發(fā)揮作用的使用場(chǎng)景之一是一個(gè)客戶端/服務(wù)器模式、使用UDP通訊協(xié)議的實(shí)時(shí)游戲。每個(gè)客戶端都只和同一臺(tái)服務(wù)器進(jìn)行會(huì)話而希望忽視任何其他來源地?cái)?shù)據(jù)包。將客戶端的DatagramChannel 實(shí)例置于已連接狀態(tài)可以減少按包計(jì)算的總開銷(因?yàn)椴恍枰獙?duì)每個(gè)包進(jìn)行安全檢查)和剔除來自欺騙玩家的假包。服務(wù)器可能也想要這樣做,不過需要每個(gè)客戶端都有一個(gè)DatagramChannel 對(duì)象。 不同于流socket,數(shù)據(jù)報(bào)socket的無狀態(tài)性質(zhì)不需要同遠(yuǎn)程系統(tǒng)進(jìn)行對(duì)話來建立連接狀態(tài)。沒有實(shí)際的連接,只有用來指定允許的遠(yuǎn)程地址的本地狀態(tài)信息。由于此原因,DatagramChannel 上也就沒有單獨(dú)的finishConnect() 方法。我們可以使用isConnected() 方法來測(cè)試一個(gè)數(shù)據(jù)報(bào)通道的連接狀態(tài)。 不同于SocketChannel (必須連接了才有用并且只能連接一次),DatagramChannel 對(duì)象可以任意次數(shù)地進(jìn)行連接或斷開連接。每次連接都可以到一個(gè)不同的遠(yuǎn)程地址。調(diào)用disconnect() 方法可以配置通道,以便它能再次接收來自安全管理器(如果已安裝)所允許的任意遠(yuǎn)程地址的數(shù)據(jù)或發(fā)送數(shù)據(jù)到這些地址上。 當(dāng)一個(gè)DatagramChannel 處于已連接狀態(tài)時(shí),發(fā)送數(shù)據(jù)將不用提供目的地址而且接收時(shí)的源地址也是已知的。這意味著DatagramChannel 已連接時(shí)可以使用常規(guī)的read() 和write() 方法,包括scatter/gather形式的讀寫來組合或分拆包的數(shù)據(jù): - public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel {
- // 這里僅列出部分API
- public abstract int read(ByteBuffer dst) throws IOException;
- public abstract long read(ByteBuffer[] dsts) throws IOException;
- public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
- public abstract int write(ByteBuffer src) throws IOException;
- public abstract long write(ByteBuffer[] srcs) throws IOException;
- public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
- }
read() 方法返回讀取字節(jié)的數(shù)量,如果通道處于非阻塞模式的話這個(gè)返回值可能是「0」。write() 方法的返回值同send() 方法一致:要么返回您的緩沖區(qū)中的字節(jié)數(shù)量,要么返回「0」(如果由于通道處于非阻塞模式而導(dǎo)致數(shù)據(jù)報(bào)不能被發(fā)送)。當(dāng)通道不是已連接狀態(tài)時(shí)調(diào)用read() 或write() 方法,都將產(chǎn)生NotYetConnectedException 異常。
數(shù)據(jù)報(bào)通道不同于流socket。由于它們的有序而可靠的數(shù)據(jù)傳輸特性,流socket非常得有用。大多數(shù)網(wǎng)絡(luò)連接都是流 socket(TCP/IP 就是一個(gè)顯著的例子)。但是,像 TCP/IP 這樣面向流的的協(xié)議為了在包導(dǎo)向的互聯(lián)網(wǎng)基礎(chǔ)設(shè)施上維護(hù)流語義必然會(huì)產(chǎn)生巨大的開銷,并且流隱喻不能適用所有的情形。數(shù)據(jù)報(bào)的吞吐量要比流協(xié)議高很多,并且數(shù)據(jù)報(bào)可以做很多流無法完成的事情。 下面列出了一些選擇數(shù)據(jù)報(bào)socket而非流socket的理由: - 您的程序可以承受數(shù)據(jù)丟失或無序的數(shù)據(jù)。
- 您希望「發(fā)射后不管」(fire and forget)而不需要知道您發(fā)送的包是否已接收。
- 數(shù)據(jù)吞吐量比可靠性更重要。
- 您需要同時(shí)發(fā)送數(shù)據(jù)給多個(gè)接受者(多播或者廣播)。
- 包隱喻比流隱喻更適合手邊的任務(wù)。
如果以上特征中的一個(gè)或多個(gè)適用于您的程序,那么數(shù)據(jù)報(bào)設(shè)計(jì)對(duì)您來說就是合適的。 例 3-9 顯示了如何使用DatagramChannel 發(fā)送請(qǐng)求到多個(gè)地址上的時(shí)間服務(wù)器。DatagramChannel 接著會(huì)等待回復(fù)(reply)的到達(dá)。對(duì)于每個(gè)返回的回復(fù),遠(yuǎn)程時(shí)間會(huì)同本地時(shí)間進(jìn)行比較。由于數(shù)據(jù)報(bào)傳輸不保證一定成功,有些回復(fù)可能永遠(yuǎn)不會(huì)到達(dá)。大多數(shù)Linux和Unix系統(tǒng)都默認(rèn)提供時(shí)間服務(wù)。互聯(lián)網(wǎng)上也有一個(gè)公共時(shí)間服務(wù)器,如time.nist.gov。防火墻或者您的ISP可能會(huì)干擾數(shù)據(jù)報(bào)傳輸,這是因人而異的。 - /*
- *例 3-9 使用DatagramChannel的時(shí)間服務(wù)客戶端
- */
- package com.ronsoft.books.nio.channels;
- import java.nio.ByteBuffer;
- import java.nio.ByteOrder;
- import java.nio.channels.DatagramChannel;
- import java.net.InetSocketAddress;
- import java.util.Date;
- import java.util.List;
- import java.util.LinkedList;
- import java.util.Iterator;
- /**
- * Request time service, per RFC 868. RFC 868
- * (http://www./rfc/rfc0868.txt) is a very simple time protocol
- * whereby one system can request the current time from another system.
- * Most Linux, BSD and Solaris systems provide RFC 868 time service
- * on port 37. This simple program will inter-operate with those.
- * The National Institute of Standards and Technology (NIST) operates
- * a public time server at time.nist.gov.
- *
- * The RFC 868 protocol specifies a 32 bit unsigned value be sent,
- * representing the number of seconds since Jan 1, 1900. The Java
- * epoch begins on Jan 1, 1970 (same as unix) so an adjustment is
- * made by adding or subtracting 2,208,988,800 as appropriate. To
- * avoid shifting and masking, a four-byte slice of an
- * eight-byte buffer is used to send/recieve. But getLong()
- * is done on the full eight bytes to get a long value.
- *
- * When run, this program will issue time requests to each hostname
- * given on the command line, then enter a loop to receive packets.
- * Note that some requests or replies may be lost, which means
- * this code could block forever.
- *
- * @author Ron Hitchens (ron@ronsoft.com)
- */
- public class TimeClient {
- private static final int DEFAULT_TIME_PORT = 37;
- private static final long DIFF_1900 = 2208988800L;
- protected int port = DEFAULT_TIME_PORT;
- protected List remoteHosts;
- protected DatagramChannel channel;
-
- public TimeClient (String [] argv) throws Exception
- if (argv.length == 0) {
- throw new Exception ("Usage: [ -p port ] host ...");
- }
- parseArgs(argv);
- this.channel = DatagramChannel.open();
- }
-
- protected InetSocketAddress receivePacket (DatagramChannel channel, ByteBuffer buffer) throws Exception {
- buffer.clear();
- // Receive an unsigned 32-bit, big-endian value
- return ((InetSocketAddress) channel.receive (buffer));
- }
-
- // Send time requests to all the supplied hosts
- protected void sendRequests() throws Exception {
- ByteBuffer buffer = ByteBuffer.allocate (1);
- Iterator it = remoteHosts.iterator();
- while (it.hasNext()) {
- InetSocketAddress sa = (InetSocketAddress) it.next();
- System.out.println ("Requesting time from " + sa.getHostName() + ":" + sa.getPort());
- // Make it empty (see RFC868)
- buffer.clear().flip();
- // Fire and forget
- channel.send (buffer, sa);
- }
- }
-
- // Receive any replies that arrive
- public void getReplies() throws Exception {
- // Allocate a buffer to hold a long value
- ByteBuffer longBuffer = ByteBuffer.allocate (8);
- // Assure big-endian (network) byte order
- longBuffer.order (ByteOrder.BIG_ENDIAN);
- // Zero the whole buffer to be sure
- longBuffer.putLong (0, 0);
- // Position to first byte of the low-order 32 bits
- longBuffer.position (4);
- // Slice the buffer; gives view of the low-order 32 bits
- ByteBuffer buffer = longBuffer.slice();
- int expect = remoteHosts.size();
- int replies = 0;
- System.out.println ("");
- System.out.println ("Waiting for replies...");
- while (true) {
- InetSocketAddress sa = receivePacket(channel, buffer);
- buffer.flip();
- replies++;
- printTime(longBuffer.getLong(0), sa);
- if (replies == expect) {
- System.out.println ("All packets answered");
- break;
- }
- // Some replies haven't shown up yet
- System.out.println ("Received " + replies + " of " + expect + " replies");
- }
- }
-
- // Print info about a received time reply
- protected void printTime (long remote1900, InetSocketAddress sa) {
- // local time as seconds since Jan 1, 1970
- long local = System.currentTimeMillis() / 1000;
- // remote time as seconds since Jan 1, 1970
- long remote = remote1900 - DIFF_1900;
- Date remoteDate = new Date (remote * 1000);
- Date localDate = new Date (local * 1000);
- long skew = remote - local;
- System.out.println ("Reply from " + sa.getHostName() + ":" + sa.getPort());
- System.out.println (" there: " + remoteDate);
- System.out.println (" here: " + localDate);
- System.out.print (" skew: ");
- if (skew == 0) {
- System.out.println ("none");
- } else if (skew > 0) {
- System.out.println (skew + " seconds ahead");
- } else {
- System.out.println ((-skew) + " seconds behind");
- }
- }
-
- protected void parseArgs (String [] argv) {
- remoteHosts = new LinkedList();
- for (int i = 0; i < argv.length; i++) {
- String arg = argv [i];
- // Send client requests to the given port
- if (arg.equals("-p")) {
- i++;
- this.port = Integer.parseInt (argv [i]);
- continue;
- }
- // Create an address object for the hostname
- InetSocketAddress sa = new InetSocketAddress(arg, port);
- // Validate that it has an address
- if (sa.getAddress() == null) {
- System.out.println ("Cannot resolve address: " + arg);
- continue;
- }
- remoteHosts.add(sa);
- }
- }
-
- // --------------------------------------------------------------
- public static void main (String [] argv) throws Exception {
- TimeClient client = new TimeClient(argv);
- client.sendRequests();
- client.getReplies();
- }
- }
例 3-10 中的程序是一個(gè)RFC 868時(shí)間服務(wù)器。這段代碼回答來自例 3-9 中的客戶端的請(qǐng)求并顯示出DatagramChannel 是怎樣綁定到一個(gè)熟知端口然后開始監(jiān)聽來自客戶端的請(qǐng)求的。該時(shí)間服務(wù)器僅監(jiān)聽數(shù)據(jù)報(bào)(UDP)請(qǐng)求。大多數(shù)Unix和Linux系統(tǒng)提供的rdate命令使用TCP協(xié)議連接到一個(gè)RFC 868時(shí)間服務(wù)。 - /*
- *例 3-10 DatagramChannel 時(shí)間服務(wù)器
- */
- package com.ronsoft.books.nio.channels;
- import java.nio.ByteBuffer;
- import java.nio.ByteOrder;
- import java.nio.channels.DatagramChannel;
- import java.net.SocketAddress;
- import java.net.InetSocketAddress;
- import java.net.SocketException;
- /**
- * Provide RFC 868 time service (http://www./rfc/rfc0868.txt).
- * This code implements an RFC 868 listener to provide time
- * service. The defined port for time service is 37. On most
- * unix systems, root privilege is required to bind to ports
- * below 1024. You can either run this code as root or
- * provide another port number on the command line. Use
- * "-p port#" with TimeClient if you choose an alternate port.
- *
- * Note: The familiar rdate command on unix will probably not work
- * with this server. Most versions of rdate use TCP rather than UDP
- * to request the time.
- *
- * @author Ron Hitchens (ron@ronsoft.com)
- */
- public class TimeServer {
- private static final int DEFAULT_TIME_PORT = 37;
- private static final long DIFF_1900 = 2208988800L;
- protected DatagramChannel channel;
-
- public TimeServer (int port) throws Exception {
- this.channel = DatagramChannel.open();
- this.channel.socket().bind (new InetSocketAddress (port));
- System.out.println ("Listening on port " + port + " for time requests");
- }
-
- public void listen() throws Exception {
- // Allocate a buffer to hold a long value
- ByteBuffer longBuffer = ByteBuffer.allocate (8);
- // Assure big-endian (network) byte order
- longBuffer.order (ByteOrder.BIG_ENDIAN);
- // Zero the whole buffer to be sure
- longBuffer.putLong (0, 0);
- // Position to first byte of the low-order 32 bits
- longBuffer.position (4);
- // Slice the buffer; gives view of the low-order 32 bits
- ByteBuffer buffer = longBuffer.slice();
- while (true) {
- buffer.clear();
- SocketAddress sa = this.channel.receive (buffer);
- if (sa == null) {
- continue; // defensive programming
- }
- // Ignore content of received datagram per RFC 868
- System.out.println ("Time request from " + sa);
- buffer.clear(); // sets pos/limit correctly
- // Set 64-bit value; slice buffer sees low 32 bits
- longBuffer.putLong (0, (System.currentTimeMillis() / 1000) + DIFF_1900);
- this.channel.send (buffer, sa);
- }
- }
-
- // --------------------------------------------------------------
- public static void main (String [] argv) throws Exception {
- int port = DEFAULT_TIME_PORT;
- if (argv.length > 0) {
- port = Integer.parseInt (argv [0]);
- }
- try {
- TimeServer server = new TimeServer (port);
- server.listen();
- } catch (SocketException e) {
- System.out.println ("Can't bind to port " + port + ", try a different one");
- }
- }
- }
Java nio入門教程詳解(二十八)
2
0
我們認(rèn)為:用戶的主要目的,是為了獲取有用的信息,而不是來點(diǎn)擊廣告的。因此本站將竭力做好內(nèi)容,并將廣告和內(nèi)容進(jìn)行分離,確保所有廣告不會(huì)影響到用戶的正常閱讀體驗(yàn)。用戶僅憑個(gè)人意愿和興趣愛好點(diǎn)擊廣告。
我們堅(jiān)信:只有給用戶帶來價(jià)值,用戶才會(huì)給我們以回報(bào)。
|