Spark調(diào)優(yōu)多線程并行處理任務(wù)實(shí)現(xiàn)方式
方式1:
1. 明確 Spark中Job 與 Streaming中 Job 的區(qū)別
1.1 Spark Core
一個(gè) RDD DAG Graph 可以生成一個(gè)或多個(gè) Job(Action操作)
一個(gè)Job可以認(rèn)為就是會(huì)最終輸出一個(gè)結(jié)果RDD的一條由RDD組織而成的計(jì)算
Job在spark里應(yīng)用里是一個(gè)被調(diào)度的單位
1.2 Streaming
一個(gè) batch 的數(shù)據(jù)對(duì)應(yīng)一個(gè) DStreamGraph
而一個(gè) DStreamGraph 包含一或多個(gè)關(guān)于 DStream 的輸出操作
每一個(gè)輸出對(duì)應(yīng)于一個(gè)Job,一個(gè) DStreamGraph 對(duì)應(yīng)一個(gè)JobSet,里面包含一個(gè)或多個(gè)Job
2. Streaming Job的并行度
Job的并行度由兩個(gè)配置決定:
spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs
一個(gè) Batch 可能會(huì)有多個(gè) Action 執(zhí)行,比如注冊(cè)了多個(gè) Kafka 數(shù)據(jù)流,每個(gè)Action都會(huì)產(chǎn)生一個(gè)Job
所以一個(gè) Batch 有可能是一批 Job,也就是 JobSet 的概念
這些 Job 由 jobExecutor 依次提交執(zhí)行
而 JobExecutor 是一個(gè)默認(rèn)池子大小為1的線程池,所以只能執(zhí)行完一個(gè)Job再執(zhí)行另外一個(gè)Job
這里說(shuō)的池子,大小就是由spark.streaming.concurrentJobs 控制的
concurrentJobs 決定了向 Spark Core 提交Job的并行度
提交一個(gè)Job,必須等這個(gè)執(zhí)行完了,才會(huì)提交第二個(gè)
假設(shè)我們把它設(shè)置為2,則會(huì)并發(fā)的把 Job 提交給 Spark Core
Spark 有自己的機(jī)制決定如何運(yùn)行這兩個(gè)Job,這個(gè)機(jī)制其實(shí)就是FIFO或者FAIR(決定了資源的分配規(guī)則)
默認(rèn)是 FIFO,也就是先進(jìn)先出,把 concurrentJobs 設(shè)置為2,但是如果底層是FIFO,那么會(huì)優(yōu)先執(zhí)行先提交的Job
雖然如此,如果資源夠兩個(gè)job運(yùn)行,還是會(huì)并行運(yùn)行兩個(gè)Job
Spark Streaming 不同Batch任務(wù)可以并行計(jì)算么 https://developer.aliyun.com/article/73004
conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs", "3") //job 并行對(duì)
conf.set("spark.scheduler.mode", "FIFO")
val sc = new StreamingContext(conf, Seconds(5))
你會(huì)發(fā)現(xiàn),不同batch的job其實(shí)也可以并行運(yùn)行的,這里需要有幾個(gè)條件:
有延時(shí)發(fā)生了,batch無(wú)法在本batch完成
concurrentJobs > 1
如果scheduler mode 是FIFO則需要某個(gè)Job無(wú)法一直消耗掉所有資源
Mode是FAIR則盡力保證你的Job是并行運(yùn)行的,毫無(wú)疑問(wèn)是可以并行的。
方式2:
場(chǎng)景1:
程序每次處理的數(shù)據(jù)量是波動(dòng)的,比如周末比工作日多很多,晚八點(diǎn)比凌晨四點(diǎn)多很多。
一個(gè)spark程序處理的時(shí)間在1-2小時(shí)波動(dòng)是OK的。而spark streaming程序不可以,如果每次處理的時(shí)間是1-10分鐘,就很蛋疼。
設(shè)置10分鐘吧,實(shí)際上10分鐘的也就那一段高峰時(shí)間,如果設(shè)置每次是1分鐘,很多時(shí)候會(huì)出現(xiàn)程序處理不過(guò)來(lái),排隊(duì)過(guò)多的任務(wù)延遲更久,還可能出現(xiàn)程序崩潰的可能。
場(chǎng)景2:
- 程序需要處理的相似job數(shù)隨著業(yè)務(wù)的增長(zhǎng)越來(lái)越多
- 我們知道spark的api里無(wú)相互依賴的stage是并行處理的,但是job之間是串行處理的。
- spark程序通常是離線處理,比如T+1之類的延遲,時(shí)間變長(zhǎng)是可以容忍的。而spark streaming是準(zhǔn)實(shí)時(shí)的,如果業(yè)務(wù)增長(zhǎng)導(dǎo)致延遲增加就很不合理。
spark雖然是串行執(zhí)行job,但是是可以把job放到線程池里多線程執(zhí)行的。如何在一個(gè)SparkContext中提交多個(gè)任務(wù)
DStream.foreachRDD{
rdd =>
//創(chuàng)建線程池
val executors=Executors.newFixedThreadPool(rules.length)
//將規(guī)則放入線程池
for( ru <- rules){
val task= executors.submit(new Callable[String] {
override def call(): String ={
//執(zhí)行規(guī)則
runRule(ru,spark)
}
})
}
//每次創(chuàng)建的線程池執(zhí)行完所有規(guī)則后shutdown
executors.shutdown()
}
注意點(diǎn)
1.最后需要executors.shutdown()。
- 如果是executors.shutdownNow()會(huì)發(fā)生未執(zhí)行完的task強(qiáng)制關(guān)閉線程。
- 如果使用executors.awaitTermination()則會(huì)發(fā)生阻塞,不是我們想要的結(jié)果。
- 如果沒(méi)有這個(gè)shutdowm操作,程序會(huì)正常執(zhí)行,但是長(zhǎng)時(shí)間會(huì)產(chǎn)生大量無(wú)用的線程池,因?yàn)槊看蝔oreachRDD都會(huì)創(chuàng)建一個(gè)線程池。
2.可不可以將創(chuàng)建線程池放到foreachRDD外面?
不可以,這個(gè)關(guān)系到對(duì)于scala閉包到理解,經(jīng)測(cè)試,第一次或者前幾次batch是正常的,后面的batch無(wú)線程可用。
3.線程池executor崩潰了就會(huì)導(dǎo)致數(shù)據(jù)丟失
原則上是這樣的,但是正常的代碼一般不會(huì)發(fā)生executor崩潰。至少我在使用的時(shí)候沒(méi)遇到過(guò)。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Spring boot @RequestBody數(shù)據(jù)傳遞過(guò)程詳解
這篇文章主要介紹了Spring boot @RequestBody數(shù)據(jù)傳遞過(guò)程詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
JsonFormat與@DateTimeFormat注解實(shí)例解析
這篇文章主要介紹了JsonFormat與@DateTimeFormat注解實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
詳解阿里云maven鏡像庫(kù)配置(gradle,maven)
這篇文章主要介紹了詳解阿里云maven鏡像庫(kù)配置(gradle,maven),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-02-02
IntelliJ IDEA2021.1 配置大全(超詳細(xì)教程)
這篇文章主要介紹了IntelliJ IDEA2021.1 配置大全(超詳細(xì)教程),需要的朋友可以參考下2021-04-04
springmvc接口接收參數(shù)與請(qǐng)求參數(shù)格式的整理
這篇文章主要介紹了springmvc接口接收參數(shù)與請(qǐng)求參數(shù)格式的整理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
利用EasyPOI實(shí)現(xiàn)多sheet和列數(shù)的動(dòng)態(tài)生成
EasyPoi功能如同名字,主打的功能就是容易,讓一個(gè)沒(méi)見(jiàn)接觸過(guò)poi的人員就可以方便的寫出Excel導(dǎo)出,Excel導(dǎo)入等功能,本文主要來(lái)講講如何利用EasyPOI實(shí)現(xiàn)多sheet和列數(shù)的動(dòng)態(tài)生成,需要的可以了解下2025-03-03
Java開(kāi)發(fā)神器Lombok安裝與使用詳解
Lombok的安裝分兩部分:Idea插件的安裝和maven中pom文件的導(dǎo)入,本文重點(diǎn)給大家介紹Java開(kāi)發(fā)神器Lombok安裝與使用詳解,感興趣的朋友跟隨小編一起看看吧2022-02-02
Spring?Boot實(shí)現(xiàn)微信掃碼登錄功能流程分析
這篇文章主要介紹了Spring?Boot?實(shí)現(xiàn)微信掃碼登錄功能,介紹了授權(quán)流程代碼和用戶登錄和登出的操作代碼,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-04-04
Java 中 getClass() 方法的使用與原理深入分析(對(duì)象類型信息)
在 Java 編程中,getClass() 是一個(gè)非常重要的方法,它用于獲取對(duì)象的運(yùn)行時(shí)類信息,無(wú)論是調(diào)試代碼、反射操作,還是類型檢查,getClass() 都扮演著關(guān)鍵角色,本文將深入探討 getClass() 的使用方法、底層原理以及實(shí)際應(yīng)用場(chǎng)景,感興趣的朋友一起看看吧2024-12-12

