深入詳解java高并發(fā)熱點(diǎn)數(shù)據(jù)更新
mysql update的時(shí)候到底是鎖行還是鎖表?
InnoDb 鎖簡(jiǎn)單分類
按照數(shù)據(jù)操作的粒度
1)行級(jí)鎖:鎖住記錄行
2)表級(jí)鎖:鎖住整張表
按照對(duì)數(shù)據(jù)操作的類型
1)讀鎖(共享鎖):針對(duì)同一份數(shù)據(jù),多個(gè)讀操作可以同時(shí)進(jìn)行而不會(huì)互相影響。
2) 寫(xiě)鎖(排它鎖):當(dāng)前操作沒(méi)有完成之前,它會(huì)阻斷其他寫(xiě)鎖和讀鎖。
mysql 在update時(shí)會(huì)根據(jù)where 條件的類型決定鎖行還是鎖表 where的過(guò)濾條件列,如果用索引,鎖行,無(wú)法用索引,鎖表。按照索引規(guī)則,如果能使用索引,鎖行,不能使用索引,鎖表。 行鎖是排他鎖,當(dāng)一條記錄已經(jīng)被一條update語(yǔ)句鎖住時(shí)會(huì)阻斷其他的update操作,在高并發(fā)場(chǎng)景下,對(duì)于熱點(diǎn)數(shù)據(jù)來(lái)說(shuō)會(huì)進(jìn)行頻繁的更新操作造成其他update操作鎖等待超時(shí)請(qǐng)求失敗
背景
以旅游支付場(chǎng)景為例,伴隨著業(yè)務(wù)量的增加,系統(tǒng)的并發(fā)量會(huì)逐漸上升,例如“北京長(zhǎng)城度假區(qū)”的賬戶流水會(huì)變得十分頻繁,每次支付或者退款操作都需要去更新一下賬戶余額,并發(fā)較低時(shí)并不會(huì)有什么問(wèn)題,但當(dāng)旅游高峰期到來(lái)時(shí)并發(fā)量上升,數(shù)據(jù)庫(kù)更新的時(shí)候需要獲得數(shù)據(jù)行鎖,在未釋放這個(gè)行鎖之前,其他事務(wù)只能是等待。
解決方案
1.支付時(shí)異步入賬,退款增加一個(gè)欠款墊資戶
用戶支付入款需要給賬戶加錢(qián)時(shí)此時(shí)商戶對(duì)于資金的實(shí)時(shí)性要求不高,追求準(zhǔn)確性,因此可以將賬戶加款放到異步線程池,達(dá)到錯(cuò)峰的目的 然而當(dāng)用戶發(fā)起退款時(shí),我們必須及時(shí)并且準(zhǔn)確的從賬戶扣款,因此退款采取同步進(jìn)行,退款的訂單相對(duì)于支付來(lái)說(shuō)量就會(huì)少很多,滿足要求。但是存在一個(gè)問(wèn)題高并發(fā)狀態(tài)下某一個(gè)熱點(diǎn)賬戶余額時(shí)刻在變很有可能退款發(fā)起時(shí)賬戶余額充足但是實(shí)際扣除時(shí)由于上一筆支付未入賬,造成金額不足
1.加分布式鎖
對(duì)特定賬戶加鎖,保證某一刻只有一筆退款請(qǐng)求獲得該賬戶的操作權(quán) 弊端:多個(gè)用時(shí)同時(shí)退款時(shí)只有一筆成功,對(duì)用戶不友好,pass掉
2.新增墊資商戶
熱點(diǎn)賬戶增加一個(gè)指定透支額度的墊資戶,實(shí)際賬戶余額不足時(shí)從墊資戶借款,然后定期核對(duì)墊資戶透支額度從實(shí)際賬戶一次性扣款, 推薦
2.合并請(qǐng)求
合并多條需要更新余額的請(qǐng)求
將一段時(shí)間內(nèi)的請(qǐng)求,先進(jìn)行阻塞,合并各個(gè)賬戶需要更新的金額,一次性處理,然后將結(jié)果拆分,喚醒被阻塞的請(qǐng)求

demo實(shí)現(xiàn)
* @Author: xiaokunkun
* @CreateTime: 2023-04-23 14:37
* @Description: 合并更新,可以不捕捉異常報(bào)錯(cuò)后外層調(diào)用方直接捕獲異常事務(wù)回滾
*/
@Service
public class CommodityAmountService {
class Request {
AcctUpdateDto acctUpdateDto;
//預(yù)留字段 可不使用
String atomCode;
//暫定返回結(jié)果為true或者false
CompletableFuture<Boolean> future; // 接受結(jié)果
}
// 積攢請(qǐng)求(每隔N毫秒批量處理一次)
LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
// 定時(shí)任務(wù)的實(shí)現(xiàn),N秒鐘處理一次數(shù)據(jù)
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// 1、取出queue的請(qǐng)求,生成一次合并更新
int size = queue.size();
if (size == 0) {
return;
}
ArrayList<Request> requests = new ArrayList<>();
for (int i = 0; i < size; i++) {
//隊(duì)列出棧
Request request = queue.poll();
requests.add(request);
}
System.out.println("批量處理數(shù)據(jù)量:" + size);
// 2、組裝一個(gè)合并更新 key為賬戶value為sum(amount)
Map<String, Long> amountMap = new HashMap<>();
ArrayList<String> commodityCodes = new ArrayList<>();
for (Request request : requests) {
//todo 根據(jù)accountNo分組
}
for (String key : amountMap.keySet()) {
Long amount = amountMap.get(key);
//update mysql
}
// 3、將結(jié)果響應(yīng) 分發(fā)給每一個(gè)單獨(dú)的用戶請(qǐng)求。由定時(shí)任務(wù)處理線程 --> n個(gè)用戶的請(qǐng)求線程
for (Request request : requests) {
// 將結(jié)果返回到對(duì)應(yīng)的請(qǐng)求線程,只要不報(bào)錯(cuò)此批次全部返回true,否則false
request.future.complete(true);
}}, 0, 1000, TimeUnit.MILLISECONDS);
}
@Autowired
CommodityRemoteService commodityRemoteService;
// 合并金額并更新,多個(gè)用戶請(qǐng)求
public Boolean updateMergeAmount(String movieCode)
throws ExecutionException, InterruptedException {
// 并非立刻發(fā)起接口調(diào)用,請(qǐng)求收集起來(lái),再進(jìn)行
Request request = new Request();
request.atomCode = movieCode;
// 異步編程:獲取異步處理的結(jié)果
CompletableFuture<Boolean> future = new CompletableFuture<>();
request.future = future;
queue.add(request);
return future.get(); // 此處get方法,會(huì)阻塞線程運(yùn)行,直到future有返回
}
}測(cè)試類:
//模擬500的并發(fā)量
public void updateMerge() {
AcctCmdDriver acctCmdDriver = new AcctCmdDriver();
TradeAccntOrderDetail detail = new TradeAccntOrderDetail();
AcctNoInfo acctNoInfo = new AcctNoInfo();
OrderConsist consistForOrder = OrderConsist.newInstance("0200_202304", "trade_accnt_merchant_order");
detail.setConsistForOrder(consistForOrder);
acctCmdDriver.setDetail(detail);
acctCmdDriver.setAcctNoInfo(acctNoInfo);
detail.setAccountCategory(AccountCategoryEnum.MERCHANT);
detail.setAccountNo("02020001010000262977202304");
detail.setAmount(6l);
System.out.println("start build thread" + acctCmdDriver);
Random rand = new Random();
for (int i = 1; i <= 500; i++) {
final String index = "code_" + i;
Thread thread = new Thread(() -> {
try {
System.out.println("amount is:" + detail.getAmount());
countDownLatch.await();
Thread.sleep(rand.nextInt(150));
Boolean res = updateMergeAmountService.mergeUpdate(acctCmdDriver);
System.out.println("current i" + index + "res:" + res);
} catch (InterruptedException e) {
System.out.println("thread error is:" + e);
}
});
thread.start();
// 啟動(dòng)后,倒計(jì)時(shí)器倒計(jì)數(shù)減一,代表又有一個(gè)線程就緒了
countDownLatch.countDown();
}
}以上就是深入詳解java高并發(fā)熱點(diǎn)數(shù)據(jù)更新的詳細(xì)內(nèi)容,更多關(guān)于java高并發(fā)熱點(diǎn)數(shù)據(jù)更新的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring Cache擴(kuò)展功能實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了Spring Cache擴(kuò)展功能實(shí)現(xiàn)解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
Spring注解開(kāi)發(fā)@Bean和@ComponentScan使用案例
這篇文章主要介紹了Spring注解開(kāi)發(fā)@Bean和@ComponentScan使用案例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
java集合類遍歷的同時(shí)如何進(jìn)行刪除操作
這篇文章主要介紹了java集合類遍歷的同時(shí)如何進(jìn)行刪除操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java并發(fā)編程之Semaphore(信號(hào)量)詳解及實(shí)例
這篇文章主要介紹了Java并發(fā)編程之Semaphore(信號(hào)量)詳解及實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-06-06
SpringCloud Ribbon與OpenFeign詳解如何實(shí)現(xiàn)服務(wù)調(diào)用
這篇文章主要介紹了SpringCloud Ribbon與OpenFeign實(shí)現(xiàn)服務(wù)調(diào)用的過(guò)程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-09-09
SpringBoot去除內(nèi)嵌tomcat的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot去除內(nèi)嵌tomcat的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09

