窗口(window)是處理無(wú)限流的核心。窗口將流分割成有限大小的“桶”,我們可以在桶上應(yīng)用計(jì)算。本文檔重點(diǎn)介紹如何在Flink中執(zhí)行窗口操作,以及程序員如何從其提供的功能中獲得最大的好處。
一個(gè)有窗口的Flink程序的一般結(jié)構(gòu)如下所示。第一個(gè)片段指的是鍵控流,而第二個(gè)片段指的是非鍵控流。可以看到,唯一的區(qū)別是keyBy(…)調(diào)用鍵流,而window(…)調(diào)用非鍵流的windowwall(…)。這也將作為頁(yè)面其余部分的路標(biāo)。
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
一般真實(shí)的流都是無(wú)界的,怎樣處理無(wú)界的數(shù)據(jù)?
在自然環(huán)境中,數(shù)據(jù)的產(chǎn)生原本就是流式的。無(wú)論是來(lái)自 Web 服務(wù)器的事件數(shù)據(jù),證券交易所的交易數(shù)據(jù),還是來(lái)自工廠車間機(jī)器上的傳感器數(shù)據(jù),其數(shù)據(jù)都是流式的。但是當(dāng)你 分析數(shù)據(jù)時(shí),可以圍繞 有界流(bounded)或 無(wú)界流(unbounded)兩種模型來(lái)組織處理數(shù)據(jù),當(dāng)然,選擇不同的模型,程序的執(zhí)行和處理方式也都會(huì)不同。
上面圖片來(lái)源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/overview/
官方文檔
翻轉(zhuǎn)窗口賦值器將每個(gè)元素賦值給一個(gè)指定窗口大小的窗口。滾動(dòng)的窗口有固定的尺寸,而且不重疊。例如,如果您指定一個(gè)大小為5分鐘的滾動(dòng)窗口,則當(dāng)前窗口將被評(píng)估,并每5分鐘啟動(dòng)一個(gè)新窗口,如下圖所示:
【特點(diǎn)】
【示例代碼】
TumblingEventTimeWindows:滾動(dòng)事件時(shí)間窗口
TumblingProcessingTimeWindows:滾動(dòng)處理時(shí)間窗口
val input: DataStream[T]=...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
滑動(dòng)窗口賦值器將元素賦值給固定長(zhǎng)度的窗口。類似于滾動(dòng)窗口賦值器,窗口的大小由窗口大小參數(shù)配置。另外一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口啟動(dòng)的頻率。因此,如果滑動(dòng)窗口小于窗口大小,則滑動(dòng)窗口可以重疊。在這種情況下,元素被分配給多個(gè)窗口。
例如,您可以將大小為10分鐘的窗口滑動(dòng)5分鐘。這樣,每隔5分鐘就會(huì)出現(xiàn)一個(gè)窗口,其中包含在最后10分鐘內(nèi)到達(dá)的事件,如下圖所示:
【特點(diǎn)】
【示例代碼】
SlidingEventTimeWindows:滑動(dòng)事件時(shí)間窗口
SlidingProcessingTimeWindows:滑動(dòng)處理時(shí)間窗口
val input: DataStream[T]=...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
會(huì)話窗口分配器根據(jù)活動(dòng)的會(huì)話對(duì)元素進(jìn)行分組。與滑動(dòng)窗口不同,會(huì)話窗口沒(méi)有重疊,也沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間。相反,當(dāng)會(huì)話窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí),即當(dāng)一個(gè)不活動(dòng)間隙發(fā)生時(shí),會(huì)話窗口將關(guān)閉。會(huì)話窗口分配器可以配置一個(gè)靜態(tài)會(huì)話間隙,也可以配置一個(gè)會(huì)話間隙提取器函數(shù),該函數(shù)定義了不活動(dòng)的時(shí)間長(zhǎng)度。當(dāng)這段時(shí)間到期時(shí),當(dāng)前會(huì)話關(guān)閉,隨后的元素被分配到一個(gè)新的會(huì)話窗口。
【特點(diǎn)】
【示例代碼】
EventTimeSessionWindows:會(huì)話事件時(shí)間窗口
SlidingProcessingTimeWindows:會(huì)話處理時(shí)間窗口
val input: DataStream[T]=...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long={
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long={
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
窗口分配器 —— window() 方法
TumblingEventTimeWindows:滾動(dòng)事件時(shí)間窗口
TumblingProcessingTimeWindows:滾動(dòng)處理時(shí)間窗口
SlidingEventTimeWindows:滑動(dòng)事件時(shí)間窗口
SlidingProcessingTimeWindows:滑動(dòng)處理時(shí)間窗口
EventTimeSessionWindows:會(huì)話事件時(shí)間窗口
SlidingProcessingTimeWindows:會(huì)話處理時(shí)間窗口
window function 定義了要對(duì)窗口中收集的數(shù)據(jù)做的計(jì)算操作。可以分為兩類。
val input: DataStream[(String, Long)]=...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2)=> (v1._1, v1._2 + v2._2) }
val input: DataStream[(String, Long)]=...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
一個(gè)ProcessWindowFunction可以這樣定義和使用:
val input: DataStream[(String, Long)]=...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String])={
var count=0L
for (in <- input) {
count=count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
官方文檔
Flink 明確支持以下三種時(shí)間語(yǔ)義:
上面圖片來(lái)源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/
我們可以直接在代碼中,對(duì)執(zhí)行環(huán)境調(diào)用 setStreamTimeCharacteristic
方法,設(shè)置流的時(shí)間特性,具體的時(shí)間,還需要從數(shù)據(jù)中提取時(shí)間戳(timestamp)
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
官方文檔
當(dāng) Flink 以 Event Time 模式處理數(shù)據(jù)流時(shí),它會(huì)根據(jù)數(shù)據(jù)里的時(shí)間戳來(lái)
處理基于時(shí)間的算子,由于網(wǎng)絡(luò)、分布式等原因,會(huì)導(dǎo)致亂序數(shù)據(jù)的產(chǎn)生,亂序數(shù)據(jù)會(huì)讓窗口計(jì)算不準(zhǔn)確。Watermark正是處理亂序數(shù)據(jù)而來(lái)的。
遇到一個(gè)時(shí)間戳達(dá)到了窗口關(guān)閉時(shí)間,不應(yīng)該立刻觸發(fā)窗口計(jì)算,而是等
待一段時(shí)間,等遲到的數(shù)據(jù)來(lái)了再關(guān)閉窗口。
時(shí)間戳的分配與 watermark 的生成是齊頭并進(jìn)的,其可以告訴 Flink 應(yīng)用程序事件時(shí)間的進(jìn)度。其可以通過(guò)指定 WatermarkGenerator 來(lái)配置 watermark 的生成方式。
使用 Flink API 時(shí)需要設(shè)置一個(gè)同時(shí)包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場(chǎng)景下構(gòu)建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根據(jù)策略實(shí)例化一個(gè)可分配時(shí)間戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根據(jù)策略實(shí)例化一個(gè) watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
通常情況下,你不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。
【例如】你想要要使用有界無(wú)序(bounded-out-of-orderness)watermark 生成器和一個(gè) lambda 表達(dá)式作為時(shí)間戳分配器,那么可以按照如下方式實(shí)現(xiàn):
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long=element._1
})
【溫馨提示】其中 TimestampAssigner 的設(shè)置與否是可選的,大多數(shù)情況下,可以不用去特別指定。
WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用:
【溫馨提示】第一種方式相比會(huì)更好,因?yàn)閿?shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通常可以更精準(zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。
【示例】?jī)H當(dāng)無(wú)法直接在數(shù)據(jù)源上設(shè)置策略時(shí),才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy):
val env=StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[MyEvent]=env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent]=stream
.filter( _.severity==WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>)
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b)=> a.add(b) )
.addSink(...)
【示例】處理空閑數(shù)據(jù)源
如果數(shù)據(jù)源中的某一個(gè)分區(qū)/分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問(wèn)題。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化。
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1))
// 注意時(shí)間是毫秒,所以根據(jù)時(shí)間戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
// MyAssigner 可以有兩種類型,都繼承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())
定義了抽取時(shí)間戳,以及生成 watermark 的方法,有兩種類型
可以棄用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了
在 Flink 新的 WatermarkStrategy,TimestampAssigner 和 WatermarkGenerator 的抽象接口之前,F(xiàn)link 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它們,但建議使用新接口,因?yàn)槠鋵?duì)時(shí)間戳和 watermark 等重點(diǎn)的抽象和分離很清晰,并且還統(tǒng)一了周期性和標(biāo)記形式的 watermark 生成方式。
flink1.11版本后 建議用WatermarkStrategy(Watermark生成策略)生成Watermark,當(dāng)創(chuàng)建DataStream對(duì)象后,使用如下方法指定策略:assignTimestampsAndWatermarks(WatermarkStrategy<T>)
通常情況下,你不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。
通過(guò)調(diào)用WatermarkStrategy對(duì)象上的forBoundedOutOfOrderness方法來(lái)實(shí)現(xiàn),接收一個(gè)Duration類型的參數(shù)作為最大亂序(out of order)長(zhǎng)度。WatermarkStrategy對(duì)象上的withTimestampAssigner方法為從事件數(shù)據(jù)中提取時(shí)間戳提供了接口。
【示例】
package com.com.streaming.watermarkstrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.time.LocalDateTime;
//在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位線示例
public class ForBoundedOutOfOrderness {
public static void main(String[] args) throws Exception{
//創(chuàng)建流處理環(huán)境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置EventTime語(yǔ)義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設(shè)置周期生成Watermark間隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示數(shù)據(jù)
DataStreamSource<ClickEvent> mySource=env.fromElements(
new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
);
//WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位線
//可更好處理延遲數(shù)據(jù)
//BoundedOutOfOrdernessWatermarks<T>實(shí)現(xiàn)WatermarkGenerator<T>
SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
//指定Watermark生成策略,最大延遲長(zhǎng)度5毫秒
WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(5))
.withTimestampAssigner(
//SerializableTimestampAssigner接口中實(shí)現(xiàn)了extractTimestamp方法來(lái)指定如何從事件數(shù)據(jù)中抽取時(shí)間戳
new SerializableTimestampAssigner<ClickEvent>() {
@Override
public long extractTimestamp(ClickEvent event, long recordTimestamp) {
return event.getDateTime(event.getEventTime());
}
})
);
//結(jié)果打印
streamTS.print();
env.execute();
}
}
package com.com.streaming.watermarkstrategy;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class ClickEvent {
private String user;
private long l;
private int i;
private LocalDateTime eventTime;
public ClickEvent(LocalDateTime eventTime, String user, long l, int i) {
this.eventTime=eventTime;
this.user=user;
this.l=l;
this.i=i;
}
public LocalDateTime getEventTime() {
return eventTime;
}
public void setEventTime(LocalDateTime eventTime) {
this.eventTime=eventTime;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user=user;
}
public long getL() {
return l;
}
public void setL(long l) {
this.l=l;
}
public int getI() {
return i;
}
public void setI(int i) {
this.i=i;
}
public long getDateTime(LocalDateTime dt) {
ZoneOffset zoneOffset8=ZoneOffset.of("+8");
return dt.toInstant(zoneOffset8).toEpochMilli();
}
}
通過(guò)調(diào)用WatermarkStrategy對(duì)象上的forMonotonousTimestamps方法來(lái)實(shí)現(xiàn),無(wú)需任何參數(shù),相當(dāng)于將forBoundedOutOfOrderness策略的最大亂序長(zhǎng)度outOfOrdernessMillis設(shè)置為0。
package com.com.streaming.watermarkstrategy;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.time.LocalDateTime;
public class ForMonotonousTimestamps {
public static void main(String[] args) throws Exception{
//創(chuàng)建流處理環(huán)境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置EventTime語(yǔ)義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設(shè)置周期生成Watermark間隔(10毫秒)
env.getConfig().setAutoWatermarkInterval(10L);
//并行度1
env.setParallelism(1);
//演示數(shù)據(jù)
DataStreamSource<ClickEvent> mySource=env.fromElements(
new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
);
//WatermarkStrategy.forMonotonousTimestamps周期性生成水位線
//相當(dāng)于延遲outOfOrdernessMillis=0
//繼承自BoundedOutOfOrdernessWatermarks<T>
SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, recordTimestamp) -> event.getDateTime(event.getEventTime()))
);
//結(jié)果打印
streamTS.print();
env.execute();
}
}
WatermarkStrategy.noWatermarks()
上面代碼設(shè)置超時(shí)時(shí)間5毫秒,超過(guò)這個(gè)時(shí)間,沒(méi)有生成Watermark,將流狀態(tài)設(shè)置空閑,當(dāng)下次有新的Watermark生成并發(fā)送到下游時(shí),重新設(shè)置為活躍。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))
未完待續(xù)~
Flink 是一個(gè)分布式流處理和批處理計(jì)算框架,具有高性能、容錯(cuò)性和靈活性。下面是 Flink 的架構(gòu)概述:
JobManager:JobManager 是 Flink 集群的主節(jié)點(diǎn),負(fù)責(zé)接收和處理用戶提交的作業(yè)。JobManager 的主要職責(zé)包括:
解析和驗(yàn)證用戶提交的作業(yè)。
生成執(zhí)行計(jì)劃,并將作業(yè)圖分發(fā)給 TaskManager。
協(xié)調(diào)任務(wù)的調(diào)度和執(zhí)行。
管理作業(yè)的狀態(tài)和元數(shù)據(jù)信息。
TaskManager:TaskManager 是 Flink 集群的工作節(jié)點(diǎn),負(fù)責(zé)執(zhí)行具體的任務(wù)。每個(gè) TaskManager 可以運(yùn)行多個(gè)任務(wù)(子任務(wù)),每個(gè)子任務(wù)運(yùn)行在一個(gè)單獨(dú)的線程中,共享 TaskManager 的資源。TaskManager 的主要職責(zé)包括:
接收并執(zhí)行 JobManager 分配的任務(wù)。
負(fù)責(zé)任務(wù)的數(shù)據(jù)處理、狀態(tài)管理、故障恢復(fù)等操作。
將處理結(jié)果返回給 JobManager。
StateBackend:StateBackend 是 Flink 的狀態(tài)管理機(jī)制,用于保存和恢復(fù)任務(wù)的狀態(tài)信息,確保任務(wù)在失敗后可以進(jìn)行故障恢復(fù)。Flink 提供了多種 StateBackend 實(shí)現(xiàn),包括內(nèi)存、文件系統(tǒng)、RocksDB 等。
DataStream API / DataSet API:Flink 提供了兩種不同的編程接口,用于流處理和批處理:
DataStream API:面向流式計(jì)算,支持實(shí)時(shí)數(shù)據(jù)流的處理和分析。它提供了豐富的操作符(例如 map、filter、window、join 等)和窗口函數(shù),以便進(jìn)行數(shù)據(jù)轉(zhuǎn)換和聚合操作。
DataSet API:面向批處理,適用于有界數(shù)據(jù)集的處理。它提供了類似于 Hadoop MapReduce 的操作符(例如 map、reduce、join 等),用于對(duì)數(shù)據(jù)集進(jìn)行轉(zhuǎn)換和計(jì)算。
Connectors:Flink 提供了多種連接器,用于與外部系統(tǒng)進(jìn)行數(shù)據(jù)交互。常見(jiàn)的連接器包括 Kafka、Hadoop、Elasticsearch、JDBC 等,可以用于讀取和寫入外部數(shù)據(jù)源。
資源管理器:Flink 可以與各種集群管理工具(如 YARN、Mesos、Kubernetes)集成,以實(shí)現(xiàn)資源的動(dòng)態(tài)分配和任務(wù)調(diào)度。
Flink 的架構(gòu)使得它能夠?qū)崿F(xiàn)高性能的流處理和批處理,同時(shí)具備良好的容錯(cuò)性和可伸縮性。它廣泛應(yīng)用于實(shí)時(shí)數(shù)據(jù)處理、數(shù)據(jù)湖分析、事件驅(qū)動(dòng)應(yīng)用等場(chǎng)景。
大數(shù)據(jù)流處理框架 Flink 和 Aflink 的技術(shù)架構(gòu)主要包括以下組件:
JobManager:負(fù)責(zé)接收 Job 圖,并將其分發(fā)給 TaskManager。
TaskManager:負(fù)責(zé)執(zhí)行任務(wù),包括數(shù)據(jù)源、數(shù)據(jù)計(jì)算、數(shù)據(jù)匯總等操作。
StateBackend:用于保存狀態(tài)信息,支持容錯(cuò)和恢復(fù)。
DataStream API:用于定義數(shù)據(jù)流處理邏輯,包括窗口函數(shù)、聚合操作等。
Connector:用于連接外部數(shù)據(jù)源,如 Kafka。
編輯
JobManager 和 TaskManager 之間的通信方式主要有兩種:心跳機(jī)制和RPC(遠(yuǎn)程過(guò)程調(diào)用)。
心跳機(jī)制:JobManager 和 TaskManager 通過(guò)心跳機(jī)制保持連接和通信。具體流程如下:
JobManager 定期向所有的 TaskManager 發(fā)送心跳信號(hào),確認(rèn) TaskManager 是否存活。
TaskManager 接收到心跳信號(hào)后,回復(fù)確認(rèn)信號(hào)給 JobManager,表示自己還活著。
如果 JobManager 在一段時(shí)間內(nèi)沒(méi)有收到 TaskManager 的心跳信號(hào),就會(huì)認(rèn)為該 TaskManager 失效,并進(jìn)行相應(yīng)的處理。
RPC:JobManager 和 TaskManager 使用 RPC 機(jī)制進(jìn)行通信,以傳遞任務(wù)和數(shù)據(jù)等信息。具體流程如下:
JobManager 將任務(wù)調(diào)度圖發(fā)送給 TaskManager。這包括任務(wù)的執(zhí)行計(jì)劃、數(shù)據(jù)源、算子操作等。
TaskManager 接收到任務(wù)調(diào)度圖后,根據(jù)指令執(zhí)行任務(wù),處理數(shù)據(jù)流。
TaskManager 在處理過(guò)程中將結(jié)果返回給 JobManager,以便進(jìn)行狀態(tài)更新和后續(xù)處理。
編輯
需要注意的是,JobManager 和 TaskManager 的通信是基于網(wǎng)絡(luò)的,它們可以部署在不同的機(jī)器上。在一個(gè) Flink 集群中,通常會(huì)有一個(gè) JobManager 和多個(gè) TaskManager,它們通過(guò)上述的通信方式協(xié)同工作,實(shí)現(xiàn)數(shù)據(jù)流的處理和任務(wù)調(diào)度。
流式計(jì)算:Flink 和 Aflink 是流式計(jì)算框架,能夠?qū)崟r(shí)處理無(wú)界數(shù)據(jù)流。流式計(jì)算基于事件驅(qū)動(dòng)的模型,能夠處理實(shí)時(shí)數(shù)據(jù)并支持低延遲計(jì)算。
窗口函數(shù):窗口函數(shù)用于對(duì)數(shù)據(jù)流進(jìn)行分組聚合操作,常見(jiàn)的窗口類型包括滾動(dòng)窗口、滑動(dòng)窗口和會(huì)話窗口。窗口函數(shù)允許用戶在有限的數(shù)據(jù)集上執(zhí)行計(jì)算操作。
窗口類型
Flink 框架提供了多種窗口函數(shù),用于對(duì)數(shù)據(jù)流進(jìn)行分組聚合操作。以下是一些常見(jiàn)的窗口函數(shù):
滾動(dòng)窗口(Tumbling Window):將數(shù)據(jù)流劃分為固定大小的、不重疊的窗口。每個(gè)窗口包含相同數(shù)量的元素,并且窗口之間沒(méi)有重疊。可以通過(guò) window(Tumble.over()) 方法來(lái)定義滾動(dòng)窗口。
滑動(dòng)窗口(Sliding Window):將數(shù)據(jù)流劃分為固定大小的、可能重疊的窗口。每個(gè)窗口包含指定數(shù)量的元素,并且窗口之間可以有重疊。可以通過(guò) window(Slide.over()) 方法來(lái)定義滑動(dòng)窗口。
會(huì)話窗口(Session Window):根據(jù)事件之間的時(shí)間間隔將數(shù)據(jù)流劃分為不固定長(zhǎng)度的會(huì)話窗口。如果在指定時(shí)間間隔內(nèi)沒(méi)有新事件到達(dá),則會(huì)話窗口關(guān)閉。可以通過(guò) window(Session.withGap()) 方法來(lái)定義會(huì)話窗口。
全局窗口(Global Window):將整個(gè)數(shù)據(jù)流視為一個(gè)窗口,不進(jìn)行數(shù)據(jù)切分。適用于需要計(jì)算整個(gè)數(shù)據(jù)流的聚合結(jié)果的場(chǎng)景。可以通過(guò) window(Global()) 方法來(lái)定義全局窗口。
自定義窗口函數(shù):Flink 還支持自定義窗口函數(shù),以便滿足特定需求。您可以實(shí)現(xiàn) WindowFunction 接口來(lái)定義自己的窗口函數(shù),并通過(guò) apply() 方法來(lái)處理窗口中的元素。
這些窗口函數(shù)可以和其他操作符(例如 groupBy()、reduce()、aggregate() 等)一起使用,以實(shí)現(xiàn)各種數(shù)據(jù)流處理和聚合操作。
不同類型的窗口函數(shù)適用于不同的業(yè)務(wù)場(chǎng)景,具體選擇哪種窗口函數(shù)取決于您的需求和數(shù)據(jù)流的特點(diǎn)。
窗口函數(shù)都有其特定的使用場(chǎng)景,下面我會(huì)簡(jiǎn)要介紹每種窗口函數(shù)的典型應(yīng)用場(chǎng)景,并提供 Java 和 Python 代碼示例。
滾動(dòng)窗口(Tumbling Window)
使用場(chǎng)景:適用于需要對(duì)固定大小的數(shù)據(jù)范圍進(jìn)行聚合計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)每5分鐘內(nèi)的數(shù)據(jù)總和。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .sum(1);
Python 代碼示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.window import TumblingEventTimeWindows from pyflink.common import WatermarkStrategy env = StreamExecutionEnvironment.get_execution_environment() input_stream = ... # 輸入數(shù)據(jù)流result_stream = input_stream.key_by(lambda x: x[0]).window(TumblingEventTimeWindows.of('5 minutes')).sum(1)
滑動(dòng)窗口(Sliding Window)
使用場(chǎng)景:適用于需要對(duì)數(shù)據(jù)流進(jìn)行連續(xù)且重疊的窗口計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)每5分鐘計(jì)算一次數(shù)據(jù)總和,并且每次計(jì)算時(shí)包含前一個(gè)窗口的部分?jǐn)?shù)據(jù)。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import SlidingEventTimeWindows result_stream = input_stream .key_by(lambda x: x[0]).window(SlidingEventTimeWindows.of('10 minutes', '5 minutes')).sum(1)
會(huì)話窗口(Session Window)
使用場(chǎng)景:適用于需要基于活動(dòng)之間的間隔時(shí)間來(lái)劃分窗口的場(chǎng)景,例如用戶在網(wǎng)站上的一系列操作之間的時(shí)間間隔作為窗口的劃分條件。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10))) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import EventTimeSessionWindows result_stream = input_stream .key_by(lambda x: x[0]).window(EventTimeSessionWindows.with_gap('10 minutes')).sum(1)
全局窗口(Global Window)
使用場(chǎng)景:適用于對(duì)整個(gè)數(shù)據(jù)流進(jìn)行聚合計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)全天的數(shù)據(jù)總和。
Java 代碼示例:
DataStream<Tuple2<String, Integer>> input = ... ; // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .sum(1);
Python 代碼示例:
from pyflink.datastream.window import GlobalWindows from pyflink.datastream.trigger import CountTrigger result_stream = input_stream .key_by(lambda x: x[0]).window(GlobalWindows.create()) .trigger(CountTrigger(1)).sum(1)
以下是一個(gè)簡(jiǎn)單的示例代碼,使用 Java 和 Python 分別演示讀取 Kafka 數(shù)據(jù)并計(jì)算指標(biāo)的過(guò)程:
Java 代碼示例:
// 創(chuàng)建 Flink 程序入口 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 從 Kafka 中讀取數(shù)據(jù) FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(kafkaConsumer); // 對(duì)數(shù)據(jù)流進(jìn)行處理,計(jì)算指標(biāo) DataStream<Result> resultStream = stream .flatMap(new UserAccessFlatMapFunction()) .keyBy("userId") .timeWindow(Time.minutes(5)) .apply(new UserAccessWindowFunction()); // 執(zhí)行任務(wù) env.execute("User Access Analysis");
Python 代碼示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.table.descriptors import Schema, Kafka # 創(chuàng)建 Flink 環(huán)境 env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) # 從 Kafka 讀取數(shù)據(jù) t_env.connect( Kafka() .version("universal") .topic("topic") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .start_from_earliest() .finish() ).with_format( Json() ).with_schema( Schema() .field("user_id", DataTypes.STRING()) .field("timestamp", DataTypes.TIMESTAMP(3)) ).create_temporary_table("MySource") # 計(jì)算指標(biāo) t_env.from_path("MySource") \ .window(Tumble.over("5.minutes").on("timestamp").alias("w")) \ .group_by("user_id, w") \ .select("user_id, w.end as window_end, count(user_id) as pv, count_distinct(user_id) as uv") \ .execute_insert("MySink")
Flink 和 Aflink 可以用于加載數(shù)據(jù)湖中的大規(guī)模數(shù)據(jù)集,進(jìn)行 AI 模型訓(xùn)練。通過(guò)流式處理和批處理相結(jié)合,可以有效處理圖片、音頻、文本等多媒體數(shù)據(jù),用于風(fēng)控等場(chǎng)景。
當(dāng)使用 Flink 進(jìn)行機(jī)器學(xué)習(xí)時(shí),通常會(huì)使用 Flink 的批處理和流處理 API 結(jié)合機(jī)器學(xué)習(xí)庫(kù)(如 Apache Flink ML、Apache Mahout 等)來(lái)實(shí)現(xiàn)各種機(jī)器學(xué)習(xí)任務(wù)。這里我將為您提供一個(gè)簡(jiǎn)單的示例,演示如何在 Flink 中使用批處理 API 來(lái)進(jìn)行線性回歸的訓(xùn)練。
首先,讓我們看一下 Java 代碼示例:
import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.DataSet; import org.apache.flink.ml.common.LabeledVector; import org.apache.flink.ml.regression.MultipleLinearRegression; public class LinearRegressionExample { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 創(chuàng)建帶標(biāo)簽的向量數(shù)據(jù)集 DataSet<LabeledVector> trainingData = ... ; // 從數(shù)據(jù)源加載帶標(biāo)簽的向量數(shù)據(jù)集 // 初始化線性回歸模型 MultipleLinearRegression mlr = new MultipleLinearRegression(); mlr.setStepsize(0.5); // 設(shè)置步長(zhǎng) mlr.setIterations(100); // 設(shè)置迭代次數(shù) // 訓(xùn)練線性回歸模型 mlr.fit(trainingData); // 獲取訓(xùn)練后的模型參數(shù) double[] weights = mlr.weights(); double intercept = mlr.intercept(); // 打印模型參數(shù) System.out.println("Weights: " + Arrays.toString(weights)); System.out.println("Intercept: " + intercept); } }
Python 代碼示例:
from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import LabeledVector from pyflink.ml.preprocessing import Splitter from pyflink.ml.regression import MultipleLinearRegression env = ExecutionEnvironment.get_execution_environment() # 創(chuàng)建帶標(biāo)簽的向量數(shù)據(jù)集 training_data = ... # 從數(shù)據(jù)源加載帶標(biāo)簽的向量數(shù)據(jù)集 # 初始化線性回歸模型 mlr = MultipleLinearRegression() mlr.set_step_size(0.5) # 設(shè)置步長(zhǎng) mlr.set_max_iterations(100) # 設(shè)置最大迭代次數(shù) # 訓(xùn)練線性回歸模型mlr.fit(training_data) # 獲取訓(xùn)練后的模型參數(shù) weights = mlr.weights_ intercept = mlr.intercept_ # 打印模型參數(shù) print("Weights: ", weights) print("Intercept: ", intercept)
在 Flink 中使用批處理 API 進(jìn)行線性回歸模型的訓(xùn)練。實(shí)際上,在 Flink 中進(jìn)行更復(fù)雜的機(jī)器學(xué)習(xí)任務(wù)時(shí),可能需要結(jié)合更多的預(yù)處理、特征工程、模型評(píng)估等步驟,以及更豐富的機(jī)器學(xué)習(xí)算法和模型庫(kù)。
Flink Table API 是 Apache Flink 提供的一種用于處理結(jié)構(gòu)化數(shù)據(jù)的高級(jí) API,它提供了一種類 SQL 的聲明性編程方式,使用戶可以通過(guò)類 SQL 的語(yǔ)法來(lái)操作流式和批處理數(shù)據(jù)。使用 Table API,用戶可以方便地進(jìn)行數(shù)據(jù)查詢、轉(zhuǎn)換、聚合等操作,而無(wú)需編寫復(fù)雜的低級(jí)別代碼。
下面是一個(gè)簡(jiǎn)單的示例,演示如何在 Flink 中使用 Table API 來(lái)實(shí)現(xiàn)對(duì)輸入數(shù)據(jù)流的簡(jiǎn)單轉(zhuǎn)換和聚合:
Java 示例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TableAPIExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 創(chuàng)建輸入數(shù)據(jù)流 Table inputTable = tableEnv.fromDataStream(inputDataStream, "name, age"); // 查詢和轉(zhuǎn)換操作 Table resultTable = inputTable .filter("age > 18") .groupBy("name") .select("name, count(1) as count"); // 將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出 tableEnv.toRetractStream(resultTable, Row.class).print(); env.execute("Table API Example"); } }
Python 示例:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) # 創(chuàng)建輸入數(shù)據(jù)流 input_table = table_env.from_data_stream(input_data_stream, ['name', 'age']) # 查詢和轉(zhuǎn)換操作 result_table = input_table \ .filter("age > 18") \ .group_by("name") \ .select("name, count(1) as count") # 將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出 table_env.to_retract_stream(result_table, Row).print() env.execute("Table API Example")
創(chuàng)建了一個(gè)輸入數(shù)據(jù)流,然后使用 Table API 對(duì)數(shù)據(jù)流進(jìn)行過(guò)濾、分組和聚合操作,最后將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出。這展示了 Table API 的簡(jiǎn)單用法,更復(fù)雜的操作和功能可以根據(jù)具體需求進(jìn)行擴(kuò)展。
發(fā)展歷史:Flink 于 2015 年正式發(fā)布,是一個(gè)快速發(fā)展的流處理引擎,Aflink 是 Flink 在國(guó)內(nèi)的一個(gè)分支,也得到了廣泛應(yīng)用。
市場(chǎng)優(yōu)勢(shì):Flink 和 Aflink 具有低延遲、高吞吐量等優(yōu)勢(shì),適用于實(shí)時(shí)數(shù)據(jù)處理場(chǎng)景。在大數(shù)據(jù)領(lǐng)域,它們已成為重要的流式計(jì)算框架,廣泛應(yīng)用于金融、電商、物聯(lián)網(wǎng)等行業(yè)。