Java多線程編程實戰(zhàn)之模擬大量數(shù)據(jù)同步
背景
最近對于 Java 多線程做了一段時間的學習,筆者一直認為,學習東西就是要應用到實際的業(yè)務需求中的。否則要么無法深入理解,要么硬生生地套用技術只是達到炫技的效果。
不過筆者仍舊認為自己對于多線程掌握不夠熟練,不敢輕易應用到生產(chǎn)代碼中。這就按照平時工作中遇到的實際問題,腦補了一個很可能存在的業(yè)務場景:
已知某公司管理著 1000 個微信服務號,每個服務號有 1w ~ 50w 粉絲不等。假設該公司每天都需要將所有微信服務號的粉絲數(shù)據(jù)通過調(diào)用微信 API 的方式更新到本地數(shù)據(jù)庫。
需求分析
對此需求進行分析,主要存在以下問題:
- 單個服務號獲取粉絲 id,只能每次 1w 按順序拉取
- 微信的 API 對于服務商的并發(fā)請求數(shù)量有限制
單個服務號獲取粉絲 id,只能每次 1w 按順序拉取。這個問題決定了單個公眾號在拉取粉絲 id 上,無法分配給多個線程執(zhí)行。
微信的 API 對于服務商的并發(fā)請求數(shù)量有限制。這點最容易被忽略,如果我們同時有過多的請求,則會導致接口被封禁。這里可以通過信號量來控制同時執(zhí)行的線程數(shù)量。
為了盡快完成數(shù)據(jù)同步,根據(jù)實際情況:整個數(shù)據(jù)同步可分為讀數(shù)據(jù)和寫數(shù)據(jù)兩個部分。讀數(shù)據(jù)是通過 API 獲取,走網(wǎng)絡 IO,速度較慢;寫數(shù)據(jù)是寫到數(shù)據(jù)庫,速度較快。所以得出結(jié)論:需要分配較多的線程進行讀數(shù)據(jù),較少的線程進行寫數(shù)據(jù)。
設計要點
首先,我們需要確定開啟多少個線程(在生產(chǎn)中往往是使用線程池),線程數(shù)量需要根據(jù)服務器性能來決定,這里我們定為 40 個讀取數(shù)據(jù)線程(將 1000 個公眾號分為 40 份,分別在 40 個線程中執(zhí)行),1個寫入數(shù)據(jù)線程。(具體開多少個線程,取決于線程池的容量,以及可以分配給此業(yè)務的數(shù)量。具體的數(shù)字需要根據(jù)實際情況測試得出,比服務器閾值低一些較好。當然,配置允許范圍內(nèi)越大越好)
其次,考慮到微信對于 API 并發(fā)請求的限制,需要限制同時執(zhí)行的線程數(shù),使用java.util.concurrent.Semaphore進行控制,這里我們限制為 20 個(具體的信號量憑證數(shù),取決于同一時間能夠執(zhí)行的線程,跟 API 限制,服務器性能有關)。
然后,我們需要知道數(shù)據(jù)何時讀取、寫入完畢,以控制程序邏輯以及終止程序,這里我們使用java.util.concurrent.CountDownLatch進行控制。
最后,我們需要一個數(shù)據(jù)結(jié)構,用來在多個線程共享處理的數(shù)據(jù),此處同步數(shù)據(jù)的場景非常適合使用隊列,這里我們使用線程安全的java.util.concurrent.ConcurrentLinkedQueue來進行處理。(需要注意的是,在實際開發(fā)中,隊列不能夠無限制地增長,這將會很快消耗掉內(nèi)存,我們需要根據(jù)實際情況對隊列長度做控制。例如,可以通過控制讀取線程數(shù)和寫入線程數(shù)的比例來控制隊列的長度)
模擬代碼
由于本文重點關注多線程的使用,模擬代碼只體現(xiàn)多線程操作的方法。代碼里添加了大量的注釋,方便各位讀者閱讀理解。
JDK:1.8
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* N個線程向隊列添加數(shù)據(jù)
* 一個線程消費隊列數(shù)據(jù)
*/
public class QueueTest {
private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");
private static final int OFFER_COUNT = 40; // 開啟的線程數(shù)量
private static Semaphore semaphore = new Semaphore(20); // 同一時間執(zhí)行的線程數(shù)量(大多用于控制API調(diào)用次數(shù)或數(shù)據(jù)庫查詢連接數(shù))
public static void main(String[] args) throws InterruptedException {
Queue<String> queue = new ConcurrentLinkedQueue<>(); // 處理隊列,需要處理的數(shù)據(jù),放置到此隊列中
CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer線程latch,每完成一個,latch減一,lacth的count為0時表示offer處理完畢
CountDownLatch pollLatch = new CountDownLatch(1); // poll線程latch,latch的count為0時,表示poll處理完畢
Runnable offerRunnable = () -> {
try {
semaphore.acquire(); // 信號量控制
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
for (String datum : data) {
queue.offer(datum);
TimeUnit.SECONDS.sleep(2); // 模擬取數(shù)據(jù)很慢的情況
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 在finally中執(zhí)行l(wèi)atch.countDown()以及信號量釋放,避免因異常導致沒有正常釋放
offerLatch.countDown();
semaphore.release();
}
};
Runnable pollRunnable = () -> {
int count = 0;
try {
while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未執(zhí)行完,或queue仍舊有數(shù)據(jù),則繼續(xù)循環(huán)
String poll = queue.poll();
if (poll != null) {
System.out.println(poll);
count++;
}
// 無論是否poll到數(shù)據(jù),均暫停一小段時間,可降低CPU消耗
TimeUnit.MILLISECONDS.sleep(100);
}
System.out.println("total count:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 在finally中執(zhí)行l(wèi)atch.countDown(),避免因異常導致沒有正常釋放
pollLatch.countDown();
}
};
// 啟動線程(生產(chǎn)環(huán)境中建議使用線程池)
new Thread(pollRunnable).start(); // 啟動一個poll線程
for (int i = 0; i < OFFER_COUNT; i++) {
new Thread(offerRunnable).start();
} // 模擬取數(shù)據(jù)很慢,需要開啟40個線程處理
// latch等待,會block主線程直到latch的count為0
offerLatch.await();
pollLatch.await();
System.out.println("===the end===");
}
}到這里,本文結(jié)束。以上是筆者腦補的一個常見需求的解決方案。
注意:多線程編程對實際環(huán)境和需求有很大的依賴,需要根據(jù)實際的需求情況對各個參數(shù)做調(diào)整。實際在使用中,需要盡量模擬生產(chǎn)環(huán)境的數(shù)據(jù)情況來進行測試,對服務器執(zhí)行期間的并發(fā)數(shù),CPU、內(nèi)存、網(wǎng)絡 IO、磁盤 IO 做好觀察。并適當?shù)卣{(diào)低并發(fā)數(shù),以給服務器留有處理其他請求的余量。
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
windows下zookeeper配置java環(huán)境變量的方法
今天小編就為大家分享一篇關于windows下zookeeper配置java環(huán)境變量的方法,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03
Java多線程編程中synchronized關鍵字的基礎用法講解
Java的synchronized關鍵字用于修飾線程同步,用以線程資源共享的目的等,下面就帶來簡單的Java多線程編程中synchronized關鍵字的基礎用法講解2016-06-06

