一、Netty核心模塊
- BootStrap:客戶端程序的啟動引導(dǎo)類
- ServerBootStrap:服務(wù)端程序的啟動引導(dǎo)類
它們的常用方法有:
- group:設(shè)置線程組
- channel:指定通道的實現(xiàn)類
- option:給channel添加配置
- childOption:給接收到的channel添加配置
- handler:設(shè)置bossGroup的handler
- childHandler:設(shè)置workGroup的handler
- Selector:用來接收注冊的channel的。當(dāng)往selector中注冊channel后,selector就會自動不斷地輪詢這些channel是否有就緒的IO事件。
- ChannelHandler:是一個接口,處理IO事件或者攔截IO事件,也就是說你拿到channel后想干的事情都通過channelHandler來完成。
- ChannelPipeline:是一個handler的集合,負(fù)責(zé)處理和攔截入站事件和出站操作。一個channel包含了一個ChannelPipeline,而ChannelPipeline又維護(hù)了一個由ChannelHandlerContext組成的雙向鏈表,并且每個ChannelHandlerContext中又關(guān)聯(lián)著一個ChannelHandler。入站事件就是從鏈表頭傳遞到鏈表尾的handler,出站事件就是從鏈表尾傳到鏈表頭的handler。ChannelPipeline的常用方法:
- addFirst:把handler添加到鏈表第一個位置
- addLast:把handler添加到鏈表的最后一個位置
- ChannelHandlerContext:保存channel相關(guān)的上下文信息,同時關(guān)聯(lián)了Channelhandler對象,也綁定了對應(yīng)的pipeline和channel信息,方便對channelhandler進(jìn)行調(diào)用。
二、用Netty實現(xiàn)聊天室功能
之前說過用NIO實現(xiàn)聊天室,現(xiàn)在來看看用netty如何實現(xiàn)聊天室。這里我將新建兩個maven項目,一個服務(wù)端,一個客戶端,最后可以打成jar包,服務(wù)端jar包運行在你電腦上,客戶端jar包自己跑一份,還可以發(fā)給你的同事,然后就可以愉快的聊天了。
1、服務(wù)端:
- pom.xml:引入netty的依賴,還要配置一下打包插件,不然你運行的jar包就會報“找不到主清單文件”或者沒把netty依賴打包上去。
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
客戶端的pom.xml唯一的區(qū)別就是換成了客戶端啟動類。
public class NettyChatRoomServer {
public void run () throws Exception {
// 創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new NettyChatRoomServerInitializer());
// 監(jiān)聽端口
ChannelFuture cf = bootstrap.bind(6666).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyChatRoomServer ncrs = new NettyChatRoomServer();
ncrs.run();
}
}
- NettyChatRoomServerInitializer:
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode",new StringDecoder());//解碼器
pipeline.addLast("encode",new StringEncoder());//編碼器
pipeline.addLast("handler",new NettyChatRoomServerHandler());
}
}
- NettyChatRoomServerHandler:
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 當(dāng)有channel加入時執(zhí)行該方法(即當(dāng)有客戶端連接時)
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + "進(jìn)入聊天室");
for (Channel channel : channelGroup) {
// 給別的客戶端提醒:xxx加入群聊
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + "進(jìn)入聊天室");
}
}
// 將當(dāng)前channel加入到channelGroup中
channelGroup.add(ctx.channel());
}
/**
* 當(dāng)有channel刪除時執(zhí)行該方法(即客戶端斷開連接)
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + "離開聊天室");
for (Channel channel : channelGroup) {
// 給別的客戶端提醒:xxx加入群聊
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + "離開聊天室");
}
}
// 這里不需要channelGroup.remove,會自動remove
}
/**
* 當(dāng)有數(shù)據(jù)讀取時執(zhí)行該方法
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
System.out.println("【" + ip + "】" + ":" + msg);
for (Channel channel : channelGroup) {
// 將消息轉(zhuǎn)發(fā)給別的客戶端
if (ctx.channel() != channel) {
channel.writeAndFlush("【" + ip + "】" + ":" + msg);
} else {
channel.writeAndFlush("【我】:" + msg);
}
}
}
/**
* 異常處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2、客戶端:
public class NettyChatRoomClient {
@SuppressWarnings("resource")
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new NettyChatRoomClientInitializer());
// 連接服務(wù)器
ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg);
}
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
NettyChatRoomClient ncrc = new NettyChatRoomClient();
ncrc.run();
}
}
- NettyChatRoomClientInitializer:
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode",new StringDecoder());//解碼器
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast("handler",new NettyChatRoomClientHandler());
}
}
- NettyChatRoomClientHandler:
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
/**
* 異常處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 將從服務(wù)端接收到的消息打印出來
System.out.println(msg);
}
}
三、Netty心跳檢測機(jī)制
客戶端與服務(wù)端連接是否正常,需要有一個機(jī)制來檢測,Netty提供了心跳檢測機(jī)制。1、Netty心跳檢測案例:
客戶端和以前一樣,沒有變換,主要是服務(wù)端加了日志handler以及childHandler重寫了一個用于檢測心跳的方法userEventTriggered,服務(wù)端代碼如下:
public class HeartBeatServer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建boss group (boss group和work group含有的子線程數(shù)默認(rèn)是cpu數(shù) * 2)
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 2. 創(chuàng)建work group
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 3. 創(chuàng)建服務(wù)端啟動對象
ServerBootstrap bootstrap = new ServerBootstrap();
// 4. 配置啟動參數(shù)
bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個線程組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
.handler(new LoggingHandler(LogLevel.INFO)) // 日志處理器
.childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
sc.pipeline().addLast(new HeartBeatServerHandler());
}
});
// 5. 啟動服務(wù)器并綁定端口
ChannelFuture cf = bootstrap.bind(6666).sync();
// 6. 對關(guān)閉通道進(jìn)行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String info = null;
switch (event.state()) {
case READER_IDLE:
info = "讀空閑";
break;
case WRITER_IDLE:
info = "寫空閑";
break;
case ALL_IDLE:
info = "讀寫空閑";
break;
}
System.out.println(ctx.channel().remoteAddress() + ":" + info);
}
}
}
四、WebSocket長連接開發(fā)
1、http協(xié)議和websocket協(xié)議的區(qū)別:
http協(xié)議:無狀態(tài)、短連接、被動型。即只能由客戶端發(fā)起請求,服務(wù)端給客戶端響應(yīng),當(dāng)服務(wù)端響應(yīng)完,本次請求的生命周期就結(jié)束了。客戶端沒辦法主動感知服務(wù)端的變化,服務(wù)端也沒辦法主動推送數(shù)據(jù)給客戶端。比如你請求秒殺接口,秒殺接口給你返回排隊中,那到底什么時候排上號了呢?客戶端就得不斷地循環(huán)請求獲取秒殺結(jié)果的接口。
websocket:是基于http協(xié)議開發(fā)的,握手過程和htpp一樣。所謂長連接,就是服務(wù)端和客戶端可以相互感知,瀏覽器關(guān)閉了服務(wù)端可以感知,服務(wù)端關(guān)閉了瀏覽器可以感知。比如還是秒殺,如果是用websocket長連接開發(fā)的接口,你請求秒殺返回排隊中,然后你不用再循環(huán)請求獲取訂單狀態(tài)的接口,服務(wù)端和客戶端會保持一個長連接,服務(wù)端可以主動把訂單狀態(tài)推給客戶端。
2、案例代碼:
public class WebSocketServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup) // 設(shè)置兩個線程組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為服務(wù)器的通道
.handler(new LoggingHandler(LogLevel.INFO)) // 日志處理器
.childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建通道初始化對象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new HttpServerCodec()); // 使用http的編碼解碼器
sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以塊方式寫,添加ChunkedWriteHandler處理器
sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http數(shù)據(jù)在傳輸?shù)臅r候是分段的,用這個處理器就可聚集分段
// 請求的url就是:ws://localhost:6666/hello
sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
sc.pipeline().addLast(new WebSocketServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(80).sync();
System.out.println("服務(wù)端準(zhǔn)備好了");
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服務(wù)器收到消息:" + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時間:" + LocalDateTime.now()) + ",msg:" + msg.text());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded被調(diào)用:" + ctx.channel().id().asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved被調(diào)用:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught被調(diào)用:" + ctx.channel().id().asLongText());
ctx.close();
}
}
然后編寫一個頁面,用來發(fā)送websocket請求:
<body>
<script type="text/javascript">
var socket;
if(window.WebSocket) {
socket = new WebSocket("ws://127.0.0.1/hello");
// 接收服務(wù)器端返回的消息,顯示在responseText中
socket.onmessage = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相當(dāng)于開啟連接
socket.onopen = function(ev){
var rt = document.getElementById("responseText");
rt.value = "連接開啟了";
}
// 連接關(guān)閉
socket.onclose = function(ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "連接關(guān)閉了";
}
} else {
alert("瀏覽器不支持websocket");
}
function send(message){
if (!window.socket){
return;
}
if (socket.readyState == WebSocket.OPEN){
socket.send(message);
} else {
alert("連接沒有開啟");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height:300px;width: 300px"></textarea>
<input type="button" value="發(fā)送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height:300px;width: 300px"></textarea>
</form>
</body>
訪問這個頁面,服務(wù)端啟動或者關(guān)閉會在框框中顯示出來,同樣,如果客戶端關(guān)閉,服務(wù)端也會在控制臺打印出來。
五、protobuf
1、編解碼問題:
數(shù)據(jù)在網(wǎng)絡(luò)中是以二進(jìn)制字節(jié)碼傳輸?shù)?,發(fā)送的數(shù)據(jù)需要編碼,服務(wù)端收到后需要解碼。Netty提供了StringDecoder、ObjectDecoder,底層采用的是java序列化技術(shù),java序列化本身效率較低,而且無法跨語言,所以就有了protobuf。
2、protobuf簡介:它是Google的開源項目,輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲格式,可用于數(shù)據(jù)的序列化,且支持跨平臺跨語言,很適合做數(shù)據(jù)存儲和RPC數(shù)據(jù)交換格式。
3、protobuf的使用:
- 需求:客戶端發(fā)送一個Student對象到服務(wù)器,服務(wù)器接收這個對象,并顯示它的信息。
下面開始編碼:
下載protoc-3.6.1-win32.zip,然后解壓。下載地址:https://github.com/protocolbuffers/protobuf/releases/tag/v3.6.1
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部類名
message Student { // 內(nèi)部類名
int32 id = 1; // 1不是值,而是序號
string name = 2;
}
- 將這個類Student.proto復(fù)制到剛解壓出來的bin目錄下,也就是和protoc.exe要同一個目錄;
protoc.exe --java_out=. Student.proto
執(zhí)行完后會在protoc.exe所在目錄生成一個StudentPOJO.java文件,這就是我們要的文件,復(fù)制到項目中。
- 客戶端的修改:NettyClientHandler中的channelActive改成這樣:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
// 發(fā)送student對象到服務(wù)端
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("張三").build();
ctx.writeAndFlush(student);
}
NettyClient中添加protobuf的編碼器:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 加入protobuf的編碼器
sc.pipeline().addLast("encoder", new ProtobufEncoder());
sc.pipeline().addLast(new NettyClientHandler());
}
- 服務(wù)端的修改:NettyServer中添加解碼器:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 加入解碼器,指定解碼對象
sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
// 傳入自定義的handler
sc.pipeline().addLast(new NettyServerHandler());
// 在這里,可以將SocketChannel sc保存到集合中,別的線程拿到集合就可以調(diào)用channel的方法了
}
NettyServerHandler中讀取student對象:
// 讀取數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 讀取客戶端發(fā)送的student
StudentPOJO.Student student = (Student) msg;
System.out.println("客戶端發(fā)送的數(shù)據(jù)是:id=" + student.getId() + ", name=" + student.getName());
}
啟動服務(wù)端,再啟動客戶端,就可以看到服務(wù)端后臺打印出了如下信息:
客戶端發(fā)送的數(shù)據(jù)是:id=666, name=張三