前言介紹 分析Promise之前我們先來(lái)看兩個(gè)單詞;Promise、Future
Promise v. 許諾;承諾;答應(yīng);保證;使很可能;預(yù)示 Future n. 將來(lái);未來(lái);未來(lái)的事;將來(lái)發(fā)生的事;前景;前途;前程
他們的含義都是對(duì)未來(lái)即將要發(fā)生的事情做相應(yīng)的處理,這也是在異步編程中非常常見(jiàn)的類名。
Netty是一個(gè)異步網(wǎng)絡(luò)處理框架,在實(shí)現(xiàn)中大量使用了Future機(jī)制,并在Java自帶Future的基礎(chǔ)上,增加了Promise機(jī)制。這兩個(gè)實(shí)現(xiàn)類的目的都是為了使異步編程更加方便使用。
源碼分析 1、了解Java并發(fā)包中的Future java的并發(fā)包中提供java.util.concurrent.Future類,用于處理異步操作。在Java中Future是一個(gè)未來(lái)完成的異步操作,可以獲得未來(lái)返回的值。如下案例,調(diào)用一個(gè)獲取用戶信息的方法,該方法會(huì)立刻返回Future對(duì)象,調(diào)用Future.get()可以同步等待耗時(shí)方法的返回,也可以通過(guò)調(diào)用future的cancel()取消Future任務(wù)。
1 class TestFuture { 2 3 public static void main (String[] args) throws ExecutionException, InterruptedException { 4 TestFuture testFuture = new TestFuture(); 5 Future<String> future = testFuture.queryUserInfo("10001" ); //返回future 6 String userInfo = future.get(); 7 System.out.println("查詢用戶信息:" + userInfo); 8 } 9 10 private Future<String> queryUserInfo (String userId) {11 FutureTask<String> future = new FutureTask<>(() -> {12 try {13 Thread.sleep(1000 );14 return "微信公眾號(hào):bugstack蟲(chóng)洞棧 | 用戶ID:" + userId;15 } catch (InterruptedException ignored) {}16 return "error" ;17 });18 new Thread(future).start();19 return future;20 }21 22 }
2、Netty實(shí)現(xiàn)了自己的Future Netty通過(guò)繼承java并發(fā)包的Future來(lái)定義自己的Future接口,為Future加入的功能主要有添加、刪除監(jiān)聽(tīng)事件接口,最后由Promise實(shí)現(xiàn)。
io.netty.util.concurrent.Future.java中定義了一些列的異步編程方法 | 經(jīng)常會(huì)使用的>b.bind(port).sync();
1 // 只有IO操作完成時(shí)才返回true 2 boolean isSuccess () ; 3 // 只有當(dāng)cancel(boolean)成功取消時(shí)才返回true 4 boolean isCancellable () ; 5 // IO操作發(fā)生異常時(shí),返回導(dǎo)致IO操作以此的原因,如果沒(méi)有異常,返回null 6 Throwable cause () ; 7 // 向Future添加事件,future完成時(shí),會(huì)執(zhí)行這些事件,如果add時(shí)future已經(jīng)完成,會(huì)立即執(zhí)行監(jiān)聽(tīng)事件 8 Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; 9 Future<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ;10 // 移除監(jiān)聽(tīng)事件,future完成時(shí),不會(huì)觸發(fā) 11 Future<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ;12 Future<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ;13 // 等待future done 14 Future<V> sync () throws InterruptedException ;15 // 等待future done,不可打斷 16 Future<V> syncUninterruptibly () ;17 // 等待future完成 18 Future<V> await () throws InterruptedException ;19 // 等待future 完成,不可打斷 20 Future<V> awaitUninterruptibly () ;21 boolean await (long timeout, TimeUnit unit) throws InterruptedException ;22 boolean await (long timeoutMillis) throws InterruptedException ;23 boolean awaitUninterruptibly (long timeout, TimeUnit unit) ;24 boolean awaitUninterruptibly (long timeoutMillis) ;25 // 立刻獲得結(jié)果,如果沒(méi)有完成,返回null 26 V getNow () ;27 // 如果成功取消,future會(huì)失敗,導(dǎo)致CancellationException 28 @Override 29 boolean cancel (boolean mayInterruptIfRunning) ;
3、Promise機(jī)制 Netty的Future與Java的Future雖然類名相同,但功能上略有不同,Netty中引入了Promise機(jī)制。在Java的Future中,業(yè)務(wù)邏輯為一個(gè)Callable或Runnable實(shí)現(xiàn)類,該類的call()或run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié);而在Promise機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。
io.netty.util.concurrent.Promise.java |
1 public interface Promise <V > extends Future <V > { 2 3 // 設(shè)置future執(zhí)行結(jié)果為成功 4 Promise<V> setSuccess (V result) ; 5 6 // 嘗試設(shè)置future執(zhí)行結(jié)果為成功,返回是否設(shè)置成功 7 boolean trySuccess (V result) ; 8 9 // 設(shè)置失敗 10 Promise<V> setFailure (Throwable cause) ;11 12 // 嘗試設(shè)置future執(zhí)行結(jié)果為失敗,返回是否設(shè)置成功 13 boolean tryFailure (Throwable cause) ;14 15 // 設(shè)置為不能取消 16 boolean setUncancellable () ;17 18 // 源碼中,以下為覆蓋了Future的方法,例如; 19 20 Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ;21 22 @Override 23 Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ;24 25 }
TestPromise.java | 一個(gè)查詢用戶信息的Promise列子,加入監(jiān)聽(tīng)再operationComplete完成后,獲取查詢信息
1 class TestPromise { 2 3 public static void main (String[] args) throws ExecutionException, InterruptedException { 4 TestPromise testPromise = new TestPromise(); 5 Promise<String> promise = testPromise.queryUserInfo("10001" ); 6 promise.addListener(new GenericFutureListener<Future<? super String>>() { 7 @Override 8 public void operationComplete (Future<? super String> future) throws Exception { 9 System.out.println("addListener.operationComplete > 查詢用戶信息完成: " + future.get());10 }11 });12 }13 14 private Promise<String> queryUserInfo (String userId) {15 NioEventLoopGroup loop = new NioEventLoopGroup();16 // 創(chuàng)建一個(gè)DefaultPromise并返回,將業(yè)務(wù)邏輯放入線程池中執(zhí)行 17 DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());18 loop.schedule(() -> {19 try {20 Thread.sleep(1000 );21 promise.setSuccess("微信公眾號(hào):bugstack蟲(chóng)洞棧 | 用戶ID:" + userId);22 return promise;23 } catch (InterruptedException ignored) {24 }25 return promise;26 }, 0 , TimeUnit.SECONDS);27 return promise;28 }29 30 }
通過(guò)這個(gè)例子可以看到,Promise能夠在業(yè)務(wù)邏輯線程中通知Future成功或失敗,由于Promise繼承了Netty的Future,因此可以加入監(jiān)聽(tīng)事件。而Future和Promise的好處在于,獲取到Promise對(duì)象后可以為其設(shè)置異步調(diào)用完成后的操作,然后立即繼續(xù)去做其他任務(wù)。
4、Promise類組織結(jié)構(gòu)&常用方法 DefaultChannelPromise類組織結(jié)構(gòu)圖 | 承接Java并發(fā)包Future并增強(qiáng)實(shí)現(xiàn)
微信公眾號(hào):bugstack蟲(chóng)洞棧 | DefaultChannelPromise類組織結(jié)構(gòu)圖 Netty中DefalutPromise是一個(gè)非常常用的類,這是Promise實(shí)現(xiàn)的基礎(chǔ)。DefaultChannelPromise是DefalutPromise的子類,加入了channel這個(gè)屬性。
DefaultPromise | 使用 在Netty中使用到Promise的地方會(huì)非常多,例如在前面一節(jié)《一行簡(jiǎn)單的writeAndFlush都做了哪些事》分析HeadContext.write中unsafe.write(msg, promise);結(jié)合這一章節(jié)可以繼續(xù)深入了解Netty的異步框架原理。另外,服務(wù)器/客戶端啟動(dòng)時(shí)的注冊(cè)任務(wù),最終會(huì)調(diào)用unsafe的register,調(diào)用過(guò)程中會(huì)傳入一個(gè)promise,unsafe進(jìn)行事件的注冊(cè)時(shí)調(diào)用promise可以設(shè)置成功/失敗。
SingleThreadEventLoop.java | 注冊(cè)服務(wù)事件循環(huán)組
1 @Override 2 public ChannelFuture register (Channel channel) { 3 return register(new DefaultChannelPromise(channel, this )); 4 } 5 6 @Override 7 public ChannelFuture register (final ChannelPromise promise) { 8 ObjectUtil.checkNotNull(promise, "promise" ); 9 promise.channel().unsafe().register(this , promise);10 return promise;11 }
DefaultPromise | 實(shí)現(xiàn) DefaultChannelPromise提供的功能可以分為兩個(gè)部分;
AbstractFuture.java | get()方法
1 public abstract class AbstractFuture <V > implements Future <V > { 2 3 @Override 4 public V get () throws InterruptedException, ExecutionException { 5 await(); 6 7 Throwable cause = cause(); 8 if (cause == null ) { 9 return getNow();10 }11 if (cause instanceof CancellationException) {12 throw (CancellationException) cause;13 }14 throw new ExecutionException(cause);15 }16 17 @Override 18 public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {19 if (await(timeout, unit)) {20 Throwable cause = cause();21 if (cause == null ) {22 return getNow();23 }24 if (cause instanceof CancellationException) {25 throw (CancellationException) cause;26 }27 throw new ExecutionException(cause);28 }29 throw new TimeoutException();30 }31 }
DefaultPromise父類AbstractFuture提供了兩個(gè)get方法;1、無(wú)參數(shù)的get會(huì)阻塞等待;2、有參數(shù)的get會(huì)等待指定事件,若未結(jié)束拋出超時(shí)異常。
DefaultPromise.java | DefaultPromise.await()方法
1 @Override 2 public Promise<V> await () throws Interrupt 3 // 判斷Future任務(wù)是否結(jié)束,內(nèi)部根據(jù)result是否為null判斷,setSuccess或setFailure時(shí)會(huì)通過(guò)CAS修改result 4 if (isDone() ) { 5 return this ; 6 } 7 // 線程是否被中斷 8 if (Thread.interrupted()) { 9 throw new InterruptedException(toS10 }11 // 檢查當(dāng)前線程是否與線程池運(yùn)行的線程是一個(gè) 12 checkDeadLock();13 synchronized (this ) {14 while (!isDone()) {15 /* waiters計(jì)數(shù)加116 * private void incWaiters() {17 * if (waiters == Short.MAX_VALUE) {18 * throw new IllegalStateException("too many waiters: " + this);19 * }20 * ++waiters;21 * }22 */ 23 incWaiters();24 try {25 // Object的方法,讓出CPU,加入等待隊(duì)列 26 wait();27 } finally {28 // waiters計(jì)數(shù)減1 29 decWaiters();30 }31 }32 }33 return this ;34 }
await(long timeout, TimeUnit unit)與awite類似,只是調(diào)用了Object對(duì)象的wait(long timeout, int nanos)方法awaitUninterruptibly()方法在內(nèi)部catch住了等待線程的中斷異常,因此不會(huì)拋出中斷異常。
DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()
1 private void addListener0 (GenericFutureListener<? extends Future<? super V>> listener) { 2 if (listeners == null ) { 3 listeners = listener; 4 } else if (listeners instanceof DefaultFutureListeners) { 5 ((DefaultFutureListeners) listeners).add(listener); 6 } else { 7 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); 8 } 9 }10 private void removeListener0 (GenericFutureListener<? extends Future<? super V>> listener) {11 if (listeners instanceof DefaultFutureListeners) {12 ((DefaultFutureListeners) listeners).remove(listener);13 } else if (listeners == listener) {14 listeners = null ;15 }16 }
addListener0方法被調(diào)用時(shí),將傳入的回調(diào)類傳入到listeners對(duì)象中,如果監(jiān)聽(tīng)多于1個(gè),會(huì)創(chuàng)建DefaultFutureListeners對(duì)象將回調(diào)方法保存在一個(gè)數(shù)組中。
removeListener0會(huì)將listeners設(shè)置為null(只有一個(gè)時(shí))或從數(shù)組中移除(多個(gè)回調(diào)時(shí))。
DefaultPromise.java | DefaultPromise.notifyListener0() 通知偵聽(tīng)器
1 @SuppressWarnings ({ "unchecked" , "rawtypes" }) 2 private static void notifyListener0 (Future future, GenericFutureListener l) { 3 try { 4 l.operationComplete(future); 5 } catch (Throwable t) { 6 if (logger.isWarnEnabled()) { 7 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); 8 } 9 }10 }
在添加監(jiān)聽(tīng)器時(shí),如果任務(wù)剛好執(zhí)行完畢,則會(huì)立即觸發(fā)監(jiān)聽(tīng)事件,觸發(fā)監(jiān)聽(tīng)通過(guò)notifyListeners()實(shí)現(xiàn)。
addListener和setSuccess都會(huì)調(diào)用notifyListeners()和Promise內(nèi)的線程池當(dāng)前執(zhí)行的線程是同一個(gè)線程,則放在線程池中執(zhí)行,否則提交到線程池去執(zhí)行;例如,main線程中調(diào)用addListener時(shí)任務(wù)完成,notifyListeners()執(zhí)行回調(diào),會(huì)提交到線程池中執(zhí)行;而如果是執(zhí)行Future任務(wù)的線程池中setSuccess()時(shí)調(diào)用notifyListeners(),會(huì)放在當(dāng)前線程中執(zhí)行。
內(nèi)部維護(hù)了notifyingListeners用來(lái)記錄是否已經(jīng)觸發(fā)過(guò)監(jiān)聽(tīng)事件,只有未觸發(fā)過(guò)且監(jiān)聽(tīng)列表不為空,才會(huì)依次便利并調(diào)用operationComplete
DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 喚起等待線程通知成功/失敗
1 // 設(shè)置成功后喚醒等待線程 2 private boolean setSuccess0 (V result) { 3 return setValue0(result == null ? SUCCESS : result); 4 } 5 6 // 設(shè)置成功后喚醒等待線程 7 private boolean setFailure0 (Throwable cause) { 8 return setValue0(new CauseHolder(checkNotNull(cause, "cause" ))); 9 }10 11 // 通知成功時(shí)將結(jié)果保存在變量result,通知失敗時(shí),使用CauseHolder包裝Throwable賦值給result 12 // RESULT_UPDATER 是一個(gè)使用CAS更新內(nèi)部屬性result的類, 13 // 如果result為null或UNCANCELLABLE,更新為成功/失敗結(jié)果;UNCANCELLABLE是不可取消狀態(tài) 14 private boolean setValue0 (Object objResult) {15 if (RESULT_UPDATER.compareAndSet(this , null , objResult) ||16 RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) {17 // 檢查是否有服務(wù),如果有,通知他們。 18 if (checkNotifyWaiters()) {19 notifyListeners(); // 通知 20 }21 return true ;22 }23 return false ;24 }
Future任務(wù)在執(zhí)行完成后調(diào)用setSuccess()或setFailure()通知Future執(zhí)行結(jié)果;主要邏輯是:修改result的值,若有等待線程則喚醒,通知監(jiān)聽(tīng)事件。
DefaultChannelPromise實(shí)現(xiàn)
1 /** 2 * The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create 3 * a new {@link ChannelPromise} rather than calling the constructor explicitly. 4 */ 5 public class DefaultChannelPromise extends DefaultPromise <Void > implements ChannelPromise , FlushCheckpoint { 6 7 private final Channel channel; 8 private long checkpoint; 9 10 ...11 }
從繼承關(guān)系可以看到DefaultChannelPromise是DefaultPromise的實(shí)現(xiàn)類,內(nèi)部維護(hù)了一個(gè)通道變量Channel。
另外還實(shí)現(xiàn)了FlushCheckpoint接口,給ChannelFlushPromiseNotifier使用,我們可以將ChannelFuture注冊(cè)到ChannelFlushPromiseNotifier類,當(dāng)有數(shù)據(jù)寫(xiě)入或到達(dá)checkpoint時(shí)使用。
1 interface FlushCheckpoint {2 long flushCheckpoint () ;3 void flushCheckpoint (long checkpoint) 4 ChannelPromise promise () ;5 }