Spring Boot中使用RSocket的示例代碼
1. 概述
RSocket 應(yīng)用層協(xié)議支持 Reactive Streams 語(yǔ)義, 例如:用RSocket作為HTTP的一種替代方案。在本教程中, 我們將看到 RSocket 用在spring boot中,特別是spring boot 如何幫助抽象出更低級(jí)別的RSocket API。
2. 依賴
讓我們從添加 spring-boot-starter-rsocket 依賴開始:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency>
這個(gè)依賴會(huì)傳遞性的拉取 RSocket 相關(guān)的依賴,比如: rsocket-core 和 rsocket-transport-netty
3.示例的應(yīng)用程序
現(xiàn)在繼續(xù)我們的簡(jiǎn)單應(yīng)用程序。為了突出 RSocket 提供的交互模式,我打算創(chuàng)建一個(gè)交易應(yīng)用程序, 交易應(yīng)用程序包括客戶端和服務(wù)器。
3.1. 服務(wù)器設(shè)置
首先,我們?cè)O(shè)置由springboot應(yīng)用程序引導(dǎo)的 RSocket server 服務(wù)器。 因?yàn)橛?nbsp;spring-boot-starter-rsocket dependency 依賴,所以springboot會(huì)自動(dòng)配置 RSocket server 。 跟平常一樣, 可以用屬性驅(qū)動(dòng)的方式修改 RSocket server 默認(rèn)配置值。例如:通過(guò)增加如下配置在 application.properties 中,來(lái)修改 RSocket 端口
spring.rsocket.server.port=7000
也可以根據(jù)需要進(jìn)一步修改服務(wù)器的其他屬性
3.2.設(shè)置客戶端
接下來(lái),我們來(lái)設(shè)置客戶端,也是一個(gè)springboot應(yīng)用程序。雖然springboot自動(dòng)配置大部分RSocket相關(guān)的組件,但還要自定義一些對(duì)象來(lái)完成設(shè)置。
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
return RSocketFactory
.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
這兒我們正在創(chuàng)建 RSocket 客戶端并且配置TCP端口為:7000。注意: 該服務(wù)端口我們?cè)谇懊嬉呀?jīng)配置過(guò)。 接下來(lái)我們定義了一個(gè)RSocket的裝飾器對(duì)象 RSocketRequester 。 這個(gè)對(duì)象在我們跟 RSocket server 交互時(shí)會(huì)為我們提供幫助。 定義這些對(duì)象配置后,我們還只是有了一個(gè)骨架。在接下來(lái),我們將暴露不同的交互模式, 并看看springboot在這個(gè)地方提供幫助的。
4. springboot RSocket 中的 Request/Response
我們從 Request/Response 開始, HTTP 也使用這種通信方式,這也是最常見的、最相似的交互模式。 在這種交互模式里, 由客戶端初始化通信并發(fā)送一個(gè)請(qǐng)求。之后,服務(wù)器端執(zhí)行操作并返回一個(gè)響應(yīng)給客戶端--這時(shí)通信完成。 在我們的交易應(yīng)用程序里, 一個(gè)客戶端詢問(wèn)一個(gè)給定的股票的當(dāng)前的市場(chǎng)數(shù)據(jù)。 作為回復(fù),服務(wù)器會(huì)傳遞請(qǐng)求的數(shù)據(jù)。
4.1.服務(wù)器
在服務(wù)器這邊,我們首先應(yīng)該創(chuàng)建一個(gè) controller 來(lái)持有我們的處理器方法。 我們會(huì)使用 @MessageMapping 注解來(lái)代替像SpringMVC中的 @RequestMapping 或者 @GetMapping 注解
@Controller
public class MarketDataRSocketController {
private final MarketDataRepository marketDataRepository;
public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
this.marketDataRepository = marketDataRepository;
}
@MessageMapping("currentMarketData")
public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getOne(marketDataRequest.getStock());
}
}
來(lái)研究下我們的控制器。 我們將使用 @Controller 注解來(lái)定義一個(gè)控制器來(lái)處理進(jìn)入RSocket的請(qǐng)求。 另外,注解 @MessageMapping 讓我們定義我們感興趣的路由和如何響應(yīng)一個(gè)請(qǐng)求。 在這個(gè)示例中, 服務(wù)器監(jiān)聽路由 currentMarketData , 并響應(yīng)一個(gè)單一的結(jié)果 Mono<MarketData> 給客戶端。
4.2. 客戶端
接下來(lái), 我們的RSocket客戶端應(yīng)該詢問(wèn)一只股票的價(jià)格并得到一個(gè)單一的響應(yīng)。 為了初始化請(qǐng)求, 我們?cè)撌褂?nbsp;RSocketRequester 類,如下:
@RestController
public class MarketDataRestController {
private final RSocketRequester rSocketRequester;
public MarketDataRestController(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@GetMapping(value = "/current/{stock}")
public Publisher<MarketData> current(@PathVariable("stock") String stock) {
return rSocketRequester
.route("currentMarketData")
.data(new MarketDataRequest(stock))
.retrieveMono(MarketData.class);
}
}
注意:在示例中, RSocket 客戶端也是一個(gè) REST 風(fēng)格的 controller ,以此來(lái)訪問(wèn)我們的 RSocket 服務(wù)器。因此,我們使用 @RestController 和 @GetMapping 注解來(lái)定義我們的請(qǐng)求/響應(yīng)端點(diǎn)。 在端點(diǎn)方法中, 我們使用的是類 RSocketRequester 并指定了路由。 事實(shí)上,這個(gè)是服務(wù)器端 RSocket 所期望的路由,然后我們傳遞請(qǐng)求數(shù)據(jù)。最后,當(dāng)調(diào)用 retrieveMono() 方法時(shí),springboot會(huì)幫我們初始化一個(gè)請(qǐng)求/響應(yīng)交互。
5. Spring Boot RSocket 中的 Fire And Forget 模式
接下來(lái)我們將查看 Fire And Forget 交互模式。正如名字提示的一樣,客戶端發(fā)送一個(gè)請(qǐng)求給服務(wù)器,但是不期望服務(wù)器的返回響應(yīng)回來(lái)。 在我們的交易程序中, 一些客戶端會(huì)作為數(shù)據(jù)資源服務(wù),并且推送市場(chǎng)數(shù)據(jù)給服務(wù)器端。
5.1.服務(wù)器端
我們來(lái)創(chuàng)建另外一個(gè)端點(diǎn)在我們的服務(wù)器應(yīng)用程序中,如下:
@MessageMapping("collectMarketData")
public Mono<Void> collectMarketData(MarketData marketData) {
marketDataRepository.add(marketData);
return Mono.empty();
}
我們又一次定義了一個(gè)新的 @MessageMapping 路由為 collectMarketData 。此外, Spring Boot自動(dòng)轉(zhuǎn)換傳入的負(fù)載為一個(gè) MarketData 實(shí)例。 但是,這兒最大的不同是我們返回一個(gè) Mono<Void> ,因?yàn)榭蛻舳瞬恍枰?wù)器的返回。
5.2. 客戶端
來(lái)看看我們?nèi)绾纬跏蓟覀兊?nbsp;fire-and-forget 模式的請(qǐng)求。 我們將創(chuàng)建另外一個(gè)REST風(fēng)格的端點(diǎn),如下:
@GetMapping(value = "/collect")
public Publisher<Void> collect() {
return rSocketRequester
.route("collectMarketData")
.data(getMarketData())
.send();
}
這兒我們指定路由和負(fù)載將是一個(gè) MarketData 實(shí)例。 由于我們使用 send() 方法來(lái)代替 retrieveMono() ,所有交互模式變成了 fire-and-forget 模式。
6. Spring Boot RSocket 中的 Request Stream
請(qǐng)求流是一種更復(fù)雜的交互模式, 這個(gè)模式中客戶端發(fā)送一個(gè)請(qǐng)求,但是在一段時(shí)間內(nèi)從服務(wù)器端獲取到多個(gè)響應(yīng)。 為了模擬這種交互模式, 客戶端會(huì)詢問(wèn)給定股票的所有市場(chǎng)數(shù)據(jù)。
6.1.服務(wù)器端
我們從服務(wù)器端開始。 我們將添加另外一個(gè)消息映射方法,如下:
@MessageMapping("feedMarketData")
public Flux<MarketData> feedMarketData(MarketDataRequest marketDataRequest) {
return marketDataRepository.getAll(marketDataRequest.getStock());
}
正如所見, 這個(gè)處理器方法跟其他的處理器方法非常類似。 不同的部分是我們返回一個(gè) Flux<MarketData> 來(lái)代替 Mono<MarketData> 。 最后我們的RSocket服務(wù)器會(huì)返回多個(gè)響應(yīng)給客戶端。
6.2.客戶端
在客戶端這邊, 我們?cè)搫?chuàng)建一個(gè)端點(diǎn)來(lái)初始化請(qǐng)求/響應(yīng)通信,如下:
@GetMapping(value = "/feed/{stock}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<MarketData> feed(@PathVariable("stock") String stock) {
return rSocketRequester
.route("feedMarketData")
.data(new MarketDataRequest(stock))
.retrieveFlux(MarketData.class);
}
我們來(lái)研究下RSocket請(qǐng)求。 首先我們定義了路由和請(qǐng)求負(fù)載。 然后,我們定義了使用 retrieveFlux() 調(diào)用的響應(yīng)期望。這部分決定了交互模式。 另外注意:由于我們的客戶端也是 REST 風(fēng)格的服務(wù)器,客戶端也定義了響應(yīng)媒介類型 MediaType.TEXT_EVENT_STREAM_VALUE 。
7.異常的處理
現(xiàn)在讓我們看看在服務(wù)器程序中,如何以聲明式的方式處理異常。 當(dāng)處理請(qǐng)求/響應(yīng)式, 我可以簡(jiǎn)單的使用 @MessageExceptionHandler 注解,如下:
@MessageExceptionHandler
public Mono<MarketData> handleException(Exception e) {
return Mono.just(MarketData.fromException(e));
}
這里我們給異常處理方法標(biāo)記注解為 @MessageExceptionHandler 。作為結(jié)果, 這個(gè)方法將處理所有類型的異常, 因?yàn)?nbsp;Exception 是所有其他類型的異常的超類。 我們也可以明確地創(chuàng)建更多的不同類型的,不同的異常處理方法。 這當(dāng)然是請(qǐng)求/響應(yīng)模式,并且我們返回的是 Mono<MarketData> 。我們期望這里的響應(yīng)類型跟我們的交互模式的返回類型相匹配。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot Starter依賴原理與實(shí)例詳解
SpringBoot中的starter是一種非常重要的機(jī)制,能夠拋棄以前繁雜的配置,將其統(tǒng)一集成進(jìn)starter,應(yīng)用者只需要在maven中引入starter依賴,SpringBoot就能自動(dòng)掃描到要加載的信息并啟動(dòng)相應(yīng)的默認(rèn)配置。starter讓我們擺脫了各種依賴庫(kù)的處理,需要配置各種信息的困擾2022-09-09
mybatis中<choose>標(biāo)簽的用法說(shuō)明
這篇文章主要介紹了mybatis中<choose>標(biāo)簽的用法說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06
springboot訪問(wèn)不存在的URL時(shí)的處理方法
在前后端分離的模式下,當(dāng)Spring Boot應(yīng)用接收到一個(gè)不存在的URL請(qǐng)求時(shí),通常希望返回一個(gè)固定的JSON字符串作為響應(yīng),以便前端能夠據(jù)此進(jìn)行相應(yīng)的處理,本文給大家介紹了springboot訪問(wèn)不存在的URL時(shí)的處理方法,需要的朋友可以參考下2024-12-12
Java輸入流Scanner/BufferedReader使用方法示例
這篇文章主要介紹了Java輸入流Scanner/BufferedReader使用方法,大家看示例吧2013-11-11
解決BigDecimal轉(zhuǎn)long丟失精度的問(wèn)題
這篇文章主要介紹了解決BigDecimal轉(zhuǎn)long丟失精度的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
微服務(wù)間調(diào)用Retrofit在Spring?Cloud?Alibaba中的使用
這篇文章主要為大家介紹了微服務(wù)間調(diào)用Retrofit在Spring?Cloud?Alibaba中的使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06

