Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作
如果說(shuō)簡(jiǎn)單聚合是對(duì)一些特定統(tǒng)計(jì)需求的實(shí)現(xiàn),那么 reduce 算子就是一個(gè)一般化的聚合統(tǒng)計(jì)操作了。從大名鼎鼎的 MapReduce 開(kāi)始,我們對(duì) reduce 操作就不陌生:它可以對(duì)已有的
數(shù)據(jù)進(jìn)行歸約處理,把每一個(gè)新輸入的數(shù)據(jù)和當(dāng)前已經(jīng)歸約出來(lái)的值,再做一個(gè)聚合計(jì)算。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStream。它不會(huì)改變流的元
素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。調(diào)用 KeyedStream 的 reduce 方法時(shí),需要傳入一個(gè)參數(shù),實(shí)現(xiàn) ReduceFunction 接口。接口在源碼中的定義如下:
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {
/**
* The core method of ReduceFunction, combining two values into one value of the same type. The
* reduce function is consecutively applied to all values of a group until only a single value
* remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction 接口里需要實(shí)現(xiàn) reduce()方法,這個(gè)方法接收兩個(gè)輸入事件,經(jīng)過(guò)轉(zhuǎn)換處理之后輸出一個(gè)相同類型的事件;所以,對(duì)于一組數(shù)據(jù),我們可以先取兩個(gè)進(jìn)行合并,然后再
將合并的結(jié)果看作一個(gè)數(shù)據(jù)、再跟后面的數(shù)據(jù)合并,最終會(huì)將它“簡(jiǎn)化”成唯一的一個(gè)數(shù)據(jù),這也就是 reduce“歸約”的含義。在流處理的底層實(shí)現(xiàn)過(guò)程中,實(shí)際上是將中間“合并的結(jié)果”
作為任務(wù)的一個(gè)狀態(tài)保存起來(lái)的;之后每來(lái)一個(gè)新的數(shù)據(jù),就和之前的聚合狀態(tài)進(jìn)一步做歸約。
其實(shí),reduce 的語(yǔ)義是針對(duì)列表進(jìn)行規(guī)約操作,運(yùn)算規(guī)則由 ReduceFunction 中的 reduce方法來(lái)定義,而在 ReduceFunction 內(nèi)部會(huì)維護(hù)一個(gè)初始值為空的累加器,注意累加器的類型
和輸入元素的類型相同,當(dāng)?shù)谝粭l元素到來(lái)時(shí),累加器的值更新為第一條元素的值,當(dāng)新的元素到來(lái)時(shí),新元素會(huì)和累加器進(jìn)行累加操作,這里的累加操作就是 reduce 函數(shù)定義的運(yùn)算規(guī)
則。然后將更新以后的累加器的值向下游輸出。
我們可以單獨(dú)定義一個(gè)函數(shù)類實(shí)現(xiàn) ReduceFunction 接口,也可以直接傳入一個(gè)匿名類。當(dāng)然,同樣也可以通過(guò)傳入 Lambda 表達(dá)式實(shí)現(xiàn)類似的功能。與簡(jiǎn)單聚合類似,reduce 操作也會(huì)將 KeyedStream 轉(zhuǎn)換為 DataStrema。它不會(huì)改變流的元素?cái)?shù)據(jù)類型,所以輸出類型和輸入類型是一樣的。下面我們來(lái)看一個(gè)稍復(fù)雜的例子。
我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)用戶訪問(wèn)的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,記錄所有用戶中訪問(wèn)頻次最高的那個(gè),也就是當(dāng)前訪問(wèn)量最大的用戶是誰(shuí)。
package com.rosh.flink.test;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* 我們將數(shù)據(jù)流按照用戶 id 進(jìn)行分區(qū),然后用一個(gè) reduce 算子實(shí)現(xiàn) sum 的功能,統(tǒng)計(jì)每個(gè)
* 用戶訪問(wèn)的頻次;進(jìn)而將所有統(tǒng)計(jì)結(jié)果分到一組,用另一個(gè) reduce 算子實(shí)現(xiàn) maxBy 的功能,
* 記錄所有用戶中訪問(wèn)頻次最高的那個(gè),也就是當(dāng)前訪問(wèn)量最大的用戶是誰(shuí)。
*/
public class TransReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
List<Integer> userIds = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
userIds.add(random.nextInt(5));
}
DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
//每個(gè)ID訪問(wèn)記錄一次
SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> map(Integer value) throws Exception {
return new Tuple2<>(value, 1L);
}
});
//統(tǒng)計(jì)每個(gè)user訪問(wèn)多少次
SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
sumDS.print("sumDS ->>>>>>>>>>>>>");
//把所有分區(qū)合并,求出最大的訪問(wèn)量
SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
if (value1.f1 > value2.f1) {
return value1;
} else {
return value2;
}
}
});
maxDS.print("maxDS ->>>>>>>>>>>");
env.execute("TransReduceTest");
}
}

到此這篇關(guān)于Flink實(shí)現(xiàn)特定統(tǒng)計(jì)的歸約聚合reduce操作的文章就介紹到這了,更多相關(guān)Flink歸約聚合內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springmvc實(shí)現(xiàn)自定義類型轉(zhuǎn)換器示例
本篇文章主要介紹了springmvc實(shí)現(xiàn)自定義類型轉(zhuǎn)換器示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-02-02
Spring Cloud Gateway重試機(jī)制的實(shí)現(xiàn)
這篇文章主要介紹了Spring Cloud Gateway重試機(jī)制的實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-03-03
java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法
這篇文章主要介紹了java計(jì)算給定字符串中出現(xiàn)次數(shù)最多的字母和該字母出現(xiàn)次數(shù)的方法,涉及java字符串的遍歷、轉(zhuǎn)換及運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2017-02-02
Java Swing JComboBox下拉列表框的示例代碼
這篇文章主要介紹了Java Swing JComboBox下拉列表框的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
java ThreadPoolExecutor線程池拒絕策略避坑
這篇文章主要為大家介紹了java ThreadPoolExecutor拒絕策略避坑踩坑示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
SpringData JPA快速上手之關(guān)聯(lián)查詢及JPQL語(yǔ)句書(shū)寫(xiě)詳解
JPA都有SpringBoot的官方直接提供的starter,而Mybatis沒(méi)有,直到SpringBoot 3才開(kāi)始加入到官方模版中,這篇文章主要介紹了SpringData JPA快速上手,關(guān)聯(lián)查詢,JPQL語(yǔ)句書(shū)寫(xiě)的相關(guān)知識(shí),感興趣的朋友一起看看吧2023-09-09
Spring IOC和DI實(shí)現(xiàn)原理及實(shí)例解析
這篇文章主要介紹了Spring IOC和DI實(shí)現(xiàn)原理及實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
java實(shí)現(xiàn)簡(jiǎn)易局域網(wǎng)聊天功能
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)簡(jiǎn)易局域網(wǎng)聊天功能,使用UDP模式編寫(xiě)一個(gè)聊天程序,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-04-04

