使用Apache Hudi + Amazon S3 + AWS DMS構(gòu)建數(shù)據(jù)湖(上)

來源: 知乎
作者:ApacheHudi
時(shí)間:2020-11-11
17383
Amazon Simple Storage Service(amazon S3)是針對(duì)結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)的高性能對(duì)象存儲(chǔ)服務(wù),可以用來作為數(shù)據(jù)湖底層的存儲(chǔ)服務(wù)。

1. 引入

數(shù)據(jù)湖使組織能夠在更短的時(shí)間內(nèi)利用多個(gè)源的數(shù)據(jù),而不同角色用戶可以以不同的方式協(xié)作和分析數(shù)據(jù),從而實(shí)現(xiàn)更好、更快的決策。Amazon Simple Storage Service(amazon S3)是針對(duì)結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)的高性能對(duì)象存儲(chǔ)服務(wù),可以用來作為數(shù)據(jù)湖底層的存儲(chǔ)服務(wù)。

然而許多用例,如從上游關(guān)系數(shù)據(jù)庫執(zhí)行變更數(shù)據(jù)捕獲(CDC)到基于Amazon S3的數(shù)據(jù)湖,都需要在記錄級(jí)別處理數(shù)據(jù),執(zhí)行諸如從數(shù)據(jù)集中插入、更新和刪除單條記錄的操作需要處理引擎讀取所有對(duì)象(文件),進(jìn)行更改,并將整個(gè)數(shù)據(jù)集重寫為新文件。此外為使數(shù)據(jù)湖中的數(shù)據(jù)以近乎實(shí)時(shí)的方式被訪問,通常會(huì)導(dǎo)致數(shù)據(jù)被分割成許多小文件,從而導(dǎo)致查詢性能較差。Apache Hudi是一個(gè)開源的數(shù)據(jù)管理框架,它使您能夠在Amazon S3 數(shù)據(jù)湖中以記錄級(jí)別管理數(shù)據(jù),從而簡(jiǎn)化了CDC管道的構(gòu)建,并使流數(shù)據(jù)攝取變得高效,Hudi管理的數(shù)據(jù)集使用開放存儲(chǔ)格式存儲(chǔ)在Amazon S3中,通過與Presto、Apache Hive、Apache Spark和AWS Glue數(shù)據(jù)目錄的集成,您可以使用熟悉的工具近乎實(shí)時(shí)地訪問更新的數(shù)據(jù)。Amazon EMR已經(jīng)內(nèi)置Hudi,在部署EMR集群時(shí)選擇Spark、Hive或Presto時(shí)自動(dòng)安裝Hudi。

本篇文章將展示如何構(gòu)建一個(gè)CDC管道,該管道使用AWS數(shù)據(jù)庫遷移服務(wù)(AWS DMS)從Amazon關(guān)系數(shù)據(jù)庫服務(wù)(Amazon RDS)for MySQL數(shù)據(jù)庫中捕獲數(shù)據(jù),并將這些更改應(yīng)用到Amazon S3中的一個(gè)數(shù)據(jù)集上。Apache Hudi包含了HoodieDeltaStreamer實(shí)用程序,它提供了一種從許多源(如分布式文件系統(tǒng)DFS或Kafka)攝取數(shù)據(jù)的簡(jiǎn)單方法,它可以自己管理檢查點(diǎn)、回滾和恢復(fù),因此不需要跟蹤從源讀取和處理了哪些數(shù)據(jù),這使得使用更改數(shù)據(jù)變得很容易,同時(shí)還可以在接收數(shù)據(jù)時(shí)對(duì)數(shù)據(jù)進(jìn)行基于SQL的輕量級(jí)轉(zhuǎn)換,有關(guān)詳細(xì)信息,請(qǐng)參見寫Hudi表。ApacheHudi版本0.5.2提供了對(duì)帶有HoodieDeltaStreamer的AWS DMS支持,并在Amazon EMR 5.30.x和6.1.0上可用。

2. 架構(gòu)

下圖展示了構(gòu)建CDC管道而部署的體系結(jié)構(gòu)。

v2-ec3b0e8916b3ce5f0eb21efbb9877255_720w.jpg

在該架構(gòu)中,我們?cè)贏mazon RDS上有一個(gè)MySQL實(shí)例,AWS-DMS將完整的增量數(shù)據(jù)(使用AWS-DMS的CDC特性)以Parquet格式存入S3中,EMR集群上的HoodieDeltaStreamer用于處理全量和增量數(shù)據(jù),以創(chuàng)建Hudi數(shù)據(jù)集,當(dāng)更新MySQL數(shù)據(jù)庫中的數(shù)據(jù)后,AWS-DMS任務(wù)將獲取這些更改并將它們變更到原始的S3存儲(chǔ)桶中。HoodieDeltastreamer作業(yè)可以在EMR集群上以特定的頻率或連續(xù)模式運(yùn)行,以將這些更改應(yīng)用于Amazon S3數(shù)據(jù)湖中的Hudi數(shù)據(jù)集,然后可以使用SparkSQL、Presto、運(yùn)行在EMR集群上的Apache Hive和Amazon Athena等工具查詢這些數(shù)據(jù)。

3. 部署解決方案資源

使用AWS CloudFormation在AWS帳戶中部署這些組件,選擇一個(gè)AWS區(qū)域部署以下服務(wù):

·Amazon EMR

·AWS DMS

·Amazon S3

·Amazon RDS

·AWS Glue

·AWS Systems Manager

在部署CloudFormation模板之前需要先滿足如下條件:

·擁有一個(gè)至少有兩個(gè)公共子網(wǎng)的專有網(wǎng)絡(luò)(VPC)。

·有一個(gè)S3存儲(chǔ)桶來從EMR集群收集日志,需要在同一個(gè)AWS區(qū)域。

·具有AWS身份和訪問管理(IAM)角色DMS VPC角色dms-vpc-role。

·如果要使用AWS Lake Formation權(quán)限模型在帳戶中部署,請(qǐng)驗(yàn)證以下設(shè)置:

·用于部署技術(shù)棧的IAM用戶需要被添加為L(zhǎng)ake Formation下的data lake administrator,或者用于部署堆棧的IAM用戶具有在AWS Glue data Catalog中創(chuàng)建數(shù)據(jù)庫的IAM權(quán)限。

·Lake Formation下的數(shù)據(jù)目錄(Data Catalog)設(shè)置配置為僅對(duì)新數(shù)據(jù)庫和新數(shù)據(jù)庫中的新表使用IAM訪問控制,這將確保僅使用IAM權(quán)限控制對(duì)數(shù)據(jù)目錄(Data Catalog)中新創(chuàng)建的數(shù)據(jù)庫和表的所有訪問權(quán)限。

v2-2211279f9a70a9e4478603603eb98fab_720w.jpg

·IAMAllowedPrincipals在Lake Formation database creators頁面上被授予數(shù)據(jù)庫創(chuàng)建者權(quán)限。

v2-43a8473f443f7fed5a6afa5fe0769cac_720w.jpg

如果此權(quán)限不存在,請(qǐng)通過選擇授予并選擇授予創(chuàng)建數(shù)據(jù)庫權(quán)限。

v2-b6da0a57d438d63faf127fa410db6942_720w.jpg

這些設(shè)置是必需的,以便僅使用IAM控制對(duì)數(shù)據(jù)目錄對(duì)象的所有權(quán)限。

4. 啟動(dòng)CloudFormation

要啟動(dòng)CloudFormation棧,請(qǐng)完成以下步驟

·選擇啟動(dòng)CloudFormation棧

·在Parameters部分提供必需的參數(shù),包括一個(gè)用于存儲(chǔ)Amazon EMR日志的S3 Bucket和一個(gè)您想要訪問Amazon RDS for MySQL的CIDR IP范圍。

v2-7a15fdf555680660451f36adc16397d2_720w.jpg

·遵循CloudFormation創(chuàng)建向?qū)?,保持其余默認(rèn)值不變。

·在最后一個(gè)頁面上,選擇允許AWS CloudFormation可能會(huì)使用自定義名稱創(chuàng)建IAM資源。

·選擇創(chuàng)建。

·當(dāng)創(chuàng)建完成后,在CloudFormation堆棧的Outputs選項(xiàng)卡上記錄S3 Bucket、EMR集群和Amazon RDS for MySQL的詳細(xì)信息。

v2-b8eefd2b0e9c30c15d5576b84534e48c_720w.jpg

CloudFormation模板為EMR集群使用m5.xlarge和m5.2xlarge實(shí)例,如果這些實(shí)例類型在你選擇用于部署的區(qū)域或可用性區(qū)域中不可用,那么CloudFormation將會(huì)創(chuàng)建失敗。如果發(fā)生這種情況,請(qǐng)選擇實(shí)例類型可用的區(qū)域或子網(wǎng)。

CloudFormation還使用必要的連接屬性(如dataFormat、timestampColumnName和parquetTimestampInMillisecond)創(chuàng)建和配置AWS DMS端點(diǎn)和任務(wù)。

作為CloudFormation棧的一部分部署的數(shù)據(jù)庫實(shí)例已經(jīng)被創(chuàng)建,其中包含AWS-DMS在數(shù)據(jù)庫的CDC模式下工作所需的設(shè)置。

·binlog_format=ROW

·binlog_checksum=NONE

另外在RDS DB實(shí)例上啟用自動(dòng)備份,這是AWS-DMS進(jìn)行CDC所必需的屬性。

5. 運(yùn)行端到端數(shù)據(jù)流

CloudFormation部署好后就可以運(yùn)行數(shù)據(jù)流,將MySQL中的完整和增量數(shù)據(jù)放入數(shù)據(jù)湖中的Hudi數(shù)據(jù)集。

作為最佳實(shí)踐,請(qǐng)將binlog保留至少24小時(shí)。使用SQL客戶端登錄Amazon RDS for MySQL數(shù)據(jù)庫并運(yùn)行以下命令:

call mysql.rds_set_configuration('binlog retention hours', 24)

1.在dev數(shù)據(jù)庫中創(chuàng)建表:

create table dev.retail_transactions(

tran_id INT,

tran_date DATE,

store_id INT,

store_city varchar(50),

store_state char(2),

item_code varchar(50),

quantity INT,

total FLOAT);

1.創(chuàng)建表時(shí),將一些測(cè)試數(shù)據(jù)插入數(shù)據(jù)庫:

insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);

insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);

insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);

insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);

insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);

insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);

insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);

insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);

insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);

insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);

insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);

insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);

insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);

insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);

現(xiàn)在使用AWS DMS將這些數(shù)據(jù)推送到Amazon S3。

1.在AWS DMS控制臺(tái)上,運(yùn)行hudiblogload任務(wù)。

此任務(wù)將表的全量數(shù)據(jù)加載到Amazon S3,然后開始寫增量數(shù)據(jù)。

v2-67256b2cec4391f1193d64640d25becf_720w.jpg

如果第一次啟動(dòng)AWS-DMS任務(wù)時(shí)系統(tǒng)提示測(cè)試AWS-DMS端點(diǎn),那么可以先進(jìn)行測(cè)試,在第一次啟動(dòng)AWS-DMS任務(wù)之前測(cè)試源和目標(biāo)端點(diǎn)通常是一個(gè)好的實(shí)踐。

幾分鐘后,任務(wù)的狀態(tài)將變更為"加載完成"、"復(fù)制正在進(jìn)行",這意味著已完成全量加載,并且正在進(jìn)行的復(fù)制已開始,可以轉(zhuǎn)到由CloudFormation創(chuàng)建的S3 Bucket,應(yīng)該會(huì)在S3 Bucket的dmsdata/dev/retail_transactions文件夾下看到一個(gè).parquet文件。

v2-dc174cea40389f5b0eb08a6bc1b68741_720w.jpg

在EMR集群的Hardware選項(xiàng)卡上,選擇主實(shí)例組并記錄主實(shí)例的EC2實(shí)例ID。

在Systems Manager控制臺(tái)上,選擇會(huì)話管理器。

選擇"啟動(dòng)會(huì)話"以啟動(dòng)與群集主節(jié)點(diǎn)的會(huì)話。

通過運(yùn)行以下命令將用戶切換到Hadoop

sql sudo su hadoop

在實(shí)際用例中,AWS DMS任務(wù)在全量加載完成后開始向相同的Amazon S3位置寫入增量文件,區(qū)分全量加載和增量加載文件的方法是,完全加載文件的名稱以load開頭,而CDC文件名具有日期時(shí)間戳,如在后面步驟所示。從處理的角度來看,我們希望將全部負(fù)載處理到Hudi數(shù)據(jù)集中,然后開始增量數(shù)據(jù)處理。為此,我們將滿載文件移動(dòng)到同一S3存儲(chǔ)桶下的另一個(gè)S3文件夾中,并在開始處理增量文件之前處理這些文件。

1.在EMR集群的主節(jié)點(diǎn)上運(yùn)行以下命令(將<s3-bucket-name>替換為實(shí)際的bucket name):

sql aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/ --exclude "*" --include "LOAD*.parquet" --recursive

有了datafull文件夾中的全量表轉(zhuǎn)儲(chǔ),接著使用EMR集群上的HoodieDeltaStreamer實(shí)用程序來向Amazon S3上寫入Hudi數(shù)據(jù)集。

1.運(yùn)行以下命令將Hudi數(shù)據(jù)集填充到同一個(gè)S3 bucket中的Hudi文件夾中(將<S3 bucket name>替換為CloudFormation創(chuàng)建的S3 bucket的名稱):

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \

  --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \

  --master yarn --deploy-mode cluster \

--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \

--conf spark.sql.hive.convertMetastoreParquet=false \

/usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \

  --table-type COPY_ON_WRITE \

  --source-ordering-field dms_received_ts \

  --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \

  --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \

  --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \

  --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \

    --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \

--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \

  --enable-hive-sync

前面的命令運(yùn)行一個(gè)運(yùn)行HoodieDeltaStreamer實(shí)用程序的Spark作業(yè)。有關(guān)此命令中使用的參數(shù)的詳細(xì)信息,請(qǐng)參閱編寫Hudi表。

當(dāng)Spark作業(yè)完成后,可以導(dǎo)航到AWS Glue控制臺(tái),找到在hudiblogdb數(shù)據(jù)庫下創(chuàng)建的名為retail_transactions的表,表的input format是org.apache.hudi.hadoop.HoodieParquetInputFormat.

v2-d2f87e895d28f3a8263eedbdc8dcda7e_720w.jpg

立即登錄,閱讀全文
版權(quán)說明:
本文內(nèi)容來自于知乎,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個(gè)人觀點(diǎn),不代表快出海對(duì)觀點(diǎn)贊同或支持。如有侵權(quán),請(qǐng)聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務(wù)商推薦
更多
掃碼登錄
打開掃一掃, 關(guān)注公眾號(hào)后即可登錄/注冊(cè)
加載中
二維碼已失效 請(qǐng)重試
刷新
賬號(hào)登錄/注冊(cè)
小程序
快出海小程序
公眾號(hào)
快出海公眾號(hào)
商務(wù)合作
商務(wù)合作
投稿采訪
投稿采訪
出海管家
出海管家