簡單談?wù)凴xJava和多線程并發(fā)
前言
相信對于RxJava,大家應(yīng)該都很熟悉,他最核心的兩個(gè)字就是異步,誠然,它對異步的處理非常的出色,但是異步絕對不等于并發(fā),更不等于線程安全,如果把這幾個(gè)概念搞混了,錯(cuò)誤的使用RxJava,是會來帶非常多的問題的。
RxJava與并發(fā)
首先讓我們來看一段RxJava協(xié)議的原文:
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
如上所述,RxJava對多線程并發(fā)其實(shí)并沒有做非常的多保護(hù),這段話中說,如果多個(gè)Observables從多個(gè)線程中發(fā)射數(shù)據(jù),必須要滿足happens-before原則。
下面來看一個(gè)簡單的例子:
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
Log.d("TAG", "onNext: " + unSafeCount);
}
});
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
subject.onNext(unit);
}
}
}).start();
}
}
});
這是一個(gè)最典型的多線程問題,從10個(gè)線程中發(fā)射數(shù)據(jù)并相加,這樣最終得到的答案是小于10000的。雖然使用了RxJava,但是這樣的使用對于并發(fā)是沒有意義的,因?yàn)镽xJava并沒有去處理并發(fā)帶來的問題。我們可以看下subject的onNext方法的源碼,里面很簡單,就是調(diào)用了對應(yīng)observer的onNext方法而已。不止是這樣,絕大多數(shù)的Subject都是線程不安全的,所以當(dāng)你在使用這樣的類的時(shí)候(典型場景就是自制的RxBus),如果從多個(gè)線程中發(fā)射數(shù)據(jù),那你就要小心了。
對于這樣的問題,有兩種解決方案:
第一種就是簡單的使用傳統(tǒng)的解決方法,比如用AtomicInteger代替int。
第二種則是使用RxJava的解決方案,在這里就是用SerializedSubject去代替Subject:
final PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
unSafeCount = unSafeCount + integer;
count.addAndGet(integer);
Log.d("TAG", "onNext: " + count);
}
});
final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
final int unit = 1;
for(int i = 0;i < 10;i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int j = 0;j < 1000;j++){
ser.onNext(unit);
}
}
}).start();
}
}
});
可以看一下SerializedSubject的onNext方法做了什么:
@Override
public void onNext(T t) {
if (terminated) {
return;
}
synchronized (this) {
if (terminated) {
return;
}
if (emitting) {
FastList list = queue;
if (list == null) {
list = new FastList();
queue = list;
}
list.add(nl.next(t));
return;
}
emitting = true;
}
try {
actual.onNext(t);
} catch (Throwable e) {
terminated = true;
Exceptions.throwOrReport(e, actual, t);
return;
}
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
try {
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}
處理方式很簡單,如果有其他線程在發(fā)射數(shù)據(jù),那就將數(shù)據(jù)放置到隊(duì)列中,等待下次發(fā)射。這保證了同一時(shí)間只會有一個(gè)線程調(diào)用onNext,onComplete和onError這些方法。
但是這樣操作顯然是會造成性能的影響的,所以RxJava并不會把所有的操作都打上線程安全的標(biāo)簽。
在這里就要引申出一個(gè)問題,那就是使用者對create方法的濫用,其實(shí)這個(gè)方法不應(yīng)該被使用者頻繁的調(diào)用的,因?yàn)槟惚仨氁⌒牡奶幚硭械臄?shù)據(jù)發(fā)射,接收的邏輯。相反的,使用已有的操作符能很好的解決這個(gè)問題,所以下次大家在遇到問題的時(shí)候不要簡單的使用create去自己寫,而是應(yīng)該想想有沒有現(xiàn)成的操作符可以完成相應(yīng)的需求。
RxJava中的一些操作符
RxJava中有一些操作符也和多線程并發(fā)有關(guān),下面讓我來講一講merge和concat,以及他們的一些變種操作符。
對于多線程發(fā)射數(shù)據(jù),有時(shí)候我們需要得到的結(jié)果也保持和發(fā)射時(shí)候一樣的順序,這個(gè)時(shí)候如果我們使用merge這個(gè)操作符去結(jié)合多個(gè)發(fā)射源,那么就會產(chǎn)生一定的問題了(例子中做了非常不好的示范——使用了create操作符,請大家不要學(xué)習(xí)這樣的寫法,這里單純是為了求證結(jié)果)。
Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
subscriber.onNext(1);
subscriber.onCompleted();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(2);
subscriber.onCompleted();
}
});
Observable.merge(o1,o2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer i) {
Log.d("TAG", "onNext: " + i);
}
});
對于這樣的場景,我們得到的答案將是2,1而不是先得到o1發(fā)射的數(shù)據(jù),再獲取o2的數(shù)據(jù)。
究其原因,就是因?yàn)閙erge其實(shí)就是給什么傳什么,也不會去管數(shù)據(jù)發(fā)射的順序:
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
可以看到在經(jīng)過lift操作之后,對應(yīng)的中間人MergeSubscriber的onNext,沒有什么多余的代碼,所以在多個(gè)Observable從多線程中發(fā)射數(shù)據(jù)的時(shí)候,順序當(dāng)然不能得到保證。
一個(gè)單詞說明這個(gè)問題:interleaving——交錯(cuò)。merge后的數(shù)據(jù)源可能是交錯(cuò)的。由于merge有這樣數(shù)據(jù)交錯(cuò)的問題,所以它的變種—flatMap也會有同樣的問題。
對于這樣的場景,我們可以使用concat操作符來完成:
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.
根據(jù)文檔,我們知道concat操作符是一個(gè)接一個(gè)的處理數(shù)據(jù)源的數(shù)據(jù)的。
if (wip.getAndIncrement() != 0) {
return;
}
final int delayErrorMode = this.delayErrorMode;
for (;;) {
if (actual.isUnsubscribed()) {
return;
}
if (!active) {
if (delayErrorMode == BOUNDARY) {
if (error.get() != null) {
Throwable ex = ExceptionsUtils.terminate(error);
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
}
boolean mainDone = done;
Object v = queue.poll();
boolean empty = v == null;
if (mainDone && empty) {
Throwable ex = ExceptionsUtils.terminate(error);
if (ex == null) {
actual.onCompleted();
} else
if (!ExceptionsUtils.isTerminated(ex)) {
actual.onError(ex);
}
return;
}
if (!empty) {
Observable<? extends R> source;
try {
source = mapper.call(NotificationLite.<T>instance().getValue(v));
} catch (Throwable mapperError) {
Exceptions.throwIfFatal(mapperError);
drainError(mapperError);
return;
}
if (source == null) {
drainError(new NullPointerException("The source returned by the mapper was null"));
return;
}
if (source != Observable.empty()) {
if (source instanceof ScalarSynchronousObservable) {
ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
active = true;
arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
} else {
ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
inner.set(innerSubscriber);
if (!innerSubscriber.isUnsubscribed()) {
active = true;
source.unsafeSubscribe(innerSubscriber);
} else {
return;
}
}
request(1);
} else {
request(1);
continue;
}
}
}
if (wip.decrementAndGet() == 0) {
break;
}
}
通過源碼我們可以知道,active字段就保證了如果上一個(gè)數(shù)據(jù)源還沒有發(fā)射完數(shù)據(jù),就會一直在for循環(huán)中等待,直到上一個(gè)數(shù)據(jù)源發(fā)射完了數(shù)據(jù)重置了active字段。
對于concat,其實(shí)還存在一個(gè)問題,那就是多個(gè)Observable變成了串行,會大大的增加整個(gè)RxJava事件流的處理時(shí)間,對于這個(gè)場景,我們可以使用concatEager來解決。concatEager的源碼就不帶大家分析了,有興趣的同學(xué)可以自行查看。
總結(jié)
這篇文章比較短,講的東西也比較淺顯,其實(shí)就是討論了一下RxJava中多線程并發(fā)的幾個(gè)問題。最后我想說,RxJava并不是什么高大上的東西,在你的項(xiàng)目引入之前,要考慮一下是否真的有必要這么做。就算真的有場景需要RxJava,也請不要一口氣把項(xiàng)目中所有的操作都換成RxJava,一些簡單的操作不一定需要使用RxJava的操作符的實(shí)現(xiàn),用了反而降低了代碼的可讀性,切勿為了使用Rx而使用Rx。
好了,以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。
相關(guān)文章
啟動SpringBoot報(bào)錯(cuò)Input length = 1問題及解決
這篇文章主要介紹了啟動SpringBoot報(bào)錯(cuò)Input length = 1問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05
StreamAPI多次消費(fèi)一個(gè)stream代碼實(shí)例
這篇文章主要介紹了StreamAPI多次消費(fèi)一個(gè)stream代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
Java使用poi-tl1.9.1生成Word文檔的技巧分享
本文將簡單介紹poi-tl的相關(guān)知識,通過一個(gè)實(shí)際的案例實(shí)踐,充分介紹如何利用poi-tl進(jìn)行目標(biāo)文檔的生成,同時(shí)分享幾個(gè)不同的office版本如何進(jìn)行圖表生成的解決方案,需要的朋友可以參考下2023-09-09
SpringBoot實(shí)現(xiàn)登錄攔截器超詳細(xì)教程分享
對于管理系統(tǒng)或其他需要用戶登錄的系統(tǒng),登錄驗(yàn)證都是必不可少的環(huán)節(jié),尤其在?SpringBoot?開發(fā)的項(xiàng)目中。本文為大家準(zhǔn)備了超詳細(xì)的SpringBoot實(shí)現(xiàn)登錄攔截器方法,快收藏一波吧2023-02-02

