Spark JDBC操作MySQL方式詳細(xì)講解
JDBC操作MySQL
在實(shí)際的企業(yè)級(jí)開(kāi)發(fā)環(huán)境中,如果數(shù)據(jù)規(guī)模特S別大,此時(shí)采用傳統(tǒng)的SQL語(yǔ)句去處理的話一般需要分成很多批次處理,而且很容易造成數(shù)據(jù)庫(kù)服務(wù)宕機(jī),且實(shí)際的處理過(guò)程可能會(huì)非常復(fù)雜,通過(guò)傳統(tǒng)的Java EE等技術(shù)可能很難或者不方便實(shí)現(xiàn)處理算法,此時(shí)采用SparkSQL進(jìn)行分布式分析處理就可以非常好的解決該問(wèn)題,在生產(chǎn)環(huán)境下,一般會(huì)在Spark SQL和具體要操作的DB之間加上一個(gè)緩沖層次,例如中間使用Redis或者Kafka。
Spark SQL可以通過(guò)JDBC從傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)中讀寫(xiě)數(shù)據(jù),讀取數(shù)據(jù)后直接生成的是DataFrame,然后再加上借助于Spark SQL豐富的API來(lái)進(jìn)行各種操作。從計(jì)算數(shù)據(jù)規(guī)模的角度去講,集群并行訪問(wèn)數(shù)據(jù)庫(kù)數(shù)據(jù),調(diào)用Data Frame Reader的Format(“JDBC”)的方式說(shuō)明Spark SQL操作的數(shù)據(jù)來(lái)源是通過(guò)JDBC獲得,JDBC后端一般都是數(shù)據(jù)庫(kù),例如MySQL、Oracle等。
JDBC讀取數(shù)據(jù)方式
單Partition(無(wú)并發(fā))
調(diào)用函數(shù)格式:def jdbc(url: String, table: String, properties: Properties): DataFrame
- url:代表數(shù)據(jù)庫(kù)的JDBC鏈接地址;
- table:具體要鏈接的數(shù)據(jù)庫(kù);
這種方法是將所有的數(shù)據(jù)放在一個(gè)Partition中進(jìn)行操作(即并發(fā)度為1),意味著無(wú)論給的資源有多少,只有一個(gè)Task會(huì)執(zhí)行任務(wù),執(zhí)行效率比較慢,并且容易出現(xiàn)OOM。使用如下,在spark-shell中執(zhí)行:
/*此為代碼格式,實(shí)際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/
val url = "jdbc:mysql://localhost:/database"
val tableName = "table"
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username") //實(shí)際使用中替換username為相應(yīng)的用戶名
prop.setProperty("password","pwd") //實(shí)際使用中替換pwd為相應(yīng)的密碼
根據(jù)Long類(lèi)型字段分區(qū)
/*此為代碼格式,實(shí)際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ def jdbc( url: String, table: String, columnName: String, // 根據(jù)該字段分區(qū),需要為整型,比如 id 等 lowerBound: Long, // 分區(qū)的下界 upperBound: Long, // 分區(qū)的上界 numPartitions: Int, //分區(qū)的個(gè)數(shù) connectionProperties: Properties): DataFrame
根據(jù)字段將數(shù)據(jù)進(jìn)行分區(qū),放進(jìn)不同的Partition中,執(zhí)行效率較快,但是只能根據(jù)數(shù)據(jù)字段作為分區(qū)關(guān)鍵字。使用如下:
/*此為代碼格式,實(shí)際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
將字段 colName 中發(fā) 1~10000000 條數(shù)據(jù)分區(qū)到 10 個(gè) Partition 中。
根據(jù)任意類(lèi)型字段分區(qū)
/*此為代碼格式,實(shí)際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/ jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
以下使用時(shí)間字段進(jìn)行分區(qū):
/*此為代碼格式,實(shí)際中使用應(yīng)替換相應(yīng)字段中的內(nèi)容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 將 9 月 16-12 月 15 三個(gè)月的數(shù)據(jù)取出,按時(shí)間分為 6 個(gè) partition
* 為了減少事例代碼,這里的時(shí)間都是寫(xiě)死的
* modified_time 為時(shí)間字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time
as date) <= date '$end'"
}
這種方法可以使用任意字段進(jìn)行分區(qū),比較靈活,適用于各種場(chǎng)景。以MySQL 3000W數(shù)據(jù)量為例,如果單分區(qū)count,若干分鐘就會(huì)報(bào)OOM;如果分成5~20個(gè)分區(qū)后,count操作只需要2s,效率會(huì)明顯提高,這里就凸顯出JDBC高并發(fā)的優(yōu)勢(shì)。Spark高并發(fā)度可以大幅度提高讀取以及處理數(shù)據(jù)的速度,但是如果設(shè)置過(guò)高(大量的Partition同時(shí)讀取)也可能會(huì)將數(shù)據(jù)源數(shù)據(jù)庫(kù)宕掉。
JDBC讀取MySQL數(shù)據(jù)
下面來(lái)進(jìn)行實(shí)際操作,首先需要配置MySQL
- 免密登陸:
mysql -uroot - 查看數(shù)據(jù)庫(kù):
show databases; - 使用MySQL數(shù)據(jù)庫(kù):
use mysql;
修改表格的權(quán)限,目的是為了使其他主機(jī)可以遠(yuǎn)程連接 MySQL,通過(guò)此命令可以查看訪問(wèn)用戶允許的主機(jī)名。
- 查看所有用戶及其host:
select host, user from user; - 將相應(yīng)用戶數(shù)據(jù)表中的host字段改成’%':
update user set host="%" where user="root"; - 刷新修改權(quán)限
flush privileges;
通過(guò)命令修改host為%,表示任意IP地址都可以登錄。出現(xiàn)ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY',是因?yàn)?user+host 是主鍵,不能重復(fù),可以不用理會(huì)。也可通過(guò)以下命令刪除user 為空的內(nèi)容來(lái)解決:delete from user where user='';。
在MySQL創(chuàng)建數(shù)據(jù)庫(kù)和表格,插入數(shù)據(jù),查看:
create database test; //創(chuàng)建數(shù)據(jù)庫(kù)test
use test; //進(jìn)入數(shù)據(jù)庫(kù)test
create table people( name varchar(12), age int); //創(chuàng)建表格people并構(gòu)建結(jié)構(gòu)
insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12); //向people表中插入數(shù)據(jù)
select * from people; //輸出people表中全部數(shù)據(jù)
編寫(xiě)代碼讀取MySQL表中數(shù)據(jù):
//導(dǎo)入依賴環(huán)境
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import java.util.Properties
val url = "jdbc:mysql://localhost/test" //MySQL地址及數(shù)據(jù)庫(kù)
val username = "root" //用戶名
val sqlContext = new SQLContext(sc)
sc.setLogLevel("WARN")
val uri = url + "?user=" + username + "&useUnicode=true&characterEncoding=UTF-8" //設(shè)置讀取路徑及用戶名
val properties = new Properties() //創(chuàng)建JDBC連接信息
properties.put("user","root")
properties.put("driver", "com.mysql.jdbc.Driver")
val df_test: DataFrame = spark.sqlContext.read.jdbc(uri, "people", properties) //讀取數(shù)據(jù)
df_test.select("name","age").collect().foreach(row => { //輸出數(shù)據(jù)
println("name " + row(0) + ", age" + row(1))
})
df_test.write.mode("append").jdbc(uri,"people",properties) //向people表中寫(xiě)入讀出的數(shù)據(jù),相當(dāng)于people表中有兩份一樣的數(shù)據(jù)
到此這篇關(guān)于Spark JDBC操作MySQL方式詳細(xì)講解的文章就介紹到這了,更多相關(guān)Spark JDBC操作MySQL內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
熟練掌握J(rèn)ava8新特性之Stream API的全面應(yīng)用
Stream是Java8的一大亮點(diǎn),是對(duì)容器對(duì)象功能的增強(qiáng),它專(zhuān)注于對(duì)容器對(duì)象進(jìn)行各種非常便利、高效的 聚合操作(aggregate operation)或者大批量數(shù)據(jù)操作。Stream API借助于同樣新出現(xiàn)的Lambda表達(dá)式,極大的提高編程效率和程序可讀性,感興趣的朋友快來(lái)看看吧2021-11-11
Spring boot實(shí)現(xiàn)應(yīng)用打包部署的示例
本篇文章主要介紹了Spring boot實(shí)現(xiàn)應(yīng)用打包部署的示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-11-11
解決程序啟動(dòng)報(bào)錯(cuò)org.springframework.context.ApplicationContextExcept
文章描述了一個(gè)Spring Boot項(xiàng)目在不同環(huán)境下啟動(dòng)時(shí)出現(xiàn)差異的問(wèn)題,通過(guò)分析報(bào)錯(cuò)信息,發(fā)現(xiàn)是由于導(dǎo)入`spring-boot-starter-tomcat`依賴時(shí)定義的scope導(dǎo)致的配置問(wèn)題,調(diào)整依賴導(dǎo)入配置后,解決了啟動(dòng)錯(cuò)誤2024-11-11
Java集合中的fail-fast(快速失敗)機(jī)制詳解
這篇文章主要給大家介紹了關(guān)于Java集合中fail-fast(快速失敗)機(jī)制的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
idea一鍵部署SpringBoot項(xiàng)目jar包到服務(wù)器的實(shí)現(xiàn)
我們?cè)陂_(kāi)發(fā)環(huán)境部署項(xiàng)目一般通過(guò)idea將項(xiàng)目打包成jar包,然后連接linux服務(wù)器,將jar手動(dòng)上傳到服務(wù)中,本文就來(lái)詳細(xì)的介紹一下步驟,感興趣的可以了解一下2023-12-12
Java實(shí)現(xiàn)的mysql事務(wù)處理操作示例
這篇文章主要介紹了Java實(shí)現(xiàn)的mysql事務(wù)處理操作,結(jié)合實(shí)例形式較為詳細(xì)的分析了Java基于JDBC操作mysql數(shù)據(jù)庫(kù)實(shí)現(xiàn)事務(wù)處理的相關(guān)概念、操作技巧與注意事項(xiàng),需要的朋友可以參考下2018-08-08
JavaWeb監(jiān)聽(tīng)器Listener實(shí)例解析
這篇文章主要為大家詳細(xì)介紹了JavaWeb監(jiān)聽(tīng)器Listener實(shí)例,針對(duì)監(jiān)聽(tīng)器進(jìn)行進(jìn)行細(xì)致分析,感興趣的小伙伴們可以參考一下2016-08-08
解析Tomcat 6、7在EL表達(dá)式解析時(shí)存在的一個(gè)Bug
這篇文章主要是對(duì)Tomcat 6、7在EL表達(dá)式解析時(shí)存在的一個(gè)Bug進(jìn)行了詳細(xì)的分析介紹,需要的朋友可以過(guò)來(lái)參考下,希望對(duì)大家有所幫助2013-12-12
Mybatis foreach標(biāo)簽使用不當(dāng)導(dǎo)致異常的原因淺析
這篇文章主要介紹了Mybatis foreach標(biāo)簽使用不當(dāng)導(dǎo)致異常的原因探究,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-12-12

