日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

從netty

 昵稱10504424 2015-12-15

分析netty從源碼開始

準備工作:

1.下載源代碼:https://github.com/netty/netty.git

    我下載的版本為4.1

2. eclipse導入maven工程。

 

netty提供了一個netty-example工程,

我們的分析從這里開始,netty是client-server形式的,我們以最簡單的discard示例開始,其服務(wù)器端代碼如下:

復制代碼
/**
 * Discards any incoming data.
 */
public final class DiscardServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     p.addLast(new DiscardServerHandler());
                 }
             });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
復制代碼

上面的代碼中使用了下面幾個類:

1. EventLoopGroup

實現(xiàn)類為NioEventLoopGroup,其層次結(jié)構(gòu)為:

  EventExecutorGroup為所有類的父接口,它通過next()方法來提供EventExecutor供使用。除此以外,它還負責處理它們的生命周期,允許以優(yōu)雅的方式關(guān)閉。

  EventExecutor是一種特殊的EventExcutorGroup,它提供了一些便利的方法來查看一個線程是否在一個事件循環(huán)中執(zhí)行過,除此以外,它也擴展了EventExecutorGroup,從而提供了一個通用的獲取方法的方式。

  EventLoopGroup是一種特殊的EventExcutorGroup,它提供了channel的注冊功能,channel在事件循環(huán)中將被后面的selection來獲取到。

  NioEventLoopGroup繼承自MultithreadEventLoopGroup,基于channel的NIO selector會使用該類。

2.ServerBootstrap使ServerChannel容易自舉。

  group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法設(shè)置父EventLoopGroup和子EventLoopGroup。這些EventLoopGroup用來處理所有的事件和ServerChannel和Channel的IO。

   channel(Class<? extends C> channelClass)方法用來創(chuàng)建一個Channel實例。創(chuàng)建Channel實例要不使用此方法,如果channel實現(xiàn)是無參構(gòu)造要么可以使用channelFactory來創(chuàng)建。

  handler(ChannelHandler handler)方法,channelHandler用來處理請求的。

  childHandler(ChannelHandler childHandler)方法,設(shè)置用來處理請求的channelHandler。

3. ChannelInitializer一種特殊的ChannelInboundHandler

  當Channel注冊到它的eventLoop中時,ChannelInitializer提供了一個方便初始化channel的方法。該類的實現(xiàn)通常用來設(shè)置ChannelPipeline的channel,通常使用在Bootstrap#handler(ChannelHandler),ServerBootstrap#handler(ChannelHandler)和ServerBootstrap#childHandler(ChannelHandler)三個場景中。示例:

 public class MyChannelInitializer extends ChannelInitializer{
      public void initChannel({@link Channel} channel) {
          channel.pipeline().addLast("myHandler", new MyHandler());
      }
  }

然后:

ServerBootstrap bootstrap = ...;
...
bootstrap.childHandler(new MyChannelInitializer());

注意:這個類標示為可共享的,因此實現(xiàn)類重用時需要時安全的。

4. ChannelPipeline相關(guān)

  理解ChannelPipeline需要先理解ChannelHandler,

 4.1 ChannelHandler

  處理一個IO事件或者翻譯一個IO操作,并且傳遞給ChannelPineline的下一個handler。

   你可以使用ChannelHandlerAdapter來替代它

   因為這個接口有太多接口需要實現(xiàn),因此你只有實現(xiàn)ChannelHandlerAdapter就可以代替實現(xiàn)這個接口。

  Context對象

  ChannelHandlerContext封裝了ChannelHandler。ChannelHandler應(yīng)該通過context對象與它所屬的ChannelPipeLine進行交互。通過使用context對象,ChannelHandler可以傳遞上行或者下行事件,或者動態(tài)修改pipeline,或者存儲特定handler的信息(使用AttributeKey)。

  狀態(tài)管理

  一個channelHandler通常需要存儲一些狀態(tài)信息。最簡單最值得推薦的方法是使用member變量:

復制代碼
 public interface Message {
      // your methods here
  }
 
  public class DataServerHandler extends  SimpleChannelInboundHandler<Message> {
 
      private boolean loggedIn;
 
      {@code @Override}
      protected void messageReceived( ChannelHandlerContext ctx, Message message) {
           Channel ch = e.getChannel();
          if (message instanceof LoginMessage) {
              authenticate((LoginMessage) message);
              loggedIn = true;
          } else (message instanceof GetDataMessage) {
              if (loggedIn) {
                  ch.write(fetchSecret((GetDataMessage) message));
              } else {
                  fail();
              }
          }
      }
      ...
  }
復制代碼

注意:handler的狀態(tài)附在ChannePipelineContext上,因此可以增加相同的handler實例到不同的pipeline上:

復制代碼
  public class DataServerInitializer extends ChannelInitializer<Channel> {
 
      private static final DataServerHandler SHARED = new DataServerHandler();
 
      @Override
      public void initChannel(Channel channel) {
          channel.pipeline().addLast("handler", SHARED);
      }
  }
復制代碼

 @Sharable注解

  在上面的示例中,使用了一個AttributeKey,你可能注意到了@Sharable注解。

  如果一個ChannelHandler使用@sharable進行注解,那就意味著你僅僅創(chuàng)建了一個handler一次,可以添加到一個或者多個ChannelPipeline多次而不會產(chǎn)生競爭。

  如果沒有指定該注解,你必須每次都創(chuàng)建一個新的handler實例,并且增加到一個ChannelPipeline,因為它沒有像member變量一樣,它有一個非共享的狀態(tài)。

4.2  ChannelPipeline

      ChanelPipeline是一組ChanelHandler的集合,它處理或者解析Channel的Inbound事件和OutBound操作。ChannelPipeline的實現(xiàn)是Intercepting Filter的一種高級形式,這樣用戶可以控制事件如何處理,一個pipeline內(nèi)部ChannelHandler如何交互。

   pipeline事件流程

  上圖描述了IO事件如何被一個ChannelPipeline的ChannelHandler處理的。一個IO事件被一個ChannelInBoundHandler處理或者ChannelOutboundHandler,然后通過調(diào)用在ChannelHandlerContext中定義的事件傳播方法傳遞給最近的handler,傳播方法有ChannelHandlerContext#filreChannelRead(Object)和ChannelHandlerContext#write(Object)。

   一個Inbound事件通常由Inbound handler來處理,如上如左上。一個Inbound handler通常處理在上圖底部IO線程產(chǎn)生的Inbound數(shù)據(jù)。Inbound數(shù)據(jù)通過真實的輸入操作如SocketChannel#read(ByteBuffer)來獲取。如果一個inbound事件越過了最上面的inbound handler,該事件將會被拋棄到而不會通知你或者如果你需要關(guān)注,打印出日志。

  一個outbound事件由上圖的右下的outbound handler來處理。一個outbound handler通常由outbound流量如寫請求產(chǎn)生或者轉(zhuǎn)變的。如果一個outbound事件越過了底部的outbound handler,它將由channel關(guān)聯(lián)的IO線程處理。IO線程通常運行的是真實的輸出操作如SocketChannel#write(byteBuffer).

示例,假設(shè)我們創(chuàng)建下面這樣一個pipeline:

 ChannelPipeline} p = ...;
 p.addLast("1", new InboundHandlerA());
 p.addLast("2", new InboundHandlerB());
 p.addLast("3", new OutboundHandlerA());
 p.addLast("4", new OutboundHandlerB());
 p.addLast("5", new InboundOutboundHandlerX());

  在上例中,inbound開頭的handler意味著它是一個inbound handler。outbound開頭的handler意味著它是一個outbound handler。上例的配置中當一個事件進入inbound時handler的順序是1,2,3,4,5;當一個事件進入outbound時,handler的順序是5,4,3,2,1.在這個最高準則下,ChannelPipeline跳過特定handler的處理來縮短stack的深度:

  3,4沒有實現(xiàn)ChannelInboundHandler,因而一個inbound事件的處理順序是1,2,5.

  1,2沒有實現(xiàn)ChannelOutBoundhandler,因而一個outbound事件的處理順序是5,4,3

  若5同時實現(xiàn)了ChannelInboundHandler和channelOutBoundHandler,一個inbound和一個outbound事件的執(zhí)行順序分別是125和543.

  一個事件跳向下一個handler

  如上圖所示,一個handler觸發(fā)ChannelHandlerContext中的事件傳播方法,然后傳遞到下一個handler。這些方法有:

  inbound 事件傳播方法:

  

復制代碼
      ChannelHandlerContext#fireChannelRegistered()
      ChannelHandlerContext#fireChannelActive()
      ChannelHandlerContext#fireChannelRead(Object)
      ChannelHandlerContext#fireChannelReadComplete()
      ChannelHandlerContext#fireExceptionCaught(Throwable)
      ChannelHandlerContext#fireUserEventTriggered(Object)
      ChannelHandlerContext#fireChannelWritabilityChanged()
      ChannelHandlerContext#fireChannelInactive()
      ChannelHandlerContext#fireChannelUnregistered()
復制代碼

 

  outbound事件傳播方法:

復制代碼
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)
復制代碼

下面的示例展示了事件是如何傳播的:

復制代碼
  public class MyInboundHandler extends ChannelInboundHandlerAdapter {
      @Override
      public void channelActive(ChannelHandlerContext} ctx) {
          System.out.println("Connected!");
          ctx.fireChannelActive();
      }
  }
 
  public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
      @Override
      public void close(ChannelHandlerContext} ctx, ChannelPromise} promise) {
          System.out.println("Closing ..");
          ctx.close(promise);
      }
  }
復制代碼

  創(chuàng)建一個pipeline

  在pipeline中,一個用戶一般由一個或者多個ChannelHandler來接收IO事件(例如讀)和IO操作請求(如寫或者close)。例如,一個典型的服務(wù)器pipeline通常具有以下幾個handler,但最多有多少handler取決于協(xié)議和業(yè)務(wù)邏輯的復雜度:

Protocol Decoder--將二進制數(shù)據(jù)(如ByteBuffer)轉(zhuǎn)換成一個java對象

Protocol Encoder--將一個java對象轉(zhuǎn)換成二進制數(shù)據(jù)。

Business Logic Handler--處理真實的業(yè)務(wù)邏輯(如數(shù)據(jù)庫訪問)。

讓我們用下面的示例展示:

復制代碼
 static final  EventExecutorGroup group = new  DefaultEventExecutorGroup(16);
  ...
 
   ChannelPipeline} pipeline = ch.pipeline();
 
  pipeline.addLast("decoder", new MyProtocolDecoder());
  pipeline.addLast("encoder", new MyProtocolEncoder());
 
  // Tell the pipeline to run MyBusinessLogicHandler's event handler methods
  // in a different thread than an I/O thread so that the I/O thread is not blocked by
  // a time-consuming task.
  // If your business logic is fully asynchronous or finished very quickly, you don't
  // need to specify a group.
  pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
復制代碼

  線程安全

  因為ChannelPipeline是線程安全的,一個channelhandler可以在任意時間內(nèi)增加或者刪除。例如,當有敏感信息交換時,你可以插入一個加密handler,然后當信息交換結(jié)束后刪除該handler。

 4.3 Channel

 Channel是網(wǎng)絡(luò)socket的一個紐帶或者一個處理IO操作如讀、寫、連接、綁定的組件。一個Channel提供如下信息:

    當前channel的狀態(tài),如它是否開啟?是否連接?

    Channel的ChannelConfig的配置參數(shù),如接受緩存大?。?/p>

    channel支持的IO操作,如讀、寫、連接、綁定;

    channel支持的ChannelPipeline,它處理所有的IO事件和channel關(guān)聯(lián)的請求。

  所有的IO操作都是異步的。

  在Netty中所有的IO操作都是異步的。這意味著所有的IO調(diào)用將立即返回,但不保證在調(diào)用結(jié)束時請求的IO操作都已經(jīng)執(zhí)行完畢。而是在請求操作處理完成、失敗或者取消時返回一個ChannelFuture來通知。

     Channel是繼承性的。

  一個Channel可以它如何創(chuàng)建的來獲取它的父Channel(#parent()方法)。例如:一個由ServerSocketChannel接受的SocketChannel調(diào)用parent()方法時返回ServerSocketChannel。

  繼承的結(jié)構(gòu)依賴于Channel的所屬transport實現(xiàn)。例如,你可以新寫一個Channel實現(xiàn),它創(chuàng)建了一個共享同一個socket連接的子channel,如BEEPSSH

 向下去獲取特定transport操作。

  一些transport會暴露一些該transport特定的操作。Channel向下轉(zhuǎn)換到子類型可以觸發(fā)這些操作。例如:老的IO datargram transport,DatagramChannel提供了多播的join和leave操作。

  釋放資源

  當Channel處理完后,一定記得調(diào)用close()或者close(ChannelPromise)來釋放資源。

5. channelFuture

channelFuture是異步IO操作的返回值。

  在Netty中所有的IO操作都是異步的。這意味著所有的IO調(diào)用將立即返回,但不保證在調(diào)用結(jié)束時請求的IO操作都已經(jīng)執(zhí)行完畢。而是在請求操作處理完成、失敗或者取消時返回一個ChannelFuture來通知。

  當一個IO操作開始時,創(chuàng)建一個新的future。ChannelFuture要么是uncompleted,要么是completed。新的future開始時是uncompleted---既不是成功、失敗,也不是取消,因為IO操作還沒有開始呢。若IO操作結(jié)束時future要么成功,要么失敗或者取消,標記為completed的future有更多特殊的意義,例如失敗的原因。請注意:即使失敗和取消也屬于completed狀態(tài)。

  有很多方法可以查詢IO操作是否完成:等待完成,檢索IO操作的結(jié)果。同樣也允許你增加ChannelFutureListenner,這樣你可以在IO操作完成后獲得通知。

  在盡可能的情況下,推薦addListenner()方法而不是await()方法,當IO操作完成后去完成接下來的其它任務(wù)時去獲取通知。

6.ChannelHandlerContext

  對ChannelHandler相關(guān)信息的包裝。

小結(jié)

  netty處理請求的總流程是經(jīng)過ChannelPipeline中的多個ChannelHandler后,返回結(jié)果ChannelFuture。如下圖所示:

具體I/O操作調(diào)用的流程, 
應(yīng)用->Channel的I/O操作->調(diào)用Pipeline相應(yīng)的I/O操作->調(diào)用ChannelHandlerContext的相應(yīng)I/O操作->調(diào)用ChannelHandler的相應(yīng)操作->Channel.UnSafe中相關(guān)的I/O操作。 
應(yīng)用為什么不直接調(diào)用Channel.UnSafe接口中的I/O操作呢,而要繞一個大圈呢?因為它是框架,要支持擴展。 
執(zhí)行者完成操作后,是如何通知命令者的呢?一般流程是這樣的: 
Channel.UnSafe中執(zhí)行相關(guān)的I/O操作,根據(jù)操作結(jié)果->調(diào)用ChannelPipeline中相應(yīng)發(fā)fireXXXX()接口->調(diào)用ChannelHandlerContext中相應(yīng)的fireXXXX()接口->調(diào)用ChannelHandler中相應(yīng)方法->調(diào)用應(yīng)用中的相關(guān)邏輯。

 

 參考文獻:

【1】http://www./article/71493268111/

【2】http://blog.csdn.net/pingnanlee/article/details/11973769

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多