1. 引入
數(shù)據(jù)湖使組織能夠在更短的時(shí)間內(nèi)利用多個(gè)源的數(shù)據(jù),而不同角色用戶可以以不同的方式協(xié)作和分析數(shù)據(jù),從而實(shí)現(xiàn)更好、更快的決策。Amazon Simple Storage Service(amazon S3)是針對(duì)結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)的高性能對(duì)象存儲(chǔ)服務(wù),可以用來作為數(shù)據(jù)湖底層的存儲(chǔ)服務(wù)。
然而許多用例,如從上游關(guān)系數(shù)據(jù)庫執(zhí)行變更數(shù)據(jù)捕獲(CDC)到基于Amazon S3的數(shù)據(jù)湖,都需要在記錄級(jí)別處理數(shù)據(jù),執(zhí)行諸如從數(shù)據(jù)集中插入、更新和刪除單條記錄的操作需要處理引擎讀取所有對(duì)象(文件),進(jìn)行更改,并將整個(gè)數(shù)據(jù)集重寫為新文件。此外為使數(shù)據(jù)湖中的數(shù)據(jù)以近乎實(shí)時(shí)的方式被訪問,通常會(huì)導(dǎo)致數(shù)據(jù)被分割成許多小文件,從而導(dǎo)致查詢性能較差。Apache Hudi是一個(gè)開源的數(shù)據(jù)管理框架,它使您能夠在Amazon S3 數(shù)據(jù)湖中以記錄級(jí)別管理數(shù)據(jù),從而簡(jiǎn)化了CDC管道的構(gòu)建,并使流數(shù)據(jù)攝取變得高效,Hudi管理的數(shù)據(jù)集使用開放存儲(chǔ)格式存儲(chǔ)在Amazon S3中,通過與Presto、Apache Hive、Apache Spark和AWS Glue數(shù)據(jù)目錄的集成,您可以使用熟悉的工具近乎實(shí)時(shí)地訪問更新的數(shù)據(jù)。Amazon EMR已經(jīng)內(nèi)置Hudi,在部署EMR集群時(shí)選擇Spark、Hive或Presto時(shí)自動(dòng)安裝Hudi。
本篇文章將展示如何構(gòu)建一個(gè)CDC管道,該管道使用AWS數(shù)據(jù)庫遷移服務(wù)(AWS DMS)從Amazon關(guān)系數(shù)據(jù)庫服務(wù)(Amazon RDS)for MySQL數(shù)據(jù)庫中捕獲數(shù)據(jù),并將這些更改應(yīng)用到Amazon S3中的一個(gè)數(shù)據(jù)集上。Apache Hudi包含了HoodieDeltaStreamer實(shí)用程序,它提供了一種從許多源(如分布式文件系統(tǒng)DFS或Kafka)攝取數(shù)據(jù)的簡(jiǎn)單方法,它可以自己管理檢查點(diǎn)、回滾和恢復(fù),因此不需要跟蹤從源讀取和處理了哪些數(shù)據(jù),這使得使用更改數(shù)據(jù)變得很容易,同時(shí)還可以在接收數(shù)據(jù)時(shí)對(duì)數(shù)據(jù)進(jìn)行基于SQL的輕量級(jí)轉(zhuǎn)換,有關(guān)詳細(xì)信息,請(qǐng)參見寫Hudi表。ApacheHudi版本0.5.2提供了對(duì)帶有HoodieDeltaStreamer的AWS DMS支持,并在Amazon EMR 5.30.x和6.1.0上可用。
2. 架構(gòu)
下圖展示了構(gòu)建CDC管道而部署的體系結(jié)構(gòu)。
在該架構(gòu)中,我們?cè)贏mazon RDS上有一個(gè)MySQL實(shí)例,AWS-DMS將完整的增量數(shù)據(jù)(使用AWS-DMS的CDC特性)以Parquet格式存入S3中,EMR集群上的HoodieDeltaStreamer用于處理全量和增量數(shù)據(jù),以創(chuàng)建Hudi數(shù)據(jù)集,當(dāng)更新MySQL數(shù)據(jù)庫中的數(shù)據(jù)后,AWS-DMS任務(wù)將獲取這些更改并將它們變更到原始的S3存儲(chǔ)桶中。HoodieDeltastreamer作業(yè)可以在EMR集群上以特定的頻率或連續(xù)模式運(yùn)行,以將這些更改應(yīng)用于Amazon S3數(shù)據(jù)湖中的Hudi數(shù)據(jù)集,然后可以使用SparkSQL、Presto、運(yùn)行在EMR集群上的Apache Hive和Amazon Athena等工具查詢這些數(shù)據(jù)。
3. 部署解決方案資源
使用AWS CloudFormation在AWS帳戶中部署這些組件,選擇一個(gè)AWS區(qū)域部署以下服務(wù):
·Amazon EMR
·AWS DMS
·Amazon S3
·Amazon RDS
·AWS Glue
·AWS Systems Manager
在部署CloudFormation模板之前需要先滿足如下條件:
·擁有一個(gè)至少有兩個(gè)公共子網(wǎng)的專有網(wǎng)絡(luò)(VPC)。
·有一個(gè)S3存儲(chǔ)桶來從EMR集群收集日志,需要在同一個(gè)AWS區(qū)域。
·具有AWS身份和訪問管理(IAM)角色DMS VPC角色dms-vpc-role。
·如果要使用AWS Lake Formation權(quán)限模型在帳戶中部署,請(qǐng)驗(yàn)證以下設(shè)置:
·用于部署技術(shù)棧的IAM用戶需要被添加為L(zhǎng)ake Formation下的data lake administrator,或者用于部署堆棧的IAM用戶具有在AWS Glue data Catalog中創(chuàng)建數(shù)據(jù)庫的IAM權(quán)限。
·Lake Formation下的數(shù)據(jù)目錄(Data Catalog)設(shè)置配置為僅對(duì)新數(shù)據(jù)庫和新數(shù)據(jù)庫中的新表使用IAM訪問控制,這將確保僅使用IAM權(quán)限控制對(duì)數(shù)據(jù)目錄(Data Catalog)中新創(chuàng)建的數(shù)據(jù)庫和表的所有訪問權(quán)限。
·IAMAllowedPrincipals在Lake Formation database creators頁面上被授予數(shù)據(jù)庫創(chuàng)建者權(quán)限。
如果此權(quán)限不存在,請(qǐng)通過選擇授予并選擇授予創(chuàng)建數(shù)據(jù)庫權(quán)限。
這些設(shè)置是必需的,以便僅使用IAM控制對(duì)數(shù)據(jù)目錄對(duì)象的所有權(quán)限。
4. 啟動(dòng)CloudFormation
要啟動(dòng)CloudFormation棧,請(qǐng)完成以下步驟
·選擇啟動(dòng)CloudFormation棧
·在Parameters部分提供必需的參數(shù),包括一個(gè)用于存儲(chǔ)Amazon EMR日志的S3 Bucket和一個(gè)您想要訪問Amazon RDS for MySQL的CIDR IP范圍。
·遵循CloudFormation創(chuàng)建向?qū)?,保持其余默認(rèn)值不變。
·在最后一個(gè)頁面上,選擇允許AWS CloudFormation可能會(huì)使用自定義名稱創(chuàng)建IAM資源。
·選擇創(chuàng)建。
·當(dāng)創(chuàng)建完成后,在CloudFormation堆棧的Outputs選項(xiàng)卡上記錄S3 Bucket、EMR集群和Amazon RDS for MySQL的詳細(xì)信息。
CloudFormation模板為EMR集群使用m5.xlarge和m5.2xlarge實(shí)例,如果這些實(shí)例類型在你選擇用于部署的區(qū)域或可用性區(qū)域中不可用,那么CloudFormation將會(huì)創(chuàng)建失敗。如果發(fā)生這種情況,請(qǐng)選擇實(shí)例類型可用的區(qū)域或子網(wǎng)。
CloudFormation還使用必要的連接屬性(如dataFormat、timestampColumnName和parquetTimestampInMillisecond)創(chuàng)建和配置AWS DMS端點(diǎn)和任務(wù)。
作為CloudFormation棧的一部分部署的數(shù)據(jù)庫實(shí)例已經(jīng)被創(chuàng)建,其中包含AWS-DMS在數(shù)據(jù)庫的CDC模式下工作所需的設(shè)置。
·binlog_format=ROW
·binlog_checksum=NONE
另外在RDS DB實(shí)例上啟用自動(dòng)備份,這是AWS-DMS進(jìn)行CDC所必需的屬性。
5. 運(yùn)行端到端數(shù)據(jù)流
CloudFormation部署好后就可以運(yùn)行數(shù)據(jù)流,將MySQL中的完整和增量數(shù)據(jù)放入數(shù)據(jù)湖中的Hudi數(shù)據(jù)集。
作為最佳實(shí)踐,請(qǐng)將binlog保留至少24小時(shí)。使用SQL客戶端登錄Amazon RDS for MySQL數(shù)據(jù)庫并運(yùn)行以下命令:
call mysql.rds_set_configuration('binlog retention hours', 24)
1.在dev數(shù)據(jù)庫中創(chuàng)建表:
create table dev.retail_transactions(
tran_id INT,
tran_date DATE,
store_id INT,
store_city varchar(50),
store_state char(2),
item_code varchar(50),
quantity INT,
total FLOAT);
1.創(chuàng)建表時(shí),將一些測(cè)試數(shù)據(jù)插入數(shù)據(jù)庫:
insert into dev.retail_transactions values(1,'2019-03-17',1,'CHICAGO','IL','XXXXXX',5,106.25);
insert into dev.retail_transactions values(2,'2019-03-16',2,'NEW YORK','NY','XXXXXX',6,116.25);
insert into dev.retail_transactions values(3,'2019-03-15',3,'SPRINGFIELD','IL','XXXXXX',7,126.25);
insert into dev.retail_transactions values(4,'2019-03-17',4,'SAN FRANCISCO','CA','XXXXXX',8,136.25);
insert into dev.retail_transactions values(5,'2019-03-11',1,'CHICAGO','IL','XXXXXX',9,146.25);
insert into dev.retail_transactions values(6,'2019-03-18',1,'CHICAGO','IL','XXXXXX',10,156.25);
insert into dev.retail_transactions values(7,'2019-03-14',2,'NEW YORK','NY','XXXXXX',11,166.25);
insert into dev.retail_transactions values(8,'2019-03-11',1,'CHICAGO','IL','XXXXXX',12,176.25);
insert into dev.retail_transactions values(9,'2019-03-10',4,'SAN FRANCISCO','CA','XXXXXX',13,186.25);
insert into dev.retail_transactions values(10,'2019-03-13',1,'CHICAGO','IL','XXXXXX',14,196.25);
insert into dev.retail_transactions values(11,'2019-03-14',5,'CHICAGO','IL','XXXXXX',15,106.25);
insert into dev.retail_transactions values(12,'2019-03-15',6,'CHICAGO','IL','XXXXXX',16,116.25);
insert into dev.retail_transactions values(13,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
insert into dev.retail_transactions values(14,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
現(xiàn)在使用AWS DMS將這些數(shù)據(jù)推送到Amazon S3。
1.在AWS DMS控制臺(tái)上,運(yùn)行hudiblogload任務(wù)。
此任務(wù)將表的全量數(shù)據(jù)加載到Amazon S3,然后開始寫增量數(shù)據(jù)。
如果第一次啟動(dòng)AWS-DMS任務(wù)時(shí)系統(tǒng)提示測(cè)試AWS-DMS端點(diǎn),那么可以先進(jìn)行測(cè)試,在第一次啟動(dòng)AWS-DMS任務(wù)之前測(cè)試源和目標(biāo)端點(diǎn)通常是一個(gè)好的實(shí)踐。
幾分鐘后,任務(wù)的狀態(tài)將變更為"加載完成"、"復(fù)制正在進(jìn)行",這意味著已完成全量加載,并且正在進(jìn)行的復(fù)制已開始,可以轉(zhuǎn)到由CloudFormation創(chuàng)建的S3 Bucket,應(yīng)該會(huì)在S3 Bucket的dmsdata/dev/retail_transactions文件夾下看到一個(gè).parquet文件。
在EMR集群的Hardware選項(xiàng)卡上,選擇主實(shí)例組并記錄主實(shí)例的EC2實(shí)例ID。
在Systems Manager控制臺(tái)上,選擇會(huì)話管理器。
選擇"啟動(dòng)會(huì)話"以啟動(dòng)與群集主節(jié)點(diǎn)的會(huì)話。
通過運(yùn)行以下命令將用戶切換到Hadoop
sql sudo su hadoop
在實(shí)際用例中,AWS DMS任務(wù)在全量加載完成后開始向相同的Amazon S3位置寫入增量文件,區(qū)分全量加載和增量加載文件的方法是,完全加載文件的名稱以load開頭,而CDC文件名具有日期時(shí)間戳,如在后面步驟所示。從處理的角度來看,我們希望將全部負(fù)載處理到Hudi數(shù)據(jù)集中,然后開始增量數(shù)據(jù)處理。為此,我們將滿載文件移動(dòng)到同一S3存儲(chǔ)桶下的另一個(gè)S3文件夾中,并在開始處理增量文件之前處理這些文件。
1.在EMR集群的主節(jié)點(diǎn)上運(yùn)行以下命令(將<s3-bucket-name>替換為實(shí)際的bucket name):
sql aws s3 mv s3://<s3-bucket-name>/dmsdata/dev/retail_transactions/ s3://<s3-bucket-name>/dmsdata/data-full/dev/retail_transactions/ --exclude "*" --include "LOAD*.parquet" --recursive
有了datafull文件夾中的全量表轉(zhuǎn)儲(chǔ),接著使用EMR集群上的HoodieDeltaStreamer實(shí)用程序來向Amazon S3上寫入Hudi數(shù)據(jù)集。
1.運(yùn)行以下命令將Hudi數(shù)據(jù)集填充到同一個(gè)S3 bucket中的Hudi文件夾中(將<S3 bucket name>替換為CloudFormation創(chuàng)建的S3 bucket的名稱):
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--master yarn --deploy-mode cluster \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.hive.convertMetastoreParquet=false \
/usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field dms_received_ts \
--props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-full.properties \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--payload-class org.apache.hudi.payload.AWSDmsAvroPayload \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--enable-hive-sync
前面的命令運(yùn)行一個(gè)運(yùn)行HoodieDeltaStreamer實(shí)用程序的Spark作業(yè)。有關(guān)此命令中使用的參數(shù)的詳細(xì)信息,請(qǐng)參閱編寫Hudi表。
當(dāng)Spark作業(yè)完成后,可以導(dǎo)航到AWS Glue控制臺(tái),找到在hudiblogdb數(shù)據(jù)庫下創(chuàng)建的名為retail_transactions的表,表的input format是org.apache.hudi.hadoop.HoodieParquetInputFormat.