Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器的示例代碼
一、什么是RPC?
RPC(Remote Procedure Call)遠(yuǎn)程過(guò)程調(diào)用,是一種進(jìn)程間的通信方式,其可以做到像調(diào)用本地方法那樣調(diào)用位于遠(yuǎn)程的計(jì)算機(jī)的服務(wù)。其實(shí)現(xiàn)的原理過(guò)程如下:
- 本地的進(jìn)程通過(guò)接口進(jìn)行本地方法調(diào)用。
- RPC客戶端將調(diào)用的接口名、接口方法、方法參數(shù)等信息利用網(wǎng)絡(luò)通信發(fā)送給RPC服務(wù)器。
- RPC服務(wù)器對(duì)請(qǐng)求進(jìn)行解析,根據(jù)接口名、接口方法、方法參數(shù)等信息找到對(duì)應(yīng)的方法實(shí)現(xiàn),并進(jìn)行本地方法調(diào)用,然后將方法調(diào)用結(jié)果響應(yīng)給RPC客戶端。
二、實(shí)現(xiàn)RPC需要解決那些問(wèn)題?
1. 約定通信協(xié)議格式
RPC分為客戶端與服務(wù)端,就像HTTP一樣,我們需要定義交互的協(xié)議格式。主要包括三個(gè)方面:
- 請(qǐng)求格式
- 響應(yīng)格式
- 網(wǎng)絡(luò)通信時(shí)數(shù)據(jù)的序列化方式
RPC請(qǐng)求
@Data
public class RpcRequest {
/**
* 請(qǐng)求ID 用來(lái)標(biāo)識(shí)本次請(qǐng)求以匹配RPC服務(wù)器的響應(yīng)
*/
private String requestId;
/**
* 調(diào)用的類(接口)權(quán)限定名稱
*/
private String className;
/**
* 調(diào)用的方法名
*/
private String methodName;
/**
* 方法參類型列表
*/
private Class<?>[] parameterTypes;
/**
* 方法參數(shù)
*/
private Object[] parameters;
}
RPC響應(yīng)
@Data
public class RpcResponse {
/**
* 響應(yīng)對(duì)應(yīng)的請(qǐng)求ID
*/
private String requestId;
/**
* 調(diào)用是否成功的標(biāo)識(shí)
*/
private boolean success = true;
/**
* 調(diào)用錯(cuò)誤信息
*/
private String errorMessage;
/**
* 調(diào)用結(jié)果
*/
private Object result;
}
2. 序列化方式
序列化方式可以使用JDK自帶的序列化方式或者一些第三方的序列化方式,JDK自帶的由于性能較差所以不推薦。我們這里選擇JSON作為序列化協(xié)議,即將請(qǐng)求和響應(yīng)對(duì)象序列化為JSON字符串后發(fā)送到對(duì)端,對(duì)端接收到后反序列為相應(yīng)的對(duì)象,這里采用阿里的 fastjson 作為JSON序列化框架。
3. TCP粘包、拆包
TCP是個(gè)“流”協(xié)議,所謂流,就是沒(méi)有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的流水,是連成一片的,其間并沒(méi)有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問(wèn)題。粘包和拆包需要應(yīng)用層程序來(lái)解決。
我們采用在請(qǐng)求和響應(yīng)的頭部保存消息體的長(zhǎng)度的方式解決粘包和拆包問(wèn)題。請(qǐng)求和響應(yīng)的格式如下:
+--------+----------------+ | Length | Content | | 4字節(jié) | Length個(gè)字節(jié) | +--------+----------------+
4. 網(wǎng)絡(luò)通信框架的選擇
出于性能的考慮,RPC一般選擇異步非阻塞的網(wǎng)絡(luò)通信方式,JDK自帶的NIO網(wǎng)絡(luò)編程操作繁雜,Netty是一款基于NIO開(kāi)發(fā)的網(wǎng)絡(luò)通信框架,其對(duì)java NIO進(jìn)行封裝對(duì)外提供友好的API,并且內(nèi)置了很多開(kāi)箱即用的組件,如各種編碼解碼器。所以我們采用Netty作為RPC服務(wù)的網(wǎng)絡(luò)通信框架。
三、RPC服務(wù)端
RPC分為客戶端和服務(wù)端,它們有一個(gè)共同的服務(wù)接口API,我們首先定義一個(gè)接口 HelloService
public interface HelloService {
String sayHello(String name);
}
然后服務(wù)端需要提供該接口的實(shí)現(xiàn)類,然后使用自定義的@RpcService注解標(biāo)注,該注解擴(kuò)展自@Component,被其標(biāo)注的類可以被Spring的容器管理。
@RpcService
public class HelloServiceImp implements HelloService {
@Override
public String sayHello(String name) {
return "Hello " + name;
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
}
RPC服務(wù)器類
我們實(shí)現(xiàn)了ApplicationContextAware接口,以便從bean容器中取出@RpcService實(shí)現(xiàn)類,存入我們的map容器中。
@Component
@Slf4j
public class RpcServer implements ApplicationContextAware, InitializingBean {
// RPC服務(wù)實(shí)現(xiàn)容器
private Map<String, Object> rpcServices = new HashMap<>();
@Value("${rpc.server.port}")
private int port;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> services = applicationContext.getBeansWithAnnotation(RpcService.class);
for (Map.Entry<String, Object> entry : services.entrySet()) {
Object bean = entry.getValue();
Class<?>[] interfaces = bean.getClass().getInterfaces();
for (Class<?> inter : interfaces) {
rpcServices.put(inter.getName(), bean);
}
}
log.info("加載RPC服務(wù)數(shù)量:{}", rpcServices.size());
}
@Override
public void afterPropertiesSet() {
start();
}
private void start(){
new Thread(() -> {
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new JsonDecoder());
pipeline.addLast(new JsonEncoder());
pipeline.addLast(new RpcInboundHandler(rpcServices));
}
})
.channel(NioServerSocketChannel.class);
ChannelFuture future = bootstrap.bind(port).sync();
log.info("RPC 服務(wù)器啟動(dòng), 監(jiān)聽(tīng)端口:" + port);
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}).start();
}
}
RpcServerInboundHandler 負(fù)責(zé)處理RPC請(qǐng)求
@Slf4j
public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> rpcServices;
public RpcServerInboundHandler(Map<String, Object> rpcServices){
this.rpcServices = rpcServices;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端連接成功,{}", ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
log.info("客戶端斷開(kāi)連接,{}", ctx.channel().remoteAddress());
ctx.channel().close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
RpcRequest rpcRequest = (RpcRequest) msg;
log.info("接收到客戶端請(qǐng)求, 請(qǐng)求接口:{}, 請(qǐng)求方法:{}", rpcRequest.getClassName(), rpcRequest.getMethodName());
RpcResponse response = new RpcResponse();
response.setRequestId(rpcRequest.getRequestId());
Object result = null;
try {
result = this.handleRequest(rpcRequest);
response.setResult(result);
} catch (Exception e) {
e.printStackTrace();
response.setSuccess(false);
response.setErrorMessage(e.getMessage());
}
log.info("服務(wù)器響應(yīng):{}", response);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("連接異常");
ctx.channel().close();
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
log.info("客戶端已超過(guò)60秒未讀寫(xiě)數(shù)據(jù), 關(guān)閉連接.{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
private Object handleRequest(RpcRequest rpcRequest) throws Exception{
Object bean = rpcServices.get(rpcRequest.getClassName());
if(bean == null){
throw new RuntimeException("未找到對(duì)應(yīng)的服務(wù): " + rpcRequest.getClassName());
}
Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
method.setAccessible(true);
return method.invoke(bean, rpcRequest.getParameters());
}
}
四、RPC客戶端
/**
* RPC遠(yuǎn)程調(diào)用的客戶端
*/
@Slf4j
@Component
public class RpcClient {
@Value("${rpc.remote.ip}")
private String remoteIp;
@Value("${rpc.remote.port}")
private int port;
private Bootstrap bootstrap;
// 儲(chǔ)存調(diào)用結(jié)果
private final Map<String, SynchronousQueue<RpcResponse>> results = new ConcurrentHashMap<>();
public RpcClient(){
}
@PostConstruct
public void init(){
bootstrap = new Bootstrap().remoteAddress(remoteIp, port);
NioEventLoopGroup worker = new NioEventLoopGroup(1);
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 10));
pipeline.addLast(new JsonEncoder());
pipeline.addLast(new JsonDecoder());
pipeline.addLast(new RpcClientInboundHandler(results));
}
});
}
public RpcResponse send(RpcRequest rpcRequest) {
RpcResponse rpcResponse = null;
rpcRequest.setRequestId(UUID.randomUUID().toString());
Channel channel = null;
try {
channel = bootstrap.connect().sync().channel();
log.info("連接建立, 發(fā)送請(qǐng)求:{}", rpcRequest);
channel.writeAndFlush(rpcRequest);
SynchronousQueue<RpcResponse> queue = new SynchronousQueue<>();
results.put(rpcRequest.getRequestId(), queue);
// 阻塞等待獲取響應(yīng)
rpcResponse = queue.take();
results.remove(rpcRequest.getRequestId());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel != null && channel.isActive()){
channel.close();
}
}
return rpcResponse;
}
}
RpcClientInboundHandler負(fù)責(zé)處理服務(wù)端的響應(yīng)
@Slf4j
public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter {
private Map<String, SynchronousQueue<RpcResponse>> results;
public RpcClientInboundHandler(Map<String, SynchronousQueue<RpcResponse>> results){
this.results = results;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse) msg;
log.info("收到服務(wù)器響應(yīng):{}", rpcResponse);
if(!rpcResponse.isSuccess()){
throw new RuntimeException("調(diào)用結(jié)果異常,異常信息:" + rpcResponse.getErrorMessage());
}
// 取出結(jié)果容器,將response放進(jìn)queue中
SynchronousQueue<RpcResponse> queue = results.get(rpcResponse.getRequestId());
queue.put(rpcResponse);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state() == IdleState.ALL_IDLE){
log.info("發(fā)送心跳包");
RpcRequest request = new RpcRequest();
request.setMethodName("heartBeat");
ctx.channel().writeAndFlush(request);
}
}else{
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
log.info("異常:{}", cause.getMessage());
ctx.channel().close();
}
}
接口代理
為了使客戶端像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程服務(wù),我們需要對(duì)接口進(jìn)行動(dòng)態(tài)代理。
代理類實(shí)現(xiàn)
@Component
public class RpcProxy implements InvocationHandler {
@Autowired
private RpcClient rpcClient;
@Override
public Object invoke(Object proxy, Method method, Object[] args){
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
rpcRequest.setParameterTypes(method.getParameterTypes());
RpcResponse rpcResponse = rpcClient.send(rpcRequest);
return rpcResponse.getResult();
}
}
實(shí)現(xiàn)FactoryBean接口,將生產(chǎn)動(dòng)態(tài)代理類納入 Spring 容器管理。
public class RpcFactoryBean<T> implements FactoryBean<T> {
private Class<T> interfaceClass;
@Autowired
private RpcProxy rpcProxy;
public RpcFactoryBean(Class<T> interfaceClass){
this.interfaceClass = interfaceClass;
}
@Override
public T getObject(){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy);
}
@Override
public Class<?> getObjectType() {
return interfaceClass;
}
}
自定義類路徑掃描器,掃描包下的RPC接口,動(dòng)態(tài)生產(chǎn)代理類,納入 Spring 容器管理
public class RpcScanner extends ClassPathBeanDefinitionScanner {
public RpcScanner(BeanDefinitionRegistry registry) {
super(registry);
}
@Override
protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages);
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
GenericBeanDefinition beanDefinition = (GenericBeanDefinition)beanDefinitionHolder.getBeanDefinition();
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName());
beanDefinition.setBeanClassName(RpcFactoryBean.class.getName());
}
return beanDefinitionHolders;
}
@Override
protected boolean isCandidateComponent(MetadataReader metadataReader) throws IOException {
return true;
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
}
@Component
public class RpcBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor {
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
RpcScanner rpcScanner = new RpcScanner(registry);
// 傳入RPC接口所在的包名
rpcScanner.scan("com.ygd.rpc.common.service");
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
}
JSON編解碼器
/**
* 將 RpcRequest 編碼成字節(jié)序列發(fā)送
* 消息格式: Length + Content
* Length使用int存儲(chǔ),標(biāo)識(shí)消息體的長(zhǎng)度
*
* +--------+----------------+
* | Length | Content |
* | 4字節(jié) | Length個(gè)字節(jié) |
* +--------+----------------+
*/
public class JsonEncoder extends MessageToByteEncoder<RpcRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, RpcRequest rpcRequest, ByteBuf out){
byte[] bytes = JSON.toJSONBytes(rpcRequest);
// 將消息體的長(zhǎng)度寫(xiě)入消息頭部
out.writeInt(bytes.length);
// 寫(xiě)入消息體
out.writeBytes(bytes);
}
}
/**
* 將響應(yīng)消息解碼成 RpcResponse
*/
public class JsonDecoder extends LengthFieldBasedFrameDecoder {
public JsonDecoder(){
super(Integer.MAX_VALUE, 0, 4, 0, 4);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf msg = (ByteBuf) super.decode(ctx, in);
byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
RpcResponse rpcResponse = JSON.parseObject(bytes, RpcResponse.class);
return rpcResponse;
}
}
測(cè)試
我們編寫(xiě)一個(gè)Controller進(jìn)行測(cè)試
@RestController
@RequestMapping("/hello")
public class HelloController {
@Autowired
private HelloService helloService;
@GetMapping("/sayHello")
public String hello(String name){
return helloService.sayHello(name);
}
}
通過(guò) PostMan調(diào)用 controller 接口 http://localhost:9998/hello/sayHello?name=小明
響應(yīng): Hello 小明
總結(jié)
本文實(shí)現(xiàn)了一個(gè)簡(jiǎn)易的、具有基本概念的RPC,主要涉及的知識(shí)點(diǎn)如下:
- 網(wǎng)絡(luò)通信及通信協(xié)議的編碼、解碼
- Java對(duì)象的序列化及反序列化
- 通信鏈路心跳檢測(cè)
- Java反射
- JDK動(dòng)態(tài)代理
項(xiàng)目完整代碼詳見(jiàn):https://github.com/yinguodong/netty-rpc
到此這篇關(guān)于Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器的示例代碼的文章就介紹到這了,更多相關(guān)Springboot RPC服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot?3.x?集成?Feign的詳細(xì)過(guò)程
本文闡述了如何在SpringBoot3.x中集成Feign,以實(shí)現(xiàn)微服務(wù)之間的調(diào)用,主要步驟包括:搭建chain-common服務(wù),創(chuàng)建chain-starter/chain-feign-starter服務(wù),集成Feign到chain-system和chain-iot-channel服務(wù),配置Feign,感興趣的朋友一起看看吧2024-09-09
Spring Boot整合MyBatis-Flex全過(guò)程
這篇文章主要介紹了Spring Boot整合MyBatis-Flex全過(guò)程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
Spring Boot集成Druid數(shù)據(jù)庫(kù)連接池
這篇文章主要介紹了Spring Boot集成Druid數(shù)據(jù)庫(kù)連接池,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-04-04
Spring AOP的概念與實(shí)現(xiàn)過(guò)程詳解
AOP為Aspect Oriented Programming的縮寫(xiě),意為:面向切面編程,可通過(guò)運(yùn)行期動(dòng)態(tài)代理實(shí)現(xiàn)程序功能的統(tǒng)一維護(hù)的一種技術(shù)。AOP是 Spring框架中的一個(gè)重要內(nèi)容2023-02-02
關(guān)于SpringBoot大文件RestTemplate下載解決方案
這篇文章主要介紹了SpringBoot大文件RestTemplate下載解決方案,最近結(jié)合網(wǎng)上案例及自己總結(jié),寫(xiě)了一個(gè)分片下載tuling/fileServer項(xiàng)目,需要的朋友可以參考下2021-10-10
Spring Boot 2.0快速構(gòu)建服務(wù)組件全步驟
這篇文章主要給大家介紹了關(guān)于Spring Boot 2.0快速構(gòu)建服務(wù)組件的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Spring Boot 2.0具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
利用Java異常機(jī)制實(shí)現(xiàn)模擬借書(shū)系統(tǒng)
這篇文章主要給大家介紹了利用Java異常機(jī)制實(shí)現(xiàn)模擬借書(shū)系統(tǒng)的相關(guān)資料,文中先對(duì)java異常機(jī)制進(jìn)行了簡(jiǎn)單介紹,而后通過(guò)示例代碼介紹了java語(yǔ)言是如何實(shí)現(xiàn)一個(gè)控制臺(tái)版的模擬借書(shū)系統(tǒng),需要的朋友可以參考學(xué)習(xí),一起來(lái)看看吧。2017-04-04

