Spark SQL 2.4.8 操作 Dataframe的兩種方式
一、測(cè)試數(shù)據(jù)
7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10
二、創(chuàng)建DataFrame
方式一:DSL方式操作
- 實(shí)例化SparkContext和SparkSession對(duì)象
- 利用StructType類(lèi)型構(gòu)建schema,用于定義數(shù)據(jù)的結(jié)構(gòu)信息
- 通過(guò)SparkContext對(duì)象讀取文件,生成RDD
- 將RDD[String]轉(zhuǎn)換成RDD[Row]
- 通過(guò)SparkSession對(duì)象創(chuàng)建dataframe
- 完整代碼如下:
package com.scala.demo.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
object Demo01 {
def main(args: Array[String]): Unit = {
// 1.創(chuàng)建SparkContext和SparkSession對(duì)象
val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]"))
val sparkSession = SparkSession.builder().getOrCreate()
// 2. 使用StructType來(lái)定義Schema
val mySchema = StructType(List(
StructField("empno", DataTypes.IntegerType, false),
StructField("ename", DataTypes.StringType, false),
StructField("job", DataTypes.StringType, false),
StructField("mgr", DataTypes.StringType, false),
StructField("hiredate", DataTypes.StringType, false),
StructField("sal", DataTypes.IntegerType, false),
StructField("comm", DataTypes.StringType, false),
StructField("deptno", DataTypes.IntegerType, false)
))
// 3. 讀取數(shù)據(jù)
val empRDD = sc.textFile("file:///D:\\TestDatas\\emp.csv")
// 4. 將其映射成ROW對(duì)象
val rowRDD = empRDD.map(line => {
val strings = line.split(",")
Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt,strings(6), strings(7).toInt)
})
// 5. 創(chuàng)建DataFrame
val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema)
// 6. 展示內(nèi)容 DSL
dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show()
}
}
結(jié)果如下:
方式二:SQL方式操作
- 實(shí)例化SparkContext和SparkSession對(duì)象
- 創(chuàng)建case class Emp樣例類(lèi),用于定義數(shù)據(jù)的結(jié)構(gòu)信息
- 通過(guò)SparkContext對(duì)象讀取文件,生成RDD[String]
- 將RDD[String]轉(zhuǎn)換成RDD[Emp]
- 引入spark隱式轉(zhuǎn)換函數(shù)(必須引入)
- 將RDD[Emp]轉(zhuǎn)換成DataFrame
- 將DataFrame注冊(cè)成一張視圖或者臨時(shí)表
- 通過(guò)調(diào)用SparkSession對(duì)象的sql函數(shù),編寫(xiě)sql語(yǔ)句
- 停止資源
- 具體代碼如下:
package com.scala.demo.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
// 0. 數(shù)據(jù)分析
// 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
// 1. 定義Emp樣例類(lèi)
case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int)
object Demo02 {
def main(args: Array[String]): Unit = {
// 2. 讀取數(shù)據(jù)將其映射成Row對(duì)象
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02"))
val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv")
.map(_.split(","))
val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt))
// 3。創(chuàng)建dataframe
val spark = SparkSession.builder().getOrCreate()
// 引入spark隱式轉(zhuǎn)換函數(shù)
import spark.implicits._
// 將RDD轉(zhuǎn)成Dataframe
val dataFrame = rowRDD.toDF
// 4.2 sql語(yǔ)句操作
// 1、將dataframe注冊(cè)成一張臨時(shí)表
dataFrame.createOrReplaceTempView("emp")
// 2. 編寫(xiě)sql語(yǔ)句進(jìn)行操作
spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show()
// 關(guān)閉資源
spark.stop()
sc.stop()
}
}
結(jié)果如下:

到此這篇關(guān)于Spark SQL 2.4.8 操作 Dataframe的兩種方式的文章就介紹到這了,更多相關(guān)Spark SQL 操作 Dataframe內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
T-SQL 查詢(xún)語(yǔ)句的執(zhí)行順序解析
查詢(xún)語(yǔ)句大家用的很多,但是知道語(yǔ)句執(zhí)行的順序和各各階段的作用的人卻很少。這里給大家簡(jiǎn)單介紹一下2011-10-10
在SQL Server中使用命令調(diào)用SSIS包的具體方法
在SQL Server中可以使用dtexec命令運(yùn)行SSIS包(2005以上版本),當(dāng)然也可以通過(guò)系統(tǒng)過(guò)程:xp_cmdshell調(diào)用dtexec運(yùn)行SSIS包2013-09-09
SQLServer2016 sa登錄失敗(錯(cuò)誤代碼18456)
18456錯(cuò)誤是因密碼或用戶(hù)名錯(cuò)誤而使身份驗(yàn)證失敗并導(dǎo)致連接嘗試被拒或者賬戶(hù)被鎖定無(wú)法sa登錄,本文就來(lái)介紹一下解決方法,感興趣的可以了解一下2023-09-09
使用SQL語(yǔ)句將相同名的多行字段內(nèi)容拼接(方法詳解)
這篇文章主要介紹了使用SQL語(yǔ)句將相同名的多行字段內(nèi)容拼接起來(lái),可以使用GROUP_CONCAT函數(shù)來(lái)實(shí)現(xiàn)相同名稱(chēng)的多行字段內(nèi)容拼接,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-05-05
SQL Server數(shù)據(jù)誤刪的恢復(fù)和備份流程
在日常的數(shù)據(jù)庫(kù)管理中,數(shù)據(jù)的誤刪操作是難以避免的,為了確保數(shù)據(jù)的安全性和完整性,我們必須采取一些措施來(lái)進(jìn)行數(shù)據(jù)的備份和恢復(fù),本文將詳細(xì)介紹如何在 SQL Server 中進(jìn)行數(shù)據(jù)的備份和恢復(fù)操作,特別是在發(fā)生數(shù)據(jù)誤刪的情況下,需要的朋友可以參考下2024-07-07
SQL?Server時(shí)間轉(zhuǎn)換3種方法總結(jié)
SQL?Server中處理日期和時(shí)間的常用方法有三種:FORMAT、CONVERT和DATEADD,這篇文章主要介紹了SQL?Server時(shí)間轉(zhuǎn)換的3種方法,文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-09-09
SQL SERVER 利用存儲(chǔ)過(guò)程查看角色和用戶(hù)信息的寫(xiě)法
SQL SERVER 利用存儲(chǔ)過(guò)程查看角色(服務(wù)器/數(shù)據(jù)庫(kù))和用戶(hù)信息,感興趣的朋友可以了解下,或許對(duì)你有所幫助2013-01-01

