一、Netty 簡介Netty 是基于 Java NIO 的異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,使用 Netty 可以快速開發(fā)網(wǎng)絡(luò)應(yīng)用,Netty 提供了高層次的抽象來簡化 TCP 和 UDP 服務(wù)器的編程,但是你仍然可以使用底層的 API。 Netty 的內(nèi)部實現(xiàn)是很復(fù)雜的,但是 Netty 提供了簡單易用的API從網(wǎng)絡(luò)處理代碼中解耦業(yè)務(wù)邏輯。Netty 是完全基于 NIO 實現(xiàn)的,所以整個 Netty 都是異步的。 Netty 是最流行的 NIO 框架,它已經(jīng)得到成百上千的商業(yè)、商用項目驗證,許多框架和開源組件的底層 rpc 都是使用的 Netty,如 Dubbo、Elasticsearch 等等。下面是官網(wǎng)給出的一些 Netty 的特性: 設(shè)計方面
易用性
性能
安全性
對于初學(xué)者,上面的特性我們在腦中有個簡單了解和印象即可, 下面開始我們的實戰(zhàn)部分。 二、一個簡單 Http 服務(wù)器開始前說明下我這里使用的開發(fā)環(huán)境是 IDEA+Gradle+Netty4,當(dāng)然你使用 Eclipse 和 Maven 都是可以的,然后在 Gradle 的 build 文件中添加依賴 compile 'io.netty:netty-all:4.1.26.Final',這樣就可以編寫我們的 Netty 程序了,正如在前面介紹 Netty 特性中提到的,Netty 不需要額外的依賴。 第一個示例我們使用 Netty 編寫一個 Http 服務(wù)器的程序,啟動服務(wù)我們在瀏覽器輸入網(wǎng)址來訪問我們的服務(wù),便會得到服務(wù)端的響應(yīng)。功能很簡單,下面我們看看具體怎么做? 首先編寫服務(wù)啟動類 public class HttpServer { public static void main(String[] args) { //構(gòu)造兩個線程組 EventLoopGroup bossrGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //服務(wù)端啟動輔助類 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerInitializer()); ChannelFuture future = bootstrap.bind(8080).sync(); //等待服務(wù)端口關(guān)閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 優(yōu)雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在編寫 Netty 程序時,一開始都會生成 NioEventLoopGroup 的兩個實例,分別是 bossGroup 和 workerGroup,也可以稱為 parentGroup 和 childGroup,為什么創(chuàng)建這兩個實例,作用是什么?可以這么理解,bossGroup 和 workerGroup 是兩個線程池, 它們默認線程數(shù)為 CPU 核心數(shù)乘以 2,bossGroup 用于接收客戶端傳過來的請求,接收到請求后將后續(xù)操作交由 workerGroup 處理。 在這里我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:747981058 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。 接下來我們生成了一個服務(wù)啟動輔助類的實例 bootstrap,boostrap 用來為 Netty 程序的啟動組裝配置一些必須要組件,例如上面的創(chuàng)建的兩個線程組。channel 方法用于指定服務(wù)器端監(jiān)聽套接字通道 NioServerSocketChannel,其內(nèi)部管理了一個 Java NIO 中的ServerSocketChannel實例。 channelHandler 方法用于設(shè)置業(yè)務(wù)職責(zé)鏈,責(zé)任鏈?zhǔn)俏覀兿旅嬉帉懙?,?zé)任鏈具體是什么,它其實就是由一個個的 ChannelHandler 串聯(lián)而成,形成的鏈?zhǔn)浇Y(jié)構(gòu)。正是這一個個的 ChannelHandler 幫我們完成了要處理的事情。 接著我們調(diào)用了 bootstrap 的 bind 方法將服務(wù)綁定到 8080 端口上,bind 方法內(nèi)部會執(zhí)行端口綁定等一系列操,使得前面的配置都各就各位各司其職,sync 方法用于阻塞當(dāng)前 Thread,一直到端口綁定操作完成。接下來一句是應(yīng)用程序?qū)枞却钡椒?wù)器的 Channel 關(guān)閉。 啟動類的編寫大體就是這樣了,下面要編寫的就是上面提到的責(zé)任鏈了。如何構(gòu)建一個鏈,在 Netty 中很簡單,不需要我們做太多,代碼如下: public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel sc) throws Exception { ChannelPipeline pipeline = sc.pipeline(); //處理http消息的編解碼 pipeline.addLast("httpServerCodec", new HttpServerCodec()); //添加自定義的ChannelHandler pipeline.addLast("httpServerHandler", new HttpServerHandler()); } }
我們自定義一個類 HttpServerInitializer 繼承 ChannelInitializer 并實現(xiàn)其中的 initChannel方法。 ChannelInitializer 繼承 ChannelInboundHandlerAdapter,用于初始化 Channel 的 ChannelPipeline。通過 initChannel 方法參數(shù) sc 得到 ChannelPipeline 的一個實例。 當(dāng)一個新的連接被接受時, 一個新的 Channel 將被創(chuàng)建,同時它會被自動地分配到它專屬的 ChannelPipeline。 ChannelPipeline 提供了 ChannelHandler 鏈的容器,推薦讀者仔細自己看看 ChannelPipeline 的 Javadoc,文章后面也會繼續(xù)說明 ChannelPipeline 的內(nèi)容。 Netty 是一個高性能網(wǎng)絡(luò)通信框架,同時它也是比較底層的框架,想要 Netty 支持 Http(超文本傳輸協(xié)議),必須要給它提供相應(yīng)的編解碼器。 所以我們這里使用 Netty 自帶的 Http 編解碼組件 HttpServerCodec 對通信數(shù)據(jù)進行編解碼,HttpServerCodec 是 HttpRequestDecoder 和 HttpResponseEncoder 的組合,因為在處理 Http 請求時這兩個類是經(jīng)常使用的,所以 Netty 直接將他們合并在一起更加方便使用。所以對于上面的代碼: pipeline.addLast("httpServerCodec", new HttpServerCodec())
我們替換成如下兩行也是可以的。 pipeline.addLast("httpResponseEndcoder", new HttpResponseEncoder()); pipeline.addLast("HttpRequestDecoder", new HttpRequestDecoder());
通過 addLast 方法將一個一個的 ChannelHandler 添加到責(zé)任鏈上并給它們?nèi)€名稱(不取也可以,Netty 會給它個默認名稱),這樣就形成了鏈?zhǔn)浇Y(jié)構(gòu)。在請求進來或者響應(yīng)出去時都會經(jīng)過鏈上這些 ChannelHandler 的處理。 最后再向鏈上加入我們自定義的 ChannelHandler 組件,處理自定義的業(yè)務(wù)邏輯。下面就是我們自定義的 ChannelHandler。 public class HttpServerChannelHandler0 extends SimpleChannelInboundHandler<HttpObject> { private HttpRequest request; @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { request = (HttpRequest) msg; request.method(); String uri = request.uri(); System.out.println("Uri:" + uri); } if (msg instanceof HttpContent) { HttpContent content = (HttpContent) msg; ByteBuf buf = content.content(); System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); ctx.writeAndFlush(response); } } }
至此一個簡單的 Http 服務(wù)器就完成了。首先我們來看看效果怎樣,我們運行 HttpServer 中的 main 方法。讓后使用 Postman 這個工具來測試下,使用 post 請求方式(也可以 get,但沒有請求體),并一個 json 格式數(shù)據(jù)作為請求體發(fā)送給服務(wù)端,服務(wù)端返回給我們一個hello world字符串。 服務(wù)端控制臺打印如下: 對于自定義的 ChannelHandler, 一般會繼承 Netty 提供的SimpleChannelInboundHandler類,并且對于 Http 請求我們可以給它設(shè)置泛型參數(shù)為 HttpOjbect 類,然后覆寫 channelRead0 方法,在 channelRead0 方法中編寫我們的業(yè)務(wù)邏輯代碼,此方法會在接收到服務(wù)器數(shù)據(jù)后被系統(tǒng)調(diào)用。 Netty 的設(shè)計中把 Http 請求分為了 HttpRequest 和 HttpContent 兩個部分,HttpRequest 主要包含請求頭、請求方法等信息,HttpContent 主要包含請求體的信息。 所以上面的代碼我們分兩塊來處理。在 HttpContent 部分,首先輸出客戶端傳過來的字符,然后通過 Unpooled 提供的靜態(tài)輔助方法來創(chuàng)建未池化的 ByteBuf 實例, Java NIO 提供了 ByteBuffer 作為它的字節(jié)容器,Netty 的 ByteBuffer 替代品是 ByteBuf。 接著構(gòu)建一個 FullHttpResponse 的實例,并為它設(shè)置一些響應(yīng)參數(shù),最后通過 writeAndFlush 方法將它寫回給客戶端。 上面這樣獲取請求和消息體則相當(dāng)不方便,Netty 又提供了另一個類 FullHttpRequest,F(xiàn)ullHttpRequest 包含請求的所有信息,它是一個接口,直接或者間接繼承了 HttpRequest 和 HttpContent,它的實現(xiàn)類是 DefalutFullHttpRequest。 因此我們可以修改自定義的 ChannelHandler 如下: public class HttpServerChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> { protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { ctx.channel().remoteAddress(); FullHttpRequest request = msg; System.out.println("請求方法名稱:" + request.method().name()); System.out.println("uri:" + request.uri()); ByteBuf buf = request.content(); System.out.print(buf.toString(CharsetUtil.UTF_8)); ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); ctx.writeAndFlush(response); } }
這樣修改就可以了嗎,如果你去啟動程序運行看看,是會拋異常的。前面說過 Netty 是一個很底層的框架,對于將請求合并為一個 FullRequest 是需要代碼實現(xiàn)的,然而這里我們并不需要我們自己動手去實現(xiàn),Netty 為我們提供了一個 HttpObjectAggregator 類,這個 ChannelHandler作用就是將請求轉(zhuǎn)換為單一的 FullHttpReques。 所以在我們的 ChannelPipeline 中添加一個 HttpObjectAggregator 的實例即可。 public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel sc) { ChannelPipeline pipeline = sc.pipeline(); //處理http消息的編解碼 pipeline.addLast("httpServerCodec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); //添加自定義的ChannelHandler pipeline.addLast("httpServerHandler", new HttpServerChannelHandler0()); } }
啟動程序運行,一切都順暢了,好了,這個簡單 Http 的例子就 OK 了。 在這里我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:747981058 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。 三、編寫 Netty 客戶端上面的兩個示例中我們都是以 Netty 做為服務(wù)端,接下來看看如何編寫 Netty 客戶端,以第一個 Http 服務(wù)的例子為基礎(chǔ),編寫一個訪問 Http 服務(wù)的客戶端。 public class HttpClient { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port = 8080; EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new HttpClientHandler()); } }); // 啟動客戶端. ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
客戶端啟動類編寫基本和服務(wù)端類似,在客戶端我們只用到了一個線程池,服務(wù)端使用了兩個,因為服務(wù)端要處理 n 條連接,而客戶端相對來說只處理一條,因此一個線程池足以。 然后服務(wù)端啟動輔助類使用的是 ServerBootstrap,而客戶端換成了 Bootstrap。通過 Bootstrap 組織一些必要的組件,為了方便,在 handler 方法中我們使用匿名內(nèi)部類的方式來構(gòu)建 ChannelPipeline 鏈容器。最后通過 connect 方法連接服務(wù)端。 接著編寫 HttpClientHandler 類。 public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { URI uri = new URI("http://127.0.0.1:8080"); String msg = "Are you ok?"; FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 構(gòu)建http請求 // request.headers().set(HttpHeaderNames.HOST, "127.0.0.1"); // request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 發(fā)送http請求 ctx.channel().writeAndFlush(request); } @Override public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { FullHttpResponse response = msg; response.headers().get(HttpHeaderNames.CONTENT_TYPE); ByteBuf buf = response.content(); System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); } }
在 HttpClientHandler 類中,我們覆寫了 channelActive 方法,當(dāng)連接建立時,此方法會被調(diào)用,我們在方法中構(gòu)建了一個 FullHttpRequest 對象,并且通過 writeAndFlush 方法將請求發(fā)送出去。 channelRead0 方法用于處理服務(wù)端返回給我們的響應(yīng),打印服務(wù)端返回給客戶端的信息。至此,Netty 客戶端的編寫就完成了,我們先開啟服務(wù)端,然后開啟客戶端就可以看到效果了。 希望通過前面介紹的幾個例子能讓大家基本知道如何編寫 Netty 客戶端和服務(wù)端,下面我們來說說 Netty 程序為什么是這樣編寫的,這也是 Netty 中最為重要的一部分知識,可以讓你在編寫 netty 程序時做到心中有數(shù)。 在這里我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:747981058 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。 四、Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext 之間的關(guān)系在編寫 Netty 程序時,經(jīng)常跟我們打交道的是上面這幾個對象,這也是 Netty 中幾個重要的對象,下面我們來看看它們之間有什么樣的關(guān)系。 Netty 中的 Channel 是框架自己定義的一個通道接口,Netty 實現(xiàn)的客戶端 NIO 套接字通道是 NioSocketChannel,提供的服務(wù)器端 NIO 套接字通道是 NioServerSocketChannel。 當(dāng)服務(wù)端和客戶端建立一個新的連接時, 一個新的 Channel 將被創(chuàng)建,同時它會被自動地分配到它專屬的 ChannelPipeline。 ChannelPipeline 是一個攔截流經(jīng) Channel 的入站和出站事件的 ChannelHandler 實例鏈,并定義了用于在該鏈上傳播入站和出站事件流的 API。那么就很容易看出這些 ChannelHandler 之間的交互是組成一個應(yīng)用程序數(shù)據(jù)和事件處理邏輯的核心。 上圖描述了 IO 事件如何被一個 ChannelPipeline 的 ChannelHandler 處理的。 ChannelHandler分為 ChannelInBoundHandler 和 ChannelOutboundHandler 兩種,如果一個入站 IO 事件被觸發(fā),這個事件會從第一個開始依次通過 ChannelPipeline中的 ChannelInBoundHandler,先添加的先執(zhí)行。 若是一個出站 I/O 事件,則會從最后一個開始依次通過 ChannelPipeline 中的 ChannelOutboundHandler,后添加的先執(zhí)行,然后通過調(diào)用在 ChannelHandlerContext 中定義的事件傳播方法傳遞給最近的 ChannelHandler。 在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。 如果某個ChannelHandler不能處理則會跳過,并將事件傳遞到下一個ChannelHandler,直到它找到和該事件所期望的方向相匹配的為止。 假設(shè)我們創(chuàng)建下面這樣一個 pipeline: ChannelPipeline p = ...; p.addLast("1", new InboundHandlerA()); p.addLast("2", new InboundHandlerB()); p.addLast("3", new OutboundHandlerA()); p.addLast("4", new OutboundHandlerB()); p.addLast("5", new InboundOutboundHandlerX());
在上面示例代碼中,inbound 開頭的 handler 意味著它是一個ChannelInBoundHandler。outbound 開頭的 handler 意味著它是一個 ChannelOutboundHandler。 當(dāng)一個事件進入 inbound 時 handler 的順序是 1,2,3,4,5;當(dāng)一個事件進入 outbound 時,handler 的順序是 5,4,3,2,1。在這個最高準(zhǔn)則下,ChannelPipeline 跳過特定 ChannelHandler 的處理:
ChannelHandler 可以通過添加、刪除或者替換其他的 ChannelHandler 來實時地修改 ChannelPipeline 的布局。 (它也可以將它自己從 ChannelPipeline 中移除。)這是 ChannelHandler 最重要的能力之一。 ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關(guān)聯(lián),每當(dāng)有 ChannelHandler 添加到 ChannelPipeline 中時,都會創(chuàng)建 ChannelHandlerContext。 ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler 之間的交互。事件從一個 ChannelHandler 到下一個 ChannelHandler 的移動是由 ChannelHandlerContext 上的調(diào)用完成的。 但是有些時候不希望總是從 ChannelPipeline 的第一個 ChannelHandler 開始事件,我們希望從一個特定的 ChannelHandler 開始處理。 你必須引用于此 ChannelHandler 的前一個 ChannelHandler 關(guān)聯(lián)的 ChannelHandlerContext,利用它調(diào)用與自身關(guān)聯(lián)的 ChannelHandler 的下一個 ChannelHandler。 如下: ChannelHandlerContext ctx = ...; // 獲得 ChannelHandlerContext引用 // write()將會把緩沖區(qū)發(fā)送到下一個ChannelHandler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); //流經(jīng)整個pipeline ctx.channel().write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
如果我們想有一些事件流全部通過 ChannelPipeline,有兩個不同的方法可以做到:
那為什么你可能會需要在 ChannelPipeline 某個特定的位置開始傳遞事件呢?
五、Netty 線程模型在這里我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:747981058 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。 在前面的示例中我們程序一開始都會生成兩個 NioEventLoopGroup 的實例,為什么需要這兩個實例呢?這兩個實例可以說是 Netty 程序的源頭,其背后是由 Netty 線程模型決定的。 Netty 線程模型是典型的 Reactor 模型結(jié)構(gòu),其中常用的 Reactor 線程模型有三種,分別為:Reactor 單線程模型、Reactor 多線程模型和主從 Reactor 多線程模型。 而在 Netty 的線程模型并非固定不變,通過在啟動輔助類中創(chuàng)建不同的 EventLoopGroup 實例并通過適當(dāng)?shù)膮?shù)配置,就可以支持上述三種 Reactor 線程模型。 Reactor 線程模型 Reactor 單線程模型Reactor 單線程模型指的是所有的 IO 操作都在同一個 NIO 線程上面完成。作為 NIO 服務(wù)端接收客戶端的 TCP 連接,作為 NIO 客戶端向服務(wù)端發(fā)起 TCP 連接,讀取通信對端的請求或向通信對端發(fā)送消息請求或者應(yīng)答消息。 由于 Reactor 模式使用的是異步非阻塞 IO,所有的 IO 操作都不會導(dǎo)致阻塞,理論上一個線程可以獨立處理所有 IO 相關(guān)的操作。 Netty 使用單線程模型的的方式如下: EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ...
在實例化 NioEventLoopGroup 時,構(gòu)造器參數(shù)是 1,表示 NioEventLoopGroup 的線程池大小是 1。然后接著我們調(diào)用 b.group(bossGroup) 設(shè)置了服務(wù)器端的 EventLoopGroup,因此 bossGroup和 workerGroup 就是同一個 NioEventLoopGroup 了。 Reactor 多線程模型對于一些小容量應(yīng)用場景,可以使用單線程模型,但是對于高負載、大并發(fā)的應(yīng)用卻不合適,需要對該模型進行改進,演進為 Reactor 多線程模型。 Rector 多線程模型與單線程模型最大的區(qū)別就是有一組 NIO 線程處理 IO 操作。 在該模型中有專門一個 NIO 線程 -Acceptor 線程用于監(jiān)聽服務(wù)端,接收客戶端的 TCP 連接請求;而 1 個 NIO 線程可以同時處理N條鏈路,但是 1 個鏈路只對應(yīng) 1 個 NIO 線程,防止發(fā)生并發(fā)操作問題。 網(wǎng)絡(luò) IO 操作-讀、寫等由一個 NIO 線程池負責(zé),線程池可以采用標(biāo)準(zhǔn)的 JDK 線程池實現(xiàn),它包含一個任務(wù)隊列和 N 個可用的線程,由這些 NIO 線程負責(zé)消息的讀取、解碼、編碼和發(fā)送。
Netty 中實現(xiàn)多線程模型的方式如下: EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
bossGroup 中只有一個線程,而 workerGroup 中的線程是 CPU 核心數(shù)乘以 2,那么就對應(yīng) Recator 的多線程模型。 主從 Reactor 多線程模型在并發(fā)極高的情況單獨一個 Acceptor 線程可能會存在性能不足問題,為了解決性能問題,產(chǎn)生主從 Reactor 多線程模型。 主從 Reactor 線程模型的特點是:服務(wù)端用于接收客戶端連接的不再是 1 個單獨的 NIO 線程,而是一個獨立的 NIO 線程池。 Acceptor 接收到客戶端 TCP 連接請求處理完成后,將新創(chuàng)建的 SocketChannel 注冊到 IO 線程池(sub reactor 線程池)的某個 IO 線程上,由它負責(zé) SocketChannel 的讀寫和編解碼工作。 Acceptor 線程池僅僅只用于客戶端的登陸、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端 subReactor 線程池的 IO 線程上,由 IO 線程負責(zé)后續(xù)的 IO 操作。 根據(jù)前面所講的兩個線程模型,很容想到 Netty 實現(xiàn)多線程的方式如下: EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
但是,在 Netty 的服務(wù)器端的 acceptor 階段,沒有使用到多線程, 因此上面的主從多線程模型在 Netty 的實現(xiàn)是有誤的。 服務(wù)器端的 ServerSocketChannel 只綁定到了 bossGroup 中的一個線程,因此在調(diào)用 Java NIO 的 Selector.select 處理客戶端的連接請求時,實際上是在一個線程中的,所以對只有一個服務(wù)的應(yīng)用來說,bossGroup 設(shè)置多個線程是沒有什么作用的,反而還會造成資源浪費。 至于 Netty 中的 bossGroup 為什么使用線程池,我在 stackoverflow 找到一個對于此問題的討論 。 the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps EventLoopGroup 和 EventLoop 當(dāng)系統(tǒng)在運行過程中,如果頻繁的進行線程上下文切換,會帶來額外的性能損耗。多線程并發(fā)執(zhí)行某個業(yè)務(wù)流程,業(yè)務(wù)開發(fā)者還需要時刻對線程安全保持警惕,哪些數(shù)據(jù)可能會被并發(fā)修改,如何保護?這不僅降低了開發(fā)效率,也會帶來額外的性能損耗。 為了解決上述問題,Netty采用了串行化設(shè)計理念,從消息的讀取、編碼以及后續(xù) ChannelHandler 的執(zhí)行,始終都由 IO 線程 EventLoop 負責(zé),這就意外著整個流程不會進行線程上下文的切換,數(shù)據(jù)也不會面臨被并發(fā)修改的風(fēng)險。 EventLoopGroup 是一組 EventLoop 的抽象,一個 EventLoopGroup 當(dāng)中會包含一個或多個 EventLoop,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規(guī)則獲取其中一個 EventLoop 來處理任務(wù)。 在 Netty 服務(wù)器端編程中我們需要 BossEventLoopGroup 和 WorkerEventLoopGroup 兩個 EventLoopGroup 來進行工作。 BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護著一個注冊了 ServerSocketChannel 的 Selector 實例,EventLoop 的實現(xiàn)涵蓋 IO 事件的分離,和分發(fā)(Dispatcher),EventLoop 的實現(xiàn)充當(dāng) Reactor 模式中的分發(fā)(Dispatcher)的角色。 所以通常可以將 BossEventLoopGroup 的線程數(shù)參數(shù)為 1。 BossEventLoop 只負責(zé)處理連接,故開銷非常小,連接到來,馬上按照策略將 SocketChannel 轉(zhuǎn)發(fā)給 WorkerEventLoopGroup,WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop 來將這 個SocketChannel 注冊到其維護的 Selector 并對其后續(xù)的 IO 事件進行處理。 ChannelPipeline 中的每一個 ChannelHandler 都是通過它的 EventLoop(I/O 線程)來處理傳遞給它的事件的。所以至關(guān)重要的是不要阻塞這個線程,因為這會對整體的 I/O 處理產(chǎn)生嚴重的負面影響。但有時可能需要與那些使用阻塞 API 的遺留代碼進行交互。 對于這種情況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add() 方法。如果一個事件被傳遞給一個自定義的 EventExecutorGroup, DefaultEventExecutorGroup 的默認實現(xiàn)。 就是在把 ChannelHanders 添加到 ChannelPipeline 的時候,指定一個 EventExecutorGroup,ChannelHandler 中所有的方法都將會在這個指定的 EventExecutorGroup 中運行。 static final EventExecutor group = new DefaultEventExecutorGroup(16); ... ChannelPipeline p = ch.pipeline(); pipeline.addLast(group, "handler", new MyChannelHandler());
最后小結(jié)一下:(如果你還沒明白,可以看一下群里面的視頻解析)
|
|