spark?dataframe全局排序id與分組后保留最大值行
正文
作為一個算法工程師,日常學(xué)習(xí)和工作中,不光要 訓(xùn)練模型關(guān)注效果 ,更多的 時間 是在 準(zhǔn)備樣本數(shù)據(jù)與分析數(shù)據(jù) 等,而這些過程 都與 大數(shù)據(jù) spark和hadoop生態(tài) 的若干工具息息相關(guān)。
今天我們就不在更新 機(jī)器學(xué)習(xí) 和 算法模型 相關(guān)的內(nèi)容,分享兩個 spark函數(shù) 吧,以前也在某種場景中使用過但沒有保存收藏,哎?。?事前不搜藏,臨時抱佛腳 的感覺 真是 痛苦,太耽誤干活了 。
so,把這 兩個函數(shù) 記在這里 以備不時 之需~
(1) 得到 spark dataframe 全局排序ID
這個函數(shù)的 應(yīng)用場景 就是:根據(jù)某一列的數(shù)值對 spark 的 dataframe 進(jìn)行排序, 得到全局多分區(qū)排序的全局有序ID,新增一列保存這個rank id ,并且保留別的列的數(shù)據(jù)無變化 。
有用戶會說,這不是很容易嗎 ,直接用 orderBy 不就可以了嗎,但是難點是:orderBy完記錄下全局ID 并且 保持原來全部列的DF數(shù)據(jù) 。
多說無益,遇到這個場景 直接copy 用起來 就知道 有多爽 了,同類問題 我們可以 用下面 這個函數(shù) 解決 ~
scala 寫的 spark 版本代碼:
def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String ="rank_id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}
函數(shù)調(diào)用我們可以用這行代碼調(diào)用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接復(fù)制過去就可以~
python寫的 pyspark 版本代碼:
from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rank_id"):
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
return spark.createDataFrame(new_rdd, new_schema)
調(diào)用 同理 , 這里我就不在進(jìn)行贅述了。
(2)分組后保留最大值行
這個函數(shù)的 應(yīng)用場景 就是: 當(dāng)我們使用 spark 或則 sparkSQL 查找某個 dataframe 數(shù)據(jù)的時候,在某一天里,任意一個用戶可能有多條記錄,我們需要 對每一個用戶,保留dataframe 中 某列值最大 的那行數(shù)據(jù) 。
其中的 關(guān)鍵點 在于:一次性求出對每個用戶分組后,求得每個用戶的多行記錄中,某個值最大的行進(jìn)行數(shù)據(jù)保留 。
當(dāng)然,經(jīng)過 簡單修改代碼,不一定是最大,最小也是可以的,平均都o(jì)k 。
scala 寫的 spark 版本代碼:
// 得到一天內(nèi)一個用戶多個記錄里面時間最大的那行用戶的記錄
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
val w = Window.partitionBy("user_id")
val result_df = raw_df
.withColumn("max_time",functions.max("time").over(w))
.where($"time" === $"max_time")
.drop($"max_time")
python寫的 pyspark 版本代碼:
# pyspark dataframe 某列值最大的元素所在的那一行
# GroupBy 列并過濾 Pyspark 中某列值最大的行
# 創(chuàng)建一個Window 以按A列進(jìn)行分區(qū),并使用它來計算每個組的最大值。然后過濾出行,使 B 列中的值等于最大值
from pyspark.sql import Window
w = Window.partitionBy('user_id')
result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\
.where(fun.col('time') == fun.col('time'))
.drop('max_time')
我們可以看到: 這個函數(shù)的關(guān)鍵就是運用了 spark 的 window 函數(shù) ,靈活運用 威力無窮 哦 !
到這里,spark利器2函數(shù)之dataframe全局排序id與分組后保留最大值行 的全文 就寫完了 ,更多關(guān)于spark dataframe全局排序的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談keras保存模型中的save()和save_weights()區(qū)別
這篇文章主要介紹了淺談keras保存模型中的save()和save_weights()區(qū)別,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05
Python實現(xiàn)將Excel轉(zhuǎn)換為json的方法示例
這篇文章主要介紹了Python實現(xiàn)將Excel轉(zhuǎn)換為json的方法,涉及Python文件讀寫及格式轉(zhuǎn)換相關(guān)操作技巧,需要的朋友可以參考下2017-08-08
Python3+Pygame實現(xiàn)射擊游戲完整代碼
這篇文章主要介紹了Python3+Pygame實現(xiàn)射擊游戲完整代碼,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03
Python?NumPy教程之?dāng)?shù)組的基本操作詳解
Numpy?中的數(shù)組是一個元素表(通常是數(shù)字),所有元素類型相同,由正整數(shù)元組索引。本文將通過一些示例詳細(xì)講一下NumPy中數(shù)組的一些基本操作,需要的可以參考一下2022-08-08

