4.5 選擇過程的可擴展性 我多次提到選擇器可以簡化用單線程同時管理多個可選擇通道的實現(xiàn)。使用一個線程來為多個通道提供服務,通過消除管理各個線程的額外開銷,可能會降低復雜性并可能大幅提升性能。但只使用一個線程來服務所有可選擇的通道是否是一個好主意呢?這要看情況。 對單CPU的系統(tǒng)而言這可能是一個好主意,因為在任何情況下都只有一個線程能夠運行。通過消除在線程之間進行上下文切換帶來的額外開銷,總吞吐量可以得到提高。但對于一個多CPU的系統(tǒng)呢?在一個有n個CPU的系統(tǒng)上,當一個單一的線程線性地輪流處理每一個線程時,可能有n-1個cpu處于空閑狀態(tài)。 那么讓不同道請求不同的服務類的辦法如何?想象一下,如果一個應用程序為大量的分布式的傳感器記錄信息。每個傳感器在服務線程遍歷每個就緒的通道時需要等待數(shù)秒鐘。這在響應時間不重要時是可以的。但對于高優(yōu)先級的連接(如操作命令),如果只用一個線程為所有通道提供服務,將不得不在隊列中等待。不同的應用程序的要求也是不同的。您采用的策略會受到您嘗試解決的問題的影響。 在第一個場景中,如果您想要將更多的線程來為通道提供服務,請抵抗住使用多個選擇器的欲望。在大量通道上執(zhí)行就緒選擇并不會有很大的開銷,大多數(shù)工作是由底層操作系統(tǒng)完成的。管理多個選擇器并隨機地將通道分派給它們當中的一個并不是這個問題的合理的解決方案。這只會形成這個場景的一個更小的版本。 一個更好的策略是對所有的可選擇通道使用一個選擇器,并將對就緒通道的服務委托給其他線程。您只用一個線程監(jiān)控通道的就緒狀態(tài)并使用一個協(xié)調好的工作線程池來處理共接收到的數(shù)據(jù)。根據(jù)部署的條件,線程池的大小是可以調整的(或者它自己進行動態(tài)的調整)。對可選擇通道的管理仍然是簡單的,而簡單的就是好的。 第二個場景中,某些通道要求比其他通道更高的響應速度,可以通過使用兩個選擇器來解決:一個為命令連接服務,另一個為普通連接服務。但這種場景也可以使用與第一個場景十分相似的辦法來解決。與將所有準備好的通道放到同一個線程池的做法不同,通道可以根據(jù)功能由不同的工作線程來處理。它們可能可以是日志線程池,命令/控制線程池,狀態(tài)請求線程池,等等。 例4-2的代碼是例4-1的一般性的選擇循環(huán)的擴展。它覆寫了readDataFromSocket()方法,并使用線程池來為準備好數(shù)據(jù)用于讀取的通道提供服務。與在主線程中同步地讀取數(shù)據(jù)不同,這個版本的實現(xiàn)將SelectionKey對象傳遞給為其服務的工作線程。 /* *例4-2.使用線程池來為通道提供服務 */ package com.ronsoft.books.nio.channels; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.channels.SelectionKey; import java.util.List; import java.util.LinkedList; import java.io.IOException; /** * Specialization of the SelectSockets class which uses a thread pool to service * channels. The thread pool is an ad-hoc implementation quicky lashed togther * in a few hours for demonstration purposes. It's definitely not production * quality. * * @author Ron Hitchens (ron@ronsoft.com) */ public class SelectSocketsThreadPool extends SelectSockets { private static final int MAX_THREADS = 5; private ThreadPool pool = new ThreadPool(MAX_THREADS); public static void main(String[] argv) throws Exception { new SelectSocketsThreadPool().go(argv); } // ------------------------------------------------------------- /** * Sample data handler method for a channel with data ready to read. This * method is invoked from the go() method in the parent class. This handler * delegates to a worker thread in a thread pool to service the channel, * then returns immediately. * * @param key * A SelectionKey object representing a channel determined by the * selector to be ready for reading. If the channel returns an * EOF condition, it is closed here, which automatically * invalidates the associated key. The selector will then * de-register the channel on the next select call. */ protected void readDataFromSocket(SelectionKey key) throws Exception { WorkerThread worker = pool.getWorker(); if (worker == null) { // No threads available. Do nothing. The selection // loop will keep calling this method until a // thread becomes available. This design could // be improved. return; } // Invoking this wakes up the worker thread, then returns worker.serviceChannel(key); } // --------------------------------------------------------------- /** * A very simple thread pool class. The pool size is set at construction * time and remains fixed. Threads are cycled through a FIFO idle queue. */ private class ThreadPool { List idle = new LinkedList(); ThreadPool(int poolSize) { // Fill up the pool with worker threads for (int i = 0; i < poolSize; i ) { WorkerThread thread = new WorkerThread(this); // Set thread name for debugging. Start it. thread.setName("Worker" (i 1)); thread.start(); idle.add(thread); } } /** * Find an idle worker thread, if any. Could return null. */ WorkerThread getWorker() { WorkerThread worker = null; synchronized (idle) { if (idle.size() > 0) { worker = (WorkerThread) idle.remove(0); } } return worker; } /** * Called by the worker thread to return itself to the idle pool. */ void returnWorker(WorkerThread worker) { synchronized (idle) { idle.add(worker); } } } /** * A worker thread class which can drain channels and echo-back the input. * Each instance is constructed with a reference to the owning thread pool * object. When started, the thread loops forever waiting to be awakened to * service the channel associated with a SelectionKey object. The worker is * tasked by calling its serviceChannel() method with a SelectionKey * object. The serviceChannel() method stores the key reference in the * thread object then calls notify() to wake it up. When the channel has 147 * been drained, the worker thread returns itself to its parent pool. */ private class WorkerThread extends Thread { private ByteBuffer buffer = ByteBuffer.allocate(1024); private ThreadPool pool; private SelectionKey key; WorkerThread(ThreadPool pool) { this.pool = pool; } // Loop forever waiting for work to do public synchronized void run() { System.out.println(this.getName() " is ready"); while (true) { try { // Sleep and release object lock this.wait(); } catch (InterruptedException e) { e.printStackTrace(); // Clear interrupt status this.interrupted(); } if (key == null) { continue; // just in case } System.out.println(this.getName() " has been awakened"); try { drainChannel(key); } catch (Exception e) { System.out.println("Caught '" e "' closing channel"); // Close channel and nudge selector try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; // Done. Ready for more. Return to pool this.pool.returnWorker(this); } } /** * Called to initiate a unit of work by this worker thread on the * provided SelectionKey object. This method is synchronized, as is the * run() method, so only one key can be serviced at a given time. * Before waking the worker thread, and before returning to the main * selection loop, this key's interest set is updated to remove OP_READ. * This will cause the selector to ignore read-readiness for this * channel while the worker thread is servicing it. */ synchronized void serviceChannel(SelectionKey key) { this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); this.notify(); // Awaken the thread } /** * The actual code which drains the channel associated with the given * key. This method assumes the key has been modified prior to * invocation to turn off selection interest in OP_READ. When this * method completes it re-enables OP_READ and calls wakeup( ) on the * selector so the selector will resume watching this channel. */ void drainChannel(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // Empty buffer // Loop while data is available; channel is nonblocking while ((count = channel.read(buffer)) > 0) { buffer.flip(); // make buffer readable // Send the data; may not go all at once while (buffer.hasRemaining()) { channel.write(buffer); } // WARNING: the above loop is evil. // See comments in superclass. buffer.clear(); // Empty buffer } if (count < 0) { // Close channel on EOF; invalidates the key channel.close(); return; } // Resume interest in OP_READ key.interestOps(key.interestOps() | SelectionKey.OP_READ); // Cycle the selector so this key is active again key.selector().wakeup(); } } } 由于執(zhí)行選擇過程的線程將重新循環(huán)并幾乎立即再次調用select(),鍵的interest集合將被修改,并將interest(感興趣的操作)從讀取就緒(read-rreadiness)狀態(tài)中移除。這將防止選擇器重復地調用readDataFromSocket()(因為通道仍然會準備好讀取數(shù)據(jù),直到工作線程從它那里讀取數(shù)據(jù))。當工作線程結束為通道提供的服務時,它將再次更新鍵的ready集合,來將interest重新放到讀取就緒集合中。它也會在選擇器上顯式地嗲用wakeup()。如果主線程在select()中被阻塞,這將使它繼續(xù)執(zhí)行。這個選擇循環(huán)會再次執(zhí)行一個輪回(可能什么也沒做)并帶著被更新的鍵重新進入select()。 00 我們認為:用戶的主要目的,是為了獲取有用的信息,而不是來點擊廣告的。因此本站將竭力做好內容,并將廣告和內容進行分離,確保所有廣告不會影響到用戶的正常閱讀體驗。用戶僅憑個人意愿和興趣愛好點擊廣告。 我們堅信:只有給用戶帶來價值,用戶才會給我們以回報。 |
|