Java 根據(jù)某個 key 加鎖的實現(xiàn)方式
一、背景
日常開發(fā)中,有時候需要根據(jù)某個 key 加鎖,確保多線程情況下,對該 key 的加鎖和解鎖之間的代碼串行執(zhí)行。
大家可以借助每個 key 對應(yīng)一個 ReentrantLock ,讓同一個 key 的線程使用該 lock 加鎖;每個 key 對應(yīng)一個 Semaphore ,讓同一個 key 的線程使用 Semaphore 控制同時執(zhí)行的線程數(shù)。
二、參考代碼
接口定義
public interface LockByKey<T> {
/**
* 加鎖
*/
void lock(T key);
/**
* 解鎖
*/
void unlock(T key);
}2.1 同一個 key 只能一個線程執(zhí)行
2.1.1 代碼實現(xiàn)
每個 key 對應(yīng)一個 ReentrantLock ,讓同一個 key 的線程使用該 lock 加鎖。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
public class DefaultLockByKeyImpl<T> implements LockByKey<T> {
private final Map<T, ReentrantLock> lockMap = new ConcurrentHashMap<>();
/**
* 加鎖
*/
@Override
public void lock(T key) {
// 如果key為空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能為空");
}
// 獲取或創(chuàng)建一個ReentrantLock對象
ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
// 獲取鎖
lock.lock();
}
/**
* 解鎖
*/
@Override
public void unlock(T key) {
// 如果key為空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能為空");
}
// 從Map中獲取鎖對象
ReentrantLock lock = lockMap.get(key);
// 獲取不到報錯
if (lock == null) {
throw new IllegalArgumentException("key " + key + "尚未加鎖");
}
// 其他線程非法持有不允許釋放
if (!lock.isHeldByCurrentThread()) {
throw new IllegalStateException("當(dāng)前線程尚未持有,key:" + key + "的鎖,不允許釋放");
}
lock.unlock();
}
}
注意事項:
(1)參數(shù)合法性校驗
(2)解鎖時需要判斷該鎖是否為當(dāng)前線程持有
2.1.2 編寫單測
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class DefaultLockByKeyImplTest {
private final LockByKey<String> lockByKey = new DefaultLockByKeyImpl<>();
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Set<String> executingKeySet = new HashSet<>();
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
if (executingKeySet.contains(key)) {
throw new RuntimeException("存在正在執(zhí)行的 key:" + key);
}
executingKeySet.add(key);
try {
System.out.println("index:" + finalI + "對 [" + key + "] 加鎖 ->" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("index:" + finalI + "釋放 [" + key + "] ->" + Thread.currentThread().getName());
lockByKey.unlock(key);
executingKeySet.remove(key);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
}
如果同一個 key 沒釋放能夠再次進(jìn)入,會拋出異常。
也可以通過日志來觀察執(zhí)行情況:
index:0對 [a] 加鎖 ->pool-1-thread-1 index:6對 [d] 加鎖 ->pool-1-thread-7 index:4對 [c] 加鎖 ->pool-1-thread-5 index:3對 [b] 加鎖 ->pool-1-thread-4 index:6釋放 [d] ->pool-1-thread-7 index:4釋放 [c] ->pool-1-thread-5 index:0釋放 [a] ->pool-1-thread-1 index:3釋放 [b] ->pool-1-thread-4 index:1對 [a] 加鎖 ->pool-1-thread-2 index:5對 [b] 加鎖 ->pool-1-thread-6 index:1釋放 [a] ->pool-1-thread-2 index:5釋放 [b] ->pool-1-thread-6 index:2對 [a] 加鎖 ->pool-1-thread-3 index:2釋放 [a] ->pool-1-thread-3
2.2、同一個 key 可以有 n個線程執(zhí)行
2.2.1 代碼實現(xiàn)
每個 key 對應(yīng)一個 Semaphore ,讓同一個 key 的線程使用 Semaphore 控制同時執(zhí)行的線程數(shù)。
import lombok.SneakyThrows;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class SimultaneousEntriesLockByKey<T> implements LockByKey<T> {
private final Map<T, Semaphore> semaphores = new ConcurrentHashMap<>();
/**
* 最大線程
*/
private int allowed_threads;
public SimultaneousEntriesLockByKey(int allowed_threads) {
this.allowed_threads = allowed_threads;
}
/**
* 加鎖
*/
@Override
public void lock(T key) {
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v);
semaphore.acquireUninterruptibly();
}
/**
* 解鎖
*/
@Override
public void unlock(T key) {
// 如果key為空,直接返回
if (key == null) {
throw new IllegalArgumentException("key 不能為空");
}
// 從Map中獲取鎖對象
Semaphore semaphore = semaphores.get(key);
if (semaphore == null) {
throw new IllegalArgumentException("key " + key + "尚未加鎖");
}
semaphore.release();
if (semaphore.availablePermits() >= allowed_threads) {
semaphores.remove(key, semaphore);
}
}
2.2.2 測試代碼
import com.google.common.collect.Lists;
import org.junit.Test;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SimultaneousEntriesLockByKeyTest {
private final int maxThreadEachKey = 2;
private final LockByKey<String> lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);
private final CountDownLatch countDownLatch = new CountDownLatch(7);
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
@Test
public void test() throws InterruptedException {
List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
Map<String, Integer> executingKeyCount = Collections.synchronizedMap(new HashMap<>());
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
int finalI = i;
executorService.submit(() -> {
lockByKey.lock(key);
executingKeyCount.compute(key, (k, v) -> {
if (v != null && v + 1 > maxThreadEachKey) {
throw new RuntimeException("超過限制了");
}
return v == null ? 1 : v + 1;
});
try {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "對 [" + key + "] 加鎖 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "釋放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
lockByKey.unlock(key);
executingKeyCount.compute(key, (k, v) -> v - 1);
countDownLatch.countDown();
}
});
}
countDownLatch.await();
}
}
輸出:
time:2023-03-15T20:49:57.044195 ,index:6對 [d] 加鎖 ->pool-1-thread-7count:1
time:2023-03-15T20:49:57.058942 ,index:5對 [b] 加鎖 ->pool-1-thread-6count:2
time:2023-03-15T20:49:57.069789 ,index:1對 [a] 加鎖 ->pool-1-thread-2count:2
time:2023-03-15T20:49:57.042402 ,index:4對 [c] 加鎖 ->pool-1-thread-5count:1
time:2023-03-15T20:49:57.046866 ,index:0對 [a] 加鎖 ->pool-1-thread-1count:2
time:2023-03-15T20:49:57.042991 ,index:3對 [b] 加鎖 ->pool-1-thread-4count:2
time:2023-03-15T20:49:58.089557 ,index:0釋放 [a] ->pool-1-thread-1count:1
time:2023-03-15T20:49:58.082679 ,index:6釋放 [d] ->pool-1-thread-7count:0
time:2023-03-15T20:49:58.084579 ,index:4釋放 [c] ->pool-1-thread-5count:0
time:2023-03-15T20:49:58.083462 ,index:5釋放 [b] ->pool-1-thread-6count:1
time:2023-03-15T20:49:58.089576 ,index:3釋放 [b] ->pool-1-thread-4count:1
time:2023-03-15T20:49:58.085359 ,index:1釋放 [a] ->pool-1-thread-2count:1
time:2023-03-15T20:49:58.096912 ,index:2對 [a] 加鎖 ->pool-1-thread-3count:1
time:2023-03-15T20:49:59.099935 ,index:2釋放 [a] ->pool-1-thread-3count:0
三、總結(jié)
本文結(jié)合自己的理解和一些參考代碼,給出自己的示例,希望對大家有幫助。
到此這篇關(guān)于Java 根據(jù)某個 key 加鎖的實現(xiàn)方式的文章就介紹到這了,更多相關(guān)Java根據(jù)某個 key 加鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解SpringBoot中@PostMapping注解的用法
在SpringBoot中,我們經(jīng)常需要編寫RESTful Web服務(wù),以便于客戶端與服務(wù)器之間的通信,@PostMapping注解可以讓我們更方便地編寫POST請求處理方法,在本文中,我們將介紹@PostMapping注解的作用、原理,以及如何在SpringBoot應(yīng)用程序中使用它2023-06-06
Mybatis-plus中IService接口的基本使用步驟
Mybatis-plus是一個Mybatis的增強(qiáng)工具,它提供了很多便捷的方法來簡化開發(fā),IService是Mybatis-plus提供的通用service接口,封裝了常用的數(shù)據(jù)庫操作方法,包括增刪改查等,下面這篇文章主要給大家介紹了關(guān)于Mybatis-plus中IService接口的基本使用步驟,需要的朋友可以參考下2023-06-06
Spring BeanFactory和FactoryBean區(qū)別解析
這篇文章主要介紹了Spring BeanFactory和FactoryBean區(qū)別解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-03-03
Springboot實現(xiàn)Excel批量導(dǎo)入數(shù)據(jù)并保存到本地
這篇文章主要為大家詳細(xì)介紹了Springboot實現(xiàn)Excel批量導(dǎo)入數(shù)據(jù)并將文件保存到本地效果的方法,文中的示例代講解詳細(xì),需要的可以參考一下2022-09-09
解決Spring運行時報錯:Consider defining a bean o
該文章主要講述了在使用Spring框架時,如果遇到某個bean未找到的錯誤,應(yīng)該在配置文件中定義該bean,解決方法是在對應(yīng)的類上添加@Component注解2025-01-01
springboot2啟動時執(zhí)行,初始化(或定時任務(wù))servletContext問題
這篇文章主要介紹了springboot2啟動時執(zhí)行,初始化(或定時任務(wù))servletContext問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01

