解決線程并發(fā)redisson使用遇到的坑
線程并發(fā)redisson的坑
背景
因為業(yè)務(wù)上的一個購買需求,需要對庫存進行行程保護,防止超賣的出現(xiàn)(我們不是電商公司),經(jīng)過調(diào)研,最終選擇使用Redission來進行控制。
主要因為Redission豐富的API,開源框架,已經(jīng)被廣泛應(yīng)用于實際生產(chǎn)環(huán)境。
問題描述
當我們使用Ression中Lock.lock()方法之后,如果存在線程并發(fā)常見情況下,會出現(xiàn)如下異常:
java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: 9f178836-f7e1-44fe-a89d-2db52f399c0d thread-id: 22
問題分析
在thread-1還沒有結(jié)束的時候,也就是在thread-1在獲得鎖但是還沒有釋放鎖的時候, `thread-2由于被別的線程中斷停止了等待從lock.tryLock的阻塞狀態(tài)中返回繼續(xù)執(zhí)行接下來的邏輯,并且由于嘗試去釋放一個屬于線程thread-1的鎖而拋出了一個運行時異常導(dǎo)致該線程thread-2結(jié)束了, 然而thread-2完成了一系列操作后,線程thread-1才釋放了自己的鎖.
所以thread-2并沒有獲得鎖,卻執(zhí)行了需要同步的內(nèi)容,還嘗試去釋放鎖。
那解決方式我們就知道了,當前線程加的鎖由當前線程去解鎖,也就是說當我們使用lock.unlock的時候加上線程的判斷即可。
問題解決
RLock lock = redissonClient.getLock(key);
if(lock.isLocked()){ // 是否還是鎖定狀態(tài)
if(lock.isHeldByCurrentThread()){ // 時候是當前執(zhí)行線程的鎖
lock.unlock(); // 釋放鎖
}
}
redisson使用注意事項
Redisson 是一個在 Redis 的基礎(chǔ)上實現(xiàn)的 Java 駐內(nèi)存數(shù)據(jù)網(wǎng)格,相較于暴露底層操作的Jedis,Redisson提供了一系列的分布式的 Java 常用對象,還提供了許多分布式服務(wù)。
特性 & 功能:
- 支持 Redis 單節(jié)點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式
- 程序接口調(diào)用方式采用異步執(zhí)行和異步流執(zhí)行兩種方式
- 數(shù)據(jù)序列化,Redisson 的對象編碼類是用于將對象進行序列化和反序列化,以實現(xiàn)對該對象在 Redis 里的讀取和存儲
- 單個集合數(shù)據(jù)分片,在集群模式下,Redisson 為單個 Redis 集合類型提供了自動分片的功能
- 提供多種分布式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
- 提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
- 分布式鎖和同步器的實現(xiàn),可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯(lián)鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等
- 提供先進的分布式服務(wù),如分布式遠程服務(wù)(Remote Service),分布式實時對象(Live Object)服務(wù),分布式執(zhí)行服務(wù)(Executor Service),分布式調(diào)度任務(wù)服務(wù)(Schedule Service)和分布式映射歸納服務(wù)(MapReduce)
- 更多特性和功能,請關(guān)注官網(wǎng):http://redisson.org
實現(xiàn)原理
redis本身是不支持上述的分布式對象和集合,Redisson是通過利用redis的特性在客戶端實現(xiàn)了高級數(shù)據(jù)結(jié)構(gòu)和特性,例如優(yōu)先隊列的實現(xiàn),是通過客戶端排序整理后再存入redis。
客戶端實現(xiàn),意味著當沒有任何客戶端在線時,這些所有的數(shù)據(jù)結(jié)構(gòu)和特性都不會保留,也不會自動生效,例如過期事件的觸發(fā)或原來優(yōu)先隊列的元素增加。
注意事項
實時性
RMap中有一個功能是可以設(shè)置鍵值對的過期時間的,并可以注冊鍵值對的事件監(jiān)聽器
元素淘汰功能(Eviction)
Redisson的分布式的RMapCache Java對象在基于RMap的前提下實現(xiàn)了針對單個元素的淘汰機制。同時仍然保留了元素的插入順序。由于RMapCache是基于RMap實現(xiàn)的,使它同時繼承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于這樣的功能來實現(xiàn)的。
目前的Redis自身并不支持散列(Hash)當中的元素淘汰,因此所有過期元素都是通過org.redisson.EvictionScheduler實例來實現(xiàn)定期清理的。為了保證資源的有效利用,每次運行最多清理300個過期元素。任務(wù)的啟動時間將根據(jù)上次實際清理數(shù)量自動調(diào)整,間隔時間趨于1秒到1小時之間。比如該次清理時刪除了300條元素,那么下次執(zhí)行清理的時間將在1秒以后(最小間隔時間)。一旦該次清理數(shù)量少于上次清理數(shù)量,時間間隔將增加1.5倍。
正如官方wiki所述,這個功能是通過后臺線程定時去清理的, 所以這個是非實時的(issue-1234:on expired event is not executed in real-time.),延遲在5秒到2小時之間,因此對實時性要求比較高的場景就得自己衡量了。
由于過期時間的非實時性,所以導(dǎo)致過期事件的發(fā)生也是非實時的,相應(yīng)的監(jiān)聽器可能會延遲了一會兒才收到通知,在我的測試中,ttl設(shè)置在秒級誤差是比較大的,分鐘級別的ttl倒還好(左側(cè)設(shè)置值,右側(cè)實際耗時):
1s _ 5s
3s _ 5s
4s _ 5s
5s _ 9s
6s _ 10s
10s _ 15s
1m _ 1m11s
序列化
由Redisson默認的編碼器為JsonJacksonCodec,JsonJackson在序列化有雙向引用的對象時,會出現(xiàn)無限循環(huán)異常。而fastjson在檢查出雙向引用后會自動用引用符$ref替換,終止循環(huán)。
在我的情況中,我序列化了一個service,這個service已被spring托管,而且和另一個service之間也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環(huán)異常。
為了序列化后的內(nèi)容可見,所以不用redission其他自帶的二進制編碼器,自行實現(xiàn)編碼器:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
public class FastjsonCodec extends BaseCodec {
private final Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
JSON.writeJSONString(os, in,SerializerFeature.WriteClassName);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
};
private final Decoder<Object> decoder = (buf, state) ->
JSON.parseObject(new ByteBufInputStream(buf), Object.class);
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}
訂閱發(fā)布
Redisson對訂閱發(fā)布的封裝是RTopic,這也是Redisson中很多事件監(jiān)聽的實現(xiàn)原理(例如鍵值對的事件監(jiān)聽)。
使用單元測試時發(fā)現(xiàn),在事件發(fā)布后,訂閱方需要延時一下才能收到事件。具體原因待查。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
基于@AllArgsConstructor與@Value共用的問題解決
這篇文章主要介紹了基于@AllArgsConstructor與@Value共用的問題解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
單點登錄的概念及SpringBoot實現(xiàn)單點登錄的操作方法
在本文中,我們將使用Spring Boot構(gòu)建一個基本的單點登錄系統(tǒng),我們將介紹如何使用Spring Security和JSON Web Tokens(JWTs)來實現(xiàn)單點登錄功能,本文假設(shè)您已經(jīng)熟悉Spring Boot和Spring Security,感興趣的朋友一起看看吧2024-10-10

