Spring響應(yīng)式編程之Reactor操作符詳解
更新時(shí)間:2025年09月05日 08:44:53 作者:minh_coo
文章介紹了Reactor庫中常用的響應(yīng)式流操作符,分為創(chuàng)建、轉(zhuǎn)換、組合、條件和錯(cuò)誤處理五類,詳細(xì)列舉了每類操作符的功能和用途,這些操作符旨在提高響應(yīng)式流的可讀性和開發(fā)效率,幫助開發(fā)者更高效地處理數(shù)據(jù)流
操作符Processo<T,R>
操作符并不是響應(yīng)式流規(guī)范的一部分,但為了改進(jìn)響應(yīng)式代碼的可讀性并降低開發(fā)成本,Reactor 庫中的 API 提供了一組豐富的操作符,這些操作符為響應(yīng)式流規(guī)范提供了最大的附加值。
下面介紹一些常用的操作符。
(1)創(chuàng)建操作符
just:創(chuàng)建一個(gè)包含單個(gè)元素的Mono或多個(gè)元素的Flux;empty:創(chuàng)建一個(gè)空的Flux或Mono;defer:在訂閱時(shí)動(dòng)態(tài)創(chuàng)建一個(gè)新的Flux或Mono;fromArray:從數(shù)組創(chuàng)建Flux;fromIterable:從Iterable對象創(chuàng)建Flux;range:創(chuàng)建一個(gè)從start到end的整數(shù)序列Flux;interval:創(chuàng)建一個(gè)按時(shí)間間隔發(fā)布數(shù)據(jù)的Flux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class CreationExample {
public static void main(String[] args) {
// 示例 1: 使用 Mono 創(chuàng)建操作符
Mono<String> monoJust = Mono.just("Hello, Mono");
Mono<String> monoEmpty = Mono.empty();
Mono<String> monoDefer = Mono.defer(() -> Mono.just("Deferred Mono"));
// 訂閱 Mono 并打印結(jié)果
monoJust.subscribe(System.out::println);
monoEmpty.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed"));
monoDefer.subscribe(System.out::println);
// 示例 2: 使用 Flux 創(chuàng)建操作符
Flux<String> fluxJust = Flux.just("A", "B", "C");
Flux<String> fluxFromArray = Flux.fromArray(new String[]{"A", "B", "C"});
List<String> list = Arrays.asList("A", "B", "C");
Flux<String> fluxFromIterable = Flux.fromIterable(list);
Flux<String> fluxFromStream = Flux.fromStream(Stream.of("A", "B", "C"));
Flux<Integer> fluxRange = Flux.range(1, 5);
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));
Flux<String> fluxDefer = Flux.defer(() -> Flux.just("Deferred Flux"));
// 訂閱 Flux 并打印結(jié)果
fluxJust.subscribe(System.out::println);
fluxFromArray.subscribe(System.out::println);
fluxFromIterable.subscribe(System.out::println);
fluxFromStream.subscribe(System.out::println);
fluxRange.subscribe(System.out::println);
fluxInterval.take(5).subscribe(System.out::println);
fluxDefer.subscribe(System.out::println);
}
}
(2)轉(zhuǎn)換操作符
map:將Mono中的值或Flux中的每個(gè)元素轉(zhuǎn)換為另一種類型;flatmap:將Mono中的值或Flux中的每個(gè)元素轉(zhuǎn)換為另一個(gè)Mono或另一個(gè)Publisher,并展平結(jié)果;flatMapSequential:類似于flatMap,但保持順序并并行處理;flatMapMany:將Mono中的值轉(zhuǎn)換為Flux;collectList: 將Flux中的所有元素收集到一個(gè)List中,返回Mono<List<T>>;collectMap:將Flux中的元素收集到一個(gè)Map中,返回Mono<Map<K,V>>;reduce:聚合Flux中的元素,返回Mono;buffer:將Flux中的元素收集到List中,按指定大小進(jìn)行分組;window:將Flux中的元素分組到Flux中,每組包含指定數(shù)量的元素;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public class ConversionExample {
public static void main(String[] args) {
// 示例 1: 使用 Mono 轉(zhuǎn)換操作符
Mono<Integer> mono = Mono.just("123")
.map(Integer::parseInt)
.flatMap(i -> Mono.just(i * 2))
.doOnNext(System.out::println);
mono.subscribe();
// 示例 2: 使用 Flux 轉(zhuǎn)換操作符
Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.filter(i -> i % 2 == 0)
.flatMap(i -> Flux.just(i * 2))
.concatMap(i -> Flux.just(i + 1))
.buffer(2)
.doOnNext(System.out::println);
flux.subscribe();
}
}
(3)組合操作符
zipWith:將兩個(gè)Mono的值組合成一個(gè)新的Mono;zip:將多個(gè)Flux的元素組合成一個(gè)Flux;then:在當(dāng)前Mono或Flux完成后執(zhí)行另一個(gè)Mono或Flux;thenReturn:在當(dāng)前Mono或Flux完成后返回一個(gè)指定的值;thenMany:在當(dāng)前Mono完成后返回一個(gè)Flux;when:等待多個(gè)Publisher完成
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class CombinationExample {
public static void main(String[] args) {
// 示例 1: 使用 Mono 組合操作符
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Mono<String> combined = mono1.zipWith(mono2, (a, b) -> a + " " + b);
combined.subscribe(System.out::println); // 輸出: Hello World
Mono<Void> when = Mono.when(mono1, mono2);
when.subscribe(null, Throwable::printStackTrace, () -> System.out.println("Completed")); // 輸出: Completed
// 示例 2: 使用 Flux 組合操作符
Flux<String> flux1 = Flux.just("A", "B", "C");
Flux<String> flux2 = Flux.just("1", "2", "3");
Flux<String> merged = Flux.merge(flux1, flux2);
merged.subscribe(System.out::println); // 輸出: A 1 B 2 C 3
Flux<String> concatenated = Flux.concat(flux1, flux2);
concatenated.subscribe(System.out::println); // 輸出: A B C 1 2 3
Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b);
zipped.subscribe(System.out::println); // 輸出: A1 B2 C3
Flux<String> combinedLatest = Flux.combineLatest(flux1, flux2, (a, b) -> a + b);
combinedLatest.subscribe(System.out::println); // 輸出: C3
Flux<String> started = flux1.startWith("Start");
started.subscribe(System.out::println); // 輸出: Start A B C
}
}
(4)條件操作符
hasElement:判斷Mono是否包含元素;hasElements:判斷Flux是否包含元素;hasElementWith:判斷Mono是否包含與給定Predicate匹配的元素;all:判斷Flux中的所有元素是否都滿足給定的條件;any:判斷Flux中是否有任意一個(gè)元素滿足給定的條件;isEmpty:判斷Flux是否為空;switchIfEmpty:如果Mono或Flux為空,則切換到另一個(gè)Mono或Flux;defaultIfEmpty:如果Mono或Flux為空,則返回默認(rèn)值;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ConditionalExample {
public static void main(String[] args) {
// 示例 1: 使用 Mono 條件操作符
Mono<String> mono = Mono.just("Hello");
Mono<Boolean> hasElement = mono.hasElement();
hasElement.subscribe(System.out::println); // 輸出: true
Mono<String> emptyMono = Mono.<String>empty();
Mono<String> switchIfEmptyMono = emptyMono.switchIfEmpty(Mono.just("Default"));
switchIfEmptyMono.subscribe(System.out::println); // 輸出: Default
Mono<String> defaultIfEmptyMono = emptyMono.defaultIfEmpty("Default");
defaultIfEmptyMono.subscribe(System.out::println); // 輸出: Default
// 示例 2: 使用 Flux 條件操作符
Flux<Integer> flux = Flux.just(1, 2, 3, 4);
Mono<Boolean> allMatch = flux.all(i -> i > 0);
allMatch.subscribe(System.out::println); // 輸出: true
Mono<Boolean> anyMatch = flux.any(i -> i > 3);
anyMatch.subscribe(System.out::println); // 輸出: true
Mono<Boolean> hasElements = flux.hasElements();
hasElements.subscribe(System.out::println); // 輸出: true
Mono<Boolean> isEmpty = flux.isEmpty();
isEmpty.subscribe(System.out::println); // 輸出: false
Flux<Integer> emptyFlux = Flux.<Integer>empty();
Flux<Integer> switchIfEmptyFlux = emptyFlux.switchIfEmpty(Flux.just(10, 20, 30));
switchIfEmptyFlux.subscribe(System.out::println); // 輸出: 10 20 30
Flux<Integer> defaultIfEmptyFlux = emptyFlux.defaultIfEmpty(999);
defaultIfEmptyFlux.subscribe(System.out::println); // 輸出: 999
}
}
(5)錯(cuò)誤處理操作符
onErrorResume:當(dāng)發(fā)生錯(cuò)誤時(shí),切換到另一個(gè)數(shù)據(jù)流;onErrorReturn:當(dāng)發(fā)生錯(cuò)誤時(shí),返回一個(gè)默認(rèn)值;onErrorMap:將錯(cuò)誤映射為另一個(gè)錯(cuò)誤;retry重試操作一定次數(shù);retryWhen:當(dāng)錯(cuò)誤發(fā)生時(shí),根據(jù)提供的Publisher邏輯重試;doOnError:當(dāng)發(fā)生錯(cuò)誤時(shí)執(zhí)行一些額外的邏輯;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
public class ErrorHandlingExample {
public static void main(String[] args) {
// 示例 1: 使用 Mono 錯(cuò)誤處理操作符
Mono<String> mono1 = Mono.error(new RuntimeException("Error"))
.onErrorResume(e -> Mono.just("Recovered"));
mono1.subscribe(System.out::println); // 輸出: Recovered
Mono<String> mono2 = Mono.error(new RuntimeException("Error"))
.onErrorReturn("Default");
mono2.subscribe(System.out::println); // 輸出: Default
Mono<String> mono3 = Mono.error(new RuntimeException("Error"))
.onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));
mono3.subscribe(System.out::println, Throwable::printStackTrace); // 輸出: Mapped Error
Mono<String> mono4 = Mono.error(new RuntimeException("Error"))
.retry(3);
mono4.subscribe(System.out::println, Throwable::printStackTrace);
Mono<String> mono5 = Mono.error(new RuntimeException("Error"))
.retryWhen(companion -> companion.take(3));
mono5.subscribe(System.out::println, Throwable::printStackTrace);
Mono<String> mono6 = Mono.error(new RuntimeException("Error"))
.doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));
mono6.subscribe(System.out::println, Throwable::printStackTrace);
// 示例 2: 使用 Flux 錯(cuò)誤處理操作符
Flux<String> flux1 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.onErrorResume(e -> Flux.just("Recovered"));
flux1.subscribe(System.out::println); // 輸出: A B Recovered
Flux<String> flux2 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.onErrorReturn("Default");
flux2.subscribe(System.out::println); // 輸出: A B Default
Flux<String> flux3 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.onErrorMap(e -> new IllegalArgumentException("Mapped Error", e));
flux3.subscribe(System.out::println, Throwable::printStackTrace); // 輸出: Mapped Error
Flux<String> flux4 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.retry(3);
flux4.subscribe(System.out::println, Throwable::printStackTrace);
Flux<String> flux5 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.retryWhen(companion -> companion.take(3));
flux5.subscribe(System.out::println, Throwable::printStackTrace);
Flux<String> flux6 = Flux.just("A", "B")
.concatWith(Mono.error(new RuntimeException("Error")))
.doOnError(e -> System.out.println("Error occurred: " + e.getMessage()));
flux6.subscribe(System.out::println, Throwable::printStackTrace);
}
}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Spring?Boot詳解五種實(shí)現(xiàn)跨域的方式
跨域指的是瀏覽器不能執(zhí)?其他?站的腳本。它是由瀏覽器的同源策略造成的,是瀏覽器對javascript施加的安全限制,這篇文章主要介紹了springboot實(shí)現(xiàn)跨域的5種方式,需要的朋友可以參考下2022-06-06
SpringBoot打War包上傳到阿里云的LINUX服務(wù)器的操作方法
這篇文章主要介紹了SpringBoot打War包上傳到阿里云的LINUX服務(wù)器,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02
Spring Boot 整合 SSE的高級實(shí)踐(Server-Sent Even
SSE(Server-Sent Events)是一種基于HTTP協(xié)議的單向通信機(jī)制,允許服務(wù)器向?yàn)g覽器持續(xù)發(fā)送實(shí)時(shí)更新,這篇文章主要介紹了Spring Boot 整合 SSE的高級實(shí)踐(Server-Sent Events),需要的朋友可以參考下2025-04-04

