???????? 最近花了一個星期的時間給大家整理了一份<font size="3" color="red">Flink優化</font>從多方面下手。末尾有彩蛋。<font size="3" color="#FF8C00">如果對大家有幫助動動發財的小手來個三連,感謝大家~</font>
此文已經制作成帶目錄的 PDF,<font size="3" color="006600">獲取本文PDF 版本,請掃下方二維碼加我微信</font>,備注:<font size="3" color="red">Flink企業級優化</font>

???????? Flink性能調優的第一步,就是為任務分配合適的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。
???????? 提交方式主要是yarn-per-job
,資源的分配在使用腳本提交Flink任務時進行指定。標準的Flink任務提交腳本(Generic CLI 模式)從1.11開始,增加了通用客戶端模式,參數使用-D <property=value>
指定。
bin/flink run
-t yarn-per-job
-d
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-c com.atguigu.app.dwd.LogBaseApp
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
bin/flink run
-t yarn-per-job
-d
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=2048mb \ JM2~4G足夠
-Dtaskmanager.memory.process.size=6144mb \ 單個TM2~8G足夠
-Dtaskmanager.numberOfTaskSlots=2 \ 與容器核數1core:1slot或1core:2slot
-c com.atguigu.app.dwd.LogBaseApp
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
???????? Flink是實時流處理,關鍵在于資源情況能不能抗住高峰時期每秒的數據量,通常用QPS/TPS來描述數據情況。
開發完成后,先進行壓測。任務并行度給10以下,測試單個并行度的處理上限。
然后 總QPS/單并行度的處理能力 = 并行度
不能只從QPS去得出并行度,因為有些字段少、邏輯簡單的任務,單并行度一秒處理幾萬條數據。而有些數據字段多,處理邏輯復雜,單并行度一秒只能處理1000條數據。
最好根據高峰期的QPS壓測,并行度*1.2倍,富余一些資源。
**數據源端是 Kafka,Source的并行度設置為Kafka對應Topic的分區數。**如果已經等于 Kafka 的分區數,消費速度仍跟不上數據生產速度,考慮下Kafka 要擴大分區,同時調大并行度等于分區數。Flink 的一個并行度可以處理一至多個分區的數據,如果并行度多于 Kafka 的分區數,那么就會造成有的并行度空閑,浪費資源。
Keyby之前的算子
一般不會做太重的操作,都是比如map、filter、flatmap等處理較快的算子,并行度可以和source保持一致。
Keyby之后的算子
如果并發較大,建議設置并行度為 2 的整數次冪,例如:128、256、512;
小并發任務的并行度不一定需要設置成 2 的整數次冪;
大并發任務如果沒有 KeyBy,并行度也無需設置為 2 的整數次冪;
Sink 端是數據流向下游的地方,可以根據 Sink 端的數據量及下游的服務抗壓能力進行評估。如果Sink端是Kafka,可以設為Kafka對應Topic的分區數。
Sink 端的數據量小,比較常見的就是監控告警的場景,并行度可以設置的小一些。
Source 端的數據量是最小的,拿到 Source 端流過來的數據后做了細粒度的拆分,數據量不斷的增加,到 Sink 端的數據量就非常大。那么在 Sink 到下游的存儲中間件的時候就需要提高并行度。
另外 Sink 端要與下游的服務進行交互,并行度還得根據下游的服務抗壓能力來設置,如果在 Flink Sink 這端的數據量過大的話,且 Sink 處并行度也設置的很大,但下游的服務完全撐不住這么大的并發寫入,可能會造成下游服務直接被寫掛,所以最終還是要在 Sink 處的并行度做一定的權衡。
RocksDB 是基于 LSM Tree 實現的(類似HBase),寫數據都是先緩存到內存中,所以RocksDB 的寫請求效率比較高。RocksDB 使用內存結合磁盤的方式來存儲數據,每次獲取數據時,先從內存中 blockcache 中查找,如果內存中沒有再去磁盤中查詢。優化后差不多單并行度 TPS 5000 record/s,性能瓶頸主要在于 RocksDB 對磁盤的讀請求,所以當處理性能不夠時,僅需要橫向擴展并行度即可提高整個Job 的吞吐量。以下幾個調優參數:
在flink-conf.yaml
中配置:
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
注意:不要配置單塊磁盤的多個目錄,務必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來分擔壓力。當設置多個 RocksDB 本地磁盤目錄時,Flink 會隨機選擇要使用的目錄,所以就可能存在三個并行度共用同一目錄的情況。如果服務器磁盤數較多,一般不會出現該情況,但是如果任務重啟后吞吐量較低,可以檢查是否發生了多個并行度共用同一塊磁盤的情況。
當一個 TaskManager 包含 3 個 slot 時,那么單個服務器上的三個并行度都對磁盤造成頻繁讀寫,從而導致三個并行度的之間相互爭搶同一個磁盤 io,這樣務必導致三個并行度的吞吐量都會下降。設置多目錄實現三個并行度使用不同的硬盤從而減少資源競爭。
如下所示是測試過程中磁盤的 IO 使用率,可以看出三個大狀態算子的并行度分別對應了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其他磁盤的 IO 平均使用率相對低很多。由此可見使用 RocksDB 做為狀態后端且有大狀態的頻繁讀取時, 對磁盤IO性能消耗確實比較大。
如下圖所示,其中兩個并行度共用了 sdb 磁盤,一個并行度使用 sdj磁盤??梢钥吹?sdb 磁盤的 IO 使用率已經達到了 91.6%,就會導致 sdb 磁盤對應的兩個并行度吞吐量大大降低,從而使得整個 Flink 任務吞吐量降低。如果每個服務器上有一兩塊 SSD,強烈建議將 RocksDB 的本地磁盤目錄配置到 SSD 的目錄下,從 HDD 改為 SSD 對于性能的提升可能比配置 10 個優化參數更有效。
一般我們的 Checkpoint 時間間隔可以設置為分鐘級別,例如 1 分鐘、3 分鐘,對于狀態很大的任務每次 Checkpoint 訪問 HDFS 比較耗時,可以設置為 5~10 分鐘一次Checkpoint,并且調大兩次 Checkpoint 之間的暫停間隔,例如設置兩次Checkpoint 之間至少暫停 4或8 分鐘。
如果 Checkpoint 語義配置為 EXACTLY_ONCE,那么在 Checkpoint 過程中還會存在 barrier 對齊的過程,可以通過 Flink Web UI 的 Checkpoint 選項卡來查看 Checkpoint 過程中各階段的耗時情況,從而確定到底是哪個階段導致 Checkpoint 時間過長然后針對性的解決問題。
RocksDB相關參數在1.3中已說明,可以在flink-conf.yaml指定,也可以在Job的代碼中調用API單獨指定,這里不再列出。
// 使? RocksDBStateBackend 做為狀態后端,并開啟增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://node01:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 開啟Checkpoint,間隔為 3 分鐘
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小間隔 4分鐘
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超時時間 10分鐘
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
在實際開發中,有各種環境(開發
、測試
、預發
、生產
),作業也有很多的配置:算子的并行度配置、Kafka 數據源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否開啟、狀態后端存儲路徑、數據庫地址、用戶名和密碼等各種各樣的配置,可能每個環境的這些配置對應的值都是不一樣的。
如果你是直接在代碼??寫死的配置,每次換個環境去運行測試作業,都要重新去修改代碼中的配置,然后編譯打包,提交運行,這樣就要花費很多時間在這些重復的勞動力上了。在 Flink 中可以通過使用 ParameterTool 類讀取配置,它可以讀取環境變量、運行參數、配置文件。
ParameterTool 是可序列化的,所以你可以將它當作參數進行傳遞給算子的自定義函數類。
我們可以在Flink的提交腳本添加運行參數,格式:
在 Flink 程序中可以直接使用 ParameterTool.fromArgs(args)
獲取到所有的參數,也可以通過 parameterTool.get(“username”)
方法獲取某個參數對應的值。
舉例:通過運行參數指定jobname
bin/flink run
-t yarn-per-job
-d
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-c com.atguigu.app.dwd.LogBaseApp
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
–jobname dwd-LogBaseApp //參數名自己隨便起,代碼里對應上即可
在代碼里獲取參數值:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String myJobname = parameterTool.get("jobname"); //參數名對應
env.execute(myJobname);
arameterTool 還?持通過 ParameterTool.fromSystemProperties() 方法讀取系統屬性。做個打印:
ParameterTool parameterTool = ParameterTool.fromSystemProperties();
System.out.println(parameterTool.toMap().toString());
可以使用ParameterTool.fromPropertiesFile("/application.properties")
讀取 properties 配置文件??梢詫⑺幸渲玫牡胤?#xff08;比如并行度和一些 Kafka、MySQL 等配置)都寫成可配置的,然后其對應的 key 和 value 值都寫在配置文件中,最后通過 ParameterTool 去讀取配置文件獲取對應的值。
在ExecutionConfig 中可以將 ParameterTool 注冊為全作業參數的參數,這樣就可以被 JobManager 的web 端以及用戶?定義函數中以配置值的形式訪問。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
可以不用將ParameterTool當作參數傳遞給算子的自定義函數,直接在用戶?定義的Rich 函數中直接獲取到參數值了。
env.addSource(new RichSourceFunction() {
@Override
public void run(SourceContext sourceContext) throws Exception {
while (true) {
ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
}
}
@Override
public void cancel() {
}
})
壓測的方式很簡單,先在kafka中積壓數據,之后開啟Flink任務,出現反壓,就是處理瓶頸。相當于水庫先積水,一下子泄洪。數據可以是自己造的模擬數據,也可以是生產中的部分數據。
反壓(BackPressure)通常產生于這樣的場景:短時間的負載高峰導致系統接收數據的速率遠高于它處理數據的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的數據快速堆積,或遇到大促、秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統崩潰。
反壓機制是指系統能夠自己檢測到被阻塞的 Operator,然后自適應地降低源頭或上游數據的發送速率,從而維持整個系統的穩定。Flink 任務一般運行在多個節點上,數據從上游算子發送到下游算子需要網絡傳輸,若系統在反壓時想要降低數據源頭或上游算子數據的發送速率,那么肯定也需要網絡傳輸。所以下面先來了解一下 Flink 的網絡流控(Flink 對網絡數據流量的控制)機制。
Flink 的反壓太過于天然了,導致無法簡單地通過監控 BufferPool 的使用情況來判斷反壓狀態。Flink 通過對運行中的任務進行采樣來確定其反壓,如果一個 Task 因為反壓導致處理速度降低了,那么它肯定會卡在向 LocalBufferPool 申請內存塊上。那么該 Task 的 stack trace 應該是這樣:
java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) [...]
監控對正常的任務運行有一定影響,因此只有當 Web 頁面切換到 Job 的 BackPressure 頁面時,JobManager 才會對該 Job 觸發反壓監控。默認情況下,JobManager 會觸發 100 次 stack trace 采樣,每次間隔 50ms 來確定反壓。Web 界面看到的比率表示在內部方法調用中有多少 stack trace 被卡在LocalBufferPool.requestBufferBlocking(),例如: 0.01 表示在 100 個采樣中只有 1 個被卡在LocalBufferPool.requestBufferBlocking()。采樣得到的比例與反壓狀態的對應關系如下:
Task 的狀態為 OK 表示沒有反壓,HIGH 表示這個 Task 被反壓。
在 Flink Web UI 中有 BackPressure 的頁面,通過該頁面可以查看任務中 subtask 的反壓狀態,如下兩圖所示,分別展示了狀態是 OK 和 HIGH 的場景。
排查的時候,先把operator chain禁用,方便定位。
當某個 Task 吞吐量下降時,基于 Credit 的反壓機制,上游不會給該 Task 發送數據,所以該 Task 不會頻繁卡在向 Buffer Pool 去申請 Buffer。反壓監控實現原理就是監控 Task 是否卡在申請 buffer 這一步,所以遇到瓶頸的 Task 對應的反壓??必然會顯示 OK,即表示沒有受到反壓。
如果該 Task 吞吐量下降,造成該Task 上游的 Task 出現反壓時,必然會存在:該 Task 對應的 InputChannel 變滿,已經申請不到可用的Buffer 空間。如果該 Task 的 InputChannel 還能申請到可用 Buffer,那么上游就可以給該 Task 發送數據,上游 Task 也就不會被反壓了,所以說遇到瓶頸且導致上游 Task 受到反壓的 Task 對應的InputChannel 必然是滿的(這?不考慮?絡遇到瓶頸的情況)。從這個思路出發,可以對該 Task 的 InputChannel 的使用情況進行監控,如果 InputChannel 使用率 100%,那么該 Task 就是我們要找的反壓源。Flink 1.9 及以上版本inPoolUsage 表示 inputFloatingBuffersUsage 和inputExclusiveBuffersUsage 的總和。
反壓時,可以看到遇到瓶頸的該Task的inPoolUage為1。
先檢查基本原因,然后再深入研究更復雜的原因,最后找出導致瓶頸的原因。下面列出從最基本到比較復雜的一些反壓潛在原因。
注意:反壓可能是暫時的,可能是由于負載高峰、CheckPoint 或作業重啟引起的數據積壓而導致反壓。如果反壓是暫時的,應該忽略它。另外,請記住,斷斷續續的反壓會影響我們分析和解決問題。
檢查涉及服務器基本資源的使用情況,如CPU、網絡或磁盤I/O,目前 Flink 任務使用最主要的還是內存和 CPU 資源,本地磁盤、依賴的外部存儲資源以及網卡資源一般都不會是瓶頸。如果某些資源被充分利用或大量使用,可以借助分析工具,分析性能瓶頸(JVM Profiler+ FlameGraph生成火焰圖)。
長時間GC暫停會導致性能問題??梢酝ㄟ^打印調試GC日志(通過-XX:+PrintGCDetails)或使用某些內存或 GC 分析器(GCViewer工具)來驗證是否處于這種情況。
在Flink提交腳本中,設置JVM參數,打印GC日志:
bin/flink run
-t yarn-per-job
-d
-p 5 \ 指定并行度
-Dyarn.application.queue=test \ 指定yarn隊列
-Djobmanager.memory.process.size=1024mb \ 指定JM的總進程大小
-Dtaskmanager.memory.process.size=1024mb \ 指定每個TM的總進程大小
-Dtaskmanager.numberOfTaskSlots=2 \ 指定每個TM的slot數
-Denv.java.opts="-XX:+PrintGCDetails -XX:+PrintGCDateStamps"
-c com.atguigu.app.dwd.LogBaseApp
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
因為是on yarn模式,運行的節點一個一個找比較麻煩??梢源蜷_WebUI,選擇JobManager或者TaskManager,點擊Stdout,即可看到GC日志,點擊下載按鈕即可將GC日志通過HTTP的方式下載下來。
通過 GC 日志分析出單個 Flink Taskmanager 堆總大小、年輕代、老年代分配的內存空間、Full GC 后老年代剩余大小等,相關指標定義可以去 Github 具體查看。
擴展:最重要的指標是Full GC后,老年代剩余大小這個指標,按照《Java性能優化權威指南》這本Java堆大小計算法則,設Full GC后老年代剩余大小空間為M,那么堆的大小建議3 ~ 4倍M,新生代為1 ~ 1.5倍M,老年代應為2 ~ 3倍M。
與上?的 CPU/線程瓶頸問題類似,subtask 可能會因為共享資源上高負載線程的競爭而成為瓶頸。同樣,可以考慮使用2.2.1提到的分析工具,考慮在用戶代碼中查找同步開銷、鎖競爭,盡管避免在用戶代碼中添加同步。
如果瓶頸是由數據傾斜引起的,可以嘗試通過將數據分區的 key 進行加鹽或通過實現本地預聚合來減輕數據傾斜的影響。(關于數據傾斜的詳細解決方案,會在下一章節詳細討論)
如果發現我們的 Source 端數據讀取性能比較低或者 Sink 端寫入性能較差,需要檢查第三方組件是否遇到瓶頸。例如,Kafka 集群是否需要擴容,Kafka 連接器是否并行度較低,HBase 的 rowkey 是否遇到熱點問題。關于第三方組件的性能問題,需要結合具體的組件來分析。
相同 Task 的多個 Subtask 中,個別Subtask 接收到的數據量明顯大于其他 Subtask 接收到的數據量,通過 Flink Web UI 可以精確地看到每個 Subtask 處理了多少數據,即可判斷出 Flink 任務是否存在數據傾斜。通常,數據傾斜也會引起反壓。
使用LocalKeyBy的思想:在 keyBy 上游算子數據發送之前,首先在上游算子的本地對數據進行聚合后再發送到下游,使下游接收到的數據量大大減少,從而使得 keyBy 之后的聚合操作不再是任務的瓶頸。類似MapReduce 中 Combiner 的思想,但是這要求聚合操作必須是多條數據或者一批數據才能聚合,單條數據沒有辦法通過聚合來減少數據量。從Flink LocalKeyBy 實現原理來講,必然會存在一個積攢批次的過程,在上游算子中必須攢夠一定的數據量,對這些數據聚合后再發送到下游。
**注意:**Flink是實時流處理,如果keyby之后的聚合操作存在數據傾斜,且沒有開窗口的情況下,簡單的認為使用兩階段聚合,是不能解決問題的。因為這個時候Flink是來一條處理一條,且向下游發送一條結果,對于原來keyby的維度(第二階段聚合)來講,數據量并沒有減少,且結果重復計算(非FlinkSQL,未使用回撤流),如下圖所示:
class LocalKeyByFlatMap extends RichFlatMapFunction<String, Tuple2<String,
//Checkpoint 時為了保證 Exactly Once,將 buffer 中的數據保存到該 ListState 中
private ListState<Tuple2<String, Long>> localPvStatListState;
//本地 buffer,存放 local 端緩存的 app 的 pv 信息
private HashMap<String, Long> localPvStat;
//緩存的數據量大小,即:緩存多少數據再向下游發送
private int batchSize;
//計數器,獲取當前批次接收的數據量
private AtomicInteger currentSize;
//構造器,批次大小傳參
LocalKeyByFlatMap(int batchSize){
this.batchSize = batchSize;
}
@Override
public void flatMap(String in, Collector collector) throws Exception {
// 將新來的數據添加到 buffer 中
Long pv = localPvStat.getOrDefault(in, 0L);
localPvStat.put(in, pv + 1);
// 如果到達設定的批次,則將 buffer 中的數據發送到下游
if(currentSize.incrementAndGet() >= batchSize){
// 遍歷 Buffer 中數據,發送到下游
for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
collector.collect(Tuple2.of(appIdPv.getKey(), appIdPv.getValue()
}
// Buffer 清空,計數器清零
localPvStat.clear();
currentSize.set(0);
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotConte
// 將 buffer 中的數據保存到狀態中,來保證 Exactly Once
localPvStatListState.clear();
for(Map.Entry<String, Long> appIdPv: localPvStat.entrySet()) {
localPvStatListState.add(Tuple2.of(appIdPv.getKey(), appIdPv.ge
}
}
@Override
public void initializeState(FunctionInitializationContext context) {
// 從狀態中恢復 buffer 中的數據
localPvStatListState = context.getOperatorStateStore().getListState
new ListStateDescriptor<>("localPvStat",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>})));
localPvStat = new HashMap();
if(context.isRestored()) {
// 從狀態中恢復數據到 localPvStat 中
for(Tuple2<String, Long> appIdPv: localPvStatListState.get()){
long pv = localPvStat.getOrDefault(appIdPv.f0, 0L);
// 如果出現 pv != 0,說明改變了并行度,
// ListState 中的數據會被均勻分發到新的 subtask中
// 所以單個 subtask 恢復的狀態中可能包含兩個相同的 app 的數據
localPvStat.put(appIdPv.f0, pv + appIdPv.f1);
}
// 從狀態恢復時,默認認為 buffer 中數據量達到了 batchSize,需要向下游發
currentSize = new AtomicInteger(batchSize);
} else {
currentSize = new AtomicInteger(0);
}
}
}
如果 keyBy 之前就存在數據傾斜,上游算子的某些實例可能處理的數據較多,某些實例可能處理的數據較少,產生該情況可能是因為數據源的數據本身就不均勻,例如由于某些原因 Kafka 的 topic 中某些 partition 的數據量較大,某些 partition 的數據量較少。對于不存在 keyBy 的 Flink 任務也會出現該情況。
這種情況,需要讓 Flink 任務強制進行shuffle。使用shuffle、rebalance 或 rescale算子即可將數據均勻分配,從而解決數據傾斜的問題。
因為使用了窗口,變成了有界數據的處理(3.2.1已分析過),窗口默認是觸發時才會輸出一條結果發往下游,所以可以使用兩階段聚合的方式:
實現思路:
注意:聚合完不再是WindowedStream,要獲取WindowEnd作為窗口標記作為第二階段分組依據,避免不同窗口的結果聚合到一起)
當 FlinkKafkaConsumer 初始化時,每個 subtask 會訂閱一批 partition,但是當 Flink 任務運行過程中,如果被訂閱的 topic 創建了新的 partition,FlinkKafkaConsumer 如何實現動態發現新創建的 partition 并消費呢?
在使用 FlinkKafkaConsumer 時,可以開啟 partition 的動態發現。通過 Properties指定參數開啟(單位是毫秒):
FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS
該參數表示間隔多久檢測一次是否有新創建的 partition。默認值是Long的最小值,表示不開啟,大于0表示開啟。開啟時會啟動一個線程根據傳入的interval定期獲取Kafka最新的元數據,新 partition 對應的那一個 subtask 會自動發現并從earliest 位置開始消費,新創建的 partition 對其他 subtask 并不會產生影響。
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, 30 * 1000 + "");
Kafka單分區內有序,多分區間無序。在這種情況下,可以使用 Flink 中可識別 Kafka 分區的 watermark 生成機制。使用此特性,將在 Kafka 消費端內部針對每個 Kafka 分區生成 watermark,并且不同分區 watermark 的合并方式與在數據流 shuffle 時的合并方式相同。
在單分區內有序的情況下,使用時間戳單調遞增按分區生成的 watermark 將生成完美的全局 watermark。
可以不使用 TimestampAssigner,直接用 Kafka 記錄自身的時間戳:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node01:9092,node01:9092,node03:9092");
properties.setProperty("group.id", "dsjlg");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
);
env.addSource(kafkaSourceFunction);
如果數據源中的某一個分區/分片在一段時間內未發送事件數據,則意味著 WatermarkGenerator 也不會獲得任何新數據去生成 watermark。我們稱這類數據源為空閑輸入或空閑源。在這種情況下,當某些其他分區仍然發送事件數據的時候就會出現問題。比如Kafka的Topic中,由于某些原因,造成個別Partition一直沒有新的數據。由于下游算子 watermark 的計算方式是取所有不同的上游并行數據源 watermark 的最小值,則其 watermark 將不會發生變化,導致窗口、定時器等不會被觸發。
為了解決這個問題,你可以使用 WatermarkStrategy 來檢測空閑輸入并將其標記為空閑狀態。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
properties.setProperty("group.id", "gzhdsjlg");
FlinkKafkaConsumer<String> kafkaSourceFunction = new FlinkKafkaConsumer<>(
"flinktest",
new SimpleStringSchema(),
properties
);
kafkaSourceFunction.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(2))
.withIdleness(Duration.ofMinutes(5))
);
env.addSource(kafkaSourceFunction)
FlinkKafkaConsumer可以調用以下API,注意與auto.offset.reset
區分開:
setStartFromGroupOffsets():默認消費策略,默認讀取上次保存的offset信息,如果是應用第一次啟動,讀取不到上次的offset信息,則會根據這個參數auto.offset.reset的值來進行消費數據。建議使用這個。
setStartFromEarliest():從最早的數據開始進行消費,忽略存儲的offset信息
setStartFromLatest():從最新的數據進行消費,忽略存儲的offset信息
setStartFromSpecificOffsets(Map):從指定位置進行消費
setStartFromTimestamp(long):從topic中指定的時間點開始消費,指定時間點之前的數據忽略
當checkpoint機制開啟的時候,KafkaConsumer會定期把kafka的offset信息還有其他operator的狀態信息一塊保存起來。當job失敗重啟的時候,Flink會從最近一次的checkpoint中進行恢復數據,重新從保存的offset消費kafka中的數據(也就是說,上面幾種策略,只有第一次啟動的時候起作用)。
為了能夠使用支持容錯的kafka Consumer,需要開啟checkpoint
MiniBatch是微批處理,原理是緩存一定的數據后再觸發處理,以減少對State的訪問,從而提升吞吐并減少數據的輸出量。MiniBatch主要依靠在每個Task上注冊的Timer線程來觸發微批,需要消耗一定的線程調度性能。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html
微批處理通過增加延遲換取高吞吐,如果有超低延遲的要求,不建議開啟微批處理。通常對于聚合的場景,微批處理可以顯著的提升系統性能,建議開啟。
1)目前,key-value 配置項僅被 Blink planner 支持。
2)1.12之前的版本有bug,開啟miniBatch,不會清理過期狀態,也就是說如果設置狀態的TTL,無法清理過期狀態。1.12版本才修復這個問題。參考ISSUE:https://issues.apache.org/jira/browse/FLINK-17096
LocalGlobal優化將原先的Aggregate分成Local+Global兩階段聚合,即MapReduce模型中的Combine+Reduce處理模式。第一階段在上游節點本地攢一批數據進行聚合(localAgg),并輸出這次微批的增量值(Accumulator)。第二階段再將收到的Accumulator合并(Merge),得到最終的結果(GlobalAgg)。
LocalGlobal本質上能夠靠LocalAgg的聚合篩除部分傾斜數據,從而降低GlobalAgg的熱點,提升性能。結合下圖理解LocalGlobal如何解決數據傾斜的問題。
由上圖可知:
LocalGlobal開啟方式:
1)LocalGlobal優化需要先開啟MiniBatch,依賴于MiniBatch的參數。
2)table.optimizer.agg-phase-strategy: 聚合策略。默認AUTO,支持參數AUTO、TWO_PHASE(使用LocalGlobal兩階段聚合)、ONE_PHASE(僅使用Global一階段聚合)。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
觀察最終生成的拓撲圖的節點名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。
LocalGlobal適用于提升如SUM、COUNT、MAX、MIN和AVG等普通聚合的性能,以及解決這些場景下的數據熱點問題。
1)需要先開啟MiniBatch 2)開啟LocalGlobal需要UDAF實現Merge方法。
LocalGlobal優化針對普通聚合(例如SUM、COUNT、MAX、MIN和AVG)有較好的效果,對于COUNT DISTINCT收效不明顯,因為COUNT DISTINCT在Local聚合時,對于DISTINCT KEY的去重率不高,導致在Global節點仍然存在熱點。
之前,為了解決COUNT DISTINCT的熱點問題,通常需要手動改寫為兩層聚合(增加按Distinct Key取模的打散層)。
從Flink1.9.0版本開始,提供了COU
NT DISTINCT自動打散功能,不需要手動重寫。Split Distinct和LocalGlobal的原理對比參見下圖。
舉例:統計一天的UV
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果手動實現兩階段聚合:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
第一層聚合: 將Distinct Key打散求COUNT DISTINCT。
第二層聚合: 對打散去重后的數據進行SUM匯總。
Split Distinct開啟方式
默認不開啟,使用參數顯式開啟:
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
觀察最終生成的拓撲圖的節點名中是否包含Expand節點,或者原來一層的聚合變成了兩層的聚合。
使用COUNT DISTINCT,但無法滿足聚合節點性能要求。
1)目前不能在包含UDAF的Flink SQL中使用Split Distinct優化方法。
2)拆分出來的兩個GROUP聚合還可參與LocalGlobal優化。
3)從Flink1.9.0版本開始,提供了COUNT DISTINCT自動打散功能,不需要手動重寫(不用像上面的例子去手動實現)。
在某些場景下,可能需要從不同維度來統計UV,如Android中的UV,iPhone中的UV,Web中的UV和總UV,這時,可能會使用如下CASE WHEN語法。
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
在這種情況下,建議使用FILTER語法, 目前的Flink SQL優化器可以識別同一唯一鍵上的不同FILTER參數。如,在上面的示例中,三個COUNT DISTINCT都作用在user_id列上。此時,經過優化器識別后,Flink可以只使用一個共享狀態實例,而不是三個狀態實例,可減少狀態的大小和對狀態的訪問。
將上邊的CASE WHEN替換成FILTER后,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
當TopN的輸入是非更新流(例如Source),TopN只有一種算法AppendRank。當TopN的輸入是更新流時(例如經過了AGG/JOIN計算),TopN有2種算法,性能從高到低分別是:UpdateFastRank 和RetractRank。算法名字會顯示在拓撲圖的節點名字上。
注意:apache社區版的Flink1.12目前還沒有UnaryUpdateRank,阿里云實時計算版Flink才有
需要具備2個條件:
1)輸入流有PK(Primary Key)信息,例如ORDER BY AVG。
2)排序字段的更新是單調的,且單調方向與排序方向相反。例如,ORDER BY COUNT/COUNT_DISTINCT/SUM(正數)DESC。
如果要獲取到優化Plan,則您需要在使用ORDER BY SUM DESC時,添加SUM為正數的過濾條件。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
根據TopN的語法,rownum字段會作為結果表的主鍵字段之一寫入結果表。但是這可能導致數據膨脹的問題。例如,收到一條原排名9的更新數據,更新后排名上升到1,則從1到9的數據排名都發生變化了,需要將這些數據作為更新都寫入結果表。這樣就產生了數據膨脹,導致結果表因為收到了太多的數據而降低更新速度。
TopN的輸出結果無需要顯示rownum值,僅需在最終前端顯式時進行1次排序,極大地減少輸入結果表的數據量。只需要在外層查詢中將rownum字段裁剪掉即可
// 最外層的字段,不寫 rownum
SELECT col1, col2, col3
FROM (
SELECT col1, col2, col3
ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
在無rownum的場景中,對于結果表主鍵的定義需要特別小心。如果定義有誤,會直接導致TopN結果的不正確。 無rownum場景中,主鍵應為TopN上游GROUP BY節點的KEY列表。
TopN為了提升性能有一個State Cache層,Cache層能提升對State的訪問效率。TopN的Cache命中率的計算公式為。
cache_hit = cache_size*parallelism/top_n/partition_key_num
例如,Top100配置緩存10000條,并發50,當PatitionBy的key維度較大時,例如10萬級別時,Cache命中率只有10000*50/100/100000=5%,命中率會很低,導致大量的請求都會擊中State(磁盤),性能會大幅下降。因此當PartitionKey維度特別大時,可以適當加大TopN的CacheS ize,相對應的也建議適當加大TopN節點的Heap Memory。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 默認10000條,調整TopN cahce到20萬,那么理論命中率能達200000*50/100/100000 = 100%
configuration.setString("table.exec.topn.cache-size", "200000");
注意:目前源碼中標記為實驗項,官網中未列出該參數
例如每天的排名,要帶上Day字段。否則TopN的結果到最后會由于State ttl有錯亂。
insert
into print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt --不輸出rownum字段,能減小結果表的輸出量(無排名優化)
FROM (
SELECT
*,
ROW_NUMBER () OVER (
PARTITION BY cate_id,
stat_date --注意要有時間字段,否則state過期會導致數據錯亂(分區字段優化)
ORDER
BY pay_ord_amt DESC --根據上游sum結果排序。排序字段的更新是單調的,且單調方向與排序方向相反(走最優算法)
) as rownum
FROM (
SELECT
cate_id,
seller_id,
stat_date,
--重點。聲明Sum的參數都是正數,所以Sum的結果是單調遞增的,因此TopN能使用優化算法,只獲取前100個數據(走最優算法)
sum (total_fee) filter (
where
total_fee >= 0
) as pay_ord_amt
FROM
random_test
WHERE
total_fee >= 0
GROUP
BY cate_name,
seller_id,
stat_date
) a
WHERE
rownum <= 100
);
由于SQL上沒有直接支持去重的語法,還要靈活的保留第一條或保留最后一條。因此我們使用了SQL的ROW_NUMBER OVER WINDOW功能來實現去重語法。去重本質上是一種特殊的TopN。
保留KEY下第一條出現的數據,之后出現該KEY下的數據會被丟棄掉。因為STATE中只存儲了KEY數據,所以性能較優,示例如下:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1
以上示例是將T表按照b字段進行去重,并按照系統時間保留第一條數據。Proctime在這里是源表T中的一個具有Processing Time屬性的字段。如果按照系統時間去重,也可以將Proctime字段簡化PROCTIME()函數調用,可以省略Proctime字段的聲明。
保留KEY下最后一條出現的數據。保留末行的去重策略性能略優于LAST_VALUE函數,示例如下:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1
以上示例是將T表按照b和d字段進行去重,并按照業務時間保留最后一條數據。Rowtime在這里是源表T中的一個具有Event Time屬性的字段。
本地時區定義了當前會話時區id。當本地時區的時間戳進行轉換時使用。在內部,帶有本地時區的時間戳總是以UTC時區表示。但是,當轉換為不包含時區的數據類型時(例如TIMESTAMP, TIME或簡單的STRING),會話時區在轉換期間被使用。為了避免時區錯亂的問題,可以參數指定時區。
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 指定時區
configuration.setString("table.local-time-zone", "Asia/Shanghai");
// 初始化table environment
TableEnvironment tEnv = ...
// 獲取 tableEnv的配置對象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 設置參數:
// 開啟miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量輸出的間隔時間
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止OOM設置每個批次最多緩存數據的條數,可以設為2萬條
configuration.setString("table.exec.mini-batch.size", "20000");
// 開啟LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 開啟Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一層打散的bucket數目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// TopN 的緩存條數
configuration.setString("table.exec.topn.cache-size", "200000");
// 指定時區
configuration.setString("table.local-time-zone", "Asia/Shanghai");
更多精彩內容請關注 <font size="3" color="Blue">微信公眾號</font> 👇「<font size="3" color="**red**">大數據老哥</font>」🔥: ???????? <font size="3" color="Violet">一枚專注分享和探討大數據技術、多年打大廠編程經驗(Flink、Spark、Hadoop、Hive、Kafka等)、分享在工作實際開發中遇到的問題及解決方案、不定期分享面試經驗、技術選型、架構設計等方面。目前幫助數百人成功獲取offer。</font> 👉 關注后回復【<font size="3" color="red">大數據面試題</font>】獲取大數據互聯網大廠面試題匯總一份 👉關注后回復【<font size="3" color="red">簡歷</font>】獲取簡歷模板一份200+ 👉關注后回復【<font size="3" color="red">Java面試題</font>】獲取大廠常面的面試題供你學習 ....
|