RPC(Remote Procedure Call Protocol)遠(yuǎn)程過程調(diào)用協(xié)議,它是一種通過網(wǎng)絡(luò),從遠(yuǎn)程計算機程序上請求服務(wù),而不必了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。說的再直白一點,就是客戶端在不必知道調(diào)用細(xì)節(jié)的前提之下,調(diào)用遠(yuǎn)程計算機上運行的某個對象,使用起來就像調(diào)用本地的對象一樣。目前典型的RPC實現(xiàn)框架有:Thrift(facebook開源)、Dubbo(alibaba開源)等等。RPC框架針對網(wǎng)絡(luò)協(xié)議、網(wǎng)絡(luò)I/O模型的封裝是透明的,對于調(diào)用的客戶端而言,它就認(rèn)為自己在調(diào)用本地的一個對象。至于傳輸層上,運用的是TCP協(xié)議、UDP協(xié)議、亦或是HTTP協(xié)議,一概不關(guān)心。從網(wǎng)絡(luò)I/O模型上來看,是基于select、poll、epoll方式、還是IOCP(I/O Completion Port)方式承載實現(xiàn)的,對于調(diào)用者而言也不用關(guān)心。 目前,主流的RPC框架都支持跨語言調(diào)用,即有所謂的IDL(接口定義語言),其實,這個并不是RPC所必須要求的。如果你的RPC框架沒有跨語言的要求,IDL就可以不用包括了。 最后,值得一提的是,衡量一個RPC框架性能的好壞與否,RPC的網(wǎng)絡(luò)I/O模型的選擇,至關(guān)重要。在此基礎(chǔ)上,設(shè)計出來的RPC服務(wù)器,可以考慮支持阻塞式同步IO、非阻塞式同步IO、當(dāng)然還有所謂的多路復(fù)用IO模型、異步IO模型。支持不同的網(wǎng)絡(luò)IO模型,在高并發(fā)的狀態(tài)下,處理性能上會有很大的差別。還有一個衡量的標(biāo)準(zhǔn),就是選擇的傳輸協(xié)議。是基于TCP協(xié)議、還是HTTP協(xié)議、還是UDP協(xié)議?對性能也有一定的影響。但是從我目前了解的情況來看,大多數(shù)RPC開源實現(xiàn)框架都是基于TCP、或者HTTP的,目測沒有采用UDP協(xié)議做為主要的傳輸協(xié)議的。 明白了RPC的使用原理和性能要求?,F(xiàn)在,我們能不能撇開那些RPC開源框架,自己動手開發(fā)一個高性能的RPC服務(wù)器呢?我想,還是可以的?,F(xiàn)在本人就使用Java,基于Netty,開發(fā)實現(xiàn)一個高性能的RPC服務(wù)器。 如何實現(xiàn)、基于什么原理?并發(fā)處理性能如何?請繼續(xù)接著看下文。 我們有的時候,為了提高單個節(jié)點的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首選的是NIO框架(No-block IO)。但是問題也來了,Java的NIO掌握起來要相當(dāng)?shù)募夹g(shù)功底,和足夠的技術(shù)積累,使用起來才能得心應(yīng)手。一般的開發(fā)人員,如果要使用NIO開發(fā)一個后端的TCP/HTTP服務(wù)器,附帶考慮TCP粘包、網(wǎng)絡(luò)通信異常、消息鏈接處理等等網(wǎng)絡(luò)通信細(xì)節(jié),開發(fā)門檻太高,所以比較明智的選擇是,采用業(yè)界主流的NIO框架進行服務(wù)器后端開發(fā)。主流的NIO框架主要有Netty、Mina。它們主要都是基于TCP通信,非阻塞的IO、靈活的IO線程池而設(shè)計的,應(yīng)對高并發(fā)請求也是綽綽有余。隨著Netty、Mina這樣優(yōu)秀的NIO框架,設(shè)計上日趨完善,Java后端高性能服務(wù)器開發(fā),在技術(shù)上提供了有力的支持保障,從而打破了C 在服務(wù)器后端,一統(tǒng)天下的局面。因為在此之前,Java的NIO一直受人詬病,讓人敬而遠(yuǎn)之! 既然,這個RPC服務(wù)器是基于Netty的,那就在說說Netty吧。實際上Netty是對JAVA NIO框架的再次封裝,它的開源網(wǎng)址是http:///,本文中使用的Netty版本是:4.0版本,可以通過http://dl./netty/downloads/netty-4.0.37.Final.tar.bz2,進行下載使用。那也許你會問,如何使用Netty進行RPC服務(wù)器的開發(fā)呢?實際不難,下面我就簡單的說明一下技術(shù)原理: 1、定義RPC請求消息、應(yīng)答消息結(jié)構(gòu),里面要包括RPC的接口定義模塊、包括遠(yuǎn)程調(diào)用的類名、方法名稱、參數(shù)結(jié)構(gòu)、參數(shù)值等信息。 2、服務(wù)端初始化的時候通過容器加載RPC接口定義和RPC接口實現(xiàn)類對象的映射關(guān)系,然后等待客戶端發(fā)起調(diào)用請求。 3、客戶端發(fā)起的RPC消息里面包含,遠(yuǎn)程調(diào)用的類名、方法名稱、參數(shù)結(jié)構(gòu)、參數(shù)值等信息,通過網(wǎng)絡(luò),以字節(jié)流的方式送給RPC服務(wù)端,RPC服務(wù)端接收到字節(jié)流的請求之后,去對應(yīng)的容器里面,查找客戶端接口映射的具體實現(xiàn)對象。 4、RPC服務(wù)端找到實現(xiàn)對象的參數(shù)信息,通過反射機制創(chuàng)建該對象的實例,并返回調(diào)用處理結(jié)果,最后封裝成RPC應(yīng)答消息通知到客戶端。 5、客戶端通過網(wǎng)絡(luò),收到字節(jié)流形式的RPC應(yīng)答消息,進行拆包、解析之后,顯示遠(yuǎn)程調(diào)用結(jié)果。 上面說的是很簡單,但是實現(xiàn)的時候,我們還要考慮如下的問題: 1、RPC服務(wù)器的傳輸層是基于TCP協(xié)議的,出現(xiàn)粘包咋辦?這樣客戶端的請求,服務(wù)端不是會解析失???好在Netty里面已經(jīng)提供了解決TCP粘包問題的解碼器:LengthFieldBasedFrameDecoder,可以靠它輕松搞定TCP粘包問題。 2、Netty服務(wù)端的線程模型是單線程、多線程(一個線程負(fù)責(zé)客戶端連接,連接成功之后,丟給后端IO的線程池處理)、還是主從模式(客戶端連接、后端IO處理都是基于線程池的實現(xiàn))。當(dāng)然在這里,我出于性能考慮,使用了Netty主從線程池模型。 3、Netty的IO處理線程池,如果遇到非常耗時的業(yè)務(wù),出現(xiàn)阻塞了咋辦?這樣不是很容易把后端的NIO線程給掛死、阻塞?本文的處理方式是,對于復(fù)雜的后端業(yè)務(wù),分派到專門的業(yè)務(wù)線程池里面,進行異步回調(diào)處理。 4、RPC消息的傳輸是通過字節(jié)流在NIO的通道(Channel)之間傳輸,那具體如何實現(xiàn)呢?本文,是通過基于Java原生對象序列化機制的編碼、解碼器(ObjectEncoder、ObjectDecoder)進行實現(xiàn)的。當(dāng)然出于性能考慮,這個可能不是最優(yōu)的方案。更優(yōu)的方案是把消息的編碼、解碼器,搞成可以配置實現(xiàn)的。具體比如可以通過:protobuf、JBoss Marshalling方式進行解碼和編碼,以提高網(wǎng)絡(luò)消息的傳輸效率。 5、RPC服務(wù)器要考慮多線程、高并發(fā)的使用場景,所以線程安全是必須的。此外盡量不要使用synchronized進行加鎖,改用輕量級的ReentrantLock方式進行代碼塊的條件加鎖。比如本文中的RPC消息處理回調(diào),就有這方面的使用。 6、RPC服務(wù)端的服務(wù)接口對象和服務(wù)接口實現(xiàn)對象要能輕易的配置,輕松進行加載、卸載。在這里,本文是通過Spring容器進行統(tǒng)一的對象管理。 綜上所述,本文設(shè)計的RPC服務(wù)器調(diào)用的流程圖如下所示: 客戶端并發(fā)發(fā)起RPC調(diào)用請求,然后RPC服務(wù)端使用Netty連接器,分派出N個NIO連接線程,這個時候Netty連接器的任務(wù)結(jié)束。然后NIO連接線程是統(tǒng)一放到Netty NIO處理線程池進行管理,這個線程池里面會對具體的RPC請求連接進行消息編碼、消息解碼、消息處理等等一系列操作。最后進行消息處理(Handler)的時候,處于性能考慮,這里的設(shè)計是,直接把復(fù)雜的消息處理過程,丟給專門的RPC業(yè)務(wù)處理線程池集中處理,然后Handler對應(yīng)的NIO線程就立即返回、不會阻塞。這個時候RPC調(diào)用結(jié)束,客戶端會異步等待服務(wù)端消息的處理結(jié)果,本文是通過消息回調(diào)機制實現(xiàn)(MessageCallBack)。 再來說一說Netty對于RPC消息的解碼、編碼、處理對應(yīng)的模塊和流程,具體如下圖所示:
從上圖可以看出客戶端、服務(wù)端對RPC消息編碼、解碼、處理調(diào)用的模塊以及調(diào)用順序了。Netty就是把這樣一個一個的處理器串在一起,形成一個責(zé)任鏈,統(tǒng)一進行調(diào)用。 說了這么多,現(xiàn)在先簡單看下,我設(shè)計實現(xiàn)的NettyRPC的代碼目錄層級結(jié)構(gòu): 其中newlandframework.netty.rpc.core包是NettyRPC的核心實現(xiàn)。newlandframework.netty.rpc.model包里面,則封裝了RPC消息請求、應(yīng)答報文結(jié)構(gòu),以及RPC服務(wù)接口與實現(xiàn)綁定關(guān)系的容器定義。newlandframework.netty.rpc.config里面定義了NettyRPC的服務(wù)端文件配置屬性。 下面先來看下newlandframework.netty.rpc.model包中定義的內(nèi)容。具體是RPC消息請求、應(yīng)答消息的結(jié)構(gòu)定義: RPC請求消息結(jié)構(gòu) /**
* @filename:MessageRequest.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服務(wù)請求結(jié)構(gòu)
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.model;
import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class MessageRequest implements Serializable {
private String messageId;
private String className;
private String methodName;
private Class<?>[] typeParameters;
private Object[] parametersVal;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getTypeParameters() {
return typeParameters;
}
public void setTypeParameters(Class<?>[] typeParameters) {
this.typeParameters = typeParameters;
}
public Object[] getParameters() {
return parametersVal;
}
public void setParameters(Object[] parametersVal) {
this.parametersVal = parametersVal;
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append('messageId', messageId).append('className', className)
.append('methodName', methodName).toString();
}
}
RPC應(yīng)答消息結(jié)構(gòu) /**
* @filename:MessageResponse.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服務(wù)應(yīng)答結(jié)構(gòu)
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.model;
import java.io.Serializable;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class MessageResponse implements Serializable {
private String messageId;
private String error;
private Object resultDesc;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
public Object getResult() {
return resultDesc;
}
public void setResult(Object resultDesc) {
this.resultDesc = resultDesc;
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append('messageId', messageId).append('error', error).toString();
}
}
RPC服務(wù)接口定義、服務(wù)接口實現(xiàn)綁定關(guān)系容器定義,提供給spring作為容器使用。 /**
* @filename:MessageKeyVal.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服務(wù)映射容器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.model;
import java.util.Map;
public class MessageKeyVal {
private Map<String, Object> messageKeyVal;
public void setMessageKeyVal(Map<String, Object> messageKeyVal) {
this.messageKeyVal = messageKeyVal;
}
public Map<String, Object> getMessageKeyVal() {
return messageKeyVal;
}
}
好了,定義好核心模型結(jié)構(gòu)之后,現(xiàn)在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的關(guān)鍵部分實現(xiàn)代碼,首先是業(yè)務(wù)線程池相關(guān)類的實現(xiàn)代碼,具體如下: 線程工廠定義實現(xiàn) /**
* @filename:NamedThreadFactory.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:線程工廠
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String prefix;
private final boolean daemoThread;
private final ThreadGroup threadGroup;
public NamedThreadFactory() {
this('rpcserver-threadpool-' threadNumber.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemo) {
this.prefix = prefix '-thread-';
daemoThread = daemo;
SecurityManager s = System.getSecurityManager();
threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = prefix mThreadNum.getAndIncrement();
Thread ret = new Thread(threadGroup, runnable, name, 0);
ret.setDaemon(daemoThread);
return ret;
}
public ThreadGroup getThreadGroup() {
return threadGroup;
}
}
業(yè)務(wù)線程池定義實現(xiàn) /**
* @filename:RpcThreadPool.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc線程池封裝
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RpcThreadPool {
//獨立出線程池主要是為了應(yīng)對復(fù)雜耗I/O操作的業(yè)務(wù),不阻塞netty的handler線程而引入
//當(dāng)然如果業(yè)務(wù)足夠簡單,把處理邏輯寫入netty的handler(ChannelInboundHandlerAdapter)也未嘗不可
public static Executor getExecutor(int threads, int queues) {
String name = 'RpcThreadPool';
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>()
: (queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name));
}
}
/**
* @filename:AbortPolicyWithReport.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:線程池異常策略
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
private final String threadName;
public AbortPolicyWithReport(String threadName) {
this.threadName = threadName;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format('RpcServer['
' Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d),'
' Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]',
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
System.out.println(msg);
throw new RejectedExecutionException(msg);
}
}
RPC調(diào)用客戶端定義實現(xiàn) /**
* @filename:MessageSendExecutor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客戶端執(zhí)行模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.lang.reflect.Proxy;
public class MessageSendExecutor {
private RpcServerLoader loader = RpcServerLoader.getInstance();
public MessageSendExecutor(String serverAddress) {
loader.load(serverAddress);
}
public void stop() {
loader.unLoad();
}
public static <T> T execute(Class<T> rpcInterface) {
return (T) Proxy.newProxyInstance(
rpcInterface.getClassLoader(),
new Class<?>[]{rpcInterface},
new MessageSendProxy<T>(rpcInterface)
);
}
}
這里的RPC客戶端實際上,是動態(tài)代理了MessageSendProxy,當(dāng)然這里是應(yīng)用了,JDK原生的動態(tài)代理實現(xiàn),你還可以改成CGLIB(Code Generation Library)方式。不過本人測試了一下CGLIB方式,在高并發(fā)的情況下面會出現(xiàn)空指針異常,但是同樣的情況,JDK原生的動態(tài)代理卻沒有問題。并發(fā)程度不高的情況下面,兩種代理方式都運行正常。后續(xù)再深入研究看看吧!廢話不說了,現(xiàn)在給出MessageSendProxy的實現(xiàn)方式 /**
* @filename:MessageSendProxy.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客戶端消息處理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.UUID;
import newlandframework.netty.rpc.model.MessageRequest;
public class MessageSendProxy<T> implements InvocationHandler {
private Class<T> cls;
public MessageSendProxy(Class<T> cls) {
this.cls = cls;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MessageRequest request = new MessageRequest();
request.setMessageId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setTypeParameters(method.getParameterTypes());
request.setParameters(args);
MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler();
MessageCallBack callBack = handler.sendRequest(request);
return callBack.start();
}
}
進一步發(fā)現(xiàn)MessageSendProxy其實是把消息發(fā)送給RpcServerLoader模塊,它的代碼如下: /**
* @filename:RpcServerLoader.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc服務(wù)器配置加載
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
public class RpcServerLoader {
private volatile static RpcServerLoader rpcServerLoader;
private final static String DELIMITER = ':';
private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE;
//方法返回到Java虛擬機的可用的處理器數(shù)量
private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
//netty nio線程池
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel);
private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
private MessageSendHandler messageSendHandler = null;
//等待Netty服務(wù)端鏈路建立通知信號
private Lock lock = new ReentrantLock();
private Condition signal = lock.newCondition();
private RpcServerLoader() {
}
//并發(fā)雙重鎖定
public static RpcServerLoader getInstance() {
if (rpcServerLoader == null) {
synchronized (RpcServerLoader.class) {
if (rpcServerLoader == null) {
rpcServerLoader = new RpcServerLoader();
}
}
}
return rpcServerLoader;
}
public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) {
String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
final InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol));
}
}
public void setMessageSendHandler(MessageSendHandler messageInHandler) {
try {
lock.lock();
this.messageSendHandler = messageInHandler;
//喚醒所有等待客戶端RPC線程
signal.signalAll();
} finally {
lock.unlock();
}
}
public MessageSendHandler getMessageSendHandler() throws InterruptedException {
try {
lock.lock();
//Netty服務(wù)端鏈路沒有建立完畢之前,先掛起等待
if (messageSendHandler == null) {
signal.await();
}
return messageSendHandler;
} finally {
lock.unlock();
}
}
public void unLoad() {
messageSendHandler.close();
threadPoolExecutor.shutdown();
eventLoopGroup.shutdownGracefully();
}
public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) {
this.serializeProtocol = serializeProtocol;
}
}
好了,現(xiàn)在一次性給出RPC客戶端消息編碼、解碼、處理的模塊實現(xiàn)代碼。 /**
* @filename:MessageSendInitializeTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客戶端線程任務(wù)處理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class MessageSendInitializeTask implements Runnable {
private EventLoopGroup eventLoopGroup = null;
private InetSocketAddress serverAddress = null;
private RpcServerLoader loader = null;
MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) {
this.eventLoopGroup = eventLoopGroup;
this.serverAddress = serverAddress;
this.loader = loader;
}
public void run() {
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new MessageSendChannelInitializer());
ChannelFuture channelFuture = b.connect(serverAddress);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class);
MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);
}
}
});
}
}
/**
* @filename:MessageSendChannelInitializer.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客戶端管道初始化
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> {
//ObjectDecoder 底層默認(rèn)繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候,
//消息頭開始即為長度字段,占據(jù)4個字節(jié)。這里出于保持兼容的考慮
final public static int MESSAGE_LENGTH = 4;
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。因為底層的父類LengthFieldBasedFrameDecoder
//的初始化參數(shù)即為super(maxObjectSize, 0, 4, 0, 4);
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH));
//利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
//考慮到并發(fā)性能,采用weakCachingConcurrentResolver緩存策略。一般情況使用:cacheDisabled即可
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageSendHandler());
}
}
/**
* @filename:MessageSendHandler.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc客戶端處理模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
public class MessageSendHandler extends ChannelInboundHandlerAdapter {
private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>();
private volatile Channel channel;
private SocketAddress remoteAddr;
public Channel getChannel() {
return channel;
}
public SocketAddress getRemoteAddr() {
return remoteAddr;
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remoteAddr = this.channel.remoteAddress();
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
this.channel = ctx.channel();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageResponse response = (MessageResponse) msg;
String messageId = response.getMessageId();
MessageCallBack callBack = mapCallBack.get(messageId);
if (callBack != null) {
mapCallBack.remove(messageId);
callBack.over(response);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void close() {
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
public MessageCallBack sendRequest(MessageRequest request) {
MessageCallBack callBack = new MessageCallBack(request);
mapCallBack.put(request.getMessageId(), callBack);
channel.writeAndFlush(request);
return callBack;
}
}
最后給出RPC服務(wù)端的實現(xiàn)。首先是通過spring自動加載RPC服務(wù)接口、接口實現(xiàn)容器綁定加載,初始化Netty主/從線程池等操作,具體是通過MessageRecvExecutor模塊實現(xiàn)的,現(xiàn)在給出實現(xiàn)代碼: /**
* @filename:MessageRecvExecutor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服務(wù)器執(zhí)行模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level;
import newlandframework.netty.rpc.model.MessageKeyVal;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean {
private String serverAddress;
private final static String DELIMITER = ':';
private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>();
private static ThreadPoolExecutor threadPoolExecutor;
public MessageRecvExecutor(String serverAddress) {
this.serverAddress = serverAddress;
}
public static void submit(Runnable task) {
if (threadPoolExecutor == null) {
synchronized (MessageRecvExecutor.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1);
}
}
}
threadPoolExecutor.submit(task);
}
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
try {
MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName('newlandframework.netty.rpc.model.MessageKeyVal'));
Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal();
Set s = rpcServiceObject.entrySet();
Iterator<Map.Entry<String, Object>> it = s.iterator();
Map.Entry<String, Object> entry;
while (it.hasNext()) {
entry = it.next();
handlerMap.put(entry.getKey(), entry.getValue());
}
} catch (ClassNotFoundException ex) {
java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void afterPropertiesSet() throws Exception {
//netty的線程池模型設(shè)置成主從線程池模式,這樣可以應(yīng)對高并發(fā)請求
//當(dāng)然netty還支持單線程、多線程網(wǎng)絡(luò)IO模型,可以根據(jù)業(yè)務(wù)需求靈活配置
ThreadFactory threadRpcFactory = new NamedThreadFactory('NettyRPC ThreadFactory');
//方法返回到Java虛擬機的可用的處理器數(shù)量
int parallel = Runtime.getRuntime().availableProcessors() * 2;
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new MessageRecvChannelInitializer(handlerMap))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER);
if (ipAddr.length == 2) {
String host = ipAddr[0];
int port = Integer.parseInt(ipAddr[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
System.out.printf('[author tangjie] Netty RPC Server start success ip:%s port:%d\n', host, port);
future.channel().closeFuture().sync();
} else {
System.out.printf('[author tangjie] Netty RPC Server start fail!\n');
}
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
最后還是老規(guī)矩,給出RPC服務(wù)端消息編碼、解碼、處理的核心模塊代碼實現(xiàn),具體如下: /**
* @filename:MessageRecvChannelInitializer.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服務(wù)端管道初始化
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.Map;
public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> {
//ObjectDecoder 底層默認(rèn)繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候,
//消息頭開始即為長度字段,占據(jù)4個字節(jié)。這里出于保持兼容的考慮
final public static int MESSAGE_LENGTH = 4;
private Map<String, Object> handlerMap = null;
MessageRecvChannelInitializer(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。因為底層的父類LengthFieldBasedFrameDecoder
//的初始化參數(shù)即為super(maxObjectSize, 0, 4, 0, 4);
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH));
//利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));
pipeline.addLast(new ObjectEncoder());
//考慮到并發(fā)性能,采用weakCachingConcurrentResolver緩存策略。一般情況使用:cacheDisabled即可
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new MessageRecvHandler(handlerMap));
}
}
/**
* @filename:MessageRecvHandler.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服務(wù)器消息處理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
public class MessageRecvHandler extends ChannelInboundHandlerAdapter {
private final Map<String, Object> handlerMap;
public MessageRecvHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRequest request = (MessageRequest) msg;
MessageResponse response = new MessageResponse();
MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx);
//不要阻塞nio線程,復(fù)雜的業(yè)務(wù)邏輯丟給專門的線程池
MessageRecvExecutor.submit(recvTask);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//網(wǎng)絡(luò)有異常要關(guān)閉通道
ctx.close();
}
}
/**
* @filename:MessageRecvInitializeTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc服務(wù)器消息線程任務(wù)處理
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.util.Map;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
import org.apache.commons.beanutils.MethodUtils;
public class MessageRecvInitializeTask implements Runnable {
private MessageRequest request = null;
private MessageResponse response = null;
private Map<String, Object> handlerMap = null;
private ChannelHandlerContext ctx = null;
public MessageResponse getResponse() {
return response;
}
public MessageRequest getRequest() {
return request;
}
public void setRequest(MessageRequest request) {
this.request = request;
}
MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap, ChannelHandlerContext ctx) {
this.request = request;
this.response = response;
this.handlerMap = handlerMap;
this.ctx = ctx;
}
public void run() {
response.setMessageId(request.getMessageId());
try {
Object result = reflect(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t.toString());
t.printStackTrace();
System.err.printf('RPC Server invoke error!\n');
}
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println('RPC Server Send message-id respone:' request.getMessageId());
}
});
}
private Object reflect(MessageRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
String methodName = request.getMethodName();
Object[] parameters = request.getParameters();
return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
}
}
然后是RPC消息處理的回調(diào)實現(xiàn)模塊代碼 /**
* @filename:MessageCallBack.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Rpc消息回調(diào)
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.core;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import newlandframework.netty.rpc.model.MessageRequest;
import newlandframework.netty.rpc.model.MessageResponse;
public class MessageCallBack {
private MessageRequest request;
private MessageResponse response;
private Lock lock = new ReentrantLock();
private Condition finish = lock.newCondition();
public MessageCallBack(MessageRequest request) {
this.request = request;
}
public Object start() throws InterruptedException {
try {
lock.lock();
//設(shè)定一下超時時間,rpc服務(wù)器太久沒有相應(yīng)的話,就默認(rèn)返回空吧。
finish.await(10*1000, TimeUnit.MILLISECONDS);
if (this.response != null) {
return this.response.getResult();
} else {
return null;
}
} finally {
lock.unlock();
}
}
public void over(MessageResponse reponse) {
try {
lock.lock();
finish.signal();
this.response = reponse;
} finally {
lock.unlock();
}
}
}
到此為止,NettyRPC的關(guān)鍵部分:服務(wù)端、客戶端的模塊已經(jīng)通過Netty全部實現(xiàn)了?,F(xiàn)在給出spring加載配置rpc-invoke-config.xml的內(nèi)容: <?xml version='1.0' encoding='UTF-8'?>
<beans xmlns='http://www./schema/beans'
xmlns:xsi='http://www./2001/XMLSchema-instance'
xmlns:context='http://www./schema/context'
xsi:schemaLocation='http://www./schema/beans
http://www./schema/beans/spring-beans.xsd
http://www./schema/context
http://www./schema/context/spring-context.xsd'>
<context:component-scan base-package='newlandframework.netty.rpc.core'/>
<context:property-placeholder location='classpath:newlandframework/netty/rpc/config/rpc-server.properties'/>
<bean id='rpcbean' class='newlandframework.netty.rpc.model.MessageKeyVal'>
<property name='messageKeyVal'>
<map>
<entry key='newlandframework.netty.rpc.servicebean.Calculate'>
<ref bean='calc'/>
</entry>
</map>
</property>
</bean>
<bean id='calc' class='newlandframework.netty.rpc.servicebean.CalculateImpl'/>
<bean id='rpcServer' class='newlandframework.netty.rpc.core.MessageRecvExecutor'>
<constructor-arg name='serverAddress' value='${rpc.server.addr}'/>
</bean>
</beans>
再貼出RPC服務(wù)綁定ip信息的配置文件:rpc-server.properties的內(nèi)容。 #rpc server's ip address config
rpc.server.addr=127.0.0.1:18888
最后NettyRPC服務(wù)端啟動方式參考如下: new ClassPathXmlApplicationContext('newlandframework/netty/rpc/config/rpc-invoke-config.xml');
如果一切順利,沒有出現(xiàn)意外的話,控制臺上面,會出現(xiàn)如下截圖所示的情況: 如果出現(xiàn)了,說明NettyRPC服務(wù)器,已經(jīng)啟動成功! 上面基于Netty的RPC服務(wù)器,并發(fā)處理性能如何呢?實踐是檢驗真理的唯一標(biāo)準(zhǔn),下面我們就來實戰(zhàn)一下。 下面的測試案例,是基于RPC遠(yuǎn)程調(diào)用兩數(shù)相加函數(shù),并返回計算結(jié)果??蛻舳送瑫r開1W個線程,同一時刻,瞬時發(fā)起并發(fā)計算請求,然后觀察Netty的RPC服務(wù)器是否有正常應(yīng)答回復(fù)響應(yīng),以及客戶端是否有正常返回調(diào)用計算結(jié)果。值得注意的是,測試案例是基于1W個線程瞬時并發(fā)請求而設(shè)計的,并不是1W個線程循環(huán)發(fā)起請求。這兩者對于衡量RPC服務(wù)器的并發(fā)處理性能,還是有很大差別的。當(dāng)然,前者對于并發(fā)性能的處理要求,要高上很多很多。 現(xiàn)在,先給出RPC計算接口、RPC計算接口實現(xiàn)類的代碼實現(xiàn): /**
* @filename:Calculate.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:計算器定義接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.servicebean;
public interface Calculate {
//兩數(shù)相加
int add(int a, int b);
}
/**
* @filename:CalculateImpl.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:計算器定義接口實現(xiàn)
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.servicebean;
public class CalculateImpl implements Calculate {
//兩數(shù)相加
public int add(int a, int b) {
return a b;
}
}
下面是瞬時并發(fā)RPC請求的測試樣例: /**
* @filename:CalcParallelRequestThread.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:并發(fā)線程模擬
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.servicebean;
import newlandframework.netty.rpc.core.MessageSendExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CalcParallelRequestThread implements Runnable {
private CountDownLatch signal;
private CountDownLatch finish;
private MessageSendExecutor executor;
private int taskNumber = 0;
public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
this.signal = signal;
this.finish = finish;
this.taskNumber = taskNumber;
this.executor = executor;
}
public void run() {
try {
signal.await();
Calculate calc = executor.execute(Calculate.class);
int add = calc.add(taskNumber, taskNumber);
System.out.println('calc add result:[' add ']');
finish.countDown();
} catch (InterruptedException ex) {
Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
/**
* @filename:RpcParallelTest.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:rpc并發(fā)測試代碼
* @author tangjie
* @version 1.0
*
*/
package newlandframework.netty.rpc.servicebean;
import java.util.concurrent.CountDownLatch;
import newlandframework.netty.rpc.core.MessageSendExecutor;
import org.apache.commons.lang.time.StopWatch;
public class RpcParallelTest {
public static void main(String[] args) throws Exception {
final MessageSendExecutor executor = new MessageSendExecutor('127.0.0.1:18888');
//并行度10000
int parallel = 10000;
//開始計時
StopWatch sw = new StopWatch();
sw.start();
CountDownLatch signal = new CountDownLatch(1);
CountDownLatch finish = new CountDownLatch(parallel);
for (int index = 0; index < parallel; index ) {
CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index);
new Thread(client).start();
}
//10000個并發(fā)線程瞬間發(fā)起請求操作
signal.countDown();
finish.await();
sw.stop();
String tip = String.format('RPC調(diào)用總共耗時: [%s] 毫秒', sw.getTime());
System.out.println(tip);
executor.stop();
}
}
好了,現(xiàn)在先啟動NettyRPC服務(wù)器,確認(rèn)沒有問題之后,運行并發(fā)RPC請求客戶端,看下客戶端打印的計算結(jié)果,以及處理耗時。 從上面來看,10000個瞬時RPC計算請求,總共耗時接近11秒。我們在來看下NettyRPC的服務(wù)端運行情況,如下所示: 可以很清楚地看到,RPC服務(wù)端都有收到客戶端發(fā)起的RPC計算請求,并返回消息應(yīng)答。 最后我們還是要分別驗證一下,RPC服務(wù)端是否存在丟包、粘包、IO阻塞的情況?1W個并發(fā)計算請求,是否成功接收處理并應(yīng)答了?實際情況說明一切,看下圖所示: 非常給力,RPC的服務(wù)端確實成功接收到了客戶端發(fā)起的1W筆瞬時并發(fā)計算請求,并且成功應(yīng)答處理了。并沒有出現(xiàn):丟包、粘包、IO阻塞的情況。再看下RPC客戶端,是否成功得到計算結(jié)果的應(yīng)答返回了呢? 很好,RPC的客戶端,確實收到了RPC服務(wù)端計算的1W筆加法請求的計算結(jié)果,而且耗時接近11秒。由此可見,基于Netty 業(yè)務(wù)線程池的NettyRPC服務(wù)器,應(yīng)對并發(fā)多線程RPC請求,處理起來是得心應(yīng)手,游刃有余! 最后,本文通過Netty這個NIO框架,實現(xiàn)了一個很簡單的“高性能”的RPC服務(wù)器,代碼雖然寫出來了,但是還是有一些值得改進的地方,比如: 1、對象序列化傳輸可以支持目前主流的序列化框架:protobuf、JBoss Marshalling、Avro等等。 2、Netty的線程模型可以根據(jù)業(yè)務(wù)需求,進行定制。因為,并不是每筆業(yè)務(wù)都需要這么強大的并發(fā)處理性能。 3、目前RPC計算只支持一個RPC服務(wù)接口映射綁定一個對應(yīng)的實現(xiàn),后續(xù)要支持一對多的情況。 4、業(yè)務(wù)線程池的啟動參數(shù)、線程池并發(fā)阻塞容器模型等等,可以配置化管理。 5、Netty的Handler處理部分,對于復(fù)雜的業(yè)務(wù)邏輯,現(xiàn)在是統(tǒng)一分派到特定的線程池進行后臺異步處理。當(dāng)然你還可以考慮JMS(消息隊列)方式進行解耦,統(tǒng)一分派給消息隊列的訂閱者,統(tǒng)一處理。目前實現(xiàn)JMS的開源框架也有很多,ActiveMQ、RocketMQ等等,都可以考慮。 本文實現(xiàn)的NettyRPC,對于面前的您而言,一定還有很多地方,可以加以完善和改進,優(yōu)化改進的工作就交給您自由發(fā)揮了。 由于本人技術(shù)能力、認(rèn)知水平有限。本文中有說不對的地方,懇請園友們批評指正!不吝賜教!最后,感謝面前的您,耐心的閱讀完本文,相信現(xiàn)在的你,對于Java開發(fā)高性能的服務(wù)端應(yīng)用,又有了一個更深入的了解!本文算是對我Netty學(xué)習(xí)成果的階段性總結(jié),后續(xù)有時間,我還會繼續(xù)推出Netty工業(yè)級開發(fā)的相關(guān)文章,敬請期待! PS:還有興趣的朋友可以參考、閱讀一下,我的另外一篇文章:Netty實現(xiàn)高性能RPC服務(wù)器優(yōu)化篇之消息序列化。此外,自從在博客園發(fā)表了兩篇:基于Netty開發(fā)高性能RPC服務(wù)器的文章之后,本人收到很多園友們索要源代碼進行學(xué)習(xí)交流的請求。為了方便大家,本人把NettyRPC的代碼開源托管到github上面,歡迎有興趣的朋友一起學(xué)習(xí)、研究! 附上NettyRPC項目的下載路徑:https://github.com/tang-jie/NettyRPC
Netty工業(yè)級開發(fā)系列文章進階:Netty構(gòu)建分布式消息隊列(AvatarMQ)設(shè)計指南之架構(gòu)篇 談?wù)勅绾问褂肗etty開發(fā)實現(xiàn)高性能的RPC服務(wù)器、Netty實現(xiàn)高性能RPC服務(wù)器優(yōu)化篇之消息序列化。這兩篇文章主要設(shè)計的思路是,基于Netty構(gòu)建了一個高性能的RPC服務(wù)器,而這些前期代碼的準(zhǔn)備工作,主要是為了設(shè)計、實現(xiàn)一個基于Netty的分布式消息隊列系統(tǒng)做鋪墊,本人把這個分布式消息隊列系統(tǒng),命名為:AvatarMQ。作為Netty工業(yè)級開發(fā)系列的進階篇,感興趣的朋友可以點擊關(guān)注:Netty構(gòu)建分布式消息隊列(AvatarMQ)設(shè)計指南之架構(gòu)篇,一定不會讓您失望! AvatarMQ項目開源網(wǎng)址:https://github.com/tang-jie/AvatarMQ。 |
|