這篇文章起了一個很牛b的名字,實際要講的內(nèi)容很簡單。但是還是發(fā)現(xiàn)很多人把這個功能寫復(fù)雜了。netty的服務(wù)端網(wǎng)絡(luò)編程,按照官方提供的demo,稍加修改即可,但是一些參數(shù)選項,需要自己去完善設(shè)置。而數(shù)據(jù)分發(fā)功能,就是面向所有連接分發(fā)數(shù)據(jù),很多人的做法是使用java concurrent包下的相關(guān)容器保存連接,然后需要分發(fā)數(shù)據(jù)時,遍歷集合中的元素,一個一個的調(diào)用writeAndFlush()將數(shù)據(jù)發(fā)出去。其實可以更簡單一些,使用線程安全的ChannelGroup保存連接并分發(fā)數(shù)據(jù)。性能提高多少,我沒有對比過,至少代碼看著簡化了。下面是相關(guān)實現(xiàn)代碼,實際生產(chǎn)環(huán)境中,下面代碼應(yīng)該還有很多很多需要優(yōu)化的地方,目前我還沒有在超過10k并發(fā)連接情況下測試過下面的代碼。 public class NetServer implements Runnable { private static Logger LOGGER = LoggerFactory.getLogger(NetServer.class); private final static int PORT = 9500; private final static int MAX_MESSAGE_LENGTH = 8192; private final ChannelGroup channels_ = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public NetServer() { } // 分發(fā)數(shù)據(jù) public void dispatcher(String message) { channels_.writeAndFlush(message); } @Override public void run() { startup(); } private void startup() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.option(ChannelOption.SO_BACKLOG, 128); b.option(ChannelOption.TCP_NODELAY, true); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LengthFieldBasedFrameDecoder(MAX_MESSAGE_LENGTH, 0, 4, 0 ,4)); p.addLast(new LengthFieldPrepender(4)); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new NetServerHandler(channels_)); } }); ChannelFuture future = b.bind(PORT).sync(); future.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if (future.isSuccess()) { LOGGER.info("服務(wù)器啟動成功..."); } else { LOGGER.info("服務(wù)器啟動失敗..."); if (future.cause() != null) { LOGGER.error("異常信息: " + future.cause().getMessage()); } } } }); future.channel().closeFuture().sync(); } catch (InterruptedException e) { LOGGER.error("服務(wù)器啟動出現(xiàn)異常..." + e.getMessage()); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } public class NetServerHandler extends SimpleChannelInboundHandler<String> { private ChannelGroup channels_; private static Logger LOGGER = LoggerFactory.getLogger(NetServerHandler.class); public NetServerHandler(ChannelGroup channels) { channels_ = channels; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels_.add(ctx.channel()); LOGGER.info("{} is up...當(dāng)前連接數(shù)量: {}", ctx.channel().remoteAddress(), channels_.size()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.close(); LOGGER.info("{} is down...當(dāng)前連接數(shù)量: {}", ctx.channel().remoteAddress(), channels_.size()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); LOGGER.error("與客戶端 {} 連接出現(xiàn)異常, 異常信息: " + cause.getMessage(), ctx.channel().remoteAddress()); } } --------------------- 作者:grafx 來源:CSDN 原文:https://blog.csdn.net/grafx/article/details/56677667 版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接! |
|