Spark學(xué)習(xí)筆記Spark Streaming的使用
1. Spark Streaming
- Spark Streaming是一個(gè)基于Spark Core之上的實(shí)時(shí)計(jì)算框架,可以從很多數(shù)據(jù)源消費(fèi)數(shù)據(jù)并對(duì)數(shù)據(jù)進(jìn)行處理
- Spark Streaing中有一個(gè)最基本的抽象叫DStream(代理),本質(zhì)上就是一系列連續(xù)的RDD,DStream其實(shí)就是對(duì)RDD的封裝
- DStream可以認(rèn)為是一個(gè)RDD的工廠,該DStream里面生產(chǎn)都是相同業(yè)務(wù)邏輯的RDD,只不過是RDD里面要讀取數(shù)據(jù)的不相同
- 在一個(gè)批次的處理時(shí)間間隔里, DStream只產(chǎn)生一個(gè)RDD
- DStream就相當(dāng)于一個(gè)"模板", 我們可以根據(jù)這個(gè)"模板"來處理一段時(shí)間間隔之內(nèi)產(chǎn)生的這個(gè)rdd,以此為依據(jù)來構(gòu)建rdd的DAG
2. 當(dāng)下比較流行的實(shí)時(shí)計(jì)算引擎
吞吐量 編程語言 處理速度 生態(tài)
Storm 較低 clojure 非???亞秒) 阿里(JStorm)
Flink 較高 scala 較快(亞秒) 國內(nèi)使用較少
Spark Streaming 非常高 scala 快(毫秒) 完善的生態(tài)圈
3. Spark Streaming處理網(wǎng)絡(luò)數(shù)據(jù)
//創(chuàng)建StreamingContext 至少要有兩個(gè)線程 一個(gè)線程用于接收數(shù)據(jù) 一個(gè)線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val receiverDS: ReceiverInputDStream[String] = ssc.socketTextStream("uplooking01", 44444)
val pairRetDS: DStream[(String, Int)] = receiverDS.flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)
pairRetDS.print()
//開啟流計(jì)算
ssc.start()
//優(yōu)雅的關(guān)閉
ssc.awaitTermination()
4. Spark Streaming接收數(shù)據(jù)的兩種方式(Kafka)
Receiver
- 偏移量是由zookeeper來維護(hù)的
- 使用的是Kafka高級(jí)的API(消費(fèi)者的API)
- 編程簡(jiǎn)單
- 效率低(為了保證數(shù)據(jù)的安全性,會(huì)開啟WAL)
- kafka0.10的版本中已經(jīng)徹底棄用Receiver了
- 生產(chǎn)環(huán)境一般不會(huì)使用這種方式
Direct
- 偏移量是有我們來手動(dòng)維護(hù)
- 效率高(我們直接把spark streaming接入到kafka的分區(qū)中了)
- 編程比較復(fù)雜
- 生產(chǎn)環(huán)境一般使用這種方式
5. Spark Streaming整合Kafka
基于Receiver的方式整合kafka(生產(chǎn)環(huán)境不建議使用,在0.10中已經(jīng)移除了)
//創(chuàng)建StreamingContext 至少要有兩個(gè)線程 一個(gè)線程用于接收數(shù)據(jù) 一個(gè)線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val zkQuorum = "uplooking03:2181,uplooking04:2181,uplooking05:2181"
val groupId = "myid"
val topics = Map("hadoop" -> 3)
val receiverDS: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
receiverDS.flatMap(_._2.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
基于Direct的方式(生產(chǎn)環(huán)境使用)
//創(chuàng)建StreamingContext 至少要有兩個(gè)線程 一個(gè)線程用于接收數(shù)據(jù) 一個(gè)線程用于處理數(shù)據(jù)
val conf = new SparkConf().setAppName("Ops1").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(3000))
val kafkaParams = Map("metadata.broker.list" -> "uplooking03:9092,uplooking04:9092,uplooking05:9092")
val topics = Set("hadoop")
val inputDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
inputDS.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
6. 實(shí)時(shí)流計(jì)算的架構(gòu)

1. 生成日志(模擬用戶訪問web應(yīng)用的日志)
public class GenerateAccessLog {
public static void main(String[] args) throws IOException, InterruptedException {
//準(zhǔn)備數(shù)據(jù)
int[] ips = {123, 18, 123, 112, 181, 16, 172, 183, 190, 191, 196, 120};
String[] requesTypes = {"GET", "POST"};
String[] cursors = {"/vip/112", "/vip/113", "/vip/114", "/vip/115", "/vip/116", "/vip/117", "/vip/118", "/vip/119", "/vip/120", "/vip/121", "/free/210", "/free/211", "/free/212", "/free/213", "/free/214", "/company/312", "/company/313", "/company/314", "/company/315"};
String[] courseNames = {"大數(shù)據(jù)", "python", "java", "c++", "c", "scala", "android", "spark", "hadoop", "redis"};
String[] references = {"www.baidu.com/", "www.sougou.com/", "www.sou.com/", "www.google.com"};
FileWriter fw = new FileWriter(args[0]);
PrintWriter printWriter = new PrintWriter(fw);
while (true) {
// Thread.sleep(1000);
//產(chǎn)生字段
String date = new Date().toLocaleString();
String method = requesTypes[getRandomNum(0, requesTypes.length)];
String url = "/cursor" + cursors[getRandomNum(0, cursors.length)];
String HTTPVERSION = "HTTP/1.1";
String ip = ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)] + "." + ips[getRandomNum(0, ips.length)];
String reference = references[getRandomNum(0, references.length)];
String rowLog = date + " " + method + " " + url + " " + HTTPVERSION + " " + ip + " " + reference;
printWriter.println(rowLog);
printWriter.flush();
}
}
//[start,end)
public static int getRandomNum(int start, int end) {
int i = new Random().nextInt(end - start) + start;
return i;
}
}
2. flume使用avro采集web應(yīng)用服務(wù)器的日志數(shù)據(jù)
采集命令執(zhí)行的結(jié)果到avro中
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f1.sources = r1 f1.channels = c1 f1.sinks = k1 #define sources f1.sources.r1.type = exec f1.sources.r1.command =tail -F /logs/access.log #define channels f1.channels.c1.type = memory f1.channels.c1.capacity = 1000 f1.channels.c1.transactionCapacity = 100 #define sink 采集日志到uplooking03 f1.sinks.k1.type = avro f1.sinks.k1.hostname = uplooking03 f1.sinks.k1.port = 44444 #bind sources and sink to channel f1.sources.r1.channels = c1 f1.sinks.k1.channel = c1 從avro采集到控制臺(tái) # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactionCapacity = 100 #define sink f2.sinks.k2.type = logger #bind sources and sink to channel f2.sources.r2.channels = c2 f2.sinks.k2.channel = c2 從avro采集到kafka中 # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' f2.sources = r2 f2.channels = c2 f2.sinks = k2 #define sources f2.sources.r2.type = avro f2.sources.r2.bind = uplooking03 f2.sources.r2.port = 44444 #define channels f2.channels.c2.type = memory f2.channels.c2.capacity = 1000 f2.channels.c2.transactionCapacity = 100 #define sink f2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink f2.sinks.k2.topic = hadoop f2.sinks.k2.brokerList = uplooking03:9092,uplooking04:9092,uplooking05:9092 f2.sinks.k2.requiredAcks = 1
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java枚舉_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
enum 的全稱為 enumeration, 是 JDK 5 中引入的新特性,存放在 java.lang 包中。這篇文章給大家介紹Java枚舉相關(guān)知識(shí),需要的的朋友參考下2017-04-04
SpringBoot使用validation做參數(shù)校驗(yàn)的實(shí)現(xiàn)步驟
這篇文章主要介紹了SpringBoot使用validation做參數(shù)校驗(yàn)的實(shí)現(xiàn)步驟,幫助大家更好的理解和學(xué)習(xí)使用SpringBoot,感興趣的朋友可以了解下2021-05-05
springsecurity第三方授權(quán)認(rèn)證的項(xiàng)目實(shí)踐
Spring security 是一個(gè)強(qiáng)大的和高度可定制的身份驗(yàn)證和訪問控制框架,本文主要介紹了springsecurity第三方授權(quán)認(rèn)證的項(xiàng)目實(shí)踐,具有一定的參考價(jià)值,感興趣可以了解一下2023-08-08
當(dāng)mybatis返回值遇見內(nèi)部類的問題
這篇文章主要介紹了當(dāng)mybatis返回值遇見內(nèi)部類的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
十分簡(jiǎn)單易懂的Java應(yīng)用程序性能調(diào)優(yōu)技巧分享
這篇文章主要介紹了十分簡(jiǎn)單易懂的Java性能調(diào)優(yōu)技巧分享,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
Java實(shí)現(xiàn)List去重的幾種方法總結(jié)
這篇文章主要為大家詳細(xì)介紹了Java中List去重的幾種常用方法總結(jié),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)和參考價(jià)值,需要的小伙伴可以了解一下2023-09-09
Java單機(jī)環(huán)境實(shí)現(xiàn)定時(shí)任務(wù)技術(shù)
這篇文章主要介紹了Java單機(jī)環(huán)境實(shí)現(xiàn)定時(shí)任務(wù)技術(shù),文章內(nèi)容介紹詳細(xì),具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-04-04

