日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

netty案例,netty4.1源碼分析篇六《Netty異步架構(gòu)監(jiān)聽(tīng)類Promise源碼分析》

 小傅哥 2021-12-13

前言介紹

分析Promise之前我們先來(lái)看兩個(gè)單詞;Promise、Future

Promise v. 許諾;承諾;答應(yīng);保證;使很可能;預(yù)示
Future n. 將來(lái);未來(lái);未來(lái)的事;將來(lái)發(fā)生的事;前景;前途;前程

他們的含義都是對(duì)未來(lái)即將要發(fā)生的事情做相應(yīng)的處理,這也是在異步編程中非常常見(jiàn)的類名。

Netty是一個(gè)異步網(wǎng)絡(luò)處理框架,在實(shí)現(xiàn)中大量使用了Future機(jī)制,并在Java自帶Future的基礎(chǔ)上,增加了Promise機(jī)制。這兩個(gè)實(shí)現(xiàn)類的目的都是為了使異步編程更加方便使用。

源碼分析

1、了解Java并發(fā)包中的Future

java的并發(fā)包中提供java.util.concurrent.Future類,用于處理異步操作。在Java中Future是一個(gè)未來(lái)完成的異步操作,可以獲得未來(lái)返回的值。如下案例,調(diào)用一個(gè)獲取用戶信息的方法,該方法會(huì)立刻返回Future對(duì)象,調(diào)用Future.get()可以同步等待耗時(shí)方法的返回,也可以通過(guò)調(diào)用future的cancel()取消Future任務(wù)。

1class TestFuture {
2
3    public static void main(String[] args) throws ExecutionException, InterruptedException {
4        TestFuture testFuture = new TestFuture();
5        Future<String> future = testFuture.queryUserInfo("10001"); //返回future
6        String userInfo = future.get();
7        System.out.println("查詢用戶信息:" + userInfo);
8    }
9
10    private Future<String> queryUserInfo(String userId) {
11        FutureTask<String> future = new FutureTask<>(() -> {
12            try {
13                Thread.sleep(1000);
14                return "微信公眾號(hào):bugstack蟲(chóng)洞棧 | 用戶ID:" + userId;
15            } catch (InterruptedException ignored) {}
16            return "error";
17        });
18        new Thread(future).start();
19        return future;
20    }
21
22}

2、Netty實(shí)現(xiàn)了自己的Future

Netty通過(guò)繼承java并發(fā)包的Future來(lái)定義自己的Future接口,為Future加入的功能主要有添加、刪除監(jiān)聽(tīng)事件接口,最后由Promise實(shí)現(xiàn)。

io.netty.util.concurrent.Future.java中定義了一些列的異步編程方法 | 經(jīng)常會(huì)使用的>b.bind(port).sync();

1// 只有IO操作完成時(shí)才返回true
2boolean isSuccess();
3// 只有當(dāng)cancel(boolean)成功取消時(shí)才返回true
4boolean isCancellable();
5// IO操作發(fā)生異常時(shí),返回導(dǎo)致IO操作以此的原因,如果沒(méi)有異常,返回null
6Throwable cause();
7// 向Future添加事件,future完成時(shí),會(huì)執(zhí)行這些事件,如果add時(shí)future已經(jīng)完成,會(huì)立即執(zhí)行監(jiān)聽(tīng)事件
8Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
9Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
10// 移除監(jiān)聽(tīng)事件,future完成時(shí),不會(huì)觸發(fā)
11Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
12Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
13// 等待future done
14Future<V> sync() throws InterruptedException;
15// 等待future done,不可打斷
16Future<V> syncUninterruptibly();
17// 等待future完成
18Future<V> await() throws InterruptedException;
19// 等待future 完成,不可打斷
20Future<V> awaitUninterruptibly();
21boolean await(long timeout, TimeUnit unit) throws InterruptedException;
22boolean await(long timeoutMillis) throws InterruptedException;
23boolean awaitUninterruptibly(long timeout, TimeUnit unit);
24boolean awaitUninterruptibly(long timeoutMillis);
25// 立刻獲得結(jié)果,如果沒(méi)有完成,返回null
26getNow();
27// 如果成功取消,future會(huì)失敗,導(dǎo)致CancellationException
28@Override
29boolean cancel(boolean mayInterruptIfRunning);

3、Promise機(jī)制

Netty的Future與Java的Future雖然類名相同,但功能上略有不同,Netty中引入了Promise機(jī)制。在Java的Future中,業(yè)務(wù)邏輯為一個(gè)Callable或Runnable實(shí)現(xiàn)類,該類的call()或run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié);而在Promise機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。

io.netty.util.concurrent.Promise.java |

1public interface Promise<Vextends Future<V{
2
3    // 設(shè)置future執(zhí)行結(jié)果為成功
4    Promise<V> setSuccess(V result);
5
6    // 嘗試設(shè)置future執(zhí)行結(jié)果為成功,返回是否設(shè)置成功
7    boolean trySuccess(V result);
8
9    // 設(shè)置失敗
10    Promise<V> setFailure(Throwable cause);
11
12    // 嘗試設(shè)置future執(zhí)行結(jié)果為失敗,返回是否設(shè)置成功 
13    boolean tryFailure(Throwable cause);
14
15    // 設(shè)置為不能取消
16    boolean setUncancellable();
17
18    // 源碼中,以下為覆蓋了Future的方法,例如;
19
20    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
21
22    @Override
23    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
24
25}

TestPromise.java | 一個(gè)查詢用戶信息的Promise列子,加入監(jiān)聽(tīng)再operationComplete完成后,獲取查詢信息

1class TestPromise {
2
3    public static void main(String[] args) throws ExecutionException, InterruptedException {
4        TestPromise testPromise = new TestPromise();
5        Promise<String> promise = testPromise.queryUserInfo("10001");
6        promise.addListener(new GenericFutureListener<Future<? super String>>() {
7            @Override
8            public void operationComplete(Future<? super String> future) throws Exception {
9                System.out.println("addListener.operationComplete > 查詢用戶信息完成: " + future.get());
10            }
11        });
12    }
13
14    private Promise<String> queryUserInfo(String userId) {
15        NioEventLoopGroup loop = new NioEventLoopGroup();
16        // 創(chuàng)建一個(gè)DefaultPromise并返回,將業(yè)務(wù)邏輯放入線程池中執(zhí)行
17        DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
18        loop.schedule(() -> {
19            try {
20                Thread.sleep(1000);
21                promise.setSuccess("微信公眾號(hào):bugstack蟲(chóng)洞棧 | 用戶ID:" + userId);
22                return promise;
23            } catch (InterruptedException ignored) {
24            }
25            return promise;
26        }, 0, TimeUnit.SECONDS);
27        return promise;
28    }
29
30}

通過(guò)這個(gè)例子可以看到,Promise能夠在業(yè)務(wù)邏輯線程中通知Future成功或失敗,由于Promise繼承了Netty的Future,因此可以加入監(jiān)聽(tīng)事件。而Future和Promise的好處在于,獲取到Promise對(duì)象后可以為其設(shè)置異步調(diào)用完成后的操作,然后立即繼續(xù)去做其他任務(wù)。

4、Promise類組織結(jié)構(gòu)&常用方法

DefaultChannelPromise類組織結(jié)構(gòu)圖 | 承接Java并發(fā)包Future并增強(qiáng)實(shí)現(xiàn)

微信公眾號(hào):bugstack蟲(chóng)洞棧 | DefaultChannelPromise類組織結(jié)構(gòu)圖

Netty中DefalutPromise是一個(gè)非常常用的類,這是Promise實(shí)現(xiàn)的基礎(chǔ)。DefaultChannelPromise是DefalutPromise的子類,加入了channel這個(gè)屬性。

DefaultPromise | 使用
在Netty中使用到Promise的地方會(huì)非常多,例如在前面一節(jié)《一行簡(jiǎn)單的writeAndFlush都做了哪些事》分析HeadContext.write中unsafe.write(msg, promise);結(jié)合這一章節(jié)可以繼續(xù)深入了解Netty的異步框架原理。另外,服務(wù)器/客戶端啟動(dòng)時(shí)的注冊(cè)任務(wù),最終會(huì)調(diào)用unsafe的register,調(diào)用過(guò)程中會(huì)傳入一個(gè)promise,unsafe進(jìn)行事件的注冊(cè)時(shí)調(diào)用promise可以設(shè)置成功/失敗。

SingleThreadEventLoop.java | 注冊(cè)服務(wù)事件循環(huán)組

1@Override
2public ChannelFuture register(Channel channel) {
3    return register(new DefaultChannelPromise(channel, this));
4}
5
6@Override
7public ChannelFuture register(final ChannelPromise promise) {
8    ObjectUtil.checkNotNull(promise, "promise");
9    promise.channel().unsafe().register(this, promise);
10    return promise;
11}

DefaultPromise | 實(shí)現(xiàn)
DefaultChannelPromise提供的功能可以分為兩個(gè)部分;

  • 為調(diào)用者提供get()和addListener()用于獲取Future任務(wù)執(zhí)行結(jié)果和添加監(jiān)聽(tīng)事件。

  • 為業(yè)務(wù)處理任務(wù)提供setSuccess()等方法設(shè)置任務(wù)的成功或失敗。

AbstractFuture.java | get()方法

1public abstract class AbstractFuture<Vimplements Future<V{
2
3    @Override
4    public V get() throws InterruptedException, ExecutionException {
5        await();
6
7        Throwable cause = cause();
8        if (cause == null) {
9            return getNow();
10        }
11        if (cause instanceof CancellationException) {
12            throw (CancellationException) cause;
13        }
14        throw new ExecutionException(cause);
15    }
16
17    @Override
18    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
19        if (await(timeout, unit)) {
20            Throwable cause = cause();
21            if (cause == null) {
22                return getNow();
23            }
24            if (cause instanceof CancellationException) {
25                throw (CancellationException) cause;
26            }
27            throw new ExecutionException(cause);
28        }
29        throw new TimeoutException();
30    }
31}

DefaultPromise父類AbstractFuture提供了兩個(gè)get方法;1、無(wú)參數(shù)的get會(huì)阻塞等待;2、有參數(shù)的get會(huì)等待指定事件,若未結(jié)束拋出超時(shí)異常。


DefaultPromise.java | DefaultPromise.await()方法

1@Override
2public Promise<V> await() throws Interrupt
3    // 判斷Future任務(wù)是否結(jié)束,內(nèi)部根據(jù)result是否為null判斷,setSuccess或setFailure時(shí)會(huì)通過(guò)CAS修改result
4    if (isDone()
{
5        return this;
6    }
7    // 線程是否被中斷
8    if (Thread.interrupted()) {
9        throw new InterruptedException(toS
10    }
11    // 檢查當(dāng)前線程是否與線程池運(yùn)行的線程是一個(gè)
12    checkDeadLock();
13    synchronized (this) {
14        while (!isDone()) {
15           /* waiters計(jì)數(shù)加1
16            * private void incWaiters() {
17            *   if (waiters == Short.MAX_VALUE) {
18            *       throw new IllegalStateException("too many waiters: " + this);
19            *   }
20            *   ++waiters;
21            * }
22            */

23            incWaiters();
24            try {
25                // Object的方法,讓出CPU,加入等待隊(duì)列
26                wait();
27            } finally {
28                // waiters計(jì)數(shù)減1
29                decWaiters();
30            }
31        }
32    }
33    return this;
34}

await(long timeout, TimeUnit unit)與awite類似,只是調(diào)用了Object對(duì)象的wait(long timeout, int nanos)方法awaitUninterruptibly()方法在內(nèi)部catch住了等待線程的中斷異常,因此不會(huì)拋出中斷異常。


DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()

1private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
2    if (listeners == null) {
3        listeners = listener;
4    } else if (listeners instanceof DefaultFutureListeners) {
5        ((DefaultFutureListeners) listeners).add(listener);
6    } else {
7        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
8    }
9}
10private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
11    if (listeners instanceof DefaultFutureListeners) {
12        ((DefaultFutureListeners) listeners).remove(listener);
13    } else if (listeners == listener) {
14        listeners = null;
15    }
16}
  • addListener0方法被調(diào)用時(shí),將傳入的回調(diào)類傳入到listeners對(duì)象中,如果監(jiān)聽(tīng)多于1個(gè),會(huì)創(chuàng)建DefaultFutureListeners對(duì)象將回調(diào)方法保存在一個(gè)數(shù)組中。

  • removeListener0會(huì)將listeners設(shè)置為null(只有一個(gè)時(shí))或從數(shù)組中移除(多個(gè)回調(diào)時(shí))。


DefaultPromise.java | DefaultPromise.notifyListener0() 通知偵聽(tīng)器

1@SuppressWarnings({ "unchecked""rawtypes" })
2private static void notifyListener0(Future future, GenericFutureListener l) {
3    try {
4        l.operationComplete(future);
5    } catch (Throwable t) {
6        if (logger.isWarnEnabled()) {
7            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
8        }
9    }
10}
  • 在添加監(jiān)聽(tīng)器時(shí),如果任務(wù)剛好執(zhí)行完畢,則會(huì)立即觸發(fā)監(jiān)聽(tīng)事件,觸發(fā)監(jiān)聽(tīng)通過(guò)notifyListeners()實(shí)現(xiàn)。

  • addListener和setSuccess都會(huì)調(diào)用notifyListeners()和Promise內(nèi)的線程池當(dāng)前執(zhí)行的線程是同一個(gè)線程,則放在線程池中執(zhí)行,否則提交到線程池去執(zhí)行;例如,main線程中調(diào)用addListener時(shí)任務(wù)完成,notifyListeners()執(zhí)行回調(diào),會(huì)提交到線程池中執(zhí)行;而如果是執(zhí)行Future任務(wù)的線程池中setSuccess()時(shí)調(diào)用notifyListeners(),會(huì)放在當(dāng)前線程中執(zhí)行。

  • 內(nèi)部維護(hù)了notifyingListeners用來(lái)記錄是否已經(jīng)觸發(fā)過(guò)監(jiān)聽(tīng)事件,只有未觸發(fā)過(guò)且監(jiān)聽(tīng)列表不為空,才會(huì)依次便利并調(diào)用operationComplete


DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 喚起等待線程通知成功/失敗

1// 設(shè)置成功后喚醒等待線程
2private boolean setSuccess0(V result) {
3    return setValue0(result == null ? SUCCESS : result);
4}
5
6// 設(shè)置成功后喚醒等待線程
7private boolean setFailure0(Throwable cause) {
8    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
9}
10
11// 通知成功時(shí)將結(jié)果保存在變量result,通知失敗時(shí),使用CauseHolder包裝Throwable賦值給result
12// RESULT_UPDATER 是一個(gè)使用CAS更新內(nèi)部屬性result的類,
13// 如果result為null或UNCANCELLABLE,更新為成功/失敗結(jié)果;UNCANCELLABLE是不可取消狀態(tài)
14private boolean setValue0(Object objResult) {
15    if (RESULT_UPDATER.compareAndSet(thisnull, objResult) ||
16        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
17        // 檢查是否有服務(wù),如果有,通知他們。
18        if (checkNotifyWaiters()) {
19            notifyListeners();  // 通知
20        }
21        return true;
22    }
23    return false;
24}

Future任務(wù)在執(zhí)行完成后調(diào)用setSuccess()或setFailure()通知Future執(zhí)行結(jié)果;主要邏輯是:修改result的值,若有等待線程則喚醒,通知監(jiān)聽(tīng)事件。


DefaultChannelPromise實(shí)現(xiàn)

1/**
2 * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create
3 * a new {@link ChannelPromise} rather than calling the constructor explicitly.
4 */

5public class DefaultChannelPromise extends DefaultPromise<Voidimplements ChannelPromiseFlushCheckpoint {
6
7    private final Channel channel;
8    private long checkpoint;
9
10    ...
11}
  • 從繼承關(guān)系可以看到DefaultChannelPromise是DefaultPromise的實(shí)現(xiàn)類,內(nèi)部維護(hù)了一個(gè)通道變量Channel。

  • 另外還實(shí)現(xiàn)了FlushCheckpoint接口,給ChannelFlushPromiseNotifier使用,我們可以將ChannelFuture注冊(cè)到ChannelFlushPromiseNotifier類,當(dāng)有數(shù)據(jù)寫(xiě)入或到達(dá)checkpoint時(shí)使用。

1interface FlushCheckpoint {
2    long flushCheckpoint();
3    void flushCheckpoint(long checkpoint)
4    ChannelPromise promise()
;
5}

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多