Flink自定義Sink端實現過程講解
Sink介紹
在Fink官網中sink端只是給出了常規(guī)的write api.在我們實際開發(fā)場景中需要將flink處理的數據寫入kafka,hbase kudu等外部系統(tǒng)。
UML關系
自定義Sink需要實現父類的接口和繼承抽象類。

上面是Sink的繼承關系
Flink addSink
// 方法需要SinkFunction的對象
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// configure the type if needed
if (sinkFunction instanceof InputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
getExecutionEnvironment().addOperator(sink.getTransformation());
return sink;
}
SinkFunction
// SinkFunction是一個接口
public interface SinkFunction<IN> extends Function, Serializable {
//公共方法
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
}
RichSinkFunction
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
}
其他繼承接口SinkFunction的類:
案例
自定義HbaseSink
public class HbaseSink extends RichSinkFunction<Tuple2<Integer, String>> {
Logger logger = LoggerFactory.getLogger(HbaseSink.class);
org.apache.hadoop.conf.Configuration configuration;
Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//獲取hbase 的鏈接信息
configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
//創(chuàng)建conn
connection = ConnectionFactory.createConnection(configuration);
logger.info("創(chuàng)建鏈接成功");
}
@Override
public void invoke(Tuple2<Integer, String> value, Context context) throws Exception {
//往habse 里面插入數據
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Table table = connection.getTable(TableName.valueOf("torder_count"));
Put put = new Put(value.f1.getBytes(StandardCharsets.UTF_8));
put.addColumn("info".getBytes(), // 列族
"order_total".getBytes(StandardCharsets.UTF_8), //特征字段
value.f0.toString().getBytes()); //屬性值
put.addColumn("info".getBytes(), "insert_time".getBytes(), format.format(new Date(System.currentTimeMillis())).getBytes());
table.put(put);
table.close();
logger.info("=====一條數據寫入成功======,時間:"+value.f1+", 值:"+value.f0);
}
@Override
public void close() throws Exception {
super.close();
connection.close();
}
通過以上案例我們熟悉了addSink函數的操作。
到此這篇關于Flink自定義Sink端實現過程講解的文章就介紹到這了,更多相關Flink自定義Sink內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringMVC框架和SpringBoot項目中控制器的響應結果深入分析
這篇文章主要介紹了SpringMVC框架和SpringBoot項目中控制器的響應結果,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2022-12-12
SpringBoot3整合EasyExcel動態(tài)實現表頭重命名
這篇文章主要為大家詳細介紹了SpringBoot3整合EasyExcel如何通過WriteHandler動態(tài)實現表頭重命名,文中的示例代碼講解詳細,有需要的可以了解下2025-03-03
spring?cloud中Feign導入jar失敗的問題及解決方案
這篇文章主要介紹了spring?cloud中Feign導入jar失敗的問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03

