PySpark中RDD的數(shù)據(jù)輸出問題詳解

RDD概念
RDD(resilient distributed dataset ,彈性分布式數(shù)據(jù)集),是 Spark 中最基礎(chǔ)的抽象。它表示了一個(gè)可以并行操作的、不可變得、被分區(qū)了的元素集合。用戶不需要關(guān)心底層復(fù)雜的抽象處理,直接使用方便的算子處理和計(jì)算就可以了。
RDD的特點(diǎn)
1) . 分布式 RDD是一個(gè)抽象的概念,RDD在spark driver中,通過RDD來引用數(shù)據(jù),數(shù)據(jù)真正存儲(chǔ)在節(jié)點(diǎn)機(jī)的partition上。
2). 只讀 在Spark中RDD一旦生成了,就不能修改。 那么為什么要設(shè)置為只讀,設(shè)置為只讀的話,因?yàn)椴淮嬖谛薷?,并發(fā)的吞吐量就上來了。
3). 血緣關(guān)系 我們需要對(duì)RDD進(jìn)行一系列的操作,因?yàn)镽DD是只讀的,我們只能不斷的生產(chǎn)新的RDD,這樣,新的RDD與原來的RDD就會(huì)存在一些血緣關(guān)系。
Spark會(huì)記錄這些血緣關(guān)系,在后期的容錯(cuò)上會(huì)有很大的益處。
4). 緩存 當(dāng)一個(gè) RDD 需要被重復(fù)使用時(shí),或者當(dāng)任務(wù)失敗重新計(jì)算的時(shí)候,這時(shí)如果將 RDD 緩存起來,就可以避免重新計(jì)算,保證程序運(yùn)行的性能。
一. 回顧
數(shù)據(jù)輸入:
- sc.parallelize
- sc.textFile
數(shù)據(jù)計(jì)算:
- rdd.map
- rdd.flatMap
- rdd.reduceByKey
- .…

二.輸出為python對(duì)象
數(shù)據(jù)輸出可用的方法是很多的,這里簡(jiǎn)單介紹常會(huì)用到的4個(gè)
- collect:將RDD內(nèi)容轉(zhuǎn)換為list
- reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
- take:取出RDD的前N個(gè)元素組成list
- count:統(tǒng)計(jì)RDD元素個(gè)數(shù)
collect算子
功能:將RDD各個(gè)分區(qū)內(nèi)的數(shù)據(jù),統(tǒng)一收集到Driver中,形成一個(gè)List對(duì)象
用法:
rdd.collect()
返回值是一個(gè)list
演示
from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())結(jié)果是

單獨(dú)輸出rdd,輸出的是rdd的類名而非內(nèi)容
reduce算子
功能:對(duì)RDD數(shù)據(jù)集按照你傳入的邏輯進(jìn)行聚合
語法:

代碼

返回值等于計(jì)算函數(shù)的返回值

演示
from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)結(jié)果是

take算子
功能:取RDD的前N個(gè)元素,組合成list返回給你
用法:

演示
from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個(gè)元素,組成list返回
take_list=rdd.take(3)
print(take_list)結(jié)果是

count算子
功能:計(jì)算RDD有多少條數(shù)據(jù),返回值是一個(gè)數(shù)字
用法:

演示
from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備一個(gè)RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,輸出RDD為list對(duì)象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的類型是:",type(rdd.collect()))
#reduce算子,對(duì)RDD進(jìn)行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n個(gè)元素,組成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,統(tǒng)計(jì)rdd中有多少條數(shù)據(jù),返回值為數(shù)字
num_count=rdd.count()
print(num_count)
#關(guān)閉鏈接
sc.stop()結(jié)果是

小結(jié)
1.Spark的編程流程就是:
- 將數(shù)據(jù)加載為RDD(數(shù)據(jù)輸入)對(duì)RDD進(jìn)行計(jì)算(數(shù)據(jù)計(jì)算)
- 將RDD轉(zhuǎn)換為Python對(duì)象(數(shù)據(jù)輸出)
2.數(shù)據(jù)輸出的方法
- collect:將RDD內(nèi)容轉(zhuǎn)換為list
- reduce:對(duì)RDD內(nèi)容進(jìn)行自定義聚合
- take:取出RDD的前N個(gè)元素組成list
- count:統(tǒng)計(jì)RDD元素個(gè)數(shù)
數(shù)據(jù)輸出可用的方法是很多的,這里只是簡(jiǎn)單介紹4個(gè)
三.輸出到文件中
savaAsTextFile算子
功能:將RDD的數(shù)據(jù)寫入文本文件中支持本地寫出, hdfs等文件系統(tǒng).
代碼:

演示

這是因?yàn)檫@個(gè)方法本質(zhì)上依賴大數(shù)據(jù)的Hadoop框架,需要配置Hadoop 依賴.
配置Hadoop依賴
調(diào)用保存文件的算子,需要配置Hadoop依賴。
- 下載Hadoop安裝包解壓到電腦任意位置
- 在Python代碼中使用os模塊配置: os.environ['HADOOP_HOME']='HADOOP解壓文件夾路徑′。
- 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內(nèi)
- 下載hadoop.dll,并放入:C:/Windows/System32文件夾內(nèi)
配置完成之后,執(zhí)行下面的代碼
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
#準(zhǔn)備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")結(jié)果是

輸出的文件夾中有這么8文件,是因?yàn)镽DD被默認(rèn)為分成8個(gè)分區(qū)
SaveAsTextFile算子輸出文件的個(gè)數(shù)是根據(jù)RDD的分區(qū)來決定的,有多少分區(qū)就會(huì)輸出多少個(gè)文件,RDD在本電腦中默認(rèn)是8(該電腦CPU核心數(shù)是8核)

打開設(shè)備管理器就可以查看處理器個(gè)數(shù),這里是有8個(gè)邏輯CPU
或者打開任務(wù)管理器就可以看到是4核8個(gè)邏輯CPU

修改rdd分區(qū)為1個(gè)
方式1, SparkConf對(duì)象設(shè)置屬性全局并行度為1:

方式2,創(chuàng)建RDD的時(shí)候設(shè)置( parallelize方法傳入numSlices參數(shù)為1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設(shè)置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
#準(zhǔn)備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")結(jié)果是

小結(jié)
1.RDD輸出到文件的方法
- rdd.saveAsTextFile(路徑)
- 輸出的結(jié)果是一個(gè)文件夾
- 有幾個(gè)分區(qū)就輸出多少個(gè)結(jié)果文件
2.如何修改RDD分區(qū)
- SparkConf對(duì)象設(shè)置conf.set("spark.default.parallelism", "7")
- 創(chuàng)建RDD的時(shí)候,sc.parallelize方法傳入numSlices參數(shù)為1
四.練習(xí)案例

需求:
讀取文件轉(zhuǎn)換成RDD,并完成:
- 打印輸出:熱門搜索時(shí)間段(小時(shí)精度)Top3
- 打印輸出:熱門搜索詞Top3
- 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多
- 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫出為文件
代碼
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分區(qū)設(shè)置為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印輸出:熱門搜索時(shí)間段(小時(shí)精度)Top3
# 取出全部的時(shí)間并轉(zhuǎn)換為小時(shí)
# 轉(zhuǎn)換為(小時(shí),1)的二元元組
# Key分組聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
map(lambda x:x[0][:2]).\
map(lambda x:(x,1)).\
reduceByKey(lambda x,y:x+y).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)#上面用的‘/'是換行的意思,當(dāng)一行代碼太長(zhǎng)時(shí)就可以這樣用
print(result1)
#需求2 打印輸出:熱門搜索詞Top3
# 取出全部的搜索詞
# (詞,1)二元元組
# 分組聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
map(lambda x:x[2])\
.map(lambda x:(x,1)).\
reduceByKey(lambda x,y:x+y).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)
print(result2)
#需求3 打印輸出:統(tǒng)計(jì)黑馬程序員關(guān)鍵字在哪個(gè)時(shí)段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
filter((lambda x:x[2]=="黑馬程序員")).\
map(lambda x:(x[0][:2],1)).\
reduceByKey(lambda x,y:x+y).\
sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
take(3)
print(result3)
#需求4 將數(shù)據(jù)轉(zhuǎn)換為JSON格式,寫出為文件
rdd.map(lambda x:x.split("\t")).\
map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
.saveAsTextFile("D:/out_json")
結(jié)果是


到此這篇關(guān)于PySpark中RDD的數(shù)據(jù)輸出詳解的文章就介紹到這了,更多相關(guān)PySpark RDD數(shù)據(jù)輸出內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python中精確輸出JSON浮點(diǎn)數(shù)的方法
這篇文章主要介紹了python中精確輸出JSON浮點(diǎn)數(shù)的方法,需要的朋友可以參考下2014-04-04
詳解Python中import模塊導(dǎo)入的實(shí)現(xiàn)原理
這篇文章主要給大家介紹了Python中import模塊導(dǎo)入的實(shí)現(xiàn)原理,主要從什么是模塊,import搜索路徑以及導(dǎo)入原理這三個(gè)方面給大家介紹,感興趣的小伙伴跟著小編一起來看看吧2023-08-08
Python光學(xué)仿真實(shí)現(xiàn)波長(zhǎng)與顏色之間對(duì)應(yīng)關(guān)系示例解析
這篇文章主要為大家介紹了Python光學(xué)仿真實(shí)現(xiàn)波長(zhǎng)與顏色之間對(duì)應(yīng)關(guān)系的示例解析,有需要的我朋友可以借鑒參考下,希望能夠有所幫助2021-10-10
Python frozenset集合的實(shí)現(xiàn)
frozenset是Python中的不可變集合類型,本文主要介紹了Python frozenset集合的實(shí)現(xiàn), 文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-04-04
Pytorch學(xué)習(xí)之torch用法----比較操作(Comparison Ops)
這篇文章主要介紹了Pytorch學(xué)習(xí)之torch用法----比較操作(Comparison Ops),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06

