java 中Spark中將對象序列化存儲到hdfs
java 中Spark中將對象序列化存儲到hdfs
摘要: Spark應(yīng)用中經(jīng)常會遇到這樣一個需求: 需要將JAVA對象序列化并存儲到HDFS, 尤其是利用MLlib計算出來的一些模型, 存儲到hdfs以便模型可以反復(fù)利用. 下面的例子演示了Spark環(huán)境下從Hbase讀取數(shù)據(jù), 生成一個word2vec模型, 存儲到hdfs.
廢話不多說, 直接貼代碼了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source
object Word2VecDemo {
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryoserializer.buffer", "256m")
sparkConf.set("spark.kryoserializer.buffer.max","2046m")
sparkConf.set("spark.akka.frameSize", "500")
sparkConf.set("spark.rpc.askTimeout", "30")
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")
hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")
val scan = new Scan()
val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
"data".getBytes,
"article".getBytes,
CompareOp.EQUAL,
comp
)
filterList.addFilter(articleFilter)
filterList.addFilter(new PageFilter(100))
scan.setFilter(filterList)
scan.setCaching(50)
scan.setCacheBlocks(false)
hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
val crawledRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
val articlesRDD = crawledRDD.filter{
case (_,result) => {
val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
content != null
}
}
val wordsInDoc = articlesRDD.map{
case (_,result) => {
val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
else Seq("")
}
}
val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
val word2vec = new Word2Vec()
val model = word2vec.fit(fitleredWordsInDoc)
//---------------------------------------重點看這里-------------------------------------------------------------
//將上面的模型存儲到hdfs
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
val fileSystem = FileSystem.get(hadoopConf)
val path = new Path("/user/hadoop/data/mllib/word2vec-object")
val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
//這里示例另外一個程序直接從hdfs讀取序列化對象使用模型
val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
/*
* //你還可以將序列化文件從hdfs放到本地, scala程序使用模型
* import java.io._
* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
* ois.close
*/
//--------------------------------------------------------------------------------------------------------------
}
}
感謝閱讀,希望能幫助到大家,謝謝大家對本站的支持!
相關(guān)文章
Springboot pom項目間接依賴包版本與預(yù)期不符原因解決分析
這篇文章主要介紹了Springboot pom項目間接依賴包版本與預(yù)期不符原因解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
Java搭建一個springboot3.4.1項目?JDK21的詳細過程
這篇文章詳細介紹了如何使用IntelliJ IDEA搭建一個基于Spring Boot 3.4.1的項目,并使用JDK 21和Maven 3.6.3,涵蓋了環(huán)境準備、項目創(chuàng)建、依賴管理、Maven配置、以及解決常見問題的步驟,感興趣的朋友跟隨小編一起看看吧2025-01-01
Linux環(huán)境卸載Centos7自帶的OpenJDK和安裝JDK1.8圖文教程
CentOS系統(tǒng)是開發(fā)者常用的Linux操作系統(tǒng),安裝它時會默認安裝自帶的舊版本的OpenJDK,但在開發(fā)者平時開發(fā)Java項目時還是需要完整的JDK,這篇文章主要給大家介紹了關(guān)于Linux環(huán)境卸載Centos7自帶的OpenJDK和安裝JDK1.8的相關(guān)資料,需要的朋友可以參考下2024-07-07
非常全面的Java?SpringBoot點贊功能實現(xiàn)
但是這些功能再項目中是高頻出現(xiàn)的,如果直接操作數(shù)據(jù)庫的話,對數(shù)據(jù)庫壓力太大。那遇到這個問題怎么解決?這篇文章主要給大家介紹了關(guān)于Java?SpringBoot點贊功能實現(xiàn)?的相關(guān)資料,需要的朋友可以參考下2022-01-01

