利用SpringBoot實(shí)現(xiàn)一個(gè)基于本地代理模式的RPC調(diào)用框架
在微服務(wù)架構(gòu)中,服務(wù)間通信是一個(gè)核心問題。
雖然Dubbo、gRPC等成熟框架已經(jīng)為我們提供了完整的RPC解決方案,但理解其底層原理并動(dòng)手實(shí)現(xiàn)一個(gè)簡化版本,對(duì)提升我們的技術(shù)理解深度很有幫助。
本文將帶你從零開始,使用SpringBoot實(shí)現(xiàn)一個(gè)基于本地代理模式的RPC調(diào)用框架。
整體設(shè)計(jì)思路
我們的RPC框架采用經(jīng)典的代理模式設(shè)計(jì):
接口定義:定義服務(wù)接口,客戶端和服務(wù)端共享
動(dòng)態(tài)代理:客戶端通過JDK動(dòng)態(tài)代理生成接口實(shí)現(xiàn)類
序列化:使用JSON進(jìn)行數(shù)據(jù)序列化傳輸
網(wǎng)絡(luò)通信:基于HTTP協(xié)議進(jìn)行服務(wù)間通信
服務(wù)注冊(cè):服務(wù)端暴露接口實(shí)現(xiàn),客戶端動(dòng)態(tài)發(fā)現(xiàn)
核心代碼實(shí)現(xiàn)
1. 定義RPC注解
首先創(chuàng)建用于標(biāo)識(shí)RPC服務(wù)的注解:
// RpcService.java - 服務(wù)提供者注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class<?> value() default void.class;
String version() default "1.0";
}
// RpcReference.java - 服務(wù)消費(fèi)者注解
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcReference {
String version() default "1.0";
long timeout() default 5000;
}
2. RPC請(qǐng)求響應(yīng)模型
定義網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)結(jié)構(gòu):
// RpcRequest.java
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] parameters;
private String version;
// 構(gòu)造函數(shù)
public RpcRequest() {
this.requestId = UUID.randomUUID().toString();
}
// getter/setter方法
public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public String getClassName() { return className; }
public void setClassName(String className) { this.className = className; }
public String getMethodName() { return methodName; }
public void setMethodName(String methodName) { this.methodName = methodName; }
public Class<?>[] getParameterTypes() { return parameterTypes; }
public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; }
public Object[] getParameters() { return parameters; }
public void setParameters(Object[] parameters) { this.parameters = parameters; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
}
// RpcResponse.java
public class RpcResponse {
private String requestId;
private Object result;
private String error;
private boolean success;
public RpcResponse() {}
public RpcResponse(String requestId) {
this.requestId = requestId;
}
// getter/setter方法
public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public Object getResult() { return result; }
public void setResult(Object result) {
this.result = result;
this.success = true;
}
public String getError() { return error; }
public void setError(String error) {
this.error = error;
this.success = false;
}
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
}
3. 服務(wù)注冊(cè)中心
實(shí)現(xiàn)簡單的本地服務(wù)注冊(cè)機(jī)制:
// ServiceRegistry.java
@Component
public class ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
// 服務(wù)實(shí)例注冊(cè)表:接口名 -> 服務(wù)實(shí)現(xiàn)實(shí)例
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
/**
* 注冊(cè)服務(wù)實(shí)例
*/
public void registerService(Class<?> serviceInterface, String version, Object serviceImpl) {
String serviceName = generateServiceName(serviceInterface, version);
serviceMap.put(serviceName, serviceImpl);
logger.info("注冊(cè)服務(wù)成功: {} -> {}", serviceName, serviceImpl.getClass().getName());
}
/**
* 獲取服務(wù)實(shí)例
*/
public Object getService(String className, String version) {
String serviceName = generateServiceName(className, version);
Object service = serviceMap.get(serviceName);
if (service == null) {
logger.warn("未找到服務(wù): {}", serviceName);
}
return service;
}
/**
* 生成服務(wù)名稱
*/
private String generateServiceName(Class<?> serviceInterface, String version) {
return generateServiceName(serviceInterface.getName(), version);
}
private String generateServiceName(String className, String version) {
return className + ":" + version;
}
/**
* 獲取所有已注冊(cè)的服務(wù)
*/
public Set<String> getAllServices() {
return new HashSet<>(serviceMap.keySet());
}
}
4. RPC客戶端代理工廠
這是框架的核心,通過動(dòng)態(tài)代理實(shí)現(xiàn)透明的遠(yuǎn)程調(diào)用:
// RpcClientProxy.java
@Component
public class RpcClientProxy {
private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);
@Autowired
private RpcClient rpcClient;
/**
* 為指定接口創(chuàng)建代理實(shí)例
*/
@SuppressWarnings("unchecked")
public <T> T createProxy(Class<T> interfaceClass, String version, long timeout) {
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class[]{interfaceClass},
new RpcInvocationHandler(interfaceClass, version, timeout, rpcClient)
);
}
/**
* 動(dòng)態(tài)代理調(diào)用處理器
*/
private static class RpcInvocationHandler implements InvocationHandler {
private final Class<?> interfaceClass;
private final String version;
private final long timeout;
private final RpcClient rpcClient;
public RpcInvocationHandler(Class<?> interfaceClass, String version, long timeout, RpcClient rpcClient) {
this.interfaceClass = interfaceClass;
this.version = version;
this.timeout = timeout;
this.rpcClient = rpcClient;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 跳過Object類的基礎(chǔ)方法
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
// 構(gòu)建RPC請(qǐng)求
RpcRequest request = buildRpcRequest(method, args);
try {
// 發(fā)送遠(yuǎn)程調(diào)用請(qǐng)求
RpcResponse response = rpcClient.sendRequest(request, timeout);
if (response.isSuccess()) {
return response.getResult();
} else {
throw new RuntimeException("RPC調(diào)用失敗: " + response.getError());
}
} catch (Exception e) {
logger.error("RPC調(diào)用異常: {}.{}", interfaceClass.getName(), method.getName(), e);
throw new RuntimeException("RPC調(diào)用異常", e);
}
}
/**
* 構(gòu)建RPC請(qǐng)求對(duì)象
*/
private RpcRequest buildRpcRequest(Method method, Object[] args) {
RpcRequest request = new RpcRequest();
request.setClassName(interfaceClass.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
request.setVersion(version);
return request;
}
}
}
5. RPC網(wǎng)絡(luò)客戶端
負(fù)責(zé)實(shí)際的網(wǎng)絡(luò)通信:
// RpcClient.java
@Component
public class RpcClient {
private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
@Autowired
private RestTemplate restTemplate;
@Value("${rpc.server.url:http://localhost:8080}")
private String serverUrl;
/**
* 發(fā)送RPC請(qǐng)求
*/
public RpcResponse sendRequest(RpcRequest request, long timeout) {
try {
logger.debug("發(fā)送RPC請(qǐng)求: {}.{}", request.getClassName(), request.getMethodName());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<RpcRequest> entity = new HttpEntity<>(request, headers);
// 發(fā)送HTTP POST請(qǐng)求
ResponseEntity<RpcResponse> responseEntity = restTemplate.postForEntity(
serverUrl + "/rpc/invoke",
entity,
RpcResponse.class
);
RpcResponse response = responseEntity.getBody();
logger.debug("收到RPC響應(yīng): requestId={}, success={}",
response.getRequestId(), response.isSuccess());
return response;
} catch (Exception e) {
logger.error("RPC請(qǐng)求發(fā)送失敗", e);
RpcResponse errorResponse = new RpcResponse(request.getRequestId());
errorResponse.setError("網(wǎng)絡(luò)請(qǐng)求失敗: " + e.getMessage());
return errorResponse;
}
}
}
6. RPC服務(wù)端處理器
處理客戶端發(fā)來的RPC請(qǐng)求:
@RestController
@RequestMapping("/rpc")
public class RpcRequestHandler {
private static final Logger logger = LoggerFactory.getLogger(RpcRequestHandler.class);
@Autowired
private ServiceRegistry serviceRegistry;
/**
* 處理RPC調(diào)用請(qǐng)求
*/
@PostMapping("/invoke")
public RpcResponse handleRpcRequest(@RequestBody RpcRequest request) {
RpcResponse response = new RpcResponse(request.getRequestId());
try {
logger.debug("處理RPC請(qǐng)求: {}.{}", request.getClassName(), request.getMethodName());
// 查找服務(wù)實(shí)例
Object serviceInstance = serviceRegistry.getService(request.getClassName(), request.getVersion());
if (serviceInstance == null) {
response.setError("服務(wù)未找到: " + request.getClassName());
return response;
}
// 通過反射調(diào)用方法
Class<?> serviceClass = serviceInstance.getClass();
Method method = serviceClass.getMethod(request.getMethodName(), request.getParameterTypes());
Object[] parameters = request.getParameters();
Class<?>[] paramTypes = method.getParameterTypes();
for (int i = 0; i < parameters.length; i++) {
if (parameters[i] != null && ClassUtil.isBasicType(paramTypes[i])) {
// 處理基本類型轉(zhuǎn)換(如客戶端傳的是包裝類型,服務(wù)端是基本類型)
parameters[i] = convertType(paramTypes[i], parameters[i]);
}
}
Object result = method.invoke(serviceInstance, parameters);
response.setResult(result);
logger.debug("RPC調(diào)用成功: {}.{}", request.getClassName(), request.getMethodName());
} catch (Exception e) {
logger.error("RPC調(diào)用處理異常", e);
response.setError("方法調(diào)用異常: " + e.getMessage());
}
return response;
}
/**
* 類型轉(zhuǎn)換處理(支持基本類型和包裝類互轉(zhuǎn))
*/
private Object convertType(Class<?> targetType, Object value) {
// 處理null值
if (value == null) return null;
// 類型匹配時(shí)直接返回
if (targetType.isInstance(value)) {
return value;
}
// 處理數(shù)字類型轉(zhuǎn)換
if (value instanceof Number) {
Number number = (Number) value;
if (targetType == int.class || targetType == Integer.class) return number.intValue();
if (targetType == long.class || targetType == Long.class) return number.longValue();
if (targetType == double.class || targetType == Double.class) return number.doubleValue();
if (targetType == float.class || targetType == Float.class) return number.floatValue();
if (targetType == byte.class || targetType == Byte.class) return number.byteValue();
if (targetType == short.class || targetType == Short.class) return number.shortValue();
}
// 處理布爾類型轉(zhuǎn)換
if (targetType == boolean.class || targetType == Boolean.class) {
if (value instanceof Boolean) return value;
return Boolean.parseBoolean(value.toString());
}
// 處理字符類型轉(zhuǎn)換
if (targetType == char.class || targetType == Character.class) {
String str = value.toString();
if (!str.isEmpty()) return str.charAt(0);
}
throw new IllegalArgumentException(String.format(
"類型轉(zhuǎn)換失敗: %s -> %s",
value.getClass().getSimpleName(),
targetType.getSimpleName()
));
}
/**
* 查詢已注冊(cè)的服務(wù)列表
*/
@GetMapping("/services")
public Set<String> getRegisteredServices() {
return serviceRegistry.getAllServices();
}
}
7. 自動(dòng)配置和Bean后處理器
實(shí)現(xiàn)Spring Boot的自動(dòng)裝配:
// RpcAutoConfiguration.java
@Configuration
@ComponentScan(basePackages = "com.example.rpc")
public class RpcAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
// 設(shè)置連接超時(shí)
HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
factory.setConnectTimeout(3000);
factory.setReadTimeout(10000);
restTemplate.setRequestFactory(factory);
return restTemplate;
}
}
// RpcServiceProcessor.java - 處理@RpcService注解
@Component
public class RpcServiceProcessor implements BeanPostProcessor, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(RpcServiceProcessor.class);
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
// 檢查是否有@RpcService注解
RpcService rpcService = beanClass.getAnnotation(RpcService.class);
if (rpcService != null) {
registerRpcService(bean, rpcService);
}
return bean;
}
/**
* 注冊(cè)RPC服務(wù)
*/
private void registerRpcService(Object serviceBean, RpcService rpcService) {
ServiceRegistry serviceRegistry = applicationContext.getBean(ServiceRegistry.class);
Class<?> interfaceClass = rpcService.value();
if (interfaceClass == void.class) {
// 如果沒有指定接口,自動(dòng)查找第一個(gè)接口
Class<?>[] interfaces = serviceBean.getClass().getInterfaces();
if (interfaces.length > 0) {
interfaceClass = interfaces[0];
} else {
logger.warn("無法確定服務(wù)接口: {}", serviceBean.getClass().getName());
return;
}
}
serviceRegistry.registerService(interfaceClass, rpcService.version(), serviceBean);
}
}
// RpcReferenceProcessor.java - 處理@RpcReference注解
@Component
public class RpcReferenceProcessor implements BeanPostProcessor, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(RpcReferenceProcessor.class);
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
// 處理所有標(biāo)注了@RpcReference的字段
Field[] fields = beanClass.getDeclaredFields();
for (Field field : fields) {
RpcReference rpcReference = field.getAnnotation(RpcReference.class);
if (rpcReference != null) {
injectRpcReference(bean, field, rpcReference);
}
}
return bean;
}
/**
* 注入RPC服務(wù)代理
*/
private void injectRpcReference(Object bean, Field field, RpcReference rpcReference) {
try {
RpcClientProxy rpcClientProxy = applicationContext.getBean(RpcClientProxy.class);
Class<?> interfaceClass = field.getType();
Object proxyInstance = rpcClientProxy.createProxy(
interfaceClass,
rpcReference.version(),
rpcReference.timeout()
);
field.setAccessible(true);
field.set(bean, proxyInstance);
logger.info("注入RPC服務(wù)代理: {}", interfaceClass.getName());
} catch (Exception e) {
logger.error("RPC服務(wù)代理注入失敗: {}", field.getName(), e);
throw new RuntimeException("RPC服務(wù)代理注入失敗", e);
}
}
}
使用示例
1. 定義服務(wù)接口
// UserService.java
public interface UserService {
String getUserName(Long userId);
boolean updateUser(Long userId, String name);
List<String> getUserList();
}
2. 實(shí)現(xiàn)服務(wù)提供者
// UserServiceImpl.java
@RpcService(UserService.class)
public class UserServiceImpl implements UserService {
private static final Map<Long, String> userDatabase = new ConcurrentHashMap<>();
static {
userDatabase.put(1L, "張三");
userDatabase.put(2L, "李四");
userDatabase.put(3L, "王五");
}
@Override
public String getUserName(Long userId) {
String userName = userDatabase.get(userId);
return userName != null ? userName : "用戶不存在";
}
@Override
public boolean updateUser(Long userId, String name) {
if (userDatabase.containsKey(userId)) {
userDatabase.put(userId, name);
return true;
}
return false;
}
@Override
public List<String> getUserList() {
return new ArrayList<>(userDatabase.values());
}
}
3. 創(chuàng)建服務(wù)消費(fèi)者
// UserController.java
@RestController
@RequestMapping("/user")
public class UserController {
@RpcReference
private UserService userService;
@GetMapping("/{userId}")
public String getUser(@PathVariable Long userId) {
return userService.getUserName(userId);
}
@PostMapping("/{userId}")
public boolean updateUser(@PathVariable Long userId, @RequestParam String name) {
return userService.updateUser(userId, name);
}
@GetMapping("/list")
public List<String> getUserList() {
return userService.getUserList();
}
}
4. 啟動(dòng)類配置
// Application.java
@SpringBootApplication
@EnableAutoConfiguration
public class RpcDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RpcDemoApplication.class, args);
}
}
5. 配置文件
# application.yml
server:
port: 8080
rpc:
server:
url: http://localhost:8080
logging:
level:
com.example.rpc: DEBUG
測(cè)試驗(yàn)證
啟動(dòng)應(yīng)用后,可以通過以下方式測(cè)試:
# 查詢用戶信息 curl http://localhost:8080/user/1 # 更新用戶信息 curl -X POST "http://localhost:8080/user/1?name=新名字" # 獲取用戶列表 curl http://localhost:8080/user/list # 查看已注冊(cè)的服務(wù) curl http://localhost:8080/rpc/services
總結(jié)
這個(gè)實(shí)現(xiàn)雖然相對(duì)簡單,但完整展現(xiàn)了RPC框架的核心思想。
在實(shí)際項(xiàng)目中,建議使用成熟的RPC框架如Dubbo或Spring Cloud,但理解底層原理對(duì)我們選擇和優(yōu)化技術(shù)方案很有價(jià)值。
到此這篇關(guān)于利用SpringBoot實(shí)現(xiàn)一個(gè)基于本地代理模式的RPC調(diào)用框架的文章就介紹到這了,更多相關(guān)SpringBoot實(shí)現(xiàn)RPC調(diào)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 基于SpringBoot實(shí)現(xiàn)REST?API與RPC調(diào)用的統(tǒng)一封裝
- SpringBoot微服務(wù)中集成gRPC的實(shí)踐指南
- Springboot整合Dubbo+Nacos實(shí)現(xiàn)RPC調(diào)用的示例代碼
- 手把手教你寫一個(gè)SpringBoot+gRPC服務(wù)
- springboot+HttpInvoke?實(shí)現(xiàn)RPC調(diào)用的方法
- 詳解SpringBoot中使用RabbitMQ的RPC功能
- SpringBoot整合Dubbo框架,實(shí)現(xiàn)RPC服務(wù)遠(yuǎn)程調(diào)用
相關(guān)文章
Java使用JavaMail API發(fā)送和接收郵件的代碼示例
JavaMail是Oracle甲骨文開發(fā)的Java郵件類API,支持多種郵件協(xié)議,這里我們就來看一下Java使用JavaMail API發(fā)送和接收郵件的代碼示例2016-06-06
從零開始學(xué)SpringBoot如何開始使用圖文詳解
這篇文章主要介紹了從零開始學(xué)SpringBoot如何開始使用,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09
Springmvc nginx實(shí)現(xiàn)動(dòng)靜分離過程詳解
這篇文章主要介紹了Springmvc nginx實(shí)現(xiàn)動(dòng)靜分離過程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
一文詳解Java中的反射與new創(chuàng)建對(duì)象
Java中的反射(Reflection)和使用new關(guān)鍵字創(chuàng)建對(duì)象是兩種不同的對(duì)象創(chuàng)建方式,各有優(yōu)缺點(diǎn)和適用場景,本文小編給大家詳細(xì)介紹了Java中的反射與new創(chuàng)建對(duì)象,感興趣的小伙伴跟著小編一起來看看吧2024-07-07
Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過程解析
這篇文章主要介紹了Java lambda表達(dá)式實(shí)現(xiàn)Flink WordCount過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02

