Spark學(xué)習(xí)筆記之Spark SQL的具體使用
1. Spark SQL是什么?
- 處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)spark的模塊
- 它提供了一個(gè)編程抽象叫做DataFrame并且作為分布式SQL查詢(xún)引擎的作用
2. Spark SQL的特點(diǎn)
- 多語(yǔ)言的接口支持(java python scala)
- 統(tǒng)一的數(shù)據(jù)訪問(wèn)
- 完全兼容hive
- 支持標(biāo)準(zhǔn)的連接
3. 為什么學(xué)習(xí)SparkSQL?
我們已經(jīng)學(xué)習(xí)了Hive,它是將Hive SQL轉(zhuǎn)換成MapReduce然后提交到集群上執(zhí)行,大大簡(jiǎn)化了編寫(xiě)MapReduce的程序的復(fù)雜性,由于MapReduce這種計(jì)算模型執(zhí)行效率比較慢。所有Spark SQL的應(yīng)運(yùn)而生,它是將Spark SQL轉(zhuǎn)換成RDD,然后提交到集群執(zhí)行,執(zhí)行效率非??欤?/p>
4. DataFrame(數(shù)據(jù)框)
- 與RDD類(lèi)似,DataFrame也是一個(gè)分布式數(shù)據(jù)容器
- 然而DataFrame更像傳統(tǒng)數(shù)據(jù)庫(kù)的二維表格,除了數(shù)據(jù)以外,還記錄數(shù)據(jù)的結(jié)構(gòu)信息,即schema
- DataFrame其實(shí)就是帶有schema信息的RDD
5. SparkSQL1.x的API編程
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
5.1 使用sqlContext創(chuàng)建DataFrame(測(cè)試用)
object Ops3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
val df1: DataFrame = sqlContext.createDataFrame(rdd1)
df1.show(1)
}
}
case class Person(name: String, age: Int, sex: String);
5.2 使用sqlContxet中提供的隱式轉(zhuǎn)換函數(shù)(測(cè)試用)
import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()
5.3 使用SqlContext創(chuàng)建DataFrame(常用)
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {
val lineSplit: Array[String] = line.split(",")
Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()
6. 使用新版本的2.x的API
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數(shù)據(jù)清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()
7. 操作SparkSQL的方式
7.1 使用SQL語(yǔ)句的方式對(duì)DataFrame進(jìn)行操作
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相當(dāng)于Spark1.x的SQLContext
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數(shù)據(jù)清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("p1")//這是Sprk2.x新的API 相當(dāng)于Spark1.x的registTempTable()
val df2 = sparkSession.sql("select * from p1")
df2.show()
7.2 使用DSL語(yǔ)句的方式對(duì)DataFrame進(jìn)行操作
DSL(domain specific language ) 特定領(lǐng)域語(yǔ)言
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//數(shù)據(jù)清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.show()
8. SparkSQL的輸出
8.1 寫(xiě)出到JSON文件
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//數(shù)據(jù)清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2 寫(xiě)出到關(guān)系型數(shù)據(jù)庫(kù)(mysql)
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//數(shù)據(jù)清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
val splits: Array[String] = line.split(",")
Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
//表會(huì)自動(dòng)創(chuàng)建
val tbName = "person1";
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")
//SaveMode 默認(rèn)為ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
使用IntelliJ?IDEA創(chuàng)建簡(jiǎn)單的Java?Web項(xiàng)目完整步驟
這篇文章主要介紹了如何使用IntelliJ?IDEA創(chuàng)建一個(gè)簡(jiǎn)單的JavaWeb項(xiàng)目,實(shí)現(xiàn)登錄、注冊(cè)和查看用戶(hù)列表功能,使用Servlet和JSP技術(shù),文中通過(guò)代碼介紹的非常詳細(xì),需要的朋友可以參考下2025-01-01
解決SpringBoot連接SqlServer出現(xiàn)的問(wèn)題
在嘗試通過(guò)SSL與SQL?Server建立安全連接時(shí),如果遇到“PKIX?path?building?failed”錯(cuò)誤,可能是因?yàn)槲茨苷_配置或信任服務(wù)器證書(shū),當(dāng)"Encrypt"屬性設(shè)置為"true"且"trustServerCertificate"屬性設(shè)置為"false"時(shí),要求驅(qū)動(dòng)程序使用安全套接字層(SSL)加密與SQL?Server建立連接2024-10-10
java開(kāi)發(fā)BeanUtils類(lèi)解決實(shí)體對(duì)象間賦值
這篇文章主要為大家介紹了java開(kāi)發(fā)中使用BeanUtils類(lèi)實(shí)現(xiàn)實(shí)體對(duì)象之間的賦值有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步學(xué)有所得2021-10-10
Jedis零基礎(chǔ)入門(mén)及操作Redis中的數(shù)據(jù)結(jié)構(gòu)詳解
Jedis 的 API 方法跟 Redis 的命令基本上完全一致,熟悉 Redis 的操作命令,自然就很容易使用 Jedis,因此官方也推薦 Java 使用 Jedis 來(lái)連接和操作 Redis2022-09-09
解決Mybatis報(bào)錯(cuò):org.apache.ibatis.reflection.ReflectionException
文章主要討論了在使用MyBatis進(jìn)行數(shù)據(jù)庫(kù)操作時(shí)遇到的幾個(gè)常見(jiàn)問(wèn)題及其解決方法,首先,文章指出如果DTO類(lèi)中沒(méi)有定義getter和setter方法,會(huì)導(dǎo)致反射異常,解決方法是使用Lombok的@Data注解自動(dòng)生成這些方法2025-01-01
MyBatis?Mapper.XML?標(biāo)簽使用小結(jié)
在MyBatis中,通過(guò)resultMap可以解決字段名和屬性名不一致的問(wèn)題,對(duì)于復(fù)雜的查詢(xún),引用實(shí)體或使用<sql>標(biāo)簽可以定義復(fù)用的SQL片段,提高代碼的可讀性和編碼效率,使用這些高級(jí)映射和動(dòng)態(tài)SQL技巧,可以有效地處理復(fù)雜的數(shù)據(jù)庫(kù)交互場(chǎng)景2024-10-10
Java簡(jiǎn)單實(shí)現(xiàn)調(diào)用命令行并獲取執(zhí)行結(jié)果示例
這篇文章主要介紹了Java簡(jiǎn)單實(shí)現(xiàn)調(diào)用命令行并獲取執(zhí)行結(jié)果,結(jié)合實(shí)例形式分析了Java調(diào)用ping命令并獲取執(zhí)行結(jié)果相關(guān)操作技巧,需要的朋友可以參考下2018-08-08
SpringMVC實(shí)戰(zhàn)案例RESTFul實(shí)現(xiàn)添加功能
這篇文章主要為大家介紹了SpringMVC實(shí)戰(zhàn)案例RESTFul實(shí)現(xiàn)添加功能詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05

