日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Netty進(jìn)階

 貪挽懶月 2022-06-20 發(fā)布于廣東

一、Netty核心模塊

  • BootStrap:客戶端程序的啟動引導(dǎo)類
  • ServerBootStrap:服務(wù)端程序的啟動引導(dǎo)類

它們的常用方法有:

- group:設(shè)置線程組
- channel:指定通道的實現(xiàn)類
- option:給channel添加配置
- childOption:給接收到的channel添加配置
- handler:設(shè)置bossGroup的handler
- childHandler:設(shè)置workGroup的handler
  • Selector:用來接收注冊的channel的。當(dāng)往selector中注冊channel后,selector就會自動不斷地輪詢這些channel是否有就緒的IO事件。
  • ChannelHandler:是一個接口,處理IO事件或者攔截IO事件,也就是說你拿到channel后想干的事情都通過channelHandler來完成。
  • ChannelPipeline:是一個handler的集合,負(fù)責(zé)處理和攔截入站事件和出站操作。一個channel包含了一個ChannelPipeline,而ChannelPipeline又維護(hù)了一個由ChannelHandlerContext組成的雙向鏈表,并且每個ChannelHandlerContext中又關(guān)聯(lián)著一個ChannelHandler。入站事件就是從鏈表頭傳遞到鏈表尾的handler,出站事件就是從鏈表尾傳到鏈表頭的handler。ChannelPipeline的常用方法:
- addFirst:把handler添加到鏈表第一個位置
- addLast:把handler添加到鏈表的最后一個位置
  • ChannelHandlerContext:保存channel相關(guān)的上下文信息,同時關(guān)聯(lián)了Channelhandler對象,也綁定了對應(yīng)的pipeline和channel信息,方便對channelhandler進(jìn)行調(diào)用。

二、用Netty實現(xiàn)聊天室功能

之前說過用NIO實現(xiàn)聊天室,現(xiàn)在來看看用netty如何實現(xiàn)聊天室。這里我將新建兩個maven項目,一個服務(wù)端,一個客戶端,最后可以打成jar包,服務(wù)端jar包運行在你電腦上,客戶端jar包自己跑一份,還可以發(fā)給你的同事,然后就可以愉快的聊天了。

1、服務(wù)端:

  • pom.xml:引入netty的依賴,還要配置一下打包插件,不然你運行的jar包就會報“找不到主清單文件”或者沒把netty依賴打包上去。
<dependencies>
 <dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.50.Final</version>
 </dependency>
</dependencies>
<build>
 <plugins>
  <plugin>
   <artifactId>maven-assembly-plugin</artifactId>
   <configuration>
    <descriptorRefs>
     <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
     <manifest>
      <mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
     </manifest>
    </archive>
   </configuration>
   <executions>
    <execution>
     <id>make-assembly</id>
     <phase>package</phase>
     <goals>
      <goal>single</goal>
     </goals>
    </execution>
   </executions>
  </plugin>
 </plugins>
</build>

客戶端的pom.xml唯一的區(qū)別就是換成了客戶端啟動類。

  • NettyChatRoomServer:
public class NettyChatRoomServer {
 
 public void run () throws Exception {
  // 創(chuàng)建兩個線程組
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   // 配置參數(shù)
   ServerBootstrap bootstrap = new ServerBootstrap();
   bootstrap.group(bossGroup, workGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new NettyChatRoomServerInitializer());
   // 監(jiān)聽端口
   ChannelFuture cf = bootstrap.bind(6666).sync(); 
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
  }
 }
 
 public static void main(String[] args) throws Exception {
  NettyChatRoomServer ncrs = new NettyChatRoomServer();
  ncrs.run();
 }
}
  • NettyChatRoomServerInitializer:
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{

 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解碼器
        pipeline.addLast("encode",new StringEncoder());//編碼器
        pipeline.addLast("handler",new NettyChatRoomServerHandler());
  
 }
}
  • NettyChatRoomServerHandler:
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{

 private  static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
 /**
  * 當(dāng)有channel加入時執(zhí)行該方法(即當(dāng)有客戶端連接時)
  */
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + "進(jìn)入聊天室");
  for (Channel channel : channelGroup) {
   // 給別的客戶端提醒:xxx加入群聊
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】" + "進(jìn)入聊天室");
   }
  }
  // 將當(dāng)前channel加入到channelGroup中
  channelGroup.add(ctx.channel());
 }
 
 /**
  * 當(dāng)有channel刪除時執(zhí)行該方法(即客戶端斷開連接)
  */
 @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + "離開聊天室");
  for (Channel channel : channelGroup) {
   // 給別的客戶端提醒:xxx加入群聊
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】" + "離開聊天室");
   }
  }
  // 這里不需要channelGroup.remove,會自動remove
 }
 
 /**
  * 當(dāng)有數(shù)據(jù)讀取時執(zhí)行該方法
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + ":" + msg);
  for (Channel channel : channelGroup) {
   // 將消息轉(zhuǎn)發(fā)給別的客戶端
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】"  + ":" + msg);
   } else {
    channel.writeAndFlush("【我】:" + msg);
   }
  }
 }
 
 /**
  * 異常處理
  */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2、客戶端:

  • NettyChatRoomClient:
public class NettyChatRoomClient {

 @SuppressWarnings("resource")
 public void run() throws Exception {
  EventLoopGroup group = new NioEventLoopGroup();
  try {
   Bootstrap bootstrap = new Bootstrap();
   bootstrap.group(group)
               .channel(NioSocketChannel.class)
               .handler(new NettyChatRoomClientInitializer());
   // 連接服務(wù)器
   ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg);
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
         group.shutdownGracefully();
        }
 }
 
 public static void main(String[] args) throws Exception {
  NettyChatRoomClient ncrc = new NettyChatRoomClient();
  ncrc.run();
 }
}
  • NettyChatRoomClientInitializer:
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{

 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解碼器
        pipeline.addLast("encode",new StringEncoder());
        pipeline.addLast("handler",new NettyChatRoomClientHandler());
 }
}
  • NettyChatRoomClientHandler:
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
 
 /**
  * 異常處理
  */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  // 將從服務(wù)端接收到的消息打印出來
  System.out.println(msg);
 }
}

三、Netty心跳檢測機(jī)制

客戶端與服務(wù)端連接是否正常,需要有一個機(jī)制來檢測,Netty提供了心跳檢測機(jī)制。1、Netty心跳檢測案例:

  • 服務(wù)器超過3秒沒有讀操作,提示讀空閑
  • 服務(wù)器超過5秒沒有寫操作,提示寫空閑
  • 服務(wù)器超過7秒沒有讀和寫,提示讀寫空閑

客戶端和以前一樣,沒有變換,主要是服務(wù)端加了日志handler以及childHandler重寫了一個用于檢測心跳的方法userEventTriggered,服務(wù)端代碼如下:

  • HeartBeatServer:
public class HeartBeatServer {

 public static void main(String[] args) throws Exception {
  // 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認(rèn)是cpu數(shù) * 2)
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  // 2. 創(chuàng)建work group
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   // 3. 創(chuàng)建服務(wù)端啟動對象
   ServerBootstrap bootstrap = new ServerBootstrap();
   // 4. 配置啟動參數(shù)
   bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個線程組
     .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
     .handler(new LoggingHandler(LogLevel.INFO)) // 日志處理器
     .childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對象
      @Override
      protected void initChannel(SocketChannel sc) throws Exception {
       sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
       sc.pipeline().addLast(new HeartBeatServerHandler());
      }
     });
   // 5. 啟動服務(wù)器并綁定端口
   ChannelFuture cf = bootstrap.bind(6666).sync();
   // 6. 對關(guān)閉通道進(jìn)行監(jiān)聽
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
   workGroup.shutdownGracefully();
  }
 }
}
  • HeartBeatServerHandler:
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof IdleStateEvent) {
   IdleStateEvent event = (IdleStateEvent) evt;
   String info = null;
   switch (event.state()) {
   case READER_IDLE:
    info = "讀空閑";
    break;
            case WRITER_IDLE:
             info = "寫空閑";
             break;
            case ALL_IDLE:
             info = "讀寫空閑";
             break;
   }
   System.out.println(ctx.channel().remoteAddress() + ":" + info);
  }
 }
}

四、WebSocket長連接開發(fā)

1、http協(xié)議和websocket協(xié)議的區(qū)別:

  • http協(xié)議:無狀態(tài)、短連接、被動型。即只能由客戶端發(fā)起請求,服務(wù)端給客戶端響應(yīng),當(dāng)服務(wù)端響應(yīng)完,本次請求的生命周期就結(jié)束了。客戶端沒辦法主動感知服務(wù)端的變化,服務(wù)端也沒辦法主動推送數(shù)據(jù)給客戶端。比如你請求秒殺接口,秒殺接口給你返回排隊中,那到底什么時候排上號了呢?客戶端就得不斷地循環(huán)請求獲取秒殺結(jié)果的接口。

  • websocket:是基于http協(xié)議開發(fā)的,握手過程和htpp一樣。所謂長連接,就是服務(wù)端和客戶端可以相互感知,瀏覽器關(guān)閉了服務(wù)端可以感知,服務(wù)端關(guān)閉了瀏覽器可以感知。比如還是秒殺,如果是用websocket長連接開發(fā)的接口,你請求秒殺返回排隊中,然后你不用再循環(huán)請求獲取訂單狀態(tài)的接口,服務(wù)端和客戶端會保持一個長連接,服務(wù)端可以主動把訂單狀態(tài)推給客戶端。

2、案例代碼:

  • WebSocketServer:
public class WebSocketServer {
 public static void main(String[] args) throws Exception {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   ServerBootstrap bootstrap = new ServerBootstrap();
   bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個線程組
     .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
     .handler(new LoggingHandler(LogLevel.INFO)) // 日志處理器
     .childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對象
      @Override
      protected void initChannel(SocketChannel sc) throws Exception {
       sc.pipeline().addLast(new HttpServerCodec()); // 使用http的編碼解碼器
       sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以塊方式寫,添加ChunkedWriteHandler處理器
       sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http數(shù)據(jù)在傳輸?shù)臅r候是分段的,用這個處理器就可聚集分段
       // 請求的url就是:ws://localhost:6666/hello
       sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
       sc.pipeline().addLast(new WebSocketServerHandler());
      }
     });
   ChannelFuture cf = bootstrap.bind(80).sync();
   System.out.println("服務(wù)端準(zhǔn)備好了");
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
   workGroup.shutdownGracefully();
  }
 }
}
  • WebSocketServerHandler:
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  System.out.println("服務(wù)器收到消息:" + msg.text());
  ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時間:" + LocalDateTime.now()) + ",msg:" + msg.text());
 }

 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  System.out.println("handlerAdded被調(diào)用:" + ctx.channel().id().asLongText());
 }

 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  System.out.println("handlerRemoved被調(diào)用:" + ctx.channel().id().asLongText());
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  System.out.println("exceptionCaught被調(diào)用:" + ctx.channel().id().asLongText());
  ctx.close();
 }
}

然后編寫一個頁面,用來發(fā)送websocket請求:


<body>
  <script type="text/javascript">
     var socket;
     if(window.WebSocket) {
      socket = new WebSocket("ws://127.0.0.1/hello");
      // 接收服務(wù)器端返回的消息,顯示在responseText中
      socket.onmessage = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = rt.value + "\n" + ev.data;
      }
      // 相當(dāng)于開啟連接
      socket.onopen = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = "連接開啟了";
      }
      // 連接關(guān)閉
      socket.onclose = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = rt.value + "\n" + "連接關(guān)閉了";
      }
     } else {
      alert("瀏覽器不支持websocket");
     }
     
     function send(message){
      if (!window.socket){
       return;
      }
      if (socket.readyState == WebSocket.OPEN){
       socket.send(message);
      } else {
       alert("連接沒有開啟");
      }
     }
  </script>

  <form onsubmit="return false">
     <textarea name="message" style="height:300px;width: 300px"></textarea>
     <input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)">
     <textarea id="responseText" style="height:300px;width: 300px"></textarea>
  </form>
</body>

訪問這個頁面,服務(wù)端啟動或者關(guān)閉會在框框中顯示出來,同樣,如果客戶端關(guān)閉,服務(wù)端也會在控制臺打印出來。

五、protobuf

1、編解碼問題:

數(shù)據(jù)在網(wǎng)絡(luò)中是以二進(jìn)制字節(jié)碼傳輸?shù)?,發(fā)送的數(shù)據(jù)需要編碼,服務(wù)端收到后需要解碼。Netty提供了StringDecoder、ObjectDecoder,底層采用的是java序列化技術(shù),java序列化本身效率較低,而且無法跨語言,所以就有了protobuf。

2、protobuf簡介:它是Google的開源項目,輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲格式,可用于數(shù)據(jù)的序列化,且支持跨平臺跨語言,很適合做數(shù)據(jù)存儲和RPC數(shù)據(jù)交換格式。

3、protobuf的使用:

  • 需求:客戶端發(fā)送一個Student對象到服務(wù)器,服務(wù)器接收這個對象,并顯示它的信息。

下面開始編碼:



  • 下載protoc-3.6.1-win32.zip,然后解壓。下載地址:https://github.com/protocolbuffers/protobuf/releases/tag/v3.6.1

  • 引入protobuf依賴:

<dependency>
 <groupId>com.google.protobuf</groupId>
 <artifactId>protobuf-java</artifactId>
 <version>3.6.1</version>
</dependency>
  • 編寫Student.proto:
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部類名
message Student { // 內(nèi)部類名
   int32 id = 1; // 1不是值,而是序號
   string name = 2;
}
  • 將這個類Student.proto復(fù)制到剛解壓出來的bin目錄下,也就是和protoc.exe要同一個目錄;
  • 在此目錄打開cmd,執(zhí)行如下命令:
protoc.exe --java_out=. Student.proto

執(zhí)行完后會在protoc.exe所在目錄生成一個StudentPOJO.java文件,這就是我們要的文件,復(fù)制到項目中。

  • 客戶端的修改:NettyClientHandler中的channelActive改成這樣:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("client:" + ctx);
 // 發(fā)送student對象到服務(wù)端
 StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("張三").build();
 ctx.writeAndFlush(student);
}

NettyClient中添加protobuf的編碼器:

@Override
protected void initChannel(SocketChannel sc) throws Exception {
 // 加入protobuf的編碼器
 sc.pipeline().addLast("encoder", new ProtobufEncoder());
 sc.pipeline().addLast(new NettyClientHandler());
}
  • 服務(wù)端的修改:NettyServer中添加解碼器:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
 // 加入解碼器,指定解碼對象
 sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
 // 傳入自定義的handler
 sc.pipeline().addLast(new NettyServerHandler());
 // 在這里,可以將SocketChannel sc保存到集合中,別的線程拿到集合就可以調(diào)用channel的方法了

NettyServerHandler中讀取student對象:

// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 // 讀取客戶端發(fā)送的student
 StudentPOJO.Student student = (Student) msg;
 System.out.println("客戶端發(fā)送的數(shù)據(jù)是:id=" + student.getId() + ", name=" + student.getName());
}

啟動服務(wù)端,再啟動客戶端,就可以看到服務(wù)端后臺打印出了如下信息:

客戶端發(fā)送的數(shù)據(jù)是:id=666, name=張三
轉(zhuǎn)一轉(zhuǎn)
贊一贊
看一看
-java開發(fā)那些事-

    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多