使用Apache Flink在Pinterest進(jìn)行實時實驗分析

來源:Ververica
作者:Parag Kesar&Ben Liu
時間:2020-08-20
2500
本文分享Pinterest是如何基于Flink做實時分析的實例。

在Pinterest,我們每天都要進(jìn)行數(shù)千個實驗。我們主要依靠日常實驗指標(biāo)來評估實驗效果。日常實驗管道運行一次可能會花費10多個小時,有時還會超時,因此想要驗證實驗設(shè)置、觸發(fā)的正確性以及預(yù)期的實驗性能時就沒那么方便了。當(dāng)代碼中存在一些錯誤時這個問題尤為突出。有時可能要花幾天時間才能發(fā)現(xiàn)錯誤,這對用戶體驗和重要指標(biāo)造成了更大的損害。我們在Pinterest開發(fā)了一個近實時實驗平臺,以提供更具時效性的實驗指標(biāo),從而幫助我們盡快發(fā)現(xiàn)這些問題??赡艹霈F(xiàn)的問題有:

1.實驗導(dǎo)致impression的統(tǒng)計數(shù)據(jù)顯著下降,因此需要盡快關(guān)閉實驗。

2.與對照組相比,實驗導(dǎo)致搜索的執(zhí)行次數(shù)顯著增加。

640.webp (6).jpg

圖1-帶有置信區(qū)間的實時實驗指標(biāo)

上圖的面板顯示了所選事件的實驗組和對照組的流量(也就是動作數(shù))和傾向(也就是unique user的數(shù)量)。自實驗開始以來,這些計數(shù)已經(jīng)累計了3天時間。如果在3天后發(fā)生了re-ramp(分配給實驗組和對照組的用戶數(shù)量增加),則計數(shù)會歸零0并重新開始累計3天時間。

為了確保實驗組與對照組之間的對比在統(tǒng)計上是有效的,我們做了一些統(tǒng)計檢驗。由于指標(biāo)是實時交付的,因此每次按順序收到新記錄時,我們都必須進(jìn)行這些檢驗。這需要與傳統(tǒng)的固定視野檢驗不一樣的方法,否則會帶來較高的假正率。我們考慮過幾種順序測試方法,包括賭徒破產(chǎn)、貝葉斯A/B檢驗和Alpha消耗函數(shù)方法。為了保證數(shù)值穩(wěn)定性,我們從t檢驗+Boferroni校正(將我們的案例作為多次檢驗進(jìn)行處理)開始,并為我們的初始實現(xiàn)預(yù)先確定了檢驗次數(shù)。

高階設(shè)計

640.webp.jpg

圖2-實時實驗管道的高階設(shè)計

實時實驗管道包括下列主要組件:

·最近ramp的實驗組作業(yè)→每5分鐘將一個CSV文件發(fā)布到一個S3位置。這個CSV是過去3天中所分配用戶有所增加的實驗組的快照。通過查詢托管實驗元數(shù)據(jù)的內(nèi)部Analytics(分析)應(yīng)用程序的MySQL數(shù)據(jù)庫,就能獲得這一信息。

·篩選事件作業(yè)→我們分析了Pinterest上的數(shù)百種用戶動作。這一作業(yè)僅保留最關(guān)key的業(yè)務(wù)事件,這些事件已插入“filtered_events”Kafka主題中。這些事件被剝離掉了不需要的字段,因此filtered_events主題相當(dāng)輕巧。該作業(yè)運行在Flink processing時間內(nèi),并且通過Flink的增量檢查點,每隔5秒將其進(jìn)度保存到HDFS中。

·過濾實驗Activation作業(yè)→每當(dāng)一個用戶被觸發(fā)進(jìn)入一個實驗時,都會創(chuàng)建一個Activation(激活)記錄。觸發(fā)規(guī)則取決于實驗邏輯,一名用戶可以被觸發(fā)進(jìn)入一個實驗數(shù)百次。我們只需要最近3天啟動,或組分配增加的實驗的Activation記錄即可。

為了過濾Activation記錄,此作業(yè)使用Flink的廣播狀態(tài)模式。每10秒檢查一次“最近ramp的實驗組”作業(yè)所發(fā)布的CSV的更改情況,并將其發(fā)布到一個KeyedBroadcastProcessFunction的所有分區(qū)上,該函數(shù)也消費Activation。

KeyedBroadcastProcessFunction將廣播的CSV與Activation流結(jié)合在一起,就可以過濾掉那些最近3天內(nèi)未ramp-up實驗的Activation記錄。此外,“group-ramp-up-time”已添加到Activation記錄中,并插入“filtered_experiment_activations”kafka主題中。

640.webp (1).jpg

圖3-Scala對象被插入中間層Kafka主題中

640.webp (2).jpg

圖4-實時實驗累積作業(yè)圖

上面是實時累積(Aggregation)Flink作業(yè)的高階概覽。這里簡單提及了一些operator,后文中還將詳細(xì)介紹另一些operator。Source operator從Kafka讀取數(shù)據(jù),而sink使用一個REST接口寫入我們的內(nèi)部Analytics Store上。

刪除重復(fù)事件→這里用一個KeyedProcessFunction實現(xiàn),由(event.user_id,event.event_type,event.timestamp)作為key。這里的思想是,如果來自同一用戶的相同事件類型的事件具有相同的時間戳,則它們是重復(fù)事件。第一個這樣的事件被發(fā)送到下游,但也會緩存進(jìn)狀態(tài)持續(xù)5分鐘時間。任何后續(xù)事件都將被丟棄。5分鐘后,一個計時器會啟動并清除狀態(tài)。這里的假定是所有重復(fù)事件之間的間隔都在5分鐘之內(nèi)。

查找首次觸發(fā)時間→這里是一個Flink KeyedProcessFunction,由(experiment_hash,experiment_group,user_id)作為key。這里的假設(shè)是,為一個用戶收到的第一個實驗Activation記錄也是具有第一個觸發(fā)時間的Activation。一個實驗ramp-up以后,收到的第一個Activation將發(fā)送至下游,并保存為狀態(tài)并持續(xù)3天時間(我們累積了實驗組ramp-up以來為期3天的計數(shù))。經(jīng)過3天的ramp時間后,一個計時器將清除狀態(tài)。

15分鐘的processing時間tumbling窗口→事件進(jìn)入并向下游發(fā)送結(jié)果時,Numerator Computer和Denominator computer都將累積計數(shù)。這意味著數(shù)百萬條記錄,但是我們不需要如此頻繁地將結(jié)果發(fā)送到Analytics Store上。我們可以在processing時間內(nèi)運行一個持續(xù)15分鐘的Flink tumbling窗口,這樣效率更高。對于Numerator Computer來說,這個窗口由(“experiment_hash”,“experiment_group”,“event_type”,“timestamp”)作為key。當(dāng)窗口在15分鐘后觸發(fā)時,將獲取帶有max_users的記錄并將其發(fā)送到下游的Analytics Store sink。

連接事件和Activation

640.webp (3).jpg

圖5-通過用戶ID連接Activation流與事件流

我們使用Flink的IntervalJoin operator實現(xiàn)流到流的連接。IntervalJoin會在接下來的3天內(nèi)緩沖每位用戶的單個Activation記錄,并且所有匹配事件都將與Activation記錄中的其他實驗元數(shù)據(jù)一起發(fā)送到下游。

這種方法的局限性:

1.對我們的需求而言,IntervalJoin operator有點不夠靈活,因為它的間隔是固定的而不是動態(tài)的。比如說,用戶可以在實驗啟動2天后加入進(jìn)來,但I(xiàn)ntervalJoin還是會為這名用戶運行3天時間,也就是說我們停止累積數(shù)據(jù)后還會運行2天時間。如果3天后組很快re-ramp,則一位用戶也可以有2個這樣的連接。這種情況會在下游處理。

2.事件和Activation不同步:如果Activation作業(yè)失敗并且Activation流被延遲,則可能會丟失一些數(shù)據(jù),因為沒有匹配Activation的事件還會繼續(xù)流動。這將導(dǎo)致計數(shù)不足。

我們研究了Flink的IntervalJoin源代碼。它會在“左側(cè)緩沖區(qū)”中緩沖Activation 3天時間,但事件將被立即刪除。目前似乎無法通過配置更改此行為。我們正在研究使用Flink的協(xié)同處理函數(shù)來實現(xiàn)這個Activation到事件的連接,該函數(shù)是用于流到流連接的更通用的函數(shù)。我們可以將事件緩沖X分鐘,這樣即使Activation流延遲了X分鐘,管道也可以處理延遲而不會出現(xiàn)計數(shù)不足。這將幫助我們避免同一用戶的兩次連接,并能形成更加動態(tài)的管道,其可以立即感知到實驗組的re-ramp,并支持更多動態(tài)行為,例如在組re-ramp時自動擴(kuò)展累積的覆蓋范圍。

Join Results Deduplicator

640.webp (4).jpg

圖6-Join Results Deduplicator

Join Results Deduplicator是一個Flink KeyedProcessFunction,它由experiment_hash,experiment_group,event_type,user_id作為key。這個operator的主要目的是在向下游發(fā)送記錄時插入“user_first_time_seen”標(biāo)志——下游Numerator Computer使用這個標(biāo)志來計算傾向編號(#unique users),而無需使用設(shè)置的數(shù)據(jù)結(jié)構(gòu)。

這個operator將狀態(tài)存儲到last-ramp-time+3天,之后狀態(tài)將被清除。

Numerator Computer

640.webp.jpg

圖7-Numerator Computer

Numerator Computer是一個KeyedProcessFunction,由experiment_hash,experiment_group,event_type作為key。它會在最后2小時內(nèi)一直滾動15分鐘的存儲桶(bucket),每當(dāng)有新記錄進(jìn)入時都會更新這些桶。對于流量來說,每個動作都很重要;因此對于每個事件,動作計數(shù)都會增加。對于傾向數(shù)字(unique user)——它取決于"first_time_seen”標(biāo)志(僅在為true時遞增)。

隨著時間的流逝,存儲桶會滾動/旋轉(zhuǎn)。每次新事件進(jìn)入時,存儲桶數(shù)據(jù)都會向下游刷新到15分鐘的tumbling窗口中。

它有一個時間為3天的計時器(從ramp-up時間→3天),可在觸發(fā)后清除所有狀態(tài),這樣就能在ramp-up3天后重置/清除計數(shù),完成歸零。

垃圾消息與處理

為了使我們的流管道具有容錯能力,F(xiàn)link的增量檢查點和RocksDB狀態(tài)后端被用來保存應(yīng)用程序檢查點。我們面臨的一項有趣挑戰(zhàn)是檢查點失敗。問題似乎在于檢查點流程需要花費很長時間,并且最終會超時。我們還注意到,在發(fā)生檢查點故障時通常也會有很高的背壓。

640.webp (1).jpg

圖8-Flink UI中顯示的檢查點故障

在仔細(xì)檢查了檢查點故障的內(nèi)部機(jī)制之后,我們發(fā)現(xiàn)超時是由于某些子任務(wù)未將確認(rèn)發(fā)送給檢查點協(xié)調(diào)器而導(dǎo)致的,整個檢查點流程都卡住了,如下所示。

640.webp (2).jpg

圖9-子任務(wù)未發(fā)送確認(rèn)

然后我們針對導(dǎo)致失敗的根本原因應(yīng)用了一些調(diào)試步驟:

1.檢查作業(yè)管理日志

2.檢查在檢查點期間卡住的子任務(wù)的任務(wù)管理器日志

3.使用Jstack詳細(xì)查看子任務(wù)

原來子任務(wù)運行很正常,只是抽不出空來處理消息。結(jié)果,這個特定的子任務(wù)具有很高的背壓,從而阻止了barrier通過。沒有barrier的收據(jù),檢查點流程將無法進(jìn)行。

在進(jìn)一步檢查所有子任務(wù)的Flink指標(biāo)之后,我們發(fā)現(xiàn)其中一個子任務(wù)產(chǎn)生的消息數(shù)量比其對等任務(wù)多100倍。由于消息是通過user_id在子任務(wù)之間分區(qū)的,這表明有些用戶產(chǎn)生的消息比其他用戶多得多,這就意味著那是垃圾消息。臨時查詢我們的spam_adjusted數(shù)據(jù)集后也確認(rèn)了這一結(jié)果。

640.webp (3).jpg

圖10-不同子任務(wù)的消息數(shù)

為了緩解該問題,我們在“過濾器事件作業(yè)”中應(yīng)用了一個上限規(guī)則:對于一個小時內(nèi)的用戶,如果我們看到的消息多于X條,則僅發(fā)送前X條消息。應(yīng)用上限規(guī)則后,檢查點就不再出現(xiàn)故障了。

數(shù)據(jù)穩(wěn)健性和驗證

數(shù)據(jù)準(zhǔn)確性對于實驗指標(biāo)的計算而言更為重要。為了確保我們的實時實驗流程按預(yù)期運行,并始終提供準(zhǔn)確的指標(biāo),我們啟動了一個單獨的每日工作流,其執(zhí)行與流作業(yè)相同的計算,但使用的是臨時方式。如果流作業(yè)結(jié)果違反以下任一條件,則會提醒開發(fā)人員:

·在同一累積期間(本例中為3天),計數(shù)不應(yīng)減少

·如果在第一個累積期之后進(jìn)行了re-ramp,則計數(shù)應(yīng)從0開始再累積3天

·流結(jié)果與驗證流結(jié)果之間的差異不應(yīng)超過某個閾值(在我們的例子中為2%)。

通過查詢實驗元數(shù)據(jù),我們分別在3種情況下對實驗進(jìn)行了驗證:

1.單次ramp-up實驗

2.在初始累積期間內(nèi)進(jìn)行多次ramp-up實驗

3.在初始累積期后進(jìn)行多次ramp-up實驗

這一流程如下所示:

640.webp (4).jpg

圖11-驗證流程

規(guī)模

在這一部分中,我們提供了一些基本統(tǒng)計信息,展示實時實驗管道的規(guī)模:

1.輸入主題流量(一天的平均值):

1597907775(1).png

2.100G檢查點

3.200~300個實驗

4.8個master,50個worker,每個都是ec2 c5d.9xlarge

5.計算的并行度為256

未來計劃

1.支持更多指標(biāo),例如PWT(pinner等待時間),這樣如果實驗導(dǎo)致Pinner的延遲異常增加,則可以盡快停止。

2.可能更新管道以使用Flink的協(xié)同處理功能代替“間隔連接”,使管道更具動態(tài)性和彈性,以應(yīng)對事件流和Activation流之間的不同步問題。

3.分區(qū):研究分區(qū)可以支持的分區(qū)類型,因為分區(qū)會導(dǎo)致狀態(tài)增加。

4.通過電子郵件或Slack支持實時警報。

致謝

實時實驗分析是Pinterest在生產(chǎn)環(huán)境中的第一個基于Flink的應(yīng)用程序。非常感謝我們的大數(shù)據(jù)平臺團(tuán)隊(特別感謝Steven Bairos-Novak、Jooseong Kim和Ang Zhang)構(gòu)建了Flink平臺并將其作為服務(wù)提供出來。同時還要感謝Analytics Platform團(tuán)隊(Bo Sun)出色的可視化效果,Logging Platform團(tuán)隊提供實時數(shù)據(jù)提取,以及Data Science團(tuán)隊(Brian Karfunkel)提供的統(tǒng)計咨詢!

立即登錄,閱讀全文
原文鏈接:點擊前往 >
文章來源:Ververica
版權(quán)說明:本文內(nèi)容來自于Ververica,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個人觀點,不代表快出海對觀點贊同或支持。如有侵權(quán),請聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務(wù)商推薦
更多
掃碼登錄
打開掃一掃, 關(guān)注公眾號后即可登錄/注冊
加載中
二維碼已失效 請重試
刷新
賬號登錄/注冊
個人VIP
小程序
快出海小程序
公眾號
快出海公眾號
商務(wù)合作
商務(wù)合作
投稿采訪
投稿采訪
出海管家
出海管家