flink?RichFunction之坑及解決
flink RichFunction之坑
flink的RichMapFunction,RichSinkFunction等,并不能百分百做到每次只open一個數(shù)據(jù)庫連接。
在有些情況下他會一直創(chuàng)建然后銷毀,創(chuàng)建銷毀。
舉例: 重點(diǎn)在第三行的注釋
val value = env.socketTextStream("192.168.13.11", 9090)
val value2 = value.filter(x => {
try {
var a = 1 / 0 //此處若沒有異常處理,任務(wù)不會斷,但是會重復(fù)打開數(shù)據(jù)庫連接
} catch {
case e: Exception =>
}
isInter(x)
}).map(fun = x => {
x.toLong
})
val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
override def extractTimestamp(element: Long): Long = {
println(element + "***************")
element
}
})
try {
var a = 1 / 0
} catch {
case e: Exception =>
}
value1.map(new mymap)
env.execute("test")
}
def isInter(input: String): Boolean = {
val matcher = Pattern.compile("^[0-9]+$").matcher(input)
matcher.find()
}
}
class myRichMapfun6() extends RichMapFunction[ListBuffer[String], Unit] {
var conn: Connection = _
var pst: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
conn = DriverManager.getConnection("jdbc:mysql://xxxxxxx:3306/zzt?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true", "root", "bigdata@mysql")
println(conn)
pst = conn.prepareStatement("insert into testa (str) values (?)")
}
override def close(): Unit = {
conn.close()
pst.close()
}
override def map(in: ListBuffer[String]): Unit = {
pst.setString(1, in.head)
pst.execute()
}
}
所以你是不是覺得那就價(jià)格異常處理不就得了?
NO
再看:

這個時(shí)候,如果傳進(jìn)來line不是數(shù)字或者格式不對,就會觸發(fā)異常,然而此時(shí)就不會像上面那樣幫你解決問題,而是一遍遍創(chuàng)建對象銷毀對象,一條消息創(chuàng)建一個連接,我就問你慌不慌,
原因
據(jù)觀察是因?yàn)椋斎氲臄?shù)據(jù)有問題,直接導(dǎo)致
val value1 = value2.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Long](Time.seconds(1)) {
override def extractTimestamp(element: Long): Long = {
println(element + "***************")
element
}
})
這個崩潰了,不走這行代碼了,沒有獲得eventime,然后估計(jì)。。。 剩下的我也沒詳細(xì)測。。。
解決方案
先fiiter過濾任何可能導(dǎo)致異常的臟數(shù)據(jù)確保數(shù)據(jù)都沒問題就可以了。
flink中RichFunction的一點(diǎn)小作用
①傳遞參數(shù)
所有需要用戶定義的函數(shù)都可以轉(zhuǎn)換成richfunction,例如實(shí)現(xiàn)map operator中你需要實(shí)現(xiàn)一個內(nèi)部類,并實(shí)現(xiàn)它的map方法:
data.map (new MapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});然后我們可以將其轉(zhuǎn)換為RichMapFunction:
data.map (new RichMapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});當(dāng)然,RichFuction除了提供原來MapFuction的方法之外,還提供open, close, getRuntimeContext 和setRuntimeContext方法,這些功能可用于參數(shù)化函數(shù)(傳遞參數(shù)),創(chuàng)建和完成本地狀態(tài),訪問廣播變量以及訪問運(yùn)行時(shí)信息以及有關(guān)迭代中的信息。
下面我們來看看RichFuction中傳遞參數(shù)的例子,以下代碼是測試RichFilterFuction的例子,基于DataSet而非DataStream。

由代碼可見,可以將Configuration中的limit參數(shù)的值傳遞進(jìn)RichFuction里面,通過后面withParameters方法傳遞進(jìn)去,最后的結(jié)果是

由此可見,我從configuration中獲取了limit的值,并設(shè)定了fliter的閾值是2,從而過濾了1,2。
②傳遞廣播變量
原理和上面差不多,下面我直接把代碼貼出來:

這是目前我學(xué)習(xí)到的RichFunction的用法,和大家分享一下。
總結(jié)
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot使用@Scheduled實(shí)現(xiàn)定時(shí)任務(wù)的并行執(zhí)行
在SpringBoot中,如果使用@Scheduled注解來定義多個定時(shí)任務(wù),默認(rèn)情況下這些任務(wù)將會被安排在一個單線程的調(diào)度器中執(zhí)行,這意味著,這些任務(wù)將會串行執(zhí)行,而不是并行執(zhí)行,本文介紹了SpringBoot使用@Scheduled實(shí)現(xiàn)定時(shí)任務(wù)的并行執(zhí)行,需要的朋友可以參考下2024-06-06
Java Map 通過 key 或者 value 過濾的實(shí)例代碼
這篇文章主要介紹了Java Map 通過 key 或者 value 過濾的實(shí)例代碼,非常不錯,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-06-06
Eclipse轉(zhuǎn)Itellij IDEA導(dǎo)入Git/svn本地項(xiàng)目的詳細(xì)步驟
這篇文章主要介紹了Eclipse轉(zhuǎn)Itellij IDEA導(dǎo)入Git/svn本地項(xiàng)目,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Springboot 項(xiàng)目讀取Resources目錄下的文件(推薦)
這篇文章主要介紹了Springboot 項(xiàng)目讀取Resources目錄下的文件,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11
JavaWeb基礎(chǔ)教程之Java基礎(chǔ)加強(qiáng)版
這篇文章主要介紹了JavaWeb基礎(chǔ)教程之Java基礎(chǔ)加強(qiáng)版的相關(guān)資料,需要的朋友可以參考下2016-07-07

