詳解Spark?Sql在UDF中如何引用外部數(shù)據(jù)
前言
Spark Sql可以通過UDF來對(duì)DataFrame的Column進(jìn)行自定義操作。在特定場(chǎng)景下定義UDF可能需要用到Spark Context以外的資源或數(shù)據(jù)。比如從List或Map中取值,或是通過連接池從外部的數(shù)據(jù)源中讀取數(shù)據(jù),然后再參與Column的運(yùn)算。
Excutor中每個(gè)task的工作線程都會(huì)對(duì)UDF的call進(jìn)行調(diào)用,外部資源的使用發(fā)生在Excutor端,而資源加載既能發(fā)生在Driver端,也可以發(fā)生在Excutor端。如果外部資源對(duì)象能序列化,我們可以在Driver端進(jìn)行初始化,然后廣播(broadcast)到Excutor端參與運(yùn)算。對(duì)于不能進(jìn)行序列化的對(duì)象,如JedisPool(redis連接池),只能在Excutor端進(jìn)行初始化。
因此,在UDF中引用外部資源有以下兩類方法:
- 能序列化:在Driver端進(jìn)行初始化,然后通過spark的broadcast方法廣播到Excutor上進(jìn)行使用;
- 不能序列化:在Excutor端進(jìn)行初始化然后使用。
下面我們將用一個(gè)實(shí)際例子對(duì)上述兩種方法進(jìn)行詳細(xì)介紹。
本文使用環(huán)境:Spark-2.3.0,Java 8。
場(chǎng)景介紹
我們以一個(gè)DataFrame(兩個(gè)字段node_1、node_2)作為原始數(shù)據(jù);一棵二叉搜索樹(BST)作為Spark外部被引用數(shù)據(jù);目標(biāo)是定義一個(gè)UDF來判斷:BST中是否剛好存在一個(gè)父節(jié)點(diǎn),它的左右子節(jié)點(diǎn)值與node_1、node_2兩個(gè)字段值相同。然后將判斷結(jié)果輸出到新列is_bro。其中DataFrame:

BST:

輸出DataFrame:

二叉樹的定義與判斷是否為父節(jié)點(diǎn)的左右子節(jié)點(diǎn)的邏輯如下:
import java.io.Serializable;
/**
* @author wangjiahui
* @create 2021-03-14-10:57
*/
public class TreeNode implements Serializable{
private Integer val;
private TreeNode left;
private TreeNode right;
public TreeNode() {
}
public TreeNode(Integer val) {
this.val = val;
}
public TreeNode(Integer val, TreeNode left, TreeNode right) {
this.val = val;
this.left = left;
this.right = right;
}
public Integer getVal() {
return val;
}
public void setVal(Integer val) {
this.val = val;
}
public TreeNode getLeft() {
return left;
}
public void setLeft(TreeNode left) {
this.left = left;
}
public TreeNode getRight() {
return right;
}
public void setRight(TreeNode right) {
this.right = right;
}
/**
* 判斷是否剛好有一個(gè)父節(jié)點(diǎn)的左、右子節(jié)點(diǎn)值與num1、num2相同
* @param num1
* @param num2
* @return
*/
public Boolean isBro( Integer num1, Integer num2) {
if (null == getLeft()||null == getRight()) {
return false;
}
if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
return true;
}
return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
}
}
生成上圖所示BST的方法createTree()如下:
public static TreeNode createTree(){
TreeNode[] treeNodes = new TreeNode[8];
for(int i=1; i<=7; i++){
treeNodes[i] = new TreeNode(i);
}
treeNodes[2].setLeft(treeNodes[1]);
treeNodes[2].setRight(treeNodes[3]);
treeNodes[6].setLeft(treeNodes[5]);
treeNodes[6].setRight(treeNodes[7]);
treeNodes[4].setLeft(treeNodes[2]);
treeNodes[4].setRight(treeNodes[6]);
return treeNodes[4];
}
方法一 Driver端加載
在Driver端完成初始化并定義UDF
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
// 初始化樹
TreeNode tree = createTree();
// broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
// lambda表達(dá)式定義udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
// 注冊(cè)u(píng)df
spark.udf().register("isBro",udf);
// 使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
方法二 Excutor端加載
如果我們直接在call中進(jìn)行初始化會(huì)存在問題:由于多個(gè)task的線程會(huì)在同一時(shí)刻對(duì)UDF中的call進(jìn)行調(diào)用,導(dǎo)致資源對(duì)象在同一時(shí)刻被初始化多次,造成Excutor內(nèi)存資源浪費(fèi)。此外,如果外部資源為連接池對(duì)象,在同一時(shí)刻初始化多次會(huì)建立多個(gè)連接,增加外部數(shù)據(jù)源的訪問壓力。
為此,我們可以借助單例模式中的懶漢式實(shí)現(xiàn),讓資源在每個(gè)Excutor中只被初始化一次。懶漢式的實(shí)現(xiàn)需要新建一個(gè)類(命名為IsBroUDF2)并實(shí)現(xiàn)UDF2<Integer, Integer, Boolean>接口,重寫UDF2的call方法:
import org.apache.spark.sql.api.java.UDF2;
/**
* @author wangjiahui
* @create 2021-03-14-14:25
*/
public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
// 定義靜態(tài)的TreeNode成員變量
private static volatile TreeNode treeNode;
public IsBroUDF2() {
}
@Override
public Boolean call(Integer num1, Integer num2) throws Exception {
// 懶漢式 二次判定
if(null==treeNode){
synchronized (IsBroUDF2.class){
if(null==treeNode){
treeNode=createTree();
}
}
}
return treeNode.isBro(num1,num2);
}
// 輔助方法
public static TreeNode createTree(){
TreeNode[] treeNodes = new TreeNode[8];
for(int i=1; i<=7; i++){
treeNodes[i] = new TreeNode(i);
}
treeNodes[2].setLeft(treeNodes[1]);
treeNodes[2].setRight(treeNodes[3]);
treeNodes[6].setLeft(treeNodes[5]);
treeNodes[6].setRight(treeNodes[7]);
treeNodes[4].setLeft(treeNodes[2]);
treeNodes[4].setRight(treeNodes[6]);
return treeNodes[4];
}
}
然后注冊(cè)和使用UDF
// 注冊(cè)u(píng)df
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
// 使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));
在call方法中通過加鎖可以實(shí)現(xiàn)TreeNode資源在同一個(gè)Excutor中只被初始化一次。除了上面介紹的這種懶漢式的寫法之外,還可以通過靜態(tài)內(nèi)部類懶加載、枚舉等方式實(shí)現(xiàn)TreeNode資源在Excutor端只被初始化一次。
小結(jié)
想要在Spark Sql的UDF中使用Spark外的資源和數(shù)據(jù)進(jìn)行運(yùn)算,我們既可以在Driver端預(yù)先進(jìn)行初始化然后廣播到各Excutor上(要求對(duì)象能序列化),也可以直接在Excutor端進(jìn)行加載;如果在Excutor端加載要保證外部資源對(duì)象只被初始化一次。
以上就是詳解Spark Sql在UDF中如何引用外部數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Spark Sql UDF引用外部數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Idea進(jìn)行pull的時(shí)候Your local changes would be
這篇文章主要介紹了Idea進(jìn)行pull的時(shí)候Your local changes would be overwritten by merge.具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
Spring中的Aware接口及應(yīng)用場(chǎng)景詳解
這篇文章主要介紹了Spring中的Aware接口及應(yīng)用場(chǎng)景,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
Java多線程并發(fā)開發(fā)之DelayQueue使用示例
這篇文章主要為大家詳細(xì)介紹了Java多線程并發(fā)開發(fā)之DelayQueue使用示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-09-09
java模擬http請(qǐng)求的錯(cuò)誤問題整理
本文是小編給大家整理的在用java模擬http請(qǐng)求的時(shí)候遇到的錯(cuò)誤問題整理,以及相關(guān)分析,有興趣的朋友參考下。2018-05-05
SpringBoot如何獲取當(dāng)前操作用戶的id/信息
在一般性的基設(shè)需求中,有需要獲取當(dāng)前用戶操作記錄的情況,也就是說我們需要記錄當(dāng)前用戶的信息,如:id、昵稱、賬號(hào)等信息,這篇文章主要介紹了SpringBoot獲取當(dāng)前操作用戶的id/信息,需要的朋友可以參考下2023-10-10
Java中字節(jié)流和字符流的區(qū)別與聯(lián)系
Java中的字節(jié)流和字符流是用于處理輸入和輸出的兩種不同的流,本文主要介紹了Java中字節(jié)流和字符流的區(qū)別與聯(lián)系,字節(jié)流以字節(jié)為單位進(jìn)行讀寫,適用于處理二進(jìn)制數(shù)據(jù),本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2024-12-12
如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類
Axis是一個(gè)基于Java的Web服務(wù)框架,可以用來調(diào)用Web服務(wù)接口,下面這篇文章主要給大家介紹了關(guān)于如何使用axis調(diào)用WebService及Java?WebService調(diào)用工具類的相關(guān)資料,需要的朋友可以參考下2023-04-04

