Android RxJava異步數(shù)據(jù)處理庫使用詳解
觀察者模式
四大要素:Observable(被觀察者),Observer (觀察者),subscribe (訂閱),事件。
觀察者訂閱被觀察者,一旦被觀察者發(fā)出事件,觀察者就可以接收到。

擴展的觀察者模式

當事件完成時會回調(diào)onComplete(),在完成過程中發(fā)生了異常會回調(diào)onError(),onError()和onComplete()只會回調(diào)一個。
引入依賴
implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello Uncle Xing");
emitter.onComplete();
}
});
//創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(tag, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(tag, "onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i(tag, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i(tag, "onComplete");
}
};
//訂閱事件
observable.subscribe(observer);操作符
創(chuàng)建Observable
create:用于創(chuàng)建Observable
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello Uncle Xing");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(tag, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(tag, "onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i(tag, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i(tag, "onComplete");
}
});just:創(chuàng)建一個Observable并自動調(diào)用onNext發(fā)射數(shù)據(jù),just中傳遞的參數(shù)將直接在Observer的onNext方法中接收到
Observable.just("Uncle Xing").subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i(tag, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i(tag, "onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i(tag, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i(tag, "onComplete");
}
});interval:創(chuàng)建一個按固定時間間隔發(fā)射整數(shù)序列的Observable,可用作定時器。
Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Long aLong) {
Log.i(tag, "count:" + aLong); //這里是非主線程,會隔1s打印出0,1,2,3....
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});timer:創(chuàng)建一個Observable,它在一個特定延遲后發(fā)射一個值
Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.i(tag, "count:" + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});轉(zhuǎn)換Observable
map:對數(shù)據(jù)進行變換后,可以返回任意值,對數(shù)據(jù)的變換是1對1進行的。
Observable.just(666).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Throwable {
return integer.toString();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.i(tag, "map:" + s);
}
});
flatMap:對數(shù)據(jù)變換后,返回ObservableSource對象,可以對數(shù)據(jù)進行一對多,多對多的變換。
Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Throwable {
return Observable.just(integer.toString());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
Log.i(tag, "accept:" + s);
}
});
buffer:把Observable的數(shù)據(jù)放進一個數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個值
Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Throwable {
Log.i(tag, integers.toString());
}
});
Log會分兩次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]
過濾Observable
distinct:去掉重復數(shù)據(jù)
Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.i(tag, "distinct:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});elementAt:取出指定位置的數(shù)據(jù)
Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Integer integer) {
Log.i(tag, "onSuccess:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});filter:對數(shù)據(jù)進行指定規(guī)則的過濾
Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Throwable {
return integer > 1;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.i(tag, "filter:" + integer);
}
});
組合Observable
zip:通過一個函數(shù)將多個Observable的發(fā)射物結(jié)合到一起,基于這個函數(shù)的結(jié)果為每個結(jié)合體發(fā)射單個數(shù)據(jù)項
Observable<Integer> observable = Observable.just(10, 20, 30, 40);
Observable<Integer> observable2 = Observable.just(1, 2, 3);
Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Throwable {
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.i(tag, "zip:" + integer);
}
});
注意:當其中一個Observable發(fā)送數(shù)據(jù)結(jié)束或異常,另外一個也停止發(fā)送,所以這里只會打印出11,22,33
merge:合并多個Observable的發(fā)射物
Observable<Integer> observable = Observable.just(10, 20, 30, 40);
Observable<Integer> observable2 = Observable.just(1, 2, 3);
Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
Log.i(tag, "merge:" + integer);//會打印出10,20,30,1,2,3
}
});
錯誤處理
- onErrorReturn:讓Observable遇到錯誤時發(fā)射一個特殊的項并且正常終止
- onErrorResumeNext:讓Observable在遇到錯誤時開始發(fā)射第二個Observable的數(shù)據(jù)序列
Schedulers調(diào)度器-解決多線程問題
- io():用于I/O操作;
- computation():計算工作默認的調(diào)度器;
- immediate():立即執(zhí)行,允許立即在當前線程執(zhí)行你指定的工作;
- newThread():創(chuàng)建新線程;
- trampoline():順序處理,按需處理隊列,并運行隊列的每一個任務。
AndroidSchedulers:RxAndroid提供在Android平臺的調(diào)度器,指定觀察者在主線程。
SubscribeOn用于每個Observable對象,ObserveOn用于每個Observer對象
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(100);
emitter.onComplete();
Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});管理RxJava的生命周期
在使用RxJava的時候,如果沒有及時解除訂閱,在退出Activity的時候,異步線程還在執(zhí)行,對Activity的引用還在,此時就會產(chǎn)生內(nèi)存泄露問題。
可使用RxLifecycle,傳送門
引入依賴
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
讓你的Activity繼承RxAppCompatActivity,F(xiàn)ragment繼承RxFragment,其余類似,然后使用bindUntilEvent或者bindToLifecycle
Observable.interval(1000, TimeUnit.MILLISECONDS)
.compose(bindUntilEvent(ActivityEvent.DESTROY)) //當前Activity執(zhí)行到onDestroy時,Observable取消訂閱
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
Log.i(tag, "accept:" + aLong);
}
}); Observable.interval(1000, TimeUnit.MILLISECONDS)
.compose(bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Throwable {
Log.i(tag, "accept:" + aLong);
}
});使用bindToLifecycle:
如果Observable在onCreate執(zhí)行,那么當執(zhí)行到onDestroy時取消訂閱。
如果Observable在onStart執(zhí)行,那么當執(zhí)行到onStop時取消訂閱。
如果Observable在onResume執(zhí)行,那么當執(zhí)行到onPause時取消訂閱。
RxJava與Retrofit完成網(wǎng)絡請求
public interface MyService {
@GET("gallery/{imageType}/response")
Observable<List<String>> getImages(@Path("imageType") String imageType);
}
Retrofit retrofit = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(BASE_URL)
.build();
MyService service = retrofit.create(MyService.class);
service.getImages("banner")
.compose(bindToLifecycle())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Throwable {
//todo
}
});
到此這篇關于Android RxJava異步數(shù)據(jù)處理庫使用詳解的文章就介紹到這了,更多相關Android RxJava異步數(shù)據(jù)處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Android中實現(xiàn)詞組高亮TextView方法示例
高亮顯示大家應該都不陌生,在開發(fā)中經(jīng)常會遇到這個需求,所以下面這篇文章主要給大家介紹了關于Android中實現(xiàn)詞組高亮TextView的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10
Android view更改背景資源與padding消失的問題解決辦法
這篇文章主要介紹了Android view更改背景資源與padding消失的問題解決辦法的相關資料,需要的朋友可以參考下2017-04-04
Android 實現(xiàn)ListView的點擊變色的實例
這篇文章主要介紹了Android 實現(xiàn)ListView的點擊變色的實例的相關資料,主要實現(xiàn)Android listveiw ItemClickListener寫入變色的功能,需要的朋友可以參考下2017-07-07
怎樣實現(xiàn)android http-post方法實例說明
android http-post方法在開發(fā)中如何實現(xiàn),具體代碼如下,感興趣的朋友可以參考下哈,希望對大家有所幫助2013-06-06
Android實現(xiàn)手勢滑動多點觸摸縮放平移圖片效果
這篇文章主要介紹了Android實現(xiàn)手勢滑動多點觸摸縮放平移圖片效果,實現(xiàn)圖片支持多點觸控,自由的進行縮放、平移的注意事項,感興趣的小伙伴們可以參考一下2016-02-02
Input系統(tǒng)之InputReader概要性實例分析
這篇文章主要介紹為大家了Input系統(tǒng)之InputReader概要性實例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11
Android Activity切換(跳轉(zhuǎn))時出現(xiàn)黑屏的解決方法 分享
Android Activity切換(跳轉(zhuǎn))時出現(xiàn)黑屏的解決方法 分享,需要的朋友可以參考一下2013-06-06
Android開發(fā)實現(xiàn)查詢遠程服務器的工具類QueryUtils完整實例
這篇文章主要介紹了Android開發(fā)實現(xiàn)查詢遠程服務器的工具類QueryUtils,涉及Android服務器請求發(fā)送、接收、數(shù)據(jù)交互等相關操作技巧,需要的朋友可以參考下2017-11-11

