日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

使用netty進(jìn)行服務(wù)端網(wǎng)絡(luò)編程及數(shù)據(jù)高效分發(fā)功能實現(xiàn)

 WindySky 2019-02-28

       這篇文章起了一個很牛b的名字,實際要講的內(nèi)容很簡單。但是還是發(fā)現(xiàn)很多人把這個功能寫復(fù)雜了。netty的服務(wù)端網(wǎng)絡(luò)編程,按照官方提供的demo,稍加修改即可,但是一些參數(shù)選項,需要自己去完善設(shè)置。而數(shù)據(jù)分發(fā)功能,就是面向所有連接分發(fā)數(shù)據(jù),很多人的做法是使用java concurrent包下的相關(guān)容器保存連接,然后需要分發(fā)數(shù)據(jù)時,遍歷集合中的元素,一個一個的調(diào)用writeAndFlush()將數(shù)據(jù)發(fā)出去。其實可以更簡單一些,使用線程安全的ChannelGroup保存連接并分發(fā)數(shù)據(jù)。性能提高多少,我沒有對比過,至少代碼看著簡化了。下面是相關(guān)實現(xiàn)代碼,實際生產(chǎn)環(huán)境中,下面代碼應(yīng)該還有很多很多需要優(yōu)化的地方,目前我還沒有在超過10k并發(fā)連接情況下測試過下面的代碼。

public class NetServer implements Runnable {

    private static Logger LOGGER = LoggerFactory.getLogger(NetServer.class);

    private final static int PORT = 9500;

    private final static int MAX_MESSAGE_LENGTH = 8192;

    private final ChannelGroup channels_ = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public NetServer() {

    }

    // 分發(fā)數(shù)據(jù)

    public void dispatcher(String message) {

        channels_.writeAndFlush(message);

    }

    @Override

    public void run() {

        startup();

    }

    private void startup() {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup);

            b.channel(NioServerSocketChannel.class);

            b.option(ChannelOption.SO_BACKLOG, 128);

            b.option(ChannelOption.TCP_NODELAY, true);

            b.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override

                protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();

                    p.addLast(new LengthFieldBasedFrameDecoder(MAX_MESSAGE_LENGTH, 0, 4, 0 ,4));

                    p.addLast(new LengthFieldPrepender(4));

                    p.addLast(new StringDecoder(CharsetUtil.UTF_8));

                    p.addLast(new StringEncoder(CharsetUtil.UTF_8));

                    p.addLast(new NetServerHandler(channels_));

                }

            });

            ChannelFuture future = b.bind(PORT).sync();

            future.addListener(new GenericFutureListener<Future<? super Void>>() {

                @Override

                public void operationComplete(Future<? super Void> future) throws Exception {

                    if (future.isSuccess()) {

                        LOGGER.info("服務(wù)器啟動成功...");

                    } else {

                        LOGGER.info("服務(wù)器啟動失敗...");

                        if (future.cause() != null) {

                            LOGGER.error("異常信息: " + future.cause().getMessage());

                        }

                    }

                }

            });

            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {

            LOGGER.error("服務(wù)器啟動出現(xiàn)異常..." + e.getMessage());

        } finally {

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

}

public class NetServerHandler extends SimpleChannelInboundHandler<String> {

    private ChannelGroup channels_;

    private static Logger LOGGER = LoggerFactory.getLogger(NetServerHandler.class);

    public NetServerHandler(ChannelGroup channels) {

        channels_ = channels;

    }

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        channels_.add(ctx.channel());

        LOGGER.info("{} is up...當(dāng)前連接數(shù)量: {}", ctx.channel().remoteAddress(), channels_.size());

    }

    @Override

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        ctx.close();

        LOGGER.info("{} is down...當(dāng)前連接數(shù)量: {}", ctx.channel().remoteAddress(), channels_.size());

    }

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

    }

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        ctx.close();

        LOGGER.error("與客戶端 {} 連接出現(xiàn)異常, 異常信息: " + cause.getMessage(), ctx.channel().remoteAddress());

    }

}

--------------------- 

作者:grafx 

來源:CSDN 

原文:https://blog.csdn.net/grafx/article/details/56677667 

版權(quán)聲明:本文為博主原創(chuàng)文章,轉(zhuǎn)載請附上博文鏈接!

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多