RxJava中map和flatMap的用法區(qū)別源碼解析
前言:
RxJava中提供了大量的操作符,這大大提高了了我們的開發(fā)效率。其中最基本的兩個變換操作符就是map和flatMap。而其他變換操作符的原理基本與map類似。
- map和flatMap都是接受一個函數(shù)作為參數(shù)(Func1)并返回一個被觀察者
Observable - Func1的< I,O >I,O模版分別為輸入和輸出值的類型,實現(xiàn)Func1的call方法對I類型進行處理后返回O類型數(shù)據(jù),只是flatMap中執(zhí)行的方法的返回類型為Observable類型
作用
map對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),執(zhí)行變換操作。對原始的Observable發(fā)射的每一項數(shù)據(jù)應用一個你選擇的函數(shù),然后返回一個發(fā)射這些結(jié)果的Observable。

flatMap將一個發(fā)射數(shù)據(jù)的Observable變換為多個Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進一個單獨的Observable。操作符使用一個指定的函數(shù)對原始Observable發(fā)射的每一項數(shù)據(jù)執(zhí)行變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當做它自己的數(shù)據(jù)序列發(fā)射

使用方法:
通過代碼來看一下兩者的使用用方法:
map
Observable.just(new User("白瑞德"))
.map(new Function<User, String>() {
@Override
public String apply(User user) throws Throwable {
return user.getName();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println(s);
}
});
<<<白瑞德
這段代碼接受一個User對象,最后打印出User中的name。
flatMap
假設存在一個需求,圖書館要打印每個User借走每一本書的名字: User結(jié)構(gòu)如下:
class User {
private String name;
private List<String> book;
}
我們來看一下map的實現(xiàn)方法:
Observable.fromIterable(userList)
.map(new Function<User, List<String>>() {
@Override
public List<String> apply(User user) throws Throwable {
return user.getBook();
}
})
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Throwable {
for (String s : strings) {
System.out.println(s);
}
}
});
可以看到,map的轉(zhuǎn)換總是一對一,只能單一轉(zhuǎn)換。我們不得不借助循環(huán)進行打印。 下面我們來看一下flatMap的實現(xiàn)方式:
Observable.fromIterable(userList)
.flatMap(new Function<User, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(User user) throws Throwable {
return Observable.fromIterable(user.getBook());
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Throwable {
System.out.println(o);
}
});
flatmap既可以單一轉(zhuǎn)換也可以一對多/多對多轉(zhuǎn)換。flatMap使用一個指定的函數(shù)對原始Observable發(fā)射的每一項數(shù)據(jù)之行相應的變換操作,這個函數(shù)返回一個本身也發(fā)射數(shù)據(jù)的Observable,因此可以再內(nèi)部再次進行事件的分發(fā)。然后flatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當做它自己的數(shù)據(jù)序列發(fā)射。
源碼分析
下面我們就結(jié)合源碼來分析一下這兩個操作符。為了降低代碼閱讀難道,這里只保留核心代碼:
map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//接受一個Function實例,并返回一個ObservableMap
return new ObservableMap<T, R>(this, mapper);
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//調(diào)用用父類構(gòu)造方法,初始化父類中的downstream
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
v = mapper.apply(t);
downstream.onNext(v);
}
}
}
這段代碼是去掉map源碼中一些校驗和其它相關(guān)回調(diào)后的精簡代碼。接下來分析一下代碼流程:
- 當在調(diào)用
map時,map接受一個匿名內(nèi)部類Function的實例,并返回一個ObservableMap對象。 ObservableMap本質(zhì)上是一個Observable,也是一個被觀察者,其構(gòu)造方法接受最外層的那個被Observable實例,和Function實例。ObservableMap重寫了subscribeActual方法,在subscribeActual中使用新建了一個MapObserver實現(xiàn)了對原始Observable的觀察。- 原始的
Observable中的數(shù)據(jù)變會被發(fā)送到MapObserver的實例中。 MapObserver構(gòu)造方法接收原始Observable的觀察者actual,和Function的實例mapperMapObserver在其onNext方法中調(diào)用mapper的apply方法,獲得該方法的返回值v apply方法就是map實例中:public String apply(User user) throws Throwable { return user.getName(); }- 調(diào)用
downstream的onNext方法,并傳入實參v。其中downstream是MapObserver父類中定義的變量,在MapObserver構(gòu)造方法中super(actual);時初始化,其本身就是傳入的actual,本質(zhì)上就是最原始的Observable
整個流程可以概括如下: 存在一個原始的ObservableA和一個觀察者ObserverA,當原始的被觀察者ObservableA調(diào)用map,并傳入一個匿名內(nèi)部類實例化的’function‘,map新建并返回了一個被觀察者ObservableB,通過subscribe讓觀察者ObserverA對其進行訂閱。并重寫subscribeActual方法,在其被訂閱時創(chuàng)建一個新的觀察者ObserverB其接受的,并用ObserverB對原始的ObservableA進行訂閱觀察。當原始的ObservableA發(fā)出事件,調(diào)用ObserverB的onNext方法,subscribeActual接受的觀察者便是最原始的觀察者ObserverA。ObserverB變執(zhí)行通過匿名內(nèi)部類實例化的’function‘的apply方法得到數(shù)據(jù)v,緊接著調(diào)用原始的ObservableA的onNext方法,并傳入實參v,ObserverA觀察到事件。 一句話概括:一個原始的被觀察者和觀察者,但是讓原始的觀察者去訂閱一個新的觀察者,當新的被觀察者被訂閱的時候,創(chuàng)建一個新的觀察者去訂閱原始的被觀察者,并在監(jiān)聽的事件之后執(zhí)行指定的操作后再通知原始觀察者。所以這里面一共涉及到兩對觀察者和被觀察者,map方法會創(chuàng)建一對新的觀察者和被觀察者作為原始觀察者和被觀察者通訊的紐帶,并在其中做一些數(shù)據(jù)變換。
用圖片顯示流程如下:

藍色框內(nèi)就是map創(chuàng)建的觀察者和被觀察者。實際上我們的原始ObserverA并沒有對ObservableA進行訂閱。
flatMap
faltMap和map的基本原理類似,其代碼如下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}
@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
p = mapper.apply(t);
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
p.subscribe(inner);
}
void drain() {
drainLoop();
}
void drainLoop() {
final Observer<? super U> child = this.downstream;
child.onNext(o);
}
}
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
@Override
public void onNext(U t) {
parent.drain();
}
}
}
上述代碼即是faltMap精簡后的源碼,其中大部分代碼的運作流程和前文中的map源碼一致,我們繼續(xù)延續(xù)上文中講解中的觀察者和被觀察者。重點關(guān)注其不同的地方: faltMap返回一個新的被觀察者ObservableB,重寫ObservableB的subscribeActual方法在原始的觀察者ObserverA對其進行訂閱時新建一個觀察者ObserverB對原始的ObservableA進行訂閱。新的觀察者ObserverB持有原始的ObserverA和faltMap接收的匿名對象實例function。當ObserverB監(jiān)聽到原始的被觀察者ObservableA的事件時,ObserverB調(diào)用function的apply方法獲得新新的被觀察者ObservableC,再創(chuàng)建一個新的觀察者ObserverC對ObservableC進行訂閱,ObserverC持有原始的觀察者ObserverA,在ObserverC觀察到被觀察者ObservableC的時間時,調(diào)用原始的觀察者ObserverA的方法。
概括就是:faltMap方法要求調(diào)用者提供一個Observable,最原始的Observable在調(diào)用faltMap后,faltMap會創(chuàng)建一個新的Observable,并對原始的進行訂閱。當拿到訂閱后,會通過flatMap接收的函數(shù)拿到調(diào)用者傳入的Observable,并用最原始的觀察者對它進行訂閱。這期間涉及三對觀察者和被觀察者,flatMap會創(chuàng)建一對,同時也接收一對用戶創(chuàng)建的。flatMap創(chuàng)建的和Map中的作用一樣,不過flatMap連接的是原始的和用戶通過flatMap提供的兩對觀察者和被觀察者。而原始的觀察者最終是對用戶通過flatMap提供的那個觀察者進行訂閱。
用圖片顯示流程如下:

和Map的流程很相似,只不過是需要用戶再提供一對觀察者和被觀察者。最終實現(xiàn)對用戶提供的被觀察者進行訂閱。
結(jié)語
至此,map和flatMap已基本分析完畢,其中map的代碼比較簡單易懂,flatMap中還涉及到大量輔助操作,文中并未涉及到其中的合并等操作,閱讀起來有些困難。如果僅僅是為了了解二者的原理,可以閱讀Single<T>中的代碼。其中的代碼量遠遠少于Observable中的代碼量。如果對RxJava基本的模式還不了解,可以閱讀 手寫極簡版的Rxjava
以上就是RxJava中map和flatMap的用法區(qū)別源碼解析的詳細內(nèi)容,更多關(guān)于RxJava map flatMap區(qū)別的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
關(guān)于Assert.assertEquals報錯的問題及解決
這篇文章主要介紹了關(guān)于Assert.assertEquals報錯的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05
Java的JDBC中Statement與CallableStatement對象實例
這篇文章主要介紹了Java的JDBC中Statement與CallableStatement對象實例,JDBC是Java編程中用于操作數(shù)據(jù)庫的API,需要的朋友可以參考下2015-12-12
SpringBoot實現(xiàn)動態(tài)增刪啟停定時任務的方式
在spring?boot中,可以通過@EnableScheduling注解和@Scheduled注解實現(xiàn)定時任務,也可以通過SchedulingConfigurer接口來實現(xiàn)定時任務,但是這兩種方式不能動態(tài)添加、刪除、啟動、停止任務,本文給大家介紹SpringBoot實現(xiàn)動態(tài)增刪啟停定時任務的方式,感興趣的朋友一起看看吧2024-03-03
關(guān)于postman傳參的幾種格式 list,map 等
這篇文章主要介紹了postman傳參的幾種格式 list,map等,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08

