RxJava?觸發(fā)流基本原理源碼解析
正文
本節(jié),我們從Rxjava使用代碼入手,去結(jié)合自己已有的知識體系,加查閱部分源碼驗證的方式,來一起探索一下Rxjava實現(xiàn)的基本原理。
為了本文原理分析環(huán)節(jié),可以被更多的人理解、學習,所以小編從初學者的角度,從使用入手,一點點的分析了其中的源碼細節(jié)、思想,建議大家隨著本文的章節(jié)步驟,一步一步的來閱讀,才能更快、更好的理解Rxjava的真正的思想精髓,也為我們之后的實踐課程留一個好的底子。
觸發(fā)流
到目前為止,我們講了構(gòu)建流、訂閱流,但是依然沒有觸發(fā)真正的observer中的事件,例如:
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG, "onNext s = " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
各位看官,莫急莫急,且聽老衲娓娓道來。
還記得上面的訂閱流嗎?訂閱流從右往左執(zhí)行的,執(zhí)行到最后的observable,執(zhí)行了它的subscribe方法。我們從使用代碼知道,最左端的observable是啥來著,大家還記得嗎?當然是ObservableJust
private void test() {
//第一步:just調(diào)用
Observable.just("https://img-blog.csdn.net/20160903083319668")
//第二步:map調(diào)用
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
//Bitmap bitmap = downloadImage(s);
return null;
}
})
//第三步:subscribeOn、observeOn調(diào)用
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
//第四步:subscribe調(diào)用
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe() {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Bitmap s) {
Log.d(TAG, "onNext s = " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
我們就順坡下驢,看一下ObservableJust的subscribe方法做啥了
public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T get() {
return value;
}
}
仔細一看,這里面沒有subscribe方法,那么肯定就是調(diào)用父類observable的subscribe方法了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//對象封裝,暫時不是重點,我們跳過
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
大家看到這里,其實關鍵在于,最終調(diào)用了一個subscribeActual方法,所以我們繼續(xù)看子類ObservableJust的subscribeActual方法干啥了?
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
接續(xù)根據(jù)ScalarDisposable的run方法
public static final class ScalarDisposable<T>
extends AtomicInteger
implements QueueDisposable<T>, Runnable {
private static final long serialVersionUID = 3880992722410194083L;
final Observer<? super T> observer;
final T value;
//...省略很多代碼
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
//可以看到這里執(zhí)行了onNext、onComplete方法
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
小結(jié)
看到這里,我們知道了,開始一層一層的從左往右去調(diào)用observer的相關方法了。 由訂閱流可知,每層的observable實際上擁有下一層的observer的代理類,所以自然而然,從最左邊開始調(diào)用observer的相關方法開始,觸發(fā)流,就是從左往右,一層一層的剝開之前包裹的observer,然后順序調(diào)用里面的onNext、onComplete等方法。 不信,我們挑一個ObservableMap來驗證一下。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//此處調(diào)用了下游的observer的onNext方法
downstream.onNext(v);
}
}
}
可以看到里面,的確調(diào)用了下游的observer的onNext方法。
總結(jié)

整個過程,分為構(gòu)建流、訂閱流、觸發(fā)流。
構(gòu)建流:從左到右執(zhí)行不同的操作符的過程,其實很簡單,就是根據(jù)不同的操作符,對原始的 observable進行逐層包裝,這里可以看出,每層的節(jié)點 N* 就持有了上一層的observable。
訂閱流:從右到左的 subscribe 調(diào)用過程,這個過程中,每個observable內(nèi)部的subscribeActual執(zhí)行兩個關鍵操作,一個是對自己已有的observer進行一層重新包裝,另外一個就是使用前面節(jié)點的observable,訂閱包裝好的observer。
觸發(fā)流:在訂閱流執(zhí)行完成之后,執(zhí)行到最左端的observable,我們發(fā)現(xiàn)它內(nèi)部的subscribeActual實現(xiàn),實際上就是調(diào)用里面擁有的observer的相關回調(diào)方法(onNext、onComplete、onError等),那么這層回調(diào)流就簡單了,就是一層一層的調(diào)用里面的observer,最終執(zhí)行到最右端的observer。
篇幅所限,大家也發(fā)現(xiàn)了,我們本節(jié)課,我們詳細講解Rxjava線程切換的實現(xiàn)原理,這個有兩個原因,一是篇幅所限,本節(jié)內(nèi)容已經(jīng)夠多了,大家先吃透框架,另外一方面是,線程切換我相信我們后面實踐環(huán)節(jié),待框架自我搭建實現(xiàn)之后,里面的線程切換功能就是水到渠成的事情,相信憑借大家已有的知識,都可以做到的。
所以建議大家,先別看這塊Rxjava是如何實現(xiàn)線程切換的,而是想一下,它是怎么實現(xiàn)的?到時我們自己的Rxjava框架搭建起來之后,填充實現(xiàn)一下。
提個醒兒,大家還記得我們之前EventBus源碼分析、實踐環(huán)節(jié)嗎?其中也說到了線程切換。其實原理差不多,大家先想一下。
以上就是RxJava 觸發(fā)流基本原理源碼解析的詳細內(nèi)容,更多關于RxJava 觸發(fā)流原理的資料請關注腳本之家其它相關文章!
相關文章
Android中Rxjava實現(xiàn)三級緩存的兩種方式
這篇文章主要介紹了Android中Rxjava實現(xiàn)三級緩存的兩種方式,具有一定的參考價值,感興趣的小伙伴們可以參考一下。2017-04-04
android獲取當前接入點信息判斷是ctwap還是ctnet實例代碼
這篇文章主要介紹了android獲取當前接入點信息判斷是ctwap還是ctnet的方法,大家參考使用吧2013-11-11
Android實現(xiàn)跳轉(zhuǎn)第三方百度地圖導航
在眾多地圖導航產(chǎn)品中,百度地圖以其精準的導航和豐富的本地生活數(shù)據(jù)受到廣泛歡迎,本項目介紹如何在 Android 中構(gòu)造 Intent 調(diào)用百度地圖導航,希望對大家有所幫助2025-04-04
Android畫圖之抗鋸齒paint和Canvas兩種方式實例
本篇文章主要介紹了Android畫圖之抗鋸齒paint和Canvas兩種方式實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04
Android中對RecyclerView Adapter封裝解析
本篇文章主要介紹了Android中對RecyclerView Adapter封裝解析。小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06

