一、netty概述
「1、NIO存在的問題:」
- NIO的API比較復雜,需要熟練掌握3個核心組件,channel、buffer和selector;
- 開發(fā)工作量大,難度也比較大,需要解決斷連、重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁堵等各種情況;
- NIO存在bug,一個叫Epoll的bug,會導致選擇器空輪詢,形成死循環(huán),最后CPU飆到100%
正是因為NIO存在這些問題,netty就應運而生了。
「2、Netty簡介:」
netty是一個異步的,基于事件驅動的網絡應用框架??梢钥焖俚亻_發(fā)高性能的服務器端和客戶端,像dubbo和elasticsearch底層都用了netty。它具有以下優(yōu)點:
- 安全,完整的SSL/TLS和StartTLS支持;
官方下載地址:https:///netty/downloads
我本次下載的版本是:4.1.51.Final
二、netty的架構設計
「1、線程模型:」
目前存在的線程模式:
根據(jù)Reactor的數(shù)量和1處理資源的線程數(shù)不同,又分3種:
Netty的線程模型是基于主從Reactor多線程做了改進。
「2、傳統(tǒng)阻塞IO的線程模型:」
采用阻塞IO獲取輸入的數(shù)據(jù),每個連接都需要獨立的線程來處理邏輯。存在的問題就是,當并發(fā)數(shù)很大時,就需要創(chuàng)建很多的線程,占用大量的資源。連接創(chuàng)建后,如果當前線程沒有數(shù)據(jù)可讀,該線程將會阻塞在讀數(shù)據(jù)的方法上,造成線程資源浪費。
「3、Reactor模式(分發(fā)者模式/反應器模式/通知者模式):」
針對傳統(tǒng)阻塞IO的模型,做了以下兩點改進:
- 基于IO復用模型:多個客戶端共用一個阻塞對象,而不是每個客戶端都對應一個阻塞對象
- 基于線程池復用線程資源:使用了線程池,而不是每來一個客戶端就創(chuàng)建一個線程
Reactor模式的核心組成:
- Reactor:Reactor就是多個客戶端共用的那一個阻塞對象,它單獨起一個線程運行,負責監(jiān)聽和分發(fā)事件,將請求分發(fā)給適當?shù)奶幚沓绦騺磉M行處理
- Handler:處理程序要完成的實際事件,也就是真正執(zhí)行業(yè)務邏輯的程序,它是非阻塞的
「4、單Reactor單線程:」
模型圖這個圖其實就跟之前的NIO群聊系統(tǒng)對應。多個客戶端請求連接,然后Reactor通過selector輪詢判斷哪些通道是有事件發(fā)生的,如果是連接事件,就到了Acceptor中建立連接;如果是其他讀寫事件,就有dispatch分發(fā)到對應的handler中進行處理。這種模式的缺點就是Reactor和Handler是在一個線程中的,如果Handler阻塞了,那么程序就阻塞了。
「5、單Reactor多線程:」
單reactor多線程處理流程如下:
- Reactor對象通過Selector監(jiān)聽客戶端請求事件,通過dispatch進行分發(fā);
- 如果是連接事件,則由Acceptor通過accept方法處理連接請求,然后創(chuàng)建一個Handler對象響應事件;
- 如果不是連接請求,則由Reactor對象調用對應handler對象進行處理;handler只響應事件,不做具體的業(yè)務處理,它通過read方法讀取數(shù)據(jù)后,會分發(fā)給線程池的某個線程進行業(yè)務處理,并將處理結果返回給handler;
- handler收到響應后,通過send方法將結果返回給client。
相比單Reactor單線程,這里將業(yè)務處理的事情交給了不同的線程去做,發(fā)揮了多核CPU的性能。但是Reactor只有一個,所有事件的監(jiān)聽和響應,都由一個Reactor去完成,并發(fā)性還是不好。
「6、主從Reactor多線程:」
主從reactor多線程這個模型相比單reactor多線程的區(qū)別就是:專門搞了一個MainReactor來處理連接事件,如果不是連接事件,就分發(fā)給SubReactor進行處理。圖中這個SubReactor只有一個,其實是可以有多個的,所以性能就上去了。
- 優(yōu)點:父線程與子線程的交互簡單、職責明確,父線程負責接收連接,子線程負責完成后續(xù)的業(yè)務處理;
「7、netty的模型:」
netty模型是基于主從Reactor多線程模型設計的,其工作流程如下:
- Netty有兩組線程池,一個Boss Group,它專門負責客戶端連接,另一個Work Group,專門負責網絡讀寫;
- Boss Group和Work Group的類型都是NIOEventLoopGroup;
- NIOEventLoopGroup相當于一個事件循環(huán)組,這個組包含了多個事件循環(huán),每一個循環(huán)都是NIOEventLoop;
- NIOEventLoop表示一個不斷循環(huán)執(zhí)行處理任務的線程,每個NIOEventLoop都有一個Selector,用于監(jiān)聽綁定在其上的socket;
- Boss Group下的每個NIOEventLoop的執(zhí)行步驟有3步:(1). 輪詢accept連接事件;(2). 處理accept事件,與client建立連接,生成一個NioSocketChannel,并將其注冊到某個work group下的NioEventLoop的selector上;(3). 處理任務隊列的任務,即runAllTasks;
- 每個Work Group下的NioEventLoop循環(huán)執(zhí)行以下步驟:(1). 輪詢read、write事件;(2). 處理read、write事件,在對應的NioSocketChannel處理;(3). 處理任務隊列的任務,即runAllTasks;
- 每個Work Group下的NioEventLoop在處理業(yè)務時,會使用pipeline(管道),pipeline中包含了channel,即通過pipeline可以獲取到對應的channel,pipeline中維護了很多的處理器。
netty模型圖如下,對應了上面那段流程:
三、netty入門實例
使用netty創(chuàng)建一個服務端與客戶端,監(jiān)聽6666端口。
「1、服務端:」
public class NettyServer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認是cpu數(shù) * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 創(chuàng)建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 創(chuàng)建服務端啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置啟動參數(shù)
bootstrap.group(bossGroup, workGroup) // 設置兩個線程組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務器的通道
.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列等待連接個數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true) // 設置保持活動連接狀態(tài)
.childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對象
// 給pipeline設置處理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 傳入自定義的handler
sc.pipeline().addLast(new NettyServerHandler());
}
});
// 5. 啟動服務器并綁定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 6. 對關閉通道進行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
}
// 數(shù)據(jù)讀取完畢后
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是服務端", CharsetUtil.UTF_8));
}
// 處理異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
「2、客戶端:」
public class NettyClient {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建事件循環(huán)組
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 2. 創(chuàng)建啟動對象
Bootstrap bootstrap = new Bootstrap();
// 3. 設置相關參數(shù)
bootstrap.group(eventLoopGroup) // 設置線程組
.channel(NioSocketChannel.class) // 設置通道
.handler(new ChannelInitializer<SocketChannel>() {
// 給pipeline設置處理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new NettyClientHandler());
}
});
// 4. 連接服務端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
// 5. 監(jiān)聽通道關閉
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter{
// 通道就緒就被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,我是客戶端", CharsetUtil.UTF_8));
}
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到服務端消息:" + buf.toString(CharsetUtil.UTF_8));
}
// 處理異常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
先啟動服務端,然后啟動客戶端,就能看到服務端和客戶端在控制臺打印出來的消息了。
「3、TaskQueue自定義任務:」
上面服務端的NettyServerHandler的channelRead方法中,假如有一個非常耗時的業(yè)務,那么就會阻塞在那里,直到業(yè)務執(zhí)行完。比如將NettyServerHandler的channelRead方法改成下面這樣:
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 線程休眠10秒,模擬耗時業(yè)務
TimeUnit.SECONDS.sleep(10);
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
}
啟動后會發(fā)現(xiàn),10秒鐘后,服務端才會收到客戶端發(fā)送的消息,客戶端也要10秒后,才會接收到服務端的消息,即服務端的channelReadComplete方法是要在channelRead方法執(zhí)行完后才會執(zhí)行的。
一直阻塞在那里也不是辦法,希望可以異步執(zhí)行,那我們就可以把該任務提交到該channel對應的NioEventLoop的TaskQueue中。有以下解決方案:
- 用戶程序自定義的普通任務:將channelRead方法改成下面這樣:
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
}
});
}
這里仍舊休眠10秒。啟動服務端,再啟動客戶端,發(fā)現(xiàn)服務端要10s后才會打印出客戶端發(fā)送的消息,但是客戶端立刻就可以收到服務端在channelReadComplete方法里發(fā)送的消息,說明這次是異步的。
- 用戶自定義定時任務:與上面的區(qū)別不大,代碼如下:
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) { e.printStackTrace();}
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客戶端消息:" + buf.toString(CharsetUtil.UTF_8));
}
}, 5, TimeUnit.SECONDS);
}
啟動服務端,然后啟動客戶端,發(fā)現(xiàn)客戶端還是會立即收到服務端發(fā)出的消息,而服務端,首先要等待5秒才去執(zhí)行run方法,run方法里面線程又休眠了10秒,所以總共要15秒后才會打印出客戶端發(fā)送的消息。
- 非當前Reactor線程調用channel的各種方法:這個的意思就是,如果我別的業(yè)務代碼,比如消息推送系統(tǒng),也想給客戶端發(fā)送消息,該咋整?其實很簡單,在NettyServerHandler的channelRead方法里,我們是通過ChannelHandlerContext 拿到Channel然后進行各種操作的,所以拿到了Channel就可以進行操作。那么可以在NettyServer中將Channel保存到集合中去,然后遍歷集合,拿到Channel就可以進行操作了。
// 給pipeline設置處理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 傳入自定義的handler
sc.pipeline().addLast(new NettyServerHandler());
// 在這里,可以將SocketChannel sc保存到集合中,別的線程拿到集合就可以調用channel的方法了
}
四、Netty的異步模型
「1、基本介紹:」
- Netty中的I/O操作都是異步的,包括bind、write和connect。這些操作會返回一個ChannelFuture對象,而不會立即返回操作結果。
- 調用者不能立即得到返回結果,而是通過Futrue-Listener機制,用戶可以主動獲取或者通過通知機制獲得IO操作的結果。
- Netty的異步是建立在future和callback之上的。callback是回調,future表示異步執(zhí)行的結果,它的核心思想是:假設有個方法fun(),計算過程可能非常耗時,等待fun()返回要很久,那么可以在調用fun()的時候,立馬返回一個future,后續(xù)通過future去監(jiān)控fun()方法的處理過程,這就是future-listener機制。
- 用戶可以通過注冊監(jiān)聽函數(shù),來獲取操作真正的結果,ChannelFuture常用的函數(shù)如下:
// 判斷當前操作是否完成
isDone
// 判斷當前操作是否成功
isSuccess
// 獲取操作失敗的原因
getCause
// 判斷當前操作是否被取消
isCancelled
// 注冊監(jiān)聽器
addListener
「2、使用監(jiān)聽器:」
在NettyServer中的“啟動并綁定端口”下面加上如下代碼:
// 5. 啟動服務器并綁定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 注冊監(jiān)聽器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
if (cf.isSuccess()) {
System.out.println("綁定端口成功");
} else {
System.out.println("綁定端口失敗");
}
}
});
這樣就注冊了監(jiān)聽器,會監(jiān)聽綁定端口的結果,如果成功了,就會打印出綁定成功這段話。
五、使用Netty開發(fā)Http服務
開發(fā)一個Netty服務端,監(jiān)聽80端口,瀏覽器訪問localhost,可以返回信息給瀏覽器。代碼如下:
public class NettyHttpServer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認是cpu數(shù) * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 創(chuàng)建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 創(chuàng)建服務端啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置啟動參數(shù)
bootstrap.group(bossGroup, workGroup) // 設置兩個線程組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務器的通道
.childHandler(new NettyHttpServerInitializer());
// 5. 啟動服務器并綁定端口
ChannelFuture cf = bootstrap.bind(80).sync();
// 6. 對關閉通道進行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
- NettyHttpServerInitializer:
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {
// 向管道加入處理器
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 1. 得到管道
ChannelPipeline pipeline = sc.pipeline();
// 2. 加入一個編碼器解碼器
pipeline.addLast("codec", new HttpServerCodec());
// 3. 增加一個自定義的handler處理器
pipeline.addLast("handler", new NettyHttpServerHandler());
}
}
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject>{
// 讀取數(shù)據(jù)
@Override
protected void channelRead0(ChannelHandlerContext chc, HttpObject msg) throws Exception {
// 1. 判斷msg是不是httpRequest請求
if (msg instanceof HttpRequest) {
System.out.println("msg類型:" + msg.getClass());
System.out.println("客戶端地址:" + chc.channel().remoteAddress());
// 對特定資源不做響應
HttpRequest httpRequest = (HttpRequest) msg;
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("請求了圖標,不做響應");
return;
}
// 2. 創(chuàng)建回復給瀏覽器的信息
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務器,are you ok?", CharsetUtil.UTF_8);
// 3. 構造http響應
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
// 4. 將response返回
chc.writeAndFlush(response);
}
}
}
啟動server后,在瀏覽器就訪問localhost就可以返回相關內容了。