操屁眼的视频在线免费看,日本在线综合一区二区,久久在线观看免费视频,欧美日韩精品久久综

新聞資訊

    一、window 概念

    窗口(window)是處理無(wú)限流的核心。窗口將流分割成有限大小的“桶”,我們可以在桶上應(yīng)用計(jì)算。本文檔重點(diǎn)介紹如何在Flink中執(zhí)行窗口操作,以及程序員如何從其提供的功能中獲得最大的好處。

    一個(gè)有窗口的Flink程序的一般結(jié)構(gòu)如下所示。第一個(gè)片段指的是鍵控流,而第二個(gè)片段指的是非鍵控流。可以看到,唯一的區(qū)別是keyBy(…)調(diào)用鍵流而window(…)調(diào)用非鍵流的windowwall(…)。這也將作為頁(yè)面其余部分的路標(biāo)。

    Keyed Windows

    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)              <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    Non-Keyed Windows

    stream
           .windowAll(...)           <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    一般真實(shí)的流都是無(wú)界的,怎樣處理無(wú)界的數(shù)據(jù)?

    在自然環(huán)境中,數(shù)據(jù)的產(chǎn)生原本就是流式的。無(wú)論是來(lái)自 Web 服務(wù)器的事件數(shù)據(jù),證券交易所的交易數(shù)據(jù),還是來(lái)自工廠車間機(jī)器上的傳感器數(shù)據(jù),其數(shù)據(jù)都是流式的。但是當(dāng)你 分析數(shù)據(jù)時(shí),可以圍繞 有界流(bounded)或 無(wú)界流(unbounded)兩種模型來(lái)組織處理數(shù)據(jù),當(dāng)然,選擇不同的模型,程序的執(zhí)行和處理方式也都會(huì)不同。

    上面圖片來(lái)源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/learn-flink/overview/

    • 可以把無(wú)限的數(shù)據(jù)流進(jìn)行切分,得到有限的數(shù)據(jù)集進(jìn)行處理 —— 也
      就是得到有界流
    • 窗口(window)就是將無(wú)限流切割為有限流的一種方式,它會(huì)將流
      數(shù)據(jù)分發(fā)到有限大小的桶(bucket)中進(jìn)行分析

    二、 時(shí)間窗口(Time Window)

    官方文檔

    1)滾動(dòng)窗口(Tumbling Windows)

    翻轉(zhuǎn)窗口賦值器將每個(gè)元素賦值給一個(gè)指定窗口大小的窗口。滾動(dòng)的窗口有固定的尺寸,而且不重疊。例如,如果您指定一個(gè)大小為5分鐘的滾動(dòng)窗口,則當(dāng)前窗口將被評(píng)估,并每5分鐘啟動(dòng)一個(gè)新窗口,如下圖所示:

    【特點(diǎn)】

    • 將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切分
    • 時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊

    【示例代碼】

    TumblingEventTimeWindows:滾動(dòng)事件時(shí)間窗口
    TumblingProcessingTimeWindows:滾動(dòng)處理時(shí)間窗口

    val input: DataStream[T]=...
    
    // tumbling event-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // tumbling processing-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // daily tumbling event-time windows offset by -8 hours.
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    2)滑動(dòng)窗口(Sliding Windows)

    滑動(dòng)窗口賦值器將元素賦值給固定長(zhǎng)度的窗口。類似于滾動(dòng)窗口賦值器,窗口的大小由窗口大小參數(shù)配置。另外一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口啟動(dòng)的頻率。因此,如果滑動(dòng)窗口小于窗口大小,則滑動(dòng)窗口可以重疊。在這種情況下,元素被分配給多個(gè)窗口。

    例如,您可以將大小為10分鐘的窗口滑動(dòng)5分鐘。這樣,每隔5分鐘就會(huì)出現(xiàn)一個(gè)窗口,其中包含在最后10分鐘內(nèi)到達(dá)的事件,如下圖所示:

    【特點(diǎn)】

    • 滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口由固定的窗口
      長(zhǎng)度和滑動(dòng)間隔組成
    • 窗口長(zhǎng)度固定,可以有重疊

    【示例代碼】

    SlidingEventTimeWindows:滑動(dòng)事件時(shí)間窗口
    SlidingProcessingTimeWindows:滑動(dòng)處理時(shí)間窗口

    val input: DataStream[T]=...
    
    // sliding event-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows offset by -8 hours
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    3)會(huì)話窗口(Session Windows)

    會(huì)話窗口分配器根據(jù)活動(dòng)的會(huì)話對(duì)元素進(jìn)行分組。與滑動(dòng)窗口不同,會(huì)話窗口沒(méi)有重疊,也沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間。相反,當(dāng)會(huì)話窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí),即當(dāng)一個(gè)不活動(dòng)間隙發(fā)生時(shí),會(huì)話窗口將關(guān)閉。會(huì)話窗口分配器可以配置一個(gè)靜態(tài)會(huì)話間隙,也可以配置一個(gè)會(huì)話間隙提取器函數(shù),該函數(shù)定義了不活動(dòng)的時(shí)間長(zhǎng)度。當(dāng)這段時(shí)間到期時(shí),當(dāng)前會(huì)話關(guān)閉,隨后的元素被分配到一個(gè)新的會(huì)話窗口。

    【特點(diǎn)】

    • 由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的 timeout 間隙組成,也就是
      一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口
    • 時(shí)間無(wú)對(duì)齊
    • 窗口長(zhǎng)度不固定,也不會(huì)重疊

    【示例代碼】

    EventTimeSessionWindows:會(huì)話事件時(shí)間窗口
    SlidingProcessingTimeWindows:會(huì)話處理時(shí)間窗口

    val input: DataStream[T]=...
    
    // event-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    // event-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long={
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    
    // processing-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    
    // processing-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long={
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    

    三、window API

    窗口分配器 —— window() 方法

    • 我們可以用 .window() 來(lái)定義一個(gè)窗口,然后基于這個(gè) window 去做一些聚
      合或者其它處理操作。注意 window () 方法必須在 keyBy 之后才能用。
    • Flink 提供了更加簡(jiǎn)單的三種類型時(shí)間窗口用于定義時(shí)
      間窗口,也提供了
      countWindowAll來(lái)定義計(jì)數(shù)窗口

    TumblingEventTimeWindows:滾動(dòng)事件時(shí)間窗口
    TumblingProcessingTimeWindows:滾動(dòng)處理時(shí)間窗口
    SlidingEventTimeWindows:滑動(dòng)事件時(shí)間窗口
    SlidingProcessingTimeWindows:滑動(dòng)處理時(shí)間窗口
    EventTimeSessionWindows:會(huì)話事件時(shí)間窗口
    SlidingProcessingTimeWindows:會(huì)話處理時(shí)間窗口

    四、窗口分配器(window assigner)

    window function 定義了要對(duì)窗口中收集的數(shù)據(jù)做的計(jì)算操作。可以分為兩類。

    1)增量聚合函數(shù)(incremental aggregation functions)

    • 每條數(shù)據(jù)到來(lái)就進(jìn)行計(jì)算,保持一個(gè)簡(jiǎn)單的狀態(tài)
    • ReduceFunction
    val input: DataStream[(String, Long)]=...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .reduce { (v1, v2)=> (v1._1, v1._2 + v2._2) }
    
    • AggregateFunction
    val input: DataStream[(String, Long)]=...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .aggregate(new AverageAggregate)
    

    2)全窗口函數(shù)(full window functions)

    • 先把窗口所有數(shù)據(jù)收集起來(lái),等到計(jì)算的時(shí)候會(huì)遍歷所有數(shù)據(jù)
    • ProcessWindowFunction

    一個(gè)ProcessWindowFunction可以這樣定義和使用:

    val input: DataStream[(String, Long)]=...
    
    input
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.minutes(5)))
      .process(new MyProcessWindowFunction())
    
    /* ... */
    
    class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    
      def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String])={
        var count=0L
        for (in <- input) {
          count=count + 1
        }
        out.collect(s"Window ${context.window} count: $count")
      }
    }
    

    3)其它可選window API

    • .trigger() —— 觸發(fā)器,定義 window 什么時(shí)候關(guān)閉,觸發(fā)計(jì)算并輸出結(jié)果
    • .evictor() —— 移除器,定義移除某些數(shù)據(jù)的邏輯
    • .allowedLateness() —— 允許處理遲到的數(shù)據(jù)
    • .sideOutputLateData() —— 將遲到的數(shù)據(jù)放入側(cè)輸出流
    • .getSideOutput() —— 獲取側(cè)輸出流

    五、Flink 中的時(shí)間語(yǔ)義

    官方文檔
    Flink 明確支持以下三種時(shí)間語(yǔ)義:

    • 事件時(shí)間(event time): 事件產(chǎn)生的時(shí)間,記錄的是設(shè)備生產(chǎn)(或者存儲(chǔ))事件的時(shí)間
    • 攝取時(shí)間(ingestion time): 數(shù)據(jù)進(jìn)入Flink的時(shí)間,F(xiàn)link 讀取事件時(shí)記錄的時(shí)間
    • 處理時(shí)間(processing time):執(zhí)行操作算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān)

    上面圖片來(lái)源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

    六、設(shè)置 Event Time

    我們可以直接在代碼中,對(duì)執(zhí)行環(huán)境調(diào)用 setStreamTimeCharacteristic
    方法,設(shè)置流的時(shí)間特性,具體的時(shí)間,還需要從數(shù)據(jù)中提取時(shí)間戳(timestamp)

    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    var env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    

    七、水位線(Watermark)

    官方文檔

    1)為什么需要水位線(Watermark)

    當(dāng) Flink 以 Event Time 模式處理數(shù)據(jù)流時(shí),它會(huì)根據(jù)數(shù)據(jù)里的時(shí)間戳來(lái)
    處理基于時(shí)間的算子,
    由于網(wǎng)絡(luò)、分布式等原因,會(huì)導(dǎo)致亂序數(shù)據(jù)的產(chǎn)生,亂序數(shù)據(jù)會(huì)讓窗口計(jì)算不準(zhǔn)確。Watermark正是處理亂序數(shù)據(jù)而來(lái)的。

    2)如何利用Watermark處理亂序數(shù)據(jù)問(wèn)題?

    遇到一個(gè)時(shí)間戳達(dá)到了窗口關(guān)閉時(shí)間,不應(yīng)該立刻觸發(fā)窗口計(jì)算,而是等
    待一段時(shí)間,等遲到的數(shù)據(jù)來(lái)了再關(guān)閉窗口。

    • Watermark 是一種衡量 Event Time 進(jìn)展的機(jī)制,可以設(shè)定延遲觸發(fā)
    • Watermark 是用于處理亂序事件的,而正確的處理亂序事件,通常
      Watermark 機(jī)制結(jié)合 window 來(lái)實(shí)現(xiàn)
    • 數(shù)據(jù)流中的 Watermark 用于表示 timestamp 小于 Watermark 的數(shù)據(jù),
      都已經(jīng)到達(dá)了,因此,window 的執(zhí)行也是由 Watermark 觸發(fā)的;
    • watermark 用來(lái)讓程序自己平衡延遲和結(jié)果正確性。

    3)watermark 的特點(diǎn)

    • watermark 是一條特殊的數(shù)據(jù)記錄
    • watermark 必須單調(diào)遞增,以確保任務(wù)的事件時(shí)間時(shí)鐘在向前推進(jìn),而不
      是在后退
    • watermark 與數(shù)據(jù)的時(shí)間戳相關(guān)

    4)watermark 的傳遞

    5)watermark 策略與應(yīng)用

    1)Watermark 策略簡(jiǎn)介

    時(shí)間戳的分配與 watermark 的生成是齊頭并進(jìn)的,其可以告訴 Flink 應(yīng)用程序事件時(shí)間的進(jìn)度。其可以通過(guò)指定 WatermarkGenerator 來(lái)配置 watermark 的生成方式。

    使用 Flink API 時(shí)需要設(shè)置一個(gè)同時(shí)包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場(chǎng)景下構(gòu)建自己的 watermark 策略。WatermarkStrategy 接口如下:

    public interface WatermarkStrategy<T> 
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
    
        /**
         * 根據(jù)策略實(shí)例化一個(gè)可分配時(shí)間戳的 {@link TimestampAssigner}。
         */
        @Override
        TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
    
        /**
         * 根據(jù)策略實(shí)例化一個(gè) watermark 生成器。
         */
        @Override
        WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
    }

    通常情況下,你不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssignerWatermarkGenerator 進(jìn)行綁定。

    【例如】你想要要使用有界無(wú)序(bounded-out-of-orderness)watermark 生成器和一個(gè) lambda 表達(dá)式作為時(shí)間戳分配器,那么可以按照如下方式實(shí)現(xiàn):

    WatermarkStrategy
      .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
      .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
        override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long=element._1
      })

    【溫馨提示】其中 TimestampAssigner 的設(shè)置與否是可選的,大多數(shù)情況下,可以不用去特別指定。

    2)使用 Watermark 策略應(yīng)用

    WatermarkStrategy 可以在 Flink 應(yīng)用程序中的兩處使用:

    • 第一種是直接在數(shù)據(jù)源上使用
    • 第二種是直接在非數(shù)據(jù)源的操作之后使用。

    【溫馨提示】第一種方式相比會(huì)更好,因?yàn)閿?shù)據(jù)源可以利用 watermark 生成邏輯中有關(guān)分片/分區(qū)(shards/partitions/splits)的信息。使用這種方式,數(shù)據(jù)源通常可以更精準(zhǔn)地跟蹤 watermark,整體 watermark 生成將更精確。

    【示例】?jī)H當(dāng)無(wú)法直接在數(shù)據(jù)源上設(shè)置策略時(shí),才應(yīng)該使用第二種方式(在任意轉(zhuǎn)換操作之后設(shè)置 WatermarkStrategy):

    val env=StreamExecutionEnvironment.getExecutionEnvironment
    
    val stream: DataStream[MyEvent]=env.readFile(
             myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
             FilePathFilter.createDefaultFilter())
    
    val withTimestampsAndWatermarks: DataStream[MyEvent]=stream
            .filter( _.severity==WARNING )
            .assignTimestampsAndWatermarks(<watermark strategy>)
    
    withTimestampsAndWatermarks
            .keyBy( _.getGroup )
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .reduce( (a, b)=> a.add(b) )
            .addSink(...)

    【示例】處理空閑數(shù)據(jù)源

    如果數(shù)據(jù)源中的某一個(gè)分區(qū)/分片在一段時(shí)間內(nèi)未發(fā)送事件數(shù)據(jù),則意味著 WatermarkGenerator 也不會(huì)獲得任何新數(shù)據(jù)去生成 watermark。我們稱這類數(shù)據(jù)源為空閑輸入或空閑源。在這種情況下,當(dāng)某些其他分區(qū)仍然發(fā)送事件數(shù)據(jù)的時(shí)候就會(huì)出現(xiàn)問(wèn)題。由于下游算子 watermark 的計(jì)算方式是取所有不同的上游并行數(shù)據(jù)源 watermark 的最小值,則其 watermark 將不會(huì)發(fā)生變化。

    WatermarkStrategy
      .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
      .withIdleness(Duration.ofMinutes(1))

    3)使用場(chǎng)景

    • 對(duì)于排好序的數(shù)據(jù),不需要延遲觸發(fā),可以只指定時(shí)間戳就行了。
    // 注意時(shí)間是毫秒,所以根據(jù)時(shí)間戳不同,可能需要乘以1000
    dataStream.assignAscendingTimestamps(_.timestamp * 1000)
    • Flink 暴露了 TimestampAssigner 接口供我們實(shí)現(xiàn),使我們可以自定義如
      何從事件數(shù)據(jù)中抽取時(shí)間戳和生成watermark。
    // MyAssigner 可以有兩種類型,都繼承自 TimestampAssigner
    dataStream.assignAscendingTimestamps(new MyAssigner())

    4)TimestampAssigner

    定義了抽取時(shí)間戳,以及生成 watermark 的方法,有兩種類型

    1、AssignerWithPeriodicWatermarks

    • 周期性的生成 watermark:系統(tǒng)會(huì)周期性的將 watermark 插入到流中
    • 默認(rèn)周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
      方法進(jìn)行設(shè)置
    • 升序和前面亂序的處理 BoundedOutOfOrderness ,都是基于周期性
      watermark 的。

    2、AssignerWithPunctuatedWatermarks

    • 沒(méi)有時(shí)間周期規(guī)律,可打斷的生成 watermark

    可以棄用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了

    在 Flink 新的 WatermarkStrategyTimestampAssignerWatermarkGenerator 的抽象接口之前,F(xiàn)link 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它們,但建議使用新接口,因?yàn)槠鋵?duì)時(shí)間戳和 watermark 等重點(diǎn)的抽象和分離很清晰,并且還統(tǒng)一了周期性和標(biāo)記形式的 watermark 生成方式。

    5)WatermarkStrategy(重點(diǎn))

    flink1.11版本后 建議用WatermarkStrategy(Watermark生成策略)生成Watermark,當(dāng)創(chuàng)建DataStream對(duì)象后,使用如下方法指定策略:assignTimestampsAndWatermarks(WatermarkStrategy<T>)

    通常情況下,你不用實(shí)現(xiàn)此接口,而是可以使用 WatermarkStrategy 工具類中通用的 watermark 策略,或者可以使用這個(gè)工具類將自定義的 TimestampAssigner 與 WatermarkGenerator 進(jìn)行綁定。

    1、固定亂序長(zhǎng)度策略(forBoundedOutOfOrderness)

    通過(guò)調(diào)用WatermarkStrategy對(duì)象上的forBoundedOutOfOrderness方法來(lái)實(shí)現(xiàn),接收一個(gè)Duration類型的參數(shù)作為最大亂序(out of order)長(zhǎng)度。WatermarkStrategy對(duì)象上的withTimestampAssigner方法為從事件數(shù)據(jù)中提取時(shí)間戳提供了接口。

    【示例】

    • ForBoundedOutOfOrderness.java
    package com.com.streaming.watermarkstrategy;
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    import java.time.LocalDateTime;
    
    //在assignTimestampsAndWatermarks中用WatermarkStrategy.forBoundedOutOfOrderness方法抽取Timestamp和生成周期性水位線示例
    public class ForBoundedOutOfOrderness {
    
        public static void main(String[] args) throws  Exception{
            //創(chuàng)建流處理環(huán)境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //設(shè)置EventTime語(yǔ)義
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //設(shè)置周期生成Watermark間隔(10毫秒)
            env.getConfig().setAutoWatermarkInterval(10L);
            //并行度1
            env.setParallelism(1);
            //演示數(shù)據(jù)
            DataStreamSource<ClickEvent> mySource=env.fromElements(
                    new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                    new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                    new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                    new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                    new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                    new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                    new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                    new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
            );
            //WatermarkStrategy.forBoundedOutOfOrderness周期性生成水位線
            //可更好處理延遲數(shù)據(jù)
            //BoundedOutOfOrdernessWatermarks<T>實(shí)現(xiàn)WatermarkGenerator<T>
            SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
                    //指定Watermark生成策略,最大延遲長(zhǎng)度5毫秒
                    WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofMillis(5))
                            .withTimestampAssigner(
                                    //SerializableTimestampAssigner接口中實(shí)現(xiàn)了extractTimestamp方法來(lái)指定如何從事件數(shù)據(jù)中抽取時(shí)間戳
                                    new SerializableTimestampAssigner<ClickEvent>() {
                                        @Override
                                        public long extractTimestamp(ClickEvent event, long recordTimestamp) {
                                            return event.getDateTime(event.getEventTime());
                                        }
                                    })
            );
            //結(jié)果打印
            streamTS.print();
            env.execute();
        }
    }
    
    • ClickEvent.java
    package com.com.streaming.watermarkstrategy;
    
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    
    public class ClickEvent {
        private String user;
        private long l;
        private int i;
        private LocalDateTime eventTime;
    
        public ClickEvent(LocalDateTime eventTime, String user, long l, int i) {
            this.eventTime=eventTime;
            this.user=user;
            this.l=l;
            this.i=i;
        }
    
        public LocalDateTime getEventTime() {
            return eventTime;
        }
    
        public void setEventTime(LocalDateTime eventTime) {
            this.eventTime=eventTime;
        }
    
        public String getUser() {
            return user;
        }
    
        public void setUser(String user) {
            this.user=user;
        }
    
        public long getL() {
            return l;
        }
    
        public void setL(long l) {
            this.l=l;
        }
    
        public int getI() {
            return i;
        }
    
        public void setI(int i) {
            this.i=i;
        }
    
        public long getDateTime(LocalDateTime dt) {
            ZoneOffset zoneOffset8=ZoneOffset.of("+8");
            return dt.toInstant(zoneOffset8).toEpochMilli();
        }
    }
    

    2、單調(diào)遞增策略(forMonotonousTimestamps)

    通過(guò)調(diào)用WatermarkStrategy對(duì)象上的forMonotonousTimestamps方法來(lái)實(shí)現(xiàn),無(wú)需任何參數(shù),相當(dāng)于將forBoundedOutOfOrderness策略的最大亂序長(zhǎng)度outOfOrdernessMillis設(shè)置為0

    • ForMonotonousTimestamps.java
    package com.com.streaming.watermarkstrategy;
    
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.time.Duration;
    import java.time.LocalDateTime;
    
    public class ForMonotonousTimestamps {
        public static void main(String[] args) throws  Exception{
            //創(chuàng)建流處理環(huán)境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
            //設(shè)置EventTime語(yǔ)義
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //設(shè)置周期生成Watermark間隔(10毫秒)
            env.getConfig().setAutoWatermarkInterval(10L);
            //并行度1
            env.setParallelism(1);
            //演示數(shù)據(jù)
            DataStreamSource<ClickEvent> mySource=env.fromElements(
                    new ClickEvent(LocalDateTime.now(), "user1", 1L, 1),
                    new ClickEvent(LocalDateTime.now(), "user1", 2L, 2),
                    new ClickEvent(LocalDateTime.now(), "user1", 3L, 3),
                    new ClickEvent(LocalDateTime.now(), "user1", 4L, 4),
                    new ClickEvent(LocalDateTime.now(), "user1", 5L, 5),
                    new ClickEvent(LocalDateTime.now(), "user1", 6L, 6),
                    new ClickEvent(LocalDateTime.now(), "user1", 7L, 7),
                    new ClickEvent(LocalDateTime.now(), "user1", 8L, 8)
            );
            //WatermarkStrategy.forMonotonousTimestamps周期性生成水位線
            //相當(dāng)于延遲outOfOrdernessMillis=0
            //繼承自BoundedOutOfOrdernessWatermarks<T>
    
            SingleOutputStreamOperator<ClickEvent> streamTS=mySource.assignTimestampsAndWatermarks(
                    WatermarkStrategy.<ClickEvent>forMonotonousTimestamps()
                            .withTimestampAssigner((event, recordTimestamp) -> event.getDateTime(event.getEventTime()))
            );
            //結(jié)果打印
            streamTS.print();
            env.execute();
        }
    }
    

    3、不生成策略(noWatermarks)

    WatermarkStrategy.noWatermarks()

    • 當(dāng)一個(gè)算子從多個(gè)上游算子中獲取數(shù)據(jù)時(shí),會(huì)取上游最小的Watermark作為自身的Watermark,并檢測(cè)是否滿足窗口觸發(fā)條件。當(dāng)達(dá)不到觸發(fā)條件,窗口會(huì)在內(nèi)存中緩存大量窗口數(shù)據(jù),導(dǎo)致內(nèi)存不足等問(wèn)題
    • flink提供了設(shè)置流狀態(tài)為空閑的withIdleness方法。在設(shè)置的超時(shí)時(shí)間內(nèi),當(dāng)某個(gè)數(shù)據(jù)流一直沒(méi)有事件數(shù)據(jù)到達(dá),就標(biāo)記這個(gè)流為空閑。下游算子不需要等待這條數(shù)據(jù)流產(chǎn)生的Watermark,而取其他上游激活狀態(tài)的Watermark,來(lái)決定是否需要觸發(fā)窗口計(jì)算。

    上面代碼設(shè)置超時(shí)時(shí)間5毫秒,超過(guò)這個(gè)時(shí)間,沒(méi)有生成Watermark,將流狀態(tài)設(shè)置空閑,當(dāng)下次有新的Watermark生成并發(fā)送到下游時(shí),重新設(shè)置為活躍。
    WatermarkStrategy.withIdleness(Duration.ofMillis(5))

    未完待續(xù)~

    架構(gòu)簡(jiǎn)介

    Flink 是一個(gè)分布式流處理和批處理計(jì)算框架,具有高性能、容錯(cuò)性和靈活性。下面是 Flink 的架構(gòu)概述:

    1. JobManager:JobManager 是 Flink 集群的主節(jié)點(diǎn),負(fù)責(zé)接收和處理用戶提交的作業(yè)。JobManager 的主要職責(zé)包括:

      • 解析和驗(yàn)證用戶提交的作業(yè)。

      • 生成執(zhí)行計(jì)劃,并將作業(yè)圖分發(fā)給 TaskManager。

      • 協(xié)調(diào)任務(wù)的調(diào)度和執(zhí)行。

      • 管理作業(yè)的狀態(tài)和元數(shù)據(jù)信息。

    2. TaskManager:TaskManager 是 Flink 集群的工作節(jié)點(diǎn),負(fù)責(zé)執(zhí)行具體的任務(wù)。每個(gè) TaskManager 可以運(yùn)行多個(gè)任務(wù)(子任務(wù)),每個(gè)子任務(wù)運(yùn)行在一個(gè)單獨(dú)的線程中,共享 TaskManager 的資源。TaskManager 的主要職責(zé)包括:

      • 接收并執(zhí)行 JobManager 分配的任務(wù)。

      • 負(fù)責(zé)任務(wù)的數(shù)據(jù)處理、狀態(tài)管理、故障恢復(fù)等操作。

      • 將處理結(jié)果返回給 JobManager。

    3. StateBackend:StateBackend 是 Flink 的狀態(tài)管理機(jī)制,用于保存和恢復(fù)任務(wù)的狀態(tài)信息,確保任務(wù)在失敗后可以進(jìn)行故障恢復(fù)。Flink 提供了多種 StateBackend 實(shí)現(xiàn),包括內(nèi)存、文件系統(tǒng)、RocksDB 等。

    4. DataStream API / DataSet API:Flink 提供了兩種不同的編程接口,用于流處理和批處理:

      • DataStream API:面向流式計(jì)算,支持實(shí)時(shí)數(shù)據(jù)流的處理和分析。它提供了豐富的操作符(例如 map、filter、window、join 等)和窗口函數(shù),以便進(jìn)行數(shù)據(jù)轉(zhuǎn)換和聚合操作。

      • DataSet API:面向批處理,適用于有界數(shù)據(jù)集的處理。它提供了類似于 Hadoop MapReduce 的操作符(例如 map、reduce、join 等),用于對(duì)數(shù)據(jù)集進(jìn)行轉(zhuǎn)換和計(jì)算。

    5. Connectors:Flink 提供了多種連接器,用于與外部系統(tǒng)進(jìn)行數(shù)據(jù)交互。常見(jiàn)的連接器包括 Kafka、Hadoop、Elasticsearch、JDBC 等,可以用于讀取和寫入外部數(shù)據(jù)源。

    6. 資源管理器:Flink 可以與各種集群管理工具(如 YARN、Mesos、Kubernetes)集成,以實(shí)現(xiàn)資源的動(dòng)態(tài)分配和任務(wù)調(diào)度。

    Flink 的架構(gòu)使得它能夠?qū)崿F(xiàn)高性能的流處理和批處理,同時(shí)具備良好的容錯(cuò)性和可伸縮性。它廣泛應(yīng)用于實(shí)時(shí)數(shù)據(jù)處理、數(shù)據(jù)湖分析、事件驅(qū)動(dòng)應(yīng)用等場(chǎng)景。

    組件模塊

    大數(shù)據(jù)流處理框架 Flink 和 Aflink 的技術(shù)架構(gòu)主要包括以下組件:

    1. JobManager:負(fù)責(zé)接收 Job 圖,并將其分發(fā)給 TaskManager。

    2. TaskManager:負(fù)責(zé)執(zhí)行任務(wù),包括數(shù)據(jù)源、數(shù)據(jù)計(jì)算、數(shù)據(jù)匯總等操作。

    3. StateBackend:用于保存狀態(tài)信息,支持容錯(cuò)和恢復(fù)。

    4. DataStream API:用于定義數(shù)據(jù)流處理邏輯,包括窗口函數(shù)、聚合操作等。

    5. Connector:用于連接外部數(shù)據(jù)源,如 Kafka。

    編輯

    JobManager 和 TaskManager 之間的通信方式主要有兩種:心跳機(jī)制和RPC(遠(yuǎn)程過(guò)程調(diào)用)。

    1. 心跳機(jī)制:JobManager 和 TaskManager 通過(guò)心跳機(jī)制保持連接和通信。具體流程如下:

      • JobManager 定期向所有的 TaskManager 發(fā)送心跳信號(hào),確認(rèn) TaskManager 是否存活。

      • TaskManager 接收到心跳信號(hào)后,回復(fù)確認(rèn)信號(hào)給 JobManager,表示自己還活著。

      • 如果 JobManager 在一段時(shí)間內(nèi)沒(méi)有收到 TaskManager 的心跳信號(hào),就會(huì)認(rèn)為該 TaskManager 失效,并進(jìn)行相應(yīng)的處理。

    2. RPC:JobManager 和 TaskManager 使用 RPC 機(jī)制進(jìn)行通信,以傳遞任務(wù)和數(shù)據(jù)等信息。具體流程如下:


      • JobManager 將任務(wù)調(diào)度圖發(fā)送給 TaskManager。這包括任務(wù)的執(zhí)行計(jì)劃、數(shù)據(jù)源、算子操作等。

      • TaskManager 接收到任務(wù)調(diào)度圖后,根據(jù)指令執(zhí)行任務(wù),處理數(shù)據(jù)流。

      • TaskManager 在處理過(guò)程中將結(jié)果返回給 JobManager,以便進(jìn)行狀態(tài)更新和后續(xù)處理。

    編輯

    需要注意的是,JobManager 和 TaskManager 的通信是基于網(wǎng)絡(luò)的,它們可以部署在不同的機(jī)器上。在一個(gè) Flink 集群中,通常會(huì)有一個(gè) JobManager 和多個(gè) TaskManager,它們通過(guò)上述的通信方式協(xié)同工作,實(shí)現(xiàn)數(shù)據(jù)流的處理和任務(wù)調(diào)度。

    與其他大數(shù)據(jù)組件集成

    流式計(jì)算和窗口函數(shù)原理

    • 流式計(jì)算:Flink 和 Aflink 是流式計(jì)算框架,能夠?qū)崟r(shí)處理無(wú)界數(shù)據(jù)流。流式計(jì)算基于事件驅(qū)動(dòng)的模型,能夠處理實(shí)時(shí)數(shù)據(jù)并支持低延遲計(jì)算。

    • 窗口函數(shù):窗口函數(shù)用于對(duì)數(shù)據(jù)流進(jìn)行分組聚合操作,常見(jiàn)的窗口類型包括滾動(dòng)窗口、滑動(dòng)窗口和會(huì)話窗口。窗口函數(shù)允許用戶在有限的數(shù)據(jù)集上執(zhí)行計(jì)算操作。

    窗口類型

    Flink 框架提供了多種窗口函數(shù),用于對(duì)數(shù)據(jù)流進(jìn)行分組聚合操作。以下是一些常見(jiàn)的窗口函數(shù):

    1. 滾動(dòng)窗口(Tumbling Window):將數(shù)據(jù)流劃分為固定大小的、不重疊的窗口。每個(gè)窗口包含相同數(shù)量的元素,并且窗口之間沒(méi)有重疊。可以通過(guò) window(Tumble.over()) 方法來(lái)定義滾動(dòng)窗口。

    2. 滑動(dòng)窗口(Sliding Window):將數(shù)據(jù)流劃分為固定大小的、可能重疊的窗口。每個(gè)窗口包含指定數(shù)量的元素,并且窗口之間可以有重疊。可以通過(guò) window(Slide.over()) 方法來(lái)定義滑動(dòng)窗口。

    3. 會(huì)話窗口(Session Window):根據(jù)事件之間的時(shí)間間隔將數(shù)據(jù)流劃分為不固定長(zhǎng)度的會(huì)話窗口。如果在指定時(shí)間間隔內(nèi)沒(méi)有新事件到達(dá),則會(huì)話窗口關(guān)閉。可以通過(guò) window(Session.withGap()) 方法來(lái)定義會(huì)話窗口。

    4. 全局窗口(Global Window):將整個(gè)數(shù)據(jù)流視為一個(gè)窗口,不進(jìn)行數(shù)據(jù)切分。適用于需要計(jì)算整個(gè)數(shù)據(jù)流的聚合結(jié)果的場(chǎng)景。可以通過(guò) window(Global()) 方法來(lái)定義全局窗口。

    5. 自定義窗口函數(shù):Flink 還支持自定義窗口函數(shù),以便滿足特定需求。您可以實(shí)現(xiàn) WindowFunction 接口來(lái)定義自己的窗口函數(shù),并通過(guò) apply() 方法來(lái)處理窗口中的元素。

    這些窗口函數(shù)可以和其他操作符(例如 groupBy()reduce()aggregate() 等)一起使用,以實(shí)現(xiàn)各種數(shù)據(jù)流處理和聚合操作。

    不同類型的窗口函數(shù)適用于不同的業(yè)務(wù)場(chǎng)景,具體選擇哪種窗口函數(shù)取決于您的需求和數(shù)據(jù)流的特點(diǎn)。

    窗口函數(shù)都有其特定的使用場(chǎng)景,下面我會(huì)簡(jiǎn)要介紹每種窗口函數(shù)的典型應(yīng)用場(chǎng)景,并提供 Java 和 Python 代碼示例。

    滾動(dòng)窗口(Tumbling Window)

    • 使用場(chǎng)景:適用于需要對(duì)固定大小的數(shù)據(jù)范圍進(jìn)行聚合計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)每5分鐘內(nèi)的數(shù)據(jù)總和。

    • Java 代碼示例

    DataStream<Tuple2<String, Integer>> input = ... ; 
    // 輸入數(shù)據(jù)流
     DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .sum(1);
    • Python 代碼示例

    from pyflink.datastream import StreamExecutionEnvironment 
    from pyflink.datastream.window import TumblingEventTimeWindows 
    from pyflink.common import WatermarkStrategy env = StreamExecutionEnvironment.get_execution_environment() 
    input_stream = ... 
    # 輸入數(shù)據(jù)流result_stream = input_stream.key_by(lambda x: x[0]).window(TumblingEventTimeWindows.of('5 minutes')).sum(1)

    滑動(dòng)窗口(Sliding Window)

    • 使用場(chǎng)景:適用于需要對(duì)數(shù)據(jù)流進(jìn)行連續(xù)且重疊的窗口計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)每5分鐘計(jì)算一次數(shù)據(jù)總和,并且每次計(jì)算時(shí)包含前一個(gè)窗口的部分?jǐn)?shù)據(jù)。

    • Java 代碼示例

    DataStream<Tuple2<String, Integer>> input = ... ; 
    // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5))) .sum(1);
    • Python 代碼示例

    from pyflink.datastream.window import SlidingEventTimeWindows
    result_stream = input_stream .key_by(lambda x: x[0]).window(SlidingEventTimeWindows.of('10 minutes', '5 minutes')).sum(1)

    會(huì)話窗口(Session Window)

    • 使用場(chǎng)景:適用于需要基于活動(dòng)之間的間隔時(shí)間來(lái)劃分窗口的場(chǎng)景,例如用戶在網(wǎng)站上的一系列操作之間的時(shí)間間隔作為窗口的劃分條件。

    • Java 代碼示例

    DataStream<Tuple2<String, Integer>> input = ... ; 
    // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10))) .sum(1);
    • Python 代碼示例

    from pyflink.datastream.window import EventTimeSessionWindows 
    result_stream = input_stream .key_by(lambda x: x[0]).window(EventTimeSessionWindows.with_gap('10 minutes')).sum(1)

    全局窗口(Global Window)

    • 使用場(chǎng)景:適用于對(duì)整個(gè)數(shù)據(jù)流進(jìn)行聚合計(jì)算的場(chǎng)景,例如統(tǒng)計(jì)全天的數(shù)據(jù)總和。

    • Java 代碼示例

    DataStream<Tuple2<String, Integer>> input = ... ; 
    // 輸入數(shù)據(jù)流 DataStream<Tuple2<String, Integer>> result = input .keyBy(0) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .sum(1);
    • Python 代碼示例

    from pyflink.datastream.window import GlobalWindows 
    from pyflink.datastream.trigger import CountTrigger 
    result_stream = input_stream .key_by(lambda x: x[0]).window(GlobalWindows.create()) .trigger(CountTrigger(1)).sum(1)

    讀取 Kafka 數(shù)據(jù)并計(jì)算指標(biāo)

    以下是一個(gè)簡(jiǎn)單的示例代碼,使用 Java 和 Python 分別演示讀取 Kafka 數(shù)據(jù)并計(jì)算指標(biāo)的過(guò)程:

    Java 代碼示例:

    // 創(chuàng)建 Flink 程序入口 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    // 從 Kafka 中讀取數(shù)據(jù)
     FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); 
    DataStream<String> stream = env.addSource(kafkaConsumer); 
    // 對(duì)數(shù)據(jù)流進(jìn)行處理,計(jì)算指標(biāo) DataStream<Result> resultStream = stream .flatMap(new UserAccessFlatMapFunction()) .keyBy("userId") .timeWindow(Time.minutes(5)) .apply(new UserAccessWindowFunction()); // 執(zhí)行任務(wù) env.execute("User Access Analysis");

    Python 代碼示例:

    from pyflink.datastream import StreamExecutionEnvironment 
    from pyflink.table import StreamTableEnvironment, EnvironmentSettings 
    from pyflink.table.descriptors import Schema, Kafka 
    # 創(chuàng)建 Flink 環(huán)境 env = StreamExecutionEnvironment.get_execution_environment() 
    env.set_parallelism(1) t_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()) 
    # 從 Kafka 讀取數(shù)據(jù) t_env.connect( Kafka() .version("universal") .topic("topic") .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .start_from_earliest() .finish() ).with_format( Json() ).with_schema( Schema() .field("user_id", DataTypes.STRING()) .field("timestamp", DataTypes.TIMESTAMP(3)) ).create_temporary_table("MySource") 
    # 計(jì)算指標(biāo) t_env.from_path("MySource") \ .window(Tumble.over("5.minutes").on("timestamp").alias("w")) \ .group_by("user_id, w") \ .select("user_id, w.end as window_end, count(user_id) as pv, count_distinct(user_id) as uv") \ .execute_insert("MySink")

    加載數(shù)據(jù)湖進(jìn)行 AI 模型訓(xùn)練

    Flink 和 Aflink 可以用于加載數(shù)據(jù)湖中的大規(guī)模數(shù)據(jù)集,進(jìn)行 AI 模型訓(xùn)練。通過(guò)流式處理和批處理相結(jié)合,可以有效處理圖片、音頻、文本等多媒體數(shù)據(jù),用于風(fēng)控等場(chǎng)景。

    當(dāng)使用 Flink 進(jìn)行機(jī)器學(xué)習(xí)時(shí),通常會(huì)使用 Flink 的批處理和流處理 API 結(jié)合機(jī)器學(xué)習(xí)庫(kù)(如 Apache Flink ML、Apache Mahout 等)來(lái)實(shí)現(xiàn)各種機(jī)器學(xué)習(xí)任務(wù)。這里我將為您提供一個(gè)簡(jiǎn)單的示例,演示如何在 Flink 中使用批處理 API 來(lái)進(jìn)行線性回歸的訓(xùn)練。

    首先,讓我們看一下 Java 代碼示例:

    import org.apache.flink.api.java.ExecutionEnvironment; 
    import org.apache.flink.api.java.DataSet; 
    import org.apache.flink.ml.common.LabeledVector; import org.apache.flink.ml.regression.MultipleLinearRegression; 
    public class LinearRegressionExample { 
        public static void main(String[] args) throws Exception {    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
            // 創(chuàng)建帶標(biāo)簽的向量數(shù)據(jù)集 
            DataSet<LabeledVector> trainingData = ... ; 
            // 從數(shù)據(jù)源加載帶標(biāo)簽的向量數(shù)據(jù)集 
            // 初始化線性回歸模型 
            MultipleLinearRegression mlr = new MultipleLinearRegression();                 
            mlr.setStepsize(0.5); 
            // 設(shè)置步長(zhǎng) 
            mlr.setIterations(100); 
            // 設(shè)置迭代次數(shù) 
            // 訓(xùn)練線性回歸模型 
            mlr.fit(trainingData); 
            // 獲取訓(xùn)練后的模型參數(shù) 
            double[] weights = mlr.weights(); 
            double intercept = mlr.intercept(); 
            // 打印模型參數(shù) 
            System.out.println("Weights: " + Arrays.toString(weights));         
            System.out.println("Intercept: " + intercept);
         }
     }

    Python 代碼示例:

    from pyflink.dataset import ExecutionEnvironment 
    from pyflink.datastream import LabeledVector 
    from pyflink.ml.preprocessing import Splitter 
    from pyflink.ml.regression import MultipleLinearRegression env = ExecutionEnvironment.get_execution_environment() 
    # 創(chuàng)建帶標(biāo)簽的向量數(shù)據(jù)集 training_data = ... 
    # 從數(shù)據(jù)源加載帶標(biāo)簽的向量數(shù)據(jù)集 # 初始化線性回歸模型 mlr = MultipleLinearRegression() mlr.set_step_size(0.5) # 設(shè)置步長(zhǎng) mlr.set_max_iterations(100) 
    # 設(shè)置最大迭代次數(shù) # 訓(xùn)練線性回歸模型mlr.fit(training_data) 
    # 獲取訓(xùn)練后的模型參數(shù) weights = mlr.weights_ intercept = mlr.intercept_ 
    # 打印模型參數(shù) print("Weights: ", weights) print("Intercept: ", intercept)

    在 Flink 中使用批處理 API 進(jìn)行線性回歸模型的訓(xùn)練。實(shí)際上,在 Flink 中進(jìn)行更復(fù)雜的機(jī)器學(xué)習(xí)任務(wù)時(shí),可能需要結(jié)合更多的預(yù)處理、特征工程、模型評(píng)估等步驟,以及更豐富的機(jī)器學(xué)習(xí)算法和模型庫(kù)。

    Flink Table API

    Flink Table API 是 Apache Flink 提供的一種用于處理結(jié)構(gòu)化數(shù)據(jù)的高級(jí) API,它提供了一種類 SQL 的聲明性編程方式,使用戶可以通過(guò)類 SQL 的語(yǔ)法來(lái)操作流式和批處理數(shù)據(jù)。使用 Table API,用戶可以方便地進(jìn)行數(shù)據(jù)查詢、轉(zhuǎn)換、聚合等操作,而無(wú)需編寫復(fù)雜的低級(jí)別代碼。

    下面是一個(gè)簡(jiǎn)單的示例,演示如何在 Flink 中使用 Table API 來(lái)實(shí)現(xiàn)對(duì)輸入數(shù)據(jù)流的簡(jiǎn)單轉(zhuǎn)換和聚合:

    Java 示例:

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.table.api.Table; 
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
    public class TableAPIExample {    public static void main(String[] args) throws Exception { 
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); 
            // 創(chuàng)建輸入數(shù)據(jù)流 
            Table inputTable = tableEnv.fromDataStream(inputDataStream, "name, age"); 
            // 查詢和轉(zhuǎn)換操作 
            Table resultTable = inputTable .filter("age > 18") .groupBy("name") .select("name, count(1) as count"); 
            // 將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出 tableEnv.toRetractStream(resultTable, Row.class).print(); env.execute("Table API Example"); 
        } 
    }

    Python 示例:

    from pyflink.datastream import StreamExecutionEnvironment 
    from pyflink.table import StreamTableEnvironment env = StreamExecutionEnvironment.get_execution_environment() 
    table_env = StreamTableEnvironment.create(env) 
    # 創(chuàng)建輸入數(shù)據(jù)流 input_table = table_env.from_data_stream(input_data_stream, ['name', 'age']) # 查詢和轉(zhuǎn)換操作 result_table = input_table \ .filter("age > 18") \ .group_by("name") \ .select("name, count(1) as count") 
    # 將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出 table_env.to_retract_stream(result_table, Row).print() env.execute("Table API Example")

    創(chuàng)建了一個(gè)輸入數(shù)據(jù)流,然后使用 Table API 對(duì)數(shù)據(jù)流進(jìn)行過(guò)濾、分組和聚合操作,最后將結(jié)果表轉(zhuǎn)換為數(shù)據(jù)流并打印輸出。這展示了 Table API 的簡(jiǎn)單用法,更復(fù)雜的操作和功能可以根據(jù)具體需求進(jìn)行擴(kuò)展。

    發(fā)展歷史和市場(chǎng)優(yōu)勢(shì)

    • 發(fā)展歷史:Flink 于 2015 年正式發(fā)布,是一個(gè)快速發(fā)展的流處理引擎,Aflink 是 Flink 在國(guó)內(nèi)的一個(gè)分支,也得到了廣泛應(yīng)用。

    • 市場(chǎng)優(yōu)勢(shì):Flink 和 Aflink 具有低延遲、高吞吐量等優(yōu)勢(shì),適用于實(shí)時(shí)數(shù)據(jù)處理場(chǎng)景。在大數(shù)據(jù)領(lǐng)域,它們已成為重要的流式計(jì)算框架,廣泛應(yīng)用于金融、電商、物聯(lián)網(wǎng)等行業(yè)。

網(wǎng)站首頁(yè)   |    關(guān)于我們   |    公司新聞   |    產(chǎn)品方案   |    用戶案例   |    售后服務(wù)   |    合作伙伴   |    人才招聘   |   

友情鏈接: 餐飲加盟

地址:北京市海淀區(qū)    電話:010-     郵箱:@126.com

備案號(hào):冀ICP備2024067069號(hào)-3 北京科技有限公司版權(quán)所有