BIGO 是一家面向海外的以短視頻直播業(yè)務為主的公司, 目前公司的主要業(yè)務包括 BigoLive (全球直播服務),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內擁有 4 億用戶。
一、業(yè)務背景
BIGO 是一家面向海外的以短視頻直播業(yè)務為主的公司, 目前公司的主要業(yè)務包括 BigoLive (全球直播服務),Likee (短視頻創(chuàng)作分享平臺),IMO (免費通信工具) 三部分,在全球范圍內擁有 4 億用戶。伴隨著業(yè)務的發(fā)展,對數據平臺處理能力的要求也是越來越高,平臺所面臨的問題也是日益凸顯,接下來將介紹 BIGO 大數據平臺及其所面臨的問題。BIGO 大數據平臺的數據流轉圖如下所示:
用戶在 APP,Web 頁面上的行為日志數據,以及關系數據庫的 Binlog 數據會被同步到 BIGO 大數據平臺消息隊列,以及離線存儲系統(tǒng)中,然后通過實時的,離線的數據分析手段進行計算,以應用于實時推薦、監(jiān)控、即席查詢等使用場景。然而存在以下幾個問題:OLAP 分析平臺入口不統(tǒng)一:Presto/Spark 分析任務入口并存,用戶不清楚自己的 SQL 查詢適合哪個引擎執(zhí)行,盲目選擇,體驗不好;另外,用戶會在兩個入口同時提交相同查詢,以更快的獲取查詢結果,導致資源浪費;
離線任務計算時延高,結果產出太慢:典型的如 ABTest 業(yè)務,經常計算到下午才計算出結果;
各個業(yè)務方基于自己的業(yè)務場景獨立開發(fā)應用,實時任務煙囪式的開發(fā),缺少數據分層,數據血緣。
面對以上的問題,BIGO 大數據平臺建設了 OneSQL OLAP 分析平臺,以及實時數倉。- 通過 OneSQL OLAP 分析平臺,統(tǒng)一 OLAP 查詢入口,減少用戶盲目選擇,提升平臺的資源利用率;
- 通過 Flink 構建實時數倉任務,通過 Kafka/Pulsar 進行數據分層;
- 將部分離線計算慢的任務遷移到 Flink 流式計算任務上,加速計算結果的產出。
另外建設實時計算平臺 Bigoflow 管理這些實時計算任務,建設實時任務的血緣關系。
二、落地實踐 & 特色改進
2.1 OneSQL OLAP 分析平臺實踐和優(yōu)化
OneSQL OLAP 分析平臺是一個集 Flink、Spark、Presto 于一體的 OLAP 查詢分析引擎。用戶提交的 OLAP 查詢請求通過 OneSQL 后端轉發(fā)到不同執(zhí)行引擎的客戶端,然后提交對應的查詢請求到不同的集群上執(zhí)行。其整體架構圖如下:

該分析平臺整體結構從上到下分為入口層、轉發(fā)層、執(zhí)行層、資源管理層。為了優(yōu)化用戶體驗,減少執(zhí)行失敗的概率,提升各集群的資源利用率,OneSQL OLAP 分析平臺實現了以下功能:- 統(tǒng)一查詢入口:入口層,用戶通過統(tǒng)一的 Hue 查詢頁面入口以 Hive SQL 語法為標準提交查詢;
- 統(tǒng)一查詢語法:集 Flink、Spark、Presto 等多種查詢引擎于一體,不同查詢引擎通過適配 Hive SQL 語法來執(zhí)行用戶的 SQL 查詢任務;
- 智能路由:在選擇執(zhí)行引擎的過程中,會根據歷史 SQL 查詢執(zhí)行的情況 (在各引擎上是否執(zhí)行成功,以及執(zhí)行耗時),各集群的繁忙情況,以及各引擎對該 SQL 語法的是否兼容,來選擇合適的引擎提交查詢;
- 失敗重試:OneSQL 后臺會監(jiān)控 SQL 任務的執(zhí)行情況,如果 SQL 任務在執(zhí)行過程中失敗,將選擇其他的引擎執(zhí)行重試提交任務。
如此一來,通過 OneSQL OLAP 分析平臺,BIGO 大數據平臺實現了 OLAP 分析入口的統(tǒng)一,減少用戶的盲目選擇,同時充分利用各個集群的資源,減少資源空閑情況。2.1.1 Flink OLAP 分析系統(tǒng)建設
在 OneSQL 分析平臺上,Flink 也作為 OLAP 分析引擎的一部分。Flink OLAP 系統(tǒng)分成兩個組成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作為 SQL 提交的入口,查詢 SQL 經過 Gateway 提交到 Flink Session 集群上執(zhí)行,同時獲取 SQL 執(zhí)行查詢的進度,以及返回查詢的結果給客戶端。其執(zhí)行 SQL 查詢的流程如下:
首先用戶提交過來的 SQL,在 SQL Gateway 進行判斷:是否需要將結果持久化寫入到 Hive 表,如果需要,則會先通過 HiveCatalog 的接口創(chuàng)建一個 Hive 表,用于持久化查詢任務的計算結果;之后,任務通過 SQL Gateway 上執(zhí)行 SQL 解析,設置作業(yè)運行的并行度,生成 Pipeline 并提交到 Session 集群上執(zhí)行。
為了保證整個 Flink OLAP 系統(tǒng)的穩(wěn)定性,以及高效的執(zhí)行 SQL 查詢,在這個系統(tǒng)中,進行了以下功能增強:
穩(wěn)定性:
- 基于 zookeeper HA 來保證 Flink Session 集群的可靠性,SQL Gateway 監(jiān)聽 Zookeeper 節(jié)點,感知 Session 集群;
- 控制查詢掃描 Hive 表的數據量,分區(qū)個數,以及返回結果數據量,防止 Session 集群的 JobManager,TaskManager 因此出現 OOM 情況。
性能:
Flink Session 集群預分配資源,減少作業(yè)提交后申請資源所需的時間;
Flink JobManager 異步解析 Split,Split 邊解析任務邊執(zhí)行,減少由于解析 Split 阻塞任務執(zhí)行的時間;
控制作業(yè)提交過程中掃描分區(qū),以及 Split 最大的個數,減少設置任務并行所需要的時間。
Hive SQL 兼容:
針對 Flink 對于 Hive SQL 語法的兼容性進行改進,目前針對 Hive SQL 的兼容性大致為 80%。監(jiān)控告警:
監(jiān)控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的內存,CPU 使用情況,以及任務的提交情況,一旦出現問題,及時告警和處理。
2.1.2 OneSQL OLAP 分析平臺取得的成果
基于以上實現的 OneSQL OLAP 分析平臺,取得了以下幾個收益:- 統(tǒng)一查詢入口,減少用戶的盲目選擇,用戶執(zhí)行出錯率下降 85.7%,SQL 執(zhí)行的成功率提升 3%;
- SQL 執(zhí)行時間縮短 10%,充分利用了各個集群的資源,減少任務排隊等待的時間;
- Flink 作為 OLAP 分析引擎的一部分,實時計算集群的資源利用率提升了 15%。
2.2 實時數倉建設和優(yōu)化
為了提升 BIGO 大數據平臺上某些業(yè)務指標的產出效率,以及更好的管理 Flink 實時任務,BIGO 大數據平臺建設了實時計算平臺 Bigoflow,并將部分計算慢的任務遷移到實時計算平臺上,通過 Flink 流式計算的方式來執(zhí)行,通過消息隊列 Kafka/Pulsar 來進行數據分層,構建實時數倉;在 Bigoflow 上針對實時數倉的任務進行平臺化管理,建立統(tǒng)一的實時任務接入入口,并基于該平臺管理實時任務的元數據,構建實時任務的血緣關系。2.2.1 建設方案
BIGO 大數據平臺主要基于 Flink + ClickHouse 建設實時數倉,大致方案如下:
按照傳統(tǒng)數據倉庫的數據分層方法,將數據劃分成 ODS、DWD、DWS、ADS 等四層數據:ODS 層:基于用戶的行為日志,業(yè)務日志等作為原始數據,存放于 Kafka/Pulsar 等消息隊列中;
DWD 層:這部分數據根據用戶的 UserId 經過 Flink 任務進行聚合后,形成不同用戶的行為明細數據,保存到 Kafka/Pulsar 中;
DWS 層:用戶行為明細的 Kafka 流表與用戶 Hive/MySQL 維表進行流維表 JOIN,然后將 JOIN 之后產生的多維明細數據輸出到 ClickHouse 表中;
ADS 層:針對 ClickHouse 中多維明細數據按照不同維度進行匯總,然后應用于不同的業(yè)務中。
按照以上方案建設實時數據倉庫的過程中,遇到了一些問題:- 將離線任務轉為實時計算任務后,計算邏輯較為復雜 (多流 JOIN,去重),導致作業(yè)狀態(tài)太大,作業(yè)出現 OOM (內存溢出) 異?;蛘咦鳂I(yè)算子背壓太大;
- 維表 Join 過程中,明細流表與大維表 Join,維表數據過多,加載到內存后 OOM,作業(yè)失敗無法運行;
- Flink 將流維表 Join 產生的多維明細數據寫入到 ClickHouse,無法保證 Exactly-once,一旦作業(yè)出現 Failover,就會導致數據重復寫入。
2.2.2 問題解決 & 優(yōu)化
優(yōu)化作業(yè)執(zhí)行邏輯,減小狀態(tài)離線的計算任務邏輯較為復雜,涉及多個 Hive 表之間的 Join 以及去重操作,其大致邏輯如下:
當將離線的作業(yè)轉為 Flink 的流式任務之后,原先離線 Join 多個 Hive 表的場景就轉變?yōu)?Join 多個 Kafka Topic 的場景。由于 Join 的 Kafka topic 的流量較大,且 Join 的窗口時間較長 (窗口最長的為 1 天),當作業(yè)運行一段時間內,Join 算子上就積累了大量的狀態(tài) (一小時后狀態(tài)就接近 1T),面對如此大的狀態(tài),Flink 作業(yè)采取 Rocksdb State Backend 來存放狀態(tài)數據,但是仍然避免不了 Rocksdb 內存使用超過導致被 YARN kill 的問題,或者是 Rocksdb State 上存的狀態(tài)太多,吞吐下降導致作業(yè)嚴重背壓。針對這個問題,我們將這多個 Topic,按照相同的 Schema 進行 Unoin all 處理,得到一個大的數據流,然后在這個大的數據流中,再根據不同事件流的 event_id 進行判斷,就能知道這條數據來自哪一個事件流的 Topic,再進行聚合計算,獲取對應事件流上的計算指標。
這樣一來,通過 UNION ALL 代替 JOIN,避免了因為 JOIN 計算帶來的大 State 帶來的影響。另外,在計算任務中還存在有比較多的 count distinct 計算,類似如下:

這些 count distinct 計算在同一個 group by 中,并基于相同的 postid 進行去重計算,因而可以讓這些 distinct state 可以共享一組 key 來進行去重計算,那么就可以通過一個 MapState 來存儲這若干個 count distinct 的狀態(tài),如下:
這些 count distinct 函數去重的 key 相同,因而可以共享 MapState 中的 key 值,從而優(yōu)化存儲空間;而 Mapstate 的 Value 是 Byte 數組,每個 Byte 8 個 bit,每個 bit 為 0 或者 1,第 n 個 bit 對應了 n 個 count distinct 函數在該 key 上的取值:1 表示該 count disitnct 函數在對應的 key 上需要進行計數,0 表示不需要計數;當計算聚合結果的時候,則將所有 key 第 n 位的數字相加,即為第 n 個 count distinct 的取值,這樣一來,就更進一步節(jié)約了狀態(tài)的存儲空間。通過以上優(yōu)化,成功的將 ABTest 的離線任務遷移到 Flink 流式計算任務上,將作業(yè)的狀態(tài)控制在 100GB 以內,讓作業(yè)正常的運行起來。流維表 JOIN 優(yōu)化
生成多維明細寬表的過程中,需要進行流維表 JOIN, 使用了 Flink Join Hive 維表的功能:Hive 維表的數據會被加載到任務的 HashMap 的內存數據結構中,流表中的數據再根據 Join Key 與 HashMap 中的數據進行 Join。但是面對上億,十億行的 Hive 大維表,加載到內存的數據量太大,很容易導致 OOM (內存溢出)。針對以上問題,我們將 Hive 大維表按照 Join Key 進行 Hash 分片,如下圖:
這樣一來,Hive 大維表的數據經過 Hash 函數計算后分布到 Flink 作業(yè)的不同并行子任務的 HashMap 中,每個 HashMap 只存放大維表的一部分數據,只要作業(yè)的并行度夠大,就能夠將大維表的數據拆分成足夠多份,進行分片保存;對于一些太大的維表,也可以采取 Rocksdb Map State 來保存分片數據。Kafka 流表中的數據,當要下發(fā)到不同的 subtask 上進行 Join 時,也通過相同的 Join Key 按照相同的 Hash 函數進行計算,從而將數據分配到對應的 subtask 進行 Join,輸出 Join 后的結果。通過以上優(yōu)化,成功 Join 了一些 Hive 大維表任務來執(zhí)行流維表 Join 計算,最大的維表超過 10 億行。ClickHouse Sink 的 Exactly-Once 語義支持將流維表 Join 生成的多維明細數據輸出到 ClickHouse 表的過程中,由于社區(qū)的 ClickHouse 不支持事務,所以沒辦法保證數據 sink 到 ClickHouse 過程中的 Exactly-Once 語義。在此過程中,一旦出現作業(yè) Failover,數據就會重復寫入到 ClickHouse。針對這個問題,BIGO ClickHouse 實現了一個二階段提交事務機制:當需要寫入數據到 ClickHouse 時,可以先設置寫入的模式為 temporary,表明現在寫入的數據是臨時數據;當數據執(zhí)行插入完成后,返回一個 Insert id,然后根據該 Insert id 執(zhí)行 Commit 操作,那么臨時數據就轉為正式數據。基于 BIGO ClickHouse 的二階段提交事務機制,并結合 Flink 的 checkpoint 機制,實現了一個 ClickHouse Connector,保證 ClickHouse Sink 的 Exactly Once 寫入語義,如下:
在正常寫入的情況下,Connector 隨機選擇 ClickHouse 的某一個 shard 寫入,根據用戶配置寫單副本,或者雙副本來執(zhí)行 insert 操作,并記錄寫入后的 insert id;在兩次 checkpoint 之間就會有多次這種 insert 操作,從而產生多個 insert id,當 checkpoint 完成時,再將這些 insert id 批量提交,將臨時數據轉為正式數據,即完成了兩次 checkpoint 間數據的寫入;
一旦作業(yè)出現 Failover,Flink 作業(yè) Failover 重啟完成后,將從最近一次完成的 checkpoint 來恢復狀態(tài),此時 ClickHouse Sink 中的 Operator State 可能會包含上一次還沒有來得及提交完成的 Insert id,針對這些 insert id 進行重試提交;針對那些數據已經寫入 ClickHouse 中之后,但是 insert id 并沒有記錄到 Opeator State 中的數據,由于是臨時數據,在 ClickHouse 中并不會被查詢到,一段時間后,將會由 ClickHouse 的過期清理機制,被清理掉,從而保證了狀態(tài)回滾到上一次 checkpoint 之后,數據不會重復。
通過以上機制,成功保證了數據從 Kafka 經過 Flink 計算后寫入到 ClickHouse 整個鏈路中端到端的 Exactly-Once 語義,數據不重復也不丟失。2.2.3 平臺建設
為了更好的管理 BIGO 大數據平臺的實時計算任務,公司內部建設了 BIGO 實時計算平臺 Bigoflow,為用戶提供統(tǒng)一的 Flink實時任務接入,平臺建設如下:

- 支持 Flink JAR、SQL、Python 等多種類型作業(yè);支持不同的 Flink 版本,覆蓋公司內部大部分實時計算相關業(yè)務;
- 一站式管理:集作業(yè)開發(fā)、提交、運行、歷史展示、監(jiān)控、告警于一體,便于隨時查看作業(yè)的運行狀態(tài)和發(fā)現問題;
- 血緣關系:方便查詢每個作業(yè)的數據源、數據目的、數據計算的來龍去脈。
三、應用場景
3.1 Onesql OLAP 分析平臺應用場景
Onesql OLAP 分析平臺在公司內部的應用場景是:應用于 AdHoc 查詢,如下:
用戶通過 Hue 頁面提交的 SQL,通過 OneSQL 后端轉發(fā)給 Flink SQL Gateway,并提交到 Flink Session 集群上執(zhí)行查詢任務,Flink SQL Gateway 獲取查詢任務的執(zhí)行進度返回給 Hue 頁面,并返回查詢結果。
3.2 實時數據倉庫應用場景
實時數據倉庫應用場景目前主要是 ABTest 業(yè)務,如下:
用戶的原始行為日志數據經過 Flink 任務聚合后生成用戶明細數據,然后與維表數據進行流維表 JOIN,輸出到 ClickHouse 生成多維明細寬表,按照不同維度匯總后,應用于不同的業(yè)務。通過改造 ABTest 業(yè)務,將該業(yè)務的結果指標的生成時間提前了 8 個小時,同時減少了使用資源一倍以上。
四、未來規(guī)劃
為了更好的建設 OneSQL OLAP 分析平臺以及 BIGO 實時數據倉庫,實時計算平臺的規(guī)劃如下: