簡(jiǎn)介:BIGO 從 Kafka 轉(zhuǎn)向 Pulsar,并借助 Apache Pulsar 和 Flink 構(gòu)造實(shí)時(shí)消息流處理系統(tǒng)。
本文整理自 BIGO Staff Engineer 陳航在 Flink Forward Asia 2020 分享的議題《借助 Flink 與 Pulsar,BIGO 打造實(shí)時(shí)消息處理系統(tǒng)》。主要內(nèi)容包括: 關(guān)于 BIGO BIGO 為什么會(huì)選擇 Apache Pulsar Apache Pulsar 在 BIGO 中的角色 BIGO 借助 Apache Pulsar 和 Flink 構(gòu)造自己的實(shí)時(shí)消息流處理系統(tǒng)。
一、關(guān)于 BIGO
借助于大數(shù)據(jù)和人工智能技術(shù),BIGO 基于視頻的服務(wù)和產(chǎn)品獲得了廣泛的歡迎,在 150 多個(gè)國家和地區(qū)獲得了大量的用戶。BIGO 主要有兩款非常流行的產(chǎn)品,第一款是 BIGO Live,另外一款是 Likee。BIGO Live 是一個(gè)直播平臺(tái),而 Likee 是一個(gè)短視頻平臺(tái)。
二、為什么選擇 Apache Pulsar
在過去的幾年里,BIGO 的消息平臺(tái)主要還是以開源的 Kafka 集群為主,但是隨著業(yè)務(wù)的不斷增長、用戶不斷擴(kuò)充,整個(gè)消息流平臺(tái)所承載的消息量和數(shù)據(jù)量也出現(xiàn)了成倍的增長,同時(shí)也對(duì)整個(gè)消息流系統(tǒng)提出了更高的要求。
主要體現(xiàn)在以下幾個(gè)方面:
第一,它對(duì)整個(gè)系統(tǒng)的穩(wěn)定性、可擴(kuò)展性以及魯棒性提出了更高的要求。
第二,由于我們是短視頻推薦相關(guān)的服務(wù),所以對(duì)整個(gè)消息流的低延遲也提出了非常高的要求。
隨著數(shù)量增長,整個(gè) BIGO 的消息流平臺(tái)的團(tuán)隊(duì)在維護(hù)多個(gè) Kafka 集群上付出了大量的工作,擺在我們面前的有很多 Kafka 集群運(yùn)維相關(guān)的問題。這個(gè)時(shí)候,我們就在思考,我們是選擇開源 Kafka 的一個(gè)基線版本進(jìn)行自己的迭代開發(fā)呢?還是看一下開源社區(qū)里面有哪些可以借鑒的方案,來打造一個(gè)符合我們應(yīng)用場(chǎng)景需求的消息流平臺(tái)。
于是我們進(jìn)行了一系列調(diào)研。在調(diào)研的過程中,我們的目光注意到了 Apache Pulsar,它有以下幾點(diǎn) feature 比較 match 我們的應(yīng)用場(chǎng)景:
首先,它能夠水平地?cái)U(kuò)展。我們知道對(duì)于 Kafka 而言,它是一個(gè)服務(wù)和存儲(chǔ)綁定的系統(tǒng)。當(dāng)我們需要去擴(kuò)容一個(gè)集群的時(shí)候,單單把機(jī)器上線是不能夠滿足需求的,我們需要對(duì)整個(gè) topic 的 partition 進(jìn)行相應(yīng)操作,這個(gè)時(shí)候就是耗人力去運(yùn)維的。所以,我們需要有一個(gè)能夠水平擴(kuò)展的系統(tǒng)。而 Apache Pulsar 提供的是存儲(chǔ)和服務(wù)分離的一個(gè)架構(gòu),使用的是 bookkeeper 作為底層的數(shù)據(jù)存儲(chǔ),上層有一個(gè) broker 來提供相關(guān)的服務(wù)。
另外,就是它的 low latency 還有高吞吐、低延遲以及在雅虎的生產(chǎn)環(huán)境上面經(jīng)受了大數(shù)據(jù)量的考驗(yàn)。
跨集群的復(fù)制等一系列的 feature 對(duì)于我們而言也是非常需要的。
并且,這樣一個(gè)存儲(chǔ)和服務(wù)分離的架構(gòu)也極大地減少了人工運(yùn)維的成本。
所以我們選擇了 Apache Pulsar。
三、Apache Pulsar 在 BIGO 中的角色1. 引入 Pulsar 的歷程
在 2019 年 11 月,我們重新開始思考 BIGO 的應(yīng)用場(chǎng)景下面所需要的消息流平臺(tái)到底是什么樣的。是基于一個(gè)開源的 Kakfa 框架去開發(fā),還是選擇另外一套整個(gè)消息流系統(tǒng)?
在 2019 年 11 月份,我們做了一次整個(gè)消息流平臺(tái)的調(diào)研工作。在調(diào)研過程中,我們對(duì)比了 Kafka、RocketMQ、Apache Pulsar 等業(yè)界相近的相對(duì)的消息隊(duì)列的實(shí)現(xiàn)。然后我們做了一系列的橫向?qū)Ρ?,并且跟我們的業(yè)務(wù)需求進(jìn)行了相應(yīng)的比較。最終發(fā)現(xiàn)使用 Apache Pulsar 能夠解決我們生產(chǎn)上的一些問題,能夠?yàn)槲覀兊南⒘髌脚_(tái)提供非常好的運(yùn)維相關(guān)的負(fù)擔(dān)的減輕,以及整個(gè)系統(tǒng)的穩(wěn)定性和吞吐的提升,所以我們就選擇了 Apache Pulsar。
在 2019 年 12 月份,我們進(jìn)行了一系列的壓測(cè)。任何一個(gè)開源的框架,如果沒有經(jīng)過公司內(nèi)部的大流量場(chǎng)景下的壓測(cè),是不敢上線的。所以從 2019 年 12 月份一直到 2020 年 4 月份,經(jīng)過了一系列的長時(shí)間的壓測(cè)工作。
在壓測(cè)的過程中,我們同時(shí)也發(fā)現(xiàn)了 Apache Pulsar 的一些問題,并且給社區(qū)修了一系列的 bug。在 2020 年 4 月份,我們把 Apache Pulsar 部署在了我們的生產(chǎn)測(cè)試環(huán)境;在穩(wěn)定運(yùn)行一個(gè)月之后,我們就把它部署到了生產(chǎn)環(huán)境;在 2020 年 5 月份,正式上線。
現(xiàn)有的 Apache Pulsar 集群規(guī)模,目前有十幾個(gè) Apache Pulsar 的節(jié)點(diǎn)。整個(gè)集群的入流量是在 2~3 GB/s。隨著時(shí)間的推移,也有越來越多的應(yīng)用會(huì)不斷地遷移到 Apache Pulsar 來替代現(xiàn)有的 Kafka 集群。
2. Apache Pulsar 的角色
Apache Pulsar 在整個(gè)流處理過程中提供的是一個(gè) PUB-SUB 的角色。
首先,有 BIGO 這邊的 Baina,一個(gè) C++ 實(shí)現(xiàn)的消息收集服務(wù),把相關(guān)的數(shù)據(jù)寫到 Apache Pulsar 相關(guān)的 topic 里面去,這是第一種場(chǎng)景。
第二場(chǎng)景就是 KMM,也就是 Kafka 的 Mirror Maker。
第三種場(chǎng)景是 Flink。另外就是一些各種語言的客戶端所實(shí)現(xiàn)的 producer。它的下游主要有 Flink、Flink SQL 以及各個(gè)語言所實(shí)現(xiàn)的 consumer,比如說 golang、JAVA,C++ 的等等。
3. 下游支撐的業(yè)務(wù)場(chǎng)景
第一個(gè)是實(shí)時(shí)數(shù)倉,第二個(gè)是實(shí)時(shí)的 ETL,第三個(gè)是實(shí)時(shí)數(shù)據(jù)分析,另外就是實(shí)時(shí)推薦,還有更多的業(yè)務(wù)場(chǎng)景也在逐漸的介入。下游的數(shù)據(jù)會(huì)寫到 Hive、Pulsar 的 topic、ClickHouse、Hadoop、redis 等一系列下游的相關(guān)存儲(chǔ)系統(tǒng)。
四、Apache Pulsar 和 Flink 構(gòu)造實(shí)時(shí)消息流處理系統(tǒng)。
這里需要分為以下三個(gè)方面來講:
第一,是關(guān)于 Pulsar-Flink-Connector 的一些內(nèi)幕。我相信在介紹 Pulsar-Flink-Connector 的一些內(nèi)幕之后,大家會(huì)對(duì)整個(gè) Flink 與 Pulsar 之間結(jié)合的關(guān)系會(huì)更加地清晰明亮,在使用過程中也會(huì)更加地清晰;
第二,是 BIGO 的一個(gè) use case,就是使用 Apache Pulsar 和 Flink 來打造自己的實(shí)時(shí) ETL 處理系統(tǒng);
第三,是借助 Apache Pulsar 和 Flink 打造 AB-test 系統(tǒng)。
首先看一下 Pulsar-Flink-Connector 整個(gè)生產(chǎn)和消費(fèi)的邏輯。它主要包括一個(gè) source 的 API 和 sink 的 API。對(duì)于消費(fèi)的時(shí)候,也就是使用一個(gè) Pulsar-Flink-Connector 的 source 來訂閱 Pulsar 的一個(gè) topic。另外一個(gè)就是我們寫一個(gè) sink,會(huì)把 Flink 里面的數(shù)據(jù)寫出到 Pulsar 的 topic 里面。下圖左邊的代碼展示怎么去訂閱這樣一個(gè) topic,實(shí)際上只需要 new 一個(gè) FlinkPulsarSource 的一個(gè)流,然后把這條流加入到 DataStream 里面去就可以了。
對(duì)于 Flink 數(shù)據(jù)的寫出而言,只需要 new 一個(gè) FlinkPulsar 的 Sink,然后我們調(diào)用第二個(gè) DataStream 的 sink 就可以把數(shù)據(jù)給寫出去了。實(shí)際上,整個(gè)的實(shí)現(xiàn)而言,跟 Kafka 的 API 是非常類似的。這里需要注意的幾點(diǎn)就是,對(duì)于 FlinkPulsarSource 里面需要傳入的是 serviceUrl 以及 adminUrl。
serviceUrl 類似于 Kafka 的 broker_list;
adminUrl 就是我們?nèi)ヒ怨芾韱T的方式來控制 Pulsar 的一些相關(guān)的操作。
Pulsar Flink 怎么樣來訂閱 Pulsar 的 topic,怎么樣消費(fèi)以及它的 offset 是怎么樣 commit 回去的?
這里就會(huì)涉及到 Pulsar Flink 的 exactly-once source。咱們首先來看一下圖左邊部分。這個(gè)圖里面有一個(gè) Pulsar 的 topic,當(dāng)我們 new 一個(gè) PulsarFlinkSource 的時(shí)候,實(shí)際上會(huì)對(duì)每一個(gè) Pulsar topic 的 partition 創(chuàng)建一個(gè) reader。這個(gè) reader 使用的是 Non-Durable Cursor,當(dāng)這個(gè) reader 訂閱了這個(gè) topic 之后,這個(gè) topic 的數(shù)據(jù)流就會(huì)源源不斷地流到這個(gè) reader 的線程里面去。當(dāng) reader 的線程觸發(fā)一次 checkpoint 的時(shí)候,這個(gè) Flink 任務(wù)就會(huì)把自己的一些狀態(tài) checkpoint 起來。當(dāng) checkpoint 完成的時(shí)候,就會(huì)調(diào)用一次 Notify checkpoint complete 這樣的一個(gè)方法。觸發(fā)的是另外一個(gè) subscription 的一個(gè) commit。
這個(gè) subscription 實(shí)際上是一個(gè) durable cursor。當(dāng)它 commit offset 的時(shí)候,這個(gè) offset 會(huì)保存在 bookkeeper 里面,這是一個(gè)永久保存的 offset。這樣做的好處是,當(dāng) checkpoint 失敗或者 checkpoint 丟了的時(shí)候,我們需要以一個(gè) subscription name 從 Pulsar 里面去恢復(fù)的時(shí)候,就可以從 bookkeeper 里面去把 message id 讀出來,然后從這邊恢復(fù)。
實(shí)際上對(duì)于 Pulsar-Flink-Connector 的消費(fèi)而言,它是由一條數(shù)據(jù)流和一條控制流來組成的:
對(duì)于數(shù)據(jù)流,就是 Pulsar Topic 的數(shù)據(jù)源源不斷的會(huì)流入到 reader 的這樣一個(gè)線程里面,由 reader 線程進(jìn)行相應(yīng)的處理。
控制流就是通過 subscription name 來提交消費(fèi)的 message id,也就是類似于 Kafka 的一個(gè) offset,會(huì)提交到 Pulsar 的客戶端來保證消費(fèi)的位置。
接下來看一下 checkpoint 實(shí)現(xiàn)的流程。
首先,當(dāng)我們?nèi)プ?checkpoint N 的時(shí)候,當(dāng) N 結(jié)束了之后,它會(huì)把 durable cursor 進(jìn)行一次 commit;
當(dāng)我們?nèi)プ?checkpoint N+1 的時(shí)候,N+1 完成之后,會(huì)接著去 commit N+1 的 durable cursor。
這樣做的好處是,當(dāng)這個(gè)任務(wù)失敗之后,如果需要從某一個(gè) checkpoint 恢復(fù),只需要從 checkpoint 里面去讀到上一次 checkpoint 成功的 offset 的 durable cursor 的 message id 的位置,就可以從上一次的位置去消費(fèi)了。這也是保證 source 的 exactly once 的實(shí)現(xiàn)。
Topic/Partition 的 Discovery
第一點(diǎn)是,在 Pulsar-Flink-Connector 實(shí)現(xiàn)的邏輯里,會(huì)為每一個(gè) Topic/Partition 分配一個(gè) reader 的線程。
第二點(diǎn)是,每一個(gè) task manager 會(huì)包括多個(gè) reader 的線程,這地方會(huì)有一個(gè)什么樣的映射關(guān)系?
舉個(gè)例子:假設(shè)我們訂閱的 Topic 里面,有 10 個(gè) partition,F(xiàn)link 里面只給它分配 5 個(gè) task manager,那么怎么將 partition 映射到 5 個(gè) task manager 里面去?這就會(huì)涉及到一個(gè)分配的邏輯。整個(gè)分配的邏輯,實(shí)際上是使用一個(gè)哈希的方式把某一個(gè) Topic/Partition hash 到目標(biāo)的 task manager 上面。
這就會(huì)存在一些隱患:當(dāng)我們訂閱了幾百個(gè)甚至上千個(gè) topic 的時(shí)候,可能會(huì)存在一定的分配不均衡。成百上千個(gè) Topic/Partition 里面,并不是每一個(gè) partition 的流量都是均衡的。假設(shè)我們訂閱了十個(gè) Topic,其中有九個(gè) Topic 的流量很小,另外一個(gè) Topic 的流量很大,那么均攤到某一個(gè) partition 時(shí)候也是這樣的。這個(gè)很大的 topic 的 Partition 的流量很大,另外 Topic/Partition 的流量很小。如果我們只是單純地進(jìn)行一次 hash 的話,就會(huì)造成某些 task manager 上面的流量不均衡,可能會(huì)導(dǎo)致頻繁 GC 的問題。這個(gè)問題在下一個(gè) use case 里會(huì)詳細(xì)地提到,以及怎么樣去解決它。
另外就是當(dāng)某一個(gè) Topic/Partition 進(jìn)行一次分區(qū)擴(kuò)容時(shí),怎么樣去自動(dòng)訂閱這樣一個(gè)新的分區(qū)?在 Pulsar-Flink-Connector 里面會(huì)啟動(dòng)一個(gè)定時(shí) check 的線程的邏輯。假設(shè)我們每一分鐘 check 一次,是否有新的 partition 的加入,并且這個(gè)新 Topic/Partition 分配到了某一個(gè) task manager 上面,那么這個(gè) task manager 就會(huì)自動(dòng)地新創(chuàng)建一個(gè) reader 的線程,然后把這個(gè) partition 訂閱下來。
這整個(gè)的流程,會(huì)有一個(gè) discover 會(huì)不斷的去 check。當(dāng)有新的 partition 的時(shí)候就會(huì) new 一個(gè) reader 起來。每一個(gè) reader 獨(dú)立消費(fèi)某一個(gè) Topic/Partition,把數(shù)據(jù)拿過來之后會(huì)定期進(jìn)行自己的反序列化操作以及后續(xù)的處理。
上面講到的是整個(gè) connector 的一個(gè)邏輯。在 Pulsar-Flink-Connector 里面提供了 job 的方式,還提供了 catalog 的方式來消費(fèi) Pulsar 的 topic。但是目前它是沒有提供 SQL DDL 的方式,在 BIGO 的應(yīng)用場(chǎng)景里面大部分的 topic 都是以 json 的格式。大部分的數(shù)據(jù),都是以 json 格式寫入的。
對(duì)于這一類 json 格式的topic,它可能沒有事先配置自己的 schema 就寫進(jìn)來了,那么我們?cè)谙M(fèi)的時(shí)候,如果想用 SQL,怎么辦呢?這里就需要用到 Flink DDL 的框架,所以 BIGO 的消息流平臺(tái)團(tuán)隊(duì)在我們的使用過程中為 Pulsar-Flink-Connector 開發(fā)了 Flink SQL DDL 的支持。接下來看一下 Flink SQL DDL 的框架。
第一步,圖左邊就是 fetch message from Pulsar topic,首先會(huì)定義這個(gè) topic 的里面數(shù)據(jù)的一個(gè)字段信息,也就是 create table test_Flink_SQL,這里面有 rid 等字段。下面的位置里面包含的是怎樣去和 Pulsar 的服務(wù)端建立連接的,這里會(huì)指定 topic 名稱,指定 service url,admin url 以及 subscribe name,還有一些一系列相關(guān)的配置操作。這樣一段 SQL 的代碼就能夠很好地完成把數(shù)據(jù)從 Pulsar topic 里面給消費(fèi)出來。
第二步,就可以進(jìn)行一系列應(yīng)用層相關(guān)邏輯的處理。比如做 join,count、union 等操作。另外就是一些應(yīng)用層邏輯的處理,比如說去做統(tǒng)計(jì)相關(guān)的一些操作。在第二步操作完了之后,我們需要將最終的結(jié)果寫出到第三方存儲(chǔ)里面。第三方存儲(chǔ)會(huì)包括 Hive 表、HDFS 和 Pulsar 的 topic 等。
對(duì)于最終的寫入寫出就會(huì)進(jìn)入到第三步,我們會(huì)調(diào)用一個(gè) insert into 的方法,直接把我們處理的結(jié)果,寫出到相關(guān)的 Hive 表里面去,這就是整個(gè) Flink SQL DDL 的一個(gè)處理邏輯。我們借助 Flink SQL DDL 能夠很好地來實(shí)現(xiàn)我們的 AB test 相關(guān)的操作。那么在前面的講解里面,我們可能會(huì)使用一個(gè) job 的方式來提交,有了 Flink SQL DDL 的支持,我們就可以很方便地使用一個(gè) SQL 的方式來消費(fèi) Pulsar 的 topic,會(huì)進(jìn)行一系列邏輯處理,最終把結(jié)果寫出去。
現(xiàn)在來看一下基于 SQL 方式的 use case。
Case 1
首先來看一下 BIGO reall-time ETL 的實(shí)現(xiàn)。這個(gè)實(shí)時(shí) ETL 的背景,是我們?cè)?Pulsar 里面,會(huì)有成百上千個(gè) topic,每一個(gè) topic 會(huì)有自己獨(dú)立的 schema。我們現(xiàn)在的一個(gè)需求是想要把每一個(gè) topic 使用自己的 schema 進(jìn)行一次解析,把最終解析的結(jié)果以 bucket 的格式落到 HDFS 的 Hive 表上面去。對(duì)于這樣一個(gè)需求,我們可能會(huì)有幾種方案:
第一種方案,我們會(huì)直接使用 Pulsar 的 HDFS 的 connector,會(huì)把 topic 里面的數(shù)據(jù)會(huì)消費(fèi)出來然后落到 HDFS 上面去,這樣做的話,當(dāng)我們需要對(duì) topic 里面進(jìn)行一系列的處理的時(shí)候,可能就不大好辦了。另外一個(gè)就是我們有成百上千個(gè) topic,那么也會(huì)有成百上千個(gè) schema,也就是說我們可能要維護(hù)成百上千個(gè)線程,去解相應(yīng)的 topic 里面的數(shù)據(jù),然后把它落出去。這樣對(duì)于整個(gè)任務(wù)的維護(hù)成本可能會(huì)比較高。
第二種方案。我們可以用 Flink SQL 去消費(fèi)每個(gè)topic,每一個(gè) SQL 指令自己的 schema,然后把這個(gè) topic 給消費(fèi)出來,之后進(jìn)行一系列的處理,然后寫出去。這種方式,實(shí)際上也會(huì)帶來幾百個(gè)甚至上千個(gè) SQL 任務(wù)的維護(hù)工作。
第三個(gè)方案,我們想到了使用一個(gè) Flink 任務(wù)來消費(fèi)成百上千個(gè) Pulsar 的 topic。然后進(jìn)行一系列的 ETL 處理,首先進(jìn)行 schema 的解析,然后進(jìn)行一系列邏輯處理,最終把它寫出到 HDFS 上面去。下面這張圖,就是我們采用的第三種方案:使用一個(gè) Flink 的 job 把成百上千個(gè) topic 訂閱了。訂閱完了之后,獲取相應(yīng)的線程去消費(fèi)。解析完了之后會(huì)經(jīng)過一系列邏輯處理,最終顯示到 HDFS 上面去。
這個(gè) case 可能存在數(shù)據(jù)分布不均的問題。假設(shè),我們有 500 個(gè) topic,其中 400 個(gè) topic 的流量很小,另外 100 個(gè) topic 的流量很大。那么我們?cè)谟嗛喌臅r(shí)候,假設(shè)我起了 100 個(gè) task manager 去消費(fèi)。那么這可能就會(huì)按平均來算,有 5-10 個(gè) topic partition 會(huì)落到同一個(gè) task manager 上面去。如果我們不干預(yù)的話,由于這個(gè) partition 自身的流量不均衡,可能會(huì)導(dǎo)致它從運(yùn)行任務(wù)的進(jìn)程的流量也是不均衡的,帶來了頻繁 GC 的問題。
為了解決消費(fèi)端上面的 task manager 流量不均衡的情況。我們引入了一個(gè) slot group 的概念。我們會(huì)事先對(duì) topic partition 的流量進(jìn)行一個(gè)預(yù)估,預(yù)估完了之后,會(huì)通過人工計(jì)算的方式把幾個(gè)低流量的 topic 組合成同一個(gè) group 里面。同一個(gè) group 的 topic 會(huì)被放在同一個(gè) slot 里面,然后去進(jìn)行調(diào)度,這樣就能夠很好的去把 task manager 上面的消費(fèi)流量不均的問題解決掉了,整個(gè) Flink job 就會(huì)運(yùn)行的很好。
Case 2
第二個(gè) case 是一個(gè) AB test 的應(yīng)用場(chǎng)景,做這個(gè) AB test 場(chǎng)景的一個(gè)初衷是什么呢?在我們實(shí)時(shí)的數(shù)倉里面,需要去產(chǎn)出小時(shí)級(jí)別的中間表,以及天級(jí)的中間表,給推薦算法的工程師以及數(shù)據(jù)分析師來使用。對(duì)于小時(shí)級(jí)別的中間表以及天級(jí)的中間表的產(chǎn)生,需要通過實(shí)時(shí)的去計(jì)算底層的各種類型的打點(diǎn),比如用戶觀看的打點(diǎn)、某個(gè)視頻的下發(fā)打點(diǎn),還有用戶其他行為的打點(diǎn)等等,會(huì)按照某一個(gè)維度進(jìn)行聚合。聚合了之后會(huì)進(jìn)行相關(guān)的一些統(tǒng)計(jì),最終會(huì)形成一張寬帶供推薦算法工程師以及數(shù)據(jù)分析師來使用。
如果我們不提供這樣一個(gè)寬表的話,那么對(duì)于上層的業(yè)務(wù)方而言,可能要不斷的去訪問底層的表,對(duì)底層表進(jìn)行各種相應(yīng)的操作。這樣不但會(huì)浪費(fèi)數(shù)據(jù)分析師以及推薦算法工程師的時(shí)間,也會(huì)造成整個(gè)集群計(jì)算資源的浪費(fèi)。那么在 BIGO 這邊,之前的解決方案是使用 Hive。使用 Map Reduce 的方式,來把每張底層的表進(jìn)行一次聚合操作。聚合完了之后會(huì)提供一個(gè)小時(shí)級(jí)別中間表以及天級(jí)的中間表給上層業(yè)務(wù)使用,這樣做的弊端是:Hive Map Reduce 的時(shí)效性是沒法保證的。所以我們就在想能否使用 Flink 流式計(jì)算的方式來提高實(shí)時(shí)數(shù)倉的數(shù)據(jù)產(chǎn)出效率。
接下來就是我們這邊的一個(gè)解決方案:首先我們會(huì)用 Flink SQL 去消費(fèi) Pulsar 的 topic。從下圖的左邊來看,我們有 Topic A、Topic B 和 Topic K。每個(gè) topic 有自己的 DDL。我們首先會(huì)使用 Flink SQL 加上每一個(gè) topic 的 scanner,也就是 DDL 會(huì)把 topic 的數(shù)據(jù)從 Pulsar 里面加載出來,然后把它做成每個(gè) topic 的一個(gè)視圖。
這個(gè)地方我們就會(huì)有 Table A、Table B 和 Table K 的一個(gè)表。假設(shè)有 K 張表,那么我們需要對(duì) K 張表進(jìn)行一次聚合操作。假設(shè)是按照 uid 進(jìn)行一次聚合,那么這個(gè)聚合有兩種方式:
第一種方式是做 join。對(duì)于 Flink 而言,它的流式 join 可能耗時(shí)會(huì)比較長,整個(gè)計(jì)算資源的消耗也是非常大的。所以我們這邊做了一個(gè)比較巧妙的方案就是使用 union 代替 join。我們會(huì)把 Table A、Table B 和 Table K 通過 union 的方式會(huì)生成一個(gè) View X。然后把 View X 直接寫出以小時(shí)為粒度,到 ClickHouse 供用戶查詢。在 union 的過程當(dāng)中,我們還會(huì)做一些相關(guān)的聚合的操作。來把相關(guān)的指標(biāo)給聚合起來供用戶使用。這個(gè)就是小時(shí)級(jí)別的中間表。
對(duì)于天級(jí)的中間表而言,我們所遇到的挑戰(zhàn)是:它并不是單單的只依賴了 Table A、Table B 和 Table K,可能還依賴了離線的表。假設(shè)有 Table a1、Table a2 和 Table a3 三張表。我們?cè)趺礃影褜?shí)時(shí)的表和離線的表做一個(gè)關(guān)聯(lián)?這里我們也是使用的一個(gè)比較巧妙的方式。
首先。在左邊 Table A、Table B 和 Table K 會(huì)使用 Flink SQL 把數(shù)據(jù)從 Pulsar 消費(fèi)出來,然后做成一個(gè)獨(dú)立的 table。然后同樣也是以 union 的方式把實(shí)時(shí)的流表給 union 起來,做一些統(tǒng)計(jì)相關(guān)的處理生成一個(gè)視圖,一個(gè)View X。這個(gè) View X 會(huì)根據(jù)我們精心設(shè)計(jì)過的一個(gè) row-key,把它以天為維度寫出到 HBase 里面去。
另外,對(duì)于離線而言。因?yàn)槲覀?Table A、Table B 和 Table K 只是代表了咱們實(shí)時(shí)的一些數(shù)據(jù),對(duì)于離線的數(shù)據(jù),也是需要 join 進(jìn)來的,那么就會(huì)使用一個(gè) Spark 來把 Table a1、Table a2 和 Table a3 相關(guān)的數(shù)據(jù)給 join 起來,然后也以相同的規(guī)則生成一個(gè) row-key 寫在 HBase 里面去。
對(duì)于 HBase 而言,它實(shí)際上提供的就是一個(gè) join 操作,寫到 HBase 就很好的避免了我們將 View X 以及 Spark 所生成的這樣一張表做 join 了。因?yàn)槿绻怯邢嗤?key,那么假設(shè) HBase 這樣一張寬表有 100 列,View X 占了前 80 列,那么后面的 Spark 所算出來的這個(gè)表會(huì)自動(dòng)地填充到那個(gè)后 20 列里面去,那么最終會(huì)生成同一個(gè) row-key 的一個(gè) 100 維的一張寬表。那么我們接下來會(huì)把 HBase 里面這樣一張寬表讀出來,然后寫到 ClickHouse 供上層用戶去查詢。這樣就能夠很好的去避免表之間的 join 操作,極大地提高 join 的效率。
五、未來工作
首先,我們會(huì)接著在 Pulsar-Flink-Connector 上面繼續(xù)的去開發(fā)新的 feature 并且持續(xù)的去進(jìn)行一系列的 bug 修復(fù);
第二點(diǎn),我們會(huì)更多的將 Flink 任務(wù)持續(xù)地從 Kakfa 遷移到 Apache Pulsar 上面去;
第三點(diǎn),在我們整個(gè)的消息流平臺(tái)里,之前使用的是 Kakfa,可能有成百上千個(gè) Flink 的任務(wù)或者是其他的任務(wù),使用 Kafka 的 API 來消費(fèi) Kafka 的 topic。如果不提供一個(gè)簡(jiǎn)單的方式讓用戶來消費(fèi) Pulsar 的 topic 的話,這個(gè)遷移工作是非常難進(jìn)行的。所以我們會(huì)借助于 KOP,也就是 Kakfa on Pulsar,方便上層應(yīng)用的遷移,有了這樣一層 KOP 的一個(gè) proxy,對(duì)于上面應(yīng)用程序是不需要改任何的代碼就能夠自動(dòng)的從 Kafka 切到 Pulsar 上面的;
第四點(diǎn),我們打算實(shí)現(xiàn)一個(gè)批流統(tǒng)一的數(shù)據(jù)的消費(fèi),從 Pulsar topic 里面以批或者是流的方式來消費(fèi) topic 里的數(shù)據(jù);
第五點(diǎn),我們會(huì)持續(xù)加強(qiáng) Pulsar 以及 bookkeeper 的穩(wěn)定性以及吞吐的調(diào)優(yōu);
第六點(diǎn),我們會(huì)持續(xù)的去優(yōu)化 Bookkeeper 的 IO 協(xié)議棧。