Java擴(kuò)展庫RxJava的基本結(jié)構(gòu)與適用場景小結(jié)
基本結(jié)構(gòu)
我們先來看一段最基本的代碼,分析這段代碼在RxJava中是如何實(shí)現(xiàn)的。
Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
Observable.create(onSubscriber1)
.subscribe(subscriber1);
首先我們來看一下Observable.create的代碼
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
直接就是調(diào)用了Observable的構(gòu)造函數(shù)來創(chuàng)建一個新的Observable對象,這個對象我們暫時標(biāo)記為observable1,以便后面追溯。
同時,會將我們傳入的OnSubscribe對象onSubscribe1保存在observable1的onSubscribe屬性中,這個屬性在后面的上下文中很重要,大家留心一下。
接下來我們來看看subscribe方法。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
}
可以看到,subscribe之后,就直接調(diào)用了observable1.onSubscribe.call方法,也就是我們代碼中的onSubscribe1對象的call方法
,傳入的參數(shù)就是我們代碼中定義的subscriber1對象。call方法中所做的事情就是調(diào)用傳入的subscriber1對象的onNext和onComplete方法。
這樣就實(shí)現(xiàn)了觀察者和被觀察者之間的通訊,是不是很簡單?
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
RxJava使用場景小結(jié)
1.取數(shù)據(jù)先檢查緩存的場景
取數(shù)據(jù),首先檢查內(nèi)存是否有緩存
然后檢查文件緩存中是否有
最后才從網(wǎng)絡(luò)中取
前面任何一個條件滿足,就不會執(zhí)行后面的
final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable<String> network = Observable.just("network");
//主要就是靠concat operator來實(shí)現(xiàn)
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
2.界面需要等到多個接口并發(fā)取完數(shù)據(jù),再更新
//拼接兩個Observable的輸出,不保證順序,按照事件產(chǎn)生的順序發(fā)送給訂閱者
private void testMerge() {
Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println);
}
3.一個接口的請求依賴另一個API請求返回的數(shù)據(jù)
舉個例子,我們經(jīng)常在需要登陸之后,根據(jù)拿到的token去獲取消息列表。
這里用RxJava主要解決嵌套回調(diào)的問題,有一個專有名詞叫Callback hell
NetworkService.getToken("username", "password")
.flatMap(s -> NetworkService.getMessage(s))
.subscribe(s -> {
System.out.println("message: " + s);
});
4.界面按鈕需要防止連續(xù)點(diǎn)擊的情況
RxView.clicks(findViewById(R.id.btn_throttle))
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(aVoid -> {
System.out.println("click");
});
5.響應(yīng)式的界面
比如勾選了某個checkbox,自動更新對應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);
CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
.subscribe(checked.asAction());
6.復(fù)雜的數(shù)據(jù)變換
Observable.just("1", "2", "2", "3", "4", "5")
.map(Integer::parseInt)
.filter(s -> s > 1)
.distinct()
.take(3)
.reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
.subscribe(System.out::println);//9
相關(guān)文章
Springboot自定義mvc組件如何實(shí)現(xiàn)
這篇文章主要介紹了Springboot自定義mvc組件如何實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11
Java Synchronize下的volatile關(guān)鍵字詳解
這篇文章主要介紹了Java Synchronize下的volatile關(guān)鍵字詳解,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03

