在Pinterest,我們每天都要進(jìn)行數(shù)千個(gè)實(shí)驗(yàn)。我們主要依靠日常實(shí)驗(yàn)指標(biāo)來評估實(shí)驗(yàn)效果。日常實(shí)驗(yàn)管道運(yùn)行一次可能會花費(fèi)10多個(gè)小時(shí),有時(shí)還會超時(shí),因此想要驗(yàn)證實(shí)驗(yàn)設(shè)置、觸發(fā)的正確性以及預(yù)期的實(shí)驗(yàn)性能時(shí)就沒那么方便了。當(dāng)代碼中存在一些錯誤時(shí)這個(gè)問題尤為突出。有時(shí)可能要花幾天時(shí)間才能發(fā)現(xiàn)錯誤,這對用戶體驗(yàn)和重要指標(biāo)造成了更大的損害。我們在Pinterest開發(fā)了一個(gè)近實(shí)時(shí)實(shí)驗(yàn)平臺,以提供更具時(shí)效性的實(shí)驗(yàn)指標(biāo),從而幫助我們盡快發(fā)現(xiàn)這些問題??赡艹霈F(xiàn)的問題有:
1.實(shí)驗(yàn)導(dǎo)致impression的統(tǒng)計(jì)數(shù)據(jù)顯著下降,因此需要盡快關(guān)閉實(shí)驗(yàn)。
2.與對照組相比,實(shí)驗(yàn)導(dǎo)致搜索的執(zhí)行次數(shù)顯著增加。
圖1-帶有置信區(qū)間的實(shí)時(shí)實(shí)驗(yàn)指標(biāo)
上圖的面板顯示了所選事件的實(shí)驗(yàn)組和對照組的流量(也就是動作數(shù))和傾向(也就是unique user的數(shù)量)。自實(shí)驗(yàn)開始以來,這些計(jì)數(shù)已經(jīng)累計(jì)了3天時(shí)間。如果在3天后發(fā)生了re-ramp(分配給實(shí)驗(yàn)組和對照組的用戶數(shù)量增加),則計(jì)數(shù)會歸零0并重新開始累計(jì)3天時(shí)間。
為了確保實(shí)驗(yàn)組與對照組之間的對比在統(tǒng)計(jì)上是有效的,我們做了一些統(tǒng)計(jì)檢驗(yàn)。由于指標(biāo)是實(shí)時(shí)交付的,因此每次按順序收到新記錄時(shí),我們都必須進(jìn)行這些檢驗(yàn)。這需要與傳統(tǒng)的固定視野檢驗(yàn)不一樣的方法,否則會帶來較高的假正率。我們考慮過幾種順序測試方法,包括賭徒破產(chǎn)、貝葉斯A/B檢驗(yàn)和Alpha消耗函數(shù)方法。為了保證數(shù)值穩(wěn)定性,我們從t檢驗(yàn)+Boferroni校正(將我們的案例作為多次檢驗(yàn)進(jìn)行處理)開始,并為我們的初始實(shí)現(xiàn)預(yù)先確定了檢驗(yàn)次數(shù)。
高階設(shè)計(jì)
圖2-實(shí)時(shí)實(shí)驗(yàn)管道的高階設(shè)計(jì)
實(shí)時(shí)實(shí)驗(yàn)管道包括下列主要組件:
·最近ramp的實(shí)驗(yàn)組作業(yè)→每5分鐘將一個(gè)CSV文件發(fā)布到一個(gè)S3位置。這個(gè)CSV是過去3天中所分配用戶有所增加的實(shí)驗(yàn)組的快照。通過查詢托管實(shí)驗(yàn)元數(shù)據(jù)的內(nèi)部Analytics(分析)應(yīng)用程序的MySQL數(shù)據(jù)庫,就能獲得這一信息。
·篩選事件作業(yè)→我們分析了Pinterest上的數(shù)百種用戶動作。這一作業(yè)僅保留最關(guān)key的業(yè)務(wù)事件,這些事件已插入“filtered_events”Kafka主題中。這些事件被剝離掉了不需要的字段,因此filtered_events主題相當(dāng)輕巧。該作業(yè)運(yùn)行在Flink processing時(shí)間內(nèi),并且通過Flink的增量檢查點(diǎn),每隔5秒將其進(jìn)度保存到HDFS中。
·過濾實(shí)驗(yàn)Activation作業(yè)→每當(dāng)一個(gè)用戶被觸發(fā)進(jìn)入一個(gè)實(shí)驗(yàn)時(shí),都會創(chuàng)建一個(gè)Activation(激活)記錄。觸發(fā)規(guī)則取決于實(shí)驗(yàn)邏輯,一名用戶可以被觸發(fā)進(jìn)入一個(gè)實(shí)驗(yàn)數(shù)百次。我們只需要最近3天啟動,或組分配增加的實(shí)驗(yàn)的Activation記錄即可。
為了過濾Activation記錄,此作業(yè)使用Flink的廣播狀態(tài)模式。每10秒檢查一次“最近ramp的實(shí)驗(yàn)組”作業(yè)所發(fā)布的CSV的更改情況,并將其發(fā)布到一個(gè)KeyedBroadcastProcessFunction的所有分區(qū)上,該函數(shù)也消費(fèi)Activation。
KeyedBroadcastProcessFunction將廣播的CSV與Activation流結(jié)合在一起,就可以過濾掉那些最近3天內(nèi)未ramp-up實(shí)驗(yàn)的Activation記錄。此外,“group-ramp-up-time”已添加到Activation記錄中,并插入“filtered_experiment_activations”kafka主題中。
圖3-Scala對象被插入中間層Kafka主題中
圖4-實(shí)時(shí)實(shí)驗(yàn)累積作業(yè)圖
上面是實(shí)時(shí)累積(Aggregation)Flink作業(yè)的高階概覽。這里簡單提及了一些operator,后文中還將詳細(xì)介紹另一些operator。Source operator從Kafka讀取數(shù)據(jù),而sink使用一個(gè)REST接口寫入我們的內(nèi)部Analytics Store上。
刪除重復(fù)事件→這里用一個(gè)KeyedProcessFunction實(shí)現(xiàn),由(event.user_id,event.event_type,event.timestamp)作為key。這里的思想是,如果來自同一用戶的相同事件類型的事件具有相同的時(shí)間戳,則它們是重復(fù)事件。第一個(gè)這樣的事件被發(fā)送到下游,但也會緩存進(jìn)狀態(tài)持續(xù)5分鐘時(shí)間。任何后續(xù)事件都將被丟棄。5分鐘后,一個(gè)計(jì)時(shí)器會啟動并清除狀態(tài)。這里的假定是所有重復(fù)事件之間的間隔都在5分鐘之內(nèi)。
查找首次觸發(fā)時(shí)間→這里是一個(gè)Flink KeyedProcessFunction,由(experiment_hash,experiment_group,user_id)作為key。這里的假設(shè)是,為一個(gè)用戶收到的第一個(gè)實(shí)驗(yàn)Activation記錄也是具有第一個(gè)觸發(fā)時(shí)間的Activation。一個(gè)實(shí)驗(yàn)ramp-up以后,收到的第一個(gè)Activation將發(fā)送至下游,并保存為狀態(tài)并持續(xù)3天時(shí)間(我們累積了實(shí)驗(yàn)組ramp-up以來為期3天的計(jì)數(shù))。經(jīng)過3天的ramp時(shí)間后,一個(gè)計(jì)時(shí)器將清除狀態(tài)。
15分鐘的processing時(shí)間tumbling窗口→事件進(jìn)入并向下游發(fā)送結(jié)果時(shí),Numerator Computer和Denominator computer都將累積計(jì)數(shù)。這意味著數(shù)百萬條記錄,但是我們不需要如此頻繁地將結(jié)果發(fā)送到Analytics Store上。我們可以在processing時(shí)間內(nèi)運(yùn)行一個(gè)持續(xù)15分鐘的Flink tumbling窗口,這樣效率更高。對于Numerator Computer來說,這個(gè)窗口由(“experiment_hash”,“experiment_group”,“event_type”,“timestamp”)作為key。當(dāng)窗口在15分鐘后觸發(fā)時(shí),將獲取帶有max_users的記錄并將其發(fā)送到下游的Analytics Store sink。
連接事件和Activation
圖5-通過用戶ID連接Activation流與事件流
我們使用Flink的IntervalJoin operator實(shí)現(xiàn)流到流的連接。IntervalJoin會在接下來的3天內(nèi)緩沖每位用戶的單個(gè)Activation記錄,并且所有匹配事件都將與Activation記錄中的其他實(shí)驗(yàn)元數(shù)據(jù)一起發(fā)送到下游。
這種方法的局限性:
1.對我們的需求而言,IntervalJoin operator有點(diǎn)不夠靈活,因?yàn)樗拈g隔是固定的而不是動態(tài)的。比如說,用戶可以在實(shí)驗(yàn)啟動2天后加入進(jìn)來,但I(xiàn)ntervalJoin還是會為這名用戶運(yùn)行3天時(shí)間,也就是說我們停止累積數(shù)據(jù)后還會運(yùn)行2天時(shí)間。如果3天后組很快re-ramp,則一位用戶也可以有2個(gè)這樣的連接。這種情況會在下游處理。
2.事件和Activation不同步:如果Activation作業(yè)失敗并且Activation流被延遲,則可能會丟失一些數(shù)據(jù),因?yàn)闆]有匹配Activation的事件還會繼續(xù)流動。這將導(dǎo)致計(jì)數(shù)不足。
我們研究了Flink的IntervalJoin源代碼。它會在“左側(cè)緩沖區(qū)”中緩沖Activation 3天時(shí)間,但事件將被立即刪除。目前似乎無法通過配置更改此行為。我們正在研究使用Flink的協(xié)同處理函數(shù)來實(shí)現(xiàn)這個(gè)Activation到事件的連接,該函數(shù)是用于流到流連接的更通用的函數(shù)。我們可以將事件緩沖X分鐘,這樣即使Activation流延遲了X分鐘,管道也可以處理延遲而不會出現(xiàn)計(jì)數(shù)不足。這將幫助我們避免同一用戶的兩次連接,并能形成更加動態(tài)的管道,其可以立即感知到實(shí)驗(yàn)組的re-ramp,并支持更多動態(tài)行為,例如在組re-ramp時(shí)自動擴(kuò)展累積的覆蓋范圍。
Join Results Deduplicator
圖6-Join Results Deduplicator
Join Results Deduplicator是一個(gè)Flink KeyedProcessFunction,它由experiment_hash,experiment_group,event_type,user_id作為key。這個(gè)operator的主要目的是在向下游發(fā)送記錄時(shí)插入“user_first_time_seen”標(biāo)志——下游Numerator Computer使用這個(gè)標(biāo)志來計(jì)算傾向編號(#unique users),而無需使用設(shè)置的數(shù)據(jù)結(jié)構(gòu)。
這個(gè)operator將狀態(tài)存儲到last-ramp-time+3天,之后狀態(tài)將被清除。
Numerator Computer
圖7-Numerator Computer
Numerator Computer是一個(gè)KeyedProcessFunction,由experiment_hash,experiment_group,event_type作為key。它會在最后2小時(shí)內(nèi)一直滾動15分鐘的存儲桶(bucket),每當(dāng)有新記錄進(jìn)入時(shí)都會更新這些桶。對于流量來說,每個(gè)動作都很重要;因此對于每個(gè)事件,動作計(jì)數(shù)都會增加。對于傾向數(shù)字(unique user)——它取決于"first_time_seen”標(biāo)志(僅在為true時(shí)遞增)。
隨著時(shí)間的流逝,存儲桶會滾動/旋轉(zhuǎn)。每次新事件進(jìn)入時(shí),存儲桶數(shù)據(jù)都會向下游刷新到15分鐘的tumbling窗口中。
它有一個(gè)時(shí)間為3天的計(jì)時(shí)器(從ramp-up時(shí)間→3天),可在觸發(fā)后清除所有狀態(tài),這樣就能在ramp-up3天后重置/清除計(jì)數(shù),完成歸零。
垃圾消息與處理
為了使我們的流管道具有容錯能力,F(xiàn)link的增量檢查點(diǎn)和RocksDB狀態(tài)后端被用來保存應(yīng)用程序檢查點(diǎn)。我們面臨的一項(xiàng)有趣挑戰(zhàn)是檢查點(diǎn)失敗。問題似乎在于檢查點(diǎn)流程需要花費(fèi)很長時(shí)間,并且最終會超時(shí)。我們還注意到,在發(fā)生檢查點(diǎn)故障時(shí)通常也會有很高的背壓。
圖8-Flink UI中顯示的檢查點(diǎn)故障
在仔細(xì)檢查了檢查點(diǎn)故障的內(nèi)部機(jī)制之后,我們發(fā)現(xiàn)超時(shí)是由于某些子任務(wù)未將確認(rèn)發(fā)送給檢查點(diǎn)協(xié)調(diào)器而導(dǎo)致的,整個(gè)檢查點(diǎn)流程都卡住了,如下所示。
圖9-子任務(wù)未發(fā)送確認(rèn)
然后我們針對導(dǎo)致失敗的根本原因應(yīng)用了一些調(diào)試步驟:
1.檢查作業(yè)管理日志
2.檢查在檢查點(diǎn)期間卡住的子任務(wù)的任務(wù)管理器日志
3.使用Jstack詳細(xì)查看子任務(wù)
原來子任務(wù)運(yùn)行很正常,只是抽不出空來處理消息。結(jié)果,這個(gè)特定的子任務(wù)具有很高的背壓,從而阻止了barrier通過。沒有barrier的收據(jù),檢查點(diǎn)流程將無法進(jìn)行。
在進(jìn)一步檢查所有子任務(wù)的Flink指標(biāo)之后,我們發(fā)現(xiàn)其中一個(gè)子任務(wù)產(chǎn)生的消息數(shù)量比其對等任務(wù)多100倍。由于消息是通過user_id在子任務(wù)之間分區(qū)的,這表明有些用戶產(chǎn)生的消息比其他用戶多得多,這就意味著那是垃圾消息。臨時(shí)查詢我們的spam_adjusted數(shù)據(jù)集后也確認(rèn)了這一結(jié)果。
圖10-不同子任務(wù)的消息數(shù)
為了緩解該問題,我們在“過濾器事件作業(yè)”中應(yīng)用了一個(gè)上限規(guī)則:對于一個(gè)小時(shí)內(nèi)的用戶,如果我們看到的消息多于X條,則僅發(fā)送前X條消息。應(yīng)用上限規(guī)則后,檢查點(diǎn)就不再出現(xiàn)故障了。
數(shù)據(jù)穩(wěn)健性和驗(yàn)證
數(shù)據(jù)準(zhǔn)確性對于實(shí)驗(yàn)指標(biāo)的計(jì)算而言更為重要。為了確保我們的實(shí)時(shí)實(shí)驗(yàn)流程按預(yù)期運(yùn)行,并始終提供準(zhǔn)確的指標(biāo),我們啟動了一個(gè)單獨(dú)的每日工作流,其執(zhí)行與流作業(yè)相同的計(jì)算,但使用的是臨時(shí)方式。如果流作業(yè)結(jié)果違反以下任一條件,則會提醒開發(fā)人員:
·在同一累積期間(本例中為3天),計(jì)數(shù)不應(yīng)減少
·如果在第一個(gè)累積期之后進(jìn)行了re-ramp,則計(jì)數(shù)應(yīng)從0開始再累積3天
·流結(jié)果與驗(yàn)證流結(jié)果之間的差異不應(yīng)超過某個(gè)閾值(在我們的例子中為2%)。
通過查詢實(shí)驗(yàn)元數(shù)據(jù),我們分別在3種情況下對實(shí)驗(yàn)進(jìn)行了驗(yàn)證:
1.單次ramp-up實(shí)驗(yàn)
2.在初始累積期間內(nèi)進(jìn)行多次ramp-up實(shí)驗(yàn)
3.在初始累積期后進(jìn)行多次ramp-up實(shí)驗(yàn)
這一流程如下所示:
圖11-驗(yàn)證流程
規(guī)模
在這一部分中,我們提供了一些基本統(tǒng)計(jì)信息,展示實(shí)時(shí)實(shí)驗(yàn)管道的規(guī)模:
1.輸入主題流量(一天的平均值):
2.100G檢查點(diǎn)
3.200~300個(gè)實(shí)驗(yàn)
4.8個(gè)master,50個(gè)worker,每個(gè)都是ec2 c5d.9xlarge
5.計(jì)算的并行度為256
未來計(jì)劃
1.支持更多指標(biāo),例如PWT(pinner等待時(shí)間),這樣如果實(shí)驗(yàn)導(dǎo)致Pinner的延遲異常增加,則可以盡快停止。
2.可能更新管道以使用Flink的協(xié)同處理功能代替“間隔連接”,使管道更具動態(tài)性和彈性,以應(yīng)對事件流和Activation流之間的不同步問題。
3.分區(qū):研究分區(qū)可以支持的分區(qū)類型,因?yàn)榉謪^(qū)會導(dǎo)致狀態(tài)增加。
4.通過電子郵件或Slack支持實(shí)時(shí)警報(bào)。
致謝
實(shí)時(shí)實(shí)驗(yàn)分析是Pinterest在生產(chǎn)環(huán)境中的第一個(gè)基于Flink的應(yīng)用程序。非常感謝我們的大數(shù)據(jù)平臺團(tuán)隊(duì)(特別感謝Steven Bairos-Novak、Jooseong Kim和Ang Zhang)構(gòu)建了Flink平臺并將其作為服務(wù)提供出來。同時(shí)還要感謝Analytics Platform團(tuán)隊(duì)(Bo Sun)出色的可視化效果,Logging Platform團(tuán)隊(duì)提供實(shí)時(shí)數(shù)據(jù)提取,以及Data Science團(tuán)隊(duì)(Brian Karfunkel)提供的統(tǒng)計(jì)咨詢!