SparkSQL快速入門教程
(一)概述
SparkSQL可以理解為在原生的RDD上做的一層封裝,通過SparkSQL可以在scala和java中寫SQL語句,并將結(jié)果作為Dataset/DataFrame返回。簡單來講,SparkSQL可以讓我們像寫SQL一樣去處理內(nèi)存中的數(shù)據(jù)。
Dataset是一個數(shù)據(jù)的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的優(yōu)點和SparkSQL優(yōu)化執(zhí)行引擎的優(yōu)點,一個Dataset相當于RDD+Schema的結(jié)合。
Dataset的底層封裝是RDD,當RDD的泛型是Row類型時,該類型就可以稱為DataFrame。DataFrame是一種表格型的數(shù)據(jù)結(jié)構(gòu),就和傳統(tǒng)的Mysql結(jié)構(gòu)一樣,通過DataFrame我們可以更加高效地去執(zhí)行Sql。
特點
- 易整合,在程序中既可以使用SQL,還可以使用API!
- 統(tǒng)一的數(shù)據(jù)訪問, 不同數(shù)據(jù)源中的數(shù)據(jù),都可以使用SQL或DataFrameAPI進行操作,還可以進行不同數(shù)據(jù)源的Join!
- 對Hive的無縫支持
- 支持標準的JDBC和ODBC
(二)SparkSQL實戰(zhàn)
使用SparkSQL首先需要引入相關(guān)的依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>該依賴需要和sparkCore保持一致。
SparkSQL的編碼主要通過四步:
- 創(chuàng)建SparkSession
- 獲取數(shù)據(jù)
- 執(zhí)行SQL
- 關(guān)閉SparkSession
public class SqlTest {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("sql")
.master("local")
.getOrCreate();
Dataset<Row> json = sparkSession.read().json("data/json");
json.printSchema();
json.show();
sparkSession.stop();
}
}在data的目錄下創(chuàng)建一個名為json的文件
{"name":"a","age":23}
{"name":"b","age":24}
{"name":"c","age":25}
{"name":"d","age":26}
{"name":"e","age":27}
{"name":"f","age":28}運行項目后輸出兩個結(jié)果,schema結(jié)果如下:

Dataset<Row>輸出結(jié)果如下:

通過SparkSQL可以執(zhí)行和SQL十分相似的查詢操作:
public class SqlTest {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("sql")
.master("local")
.getOrCreate();
Dataset<Row> json = sparkSession.read().json("data/json");
json.select("age","name").where("age > 26").show();
sparkSession.stop();
}
}在上面的語句中,通過一系列的API實現(xiàn)了SQL查詢操作,除此之外,SparkSQL還支持直接寫原始SQL語句的操作。
在寫SQL語句之前,首先需要讓Spark知道對哪個表進行查詢,因此需要建立一張臨時表,再執(zhí)行SQL查詢:
json.createOrReplaceTempView("json");
sparkSession.sql("select * from json where age > 26").show();(三)非JSON格式的Dataset創(chuàng)建
在上一節(jié)中創(chuàng)建Dataset時使用了最簡單的json,因為json自己帶有schema結(jié)構(gòu),因此不需要手動去增加,如果是一個txt文件,就需要在創(chuàng)建Dataset時手動塞入schema。
下面展示讀取txt文件的例子,首先創(chuàng)建一個user.txt
a 23 b 24 c 25 d 26
現(xiàn)在我要將上面的這幾行變成DataFrame,第一列表示姓名,第二列表示年齡,于是就可以像下面這樣操作:
public class SqlTest2 {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("sql")
.master("local")
.getOrCreate();
SparkContext sparkContext = sparkSession.sparkContext();
JavaSparkContext sc = new JavaSparkContext(sparkContext);
JavaRDD<String> lines = sc.textFile("data/user.txt");
//將String類型轉(zhuǎn)化為Row類型
JavaRDD<Row> rowJavaRDD = lines.map(new Function<String, Row>() {
@Override
public Row call(String v1) throws Exception {
String[] split = v1.split(" ");
return RowFactory.create(
split[0],
Integer.valueOf(split[1])
);
}
});
//定義schema
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType structType = DataTypes.createStructType(structFields);
//生成dataFrame
Dataset<Row> dataFrame = sparkSession.createDataFrame(rowJavaRDD, structType);
dataFrame.show();
}
}(四)通過JDBC創(chuàng)建DataFrame
通過JDBC可直接將對應數(shù)據(jù)庫中的表放入Spark中進行一些處理,下面通過MySQL進行展示。
使用MySQL需要在依賴中引入MySQL的引擎:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>接著通過類似JDBC的方式讀取MySQL數(shù)據(jù):
public class SqlTest3 {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("sql")
.master("local")
.getOrCreate();
Map<String,String> options = new HashMap<>();
options.put("url","jdbc:mysql://127.0.0.1:3306/books");
options.put("driver","com.mysql.jdbc.Driver");
options.put("user","root");
options.put("password","123456");
options.put("dbtable","book");
Dataset<Row> jdbc = sparkSession.read().format("jdbc").options(options).load();
jdbc.show();
sparkSession.close();
}
}讀取到的數(shù)據(jù)是DataFrame,接下來的操作就是對DataFrame的操作了。
(五)總結(jié)
SparkSQL是對Spark原生RDD的增強,雖然很多功能通過RDD就可以實現(xiàn),但是SparkSQL可以更加靈活地實現(xiàn)一些功能。
到此這篇關(guān)于SparkSQL快速入門教程的文章就介紹到這了,更多相關(guān)SparkSQL入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)samza轉(zhuǎn)換成flink
將Apache Samza作業(yè)遷移到Apache Flink作業(yè)是一個復雜的任務,因為這兩個流處理框架有不同的API和架構(gòu),本文我們就來看看如何使用Java實現(xiàn)samza轉(zhuǎn)換成flink吧2024-11-11
SpringBoot整合Redis實現(xiàn)熱點數(shù)據(jù)緩存的示例代碼
這篇文章主要介紹了SpringBoot中整合Redis實現(xiàn)熱點數(shù)據(jù)緩存,本文以IDEA?+?SpringBoot作為?Java中整合Redis的使用?的測試環(huán)境,結(jié)合實例代碼給大家詳細講解,需要的朋友可以參考下2023-03-03
Springboot整合nacos報錯無法連接nacos的解決
這篇文章主要介紹了Springboot整合nacos報錯無法連接nacos的解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-06-06
Java 數(shù)組內(nèi)置函數(shù)toArray詳解
這篇文章主要介紹了Java 數(shù)組內(nèi)置函數(shù)toArray詳解,文本詳細的講解了toArray底層的代碼和文檔,需要的朋友可以參考下2021-06-06
SpringBoot中的application.properties無法加載問題定位技巧
這篇文章主要介紹了SpringBoot中的application.properties無法加載問題定位技巧,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-05-05

