接下來查詢數(shù)據(jù)并查看目錄中retail_transactions表的數(shù)據(jù)。
1.在先前建立的Systems Manager會話中,運(yùn)行以下命令(確保已完成post的所有先前條件,包括在Lake Formation中將IAMAllowedPrincipals添加為數(shù)據(jù)庫創(chuàng)建者):
```shell spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar
```
2.對retail_transactions表運(yùn)行以下查詢:
sql spark.sql("select * from hudiblogdb.retail_transactions order by tran_id").show()
接著可以在表中看到與MySQL數(shù)據(jù)庫相同的數(shù)據(jù),其中有幾個列是由HoodieDeltaStreamer自動添加Hudi元數(shù)據(jù)。
現(xiàn)在在MySQL數(shù)據(jù)庫上運(yùn)行一些DML語句,并將這些更改傳遞到Hudi數(shù)據(jù)集。
1.在MySQL數(shù)據(jù)庫上運(yùn)行以下DML語句
insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);
update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;
delete from dev.retail_transactions where tran_id=2;
幾分鐘后將看到在S3存儲桶中的dmsdata/dev/retail_transactions文件夾下創(chuàng)建了一個新的.parquet文件。
1.在EMR集群上運(yùn)行以下命令,將增量數(shù)據(jù)獲取到Hudi數(shù)據(jù)集(將<s3 bucket name>替換為CloudFormation模板創(chuàng)建的s3 bucket的名稱):
shell spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \ --master yarn --deploy-mode cluster \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.hive.convertMetastoreParquet=false \ /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ --table-type COPY_ON_WRITE \ --source-ordering-field dms_received_ts \ --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions \ --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ --payload-class org.apache.hudi.payload.AWSDmsAvroPayload \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --enable-hive-sync \ --checkpoint 0
此命令與上一個命令之間的關(guān)鍵區(qū)別在于屬性文件,該文件包含–-props和--checkpoint參數(shù),對于先前執(zhí)行全量加載的命令,我們使用dfs-source-retail-transactions-full.properties進(jìn)行全量加載、dfs-source-retail-transactions-incremental.properties進(jìn)行增量加載,這兩個屬性文件之間的區(qū)別是:
·源數(shù)據(jù)的位置在AmazonS3中的全量數(shù)據(jù)和增量數(shù)據(jù)之間發(fā)生變化。
·SQL transformer查詢包含了一個全量任務(wù)的Op字段,因?yàn)锳WS DMS首次全量加載不包括parquet數(shù)據(jù)集的Op字段,Op字段可有I、U和D值,表示插入、更新和刪除。
本文后面的"部署到生產(chǎn)環(huán)境時的注意事項(xiàng)"部分討論--checkpoint參數(shù)的詳細(xì)信息。
1.作業(yè)完成后,在spark shell中運(yùn)行相同的查詢。
將會看到這些更新應(yīng)用于Hudi數(shù)據(jù)集。
另外還可以使用Hudi Cli來管理Hudi數(shù)據(jù)集,以查看有關(guān)提交、文件系統(tǒng)、統(tǒng)計(jì)信息等的信息。
1.為此在Systems Manager會話中,運(yùn)行以下命令
sql /usr/lib/hudi/cli/bin/hudi-cli.sh
2.在Hudi Cli中,運(yùn)行以下命令(將<s3 bucket name>替換為CloudFormation模板創(chuàng)建的s3 bucket的名稱):
sql connect --path s3://<s3-bucket-name>/hudi/retail_transactions
3.要檢查Hudi數(shù)據(jù)集上的提交(commit),請運(yùn)行以下命令
sql commits show
還可以從Hudi數(shù)據(jù)集查詢增量數(shù)據(jù),這對于希望將增量數(shù)據(jù)用于下游處理(如聚合)時非常有用,Hudi提供了多種增量提取數(shù)據(jù)的方法,Hudi快速入門指南中提供了如何使用此功能的示例。
6. 部署到生產(chǎn)環(huán)境時的注意事項(xiàng)
前面展示了一個如何從關(guān)系數(shù)據(jù)庫到基于Amazon S3的數(shù)據(jù)湖構(gòu)建CDC管道的示例,但如果要將此解決方案用于生產(chǎn),則應(yīng)考慮以下事項(xiàng):
·為了確保高可用性,可以在多AZ配置中設(shè)置AWS-DMS實(shí)例。
·CloudFormation將deltastreamer實(shí)用程序所需的屬性文件部署到S3://<s3bucket name>/properties/處的S3 bucket中,你可以根據(jù)需求定制修改,其中有幾個參數(shù)需要注意
·deltastreamer.transformer.sql – 此屬性是Deltastreamer實(shí)用程序的一個非常強(qiáng)大的特性:它使您能夠在數(shù)據(jù)被攝取并保存在Hudi數(shù)據(jù)集中時動態(tài)地轉(zhuǎn)換數(shù)據(jù),在本文中,我們展示了一個基本的轉(zhuǎn)換,它將tran_date列強(qiáng)制轉(zhuǎn)換為字符串,但是您可以將任何轉(zhuǎn)換作為此查詢的一部分應(yīng)用。
·parquet.small.file.limit– 此字段以字節(jié)為單位,是一個關(guān)鍵存儲配置,指定Hudi如何處理Amazon S3上的小文件,由于每個分區(qū)的每個插入操作中要處理的記錄數(shù),可能會出現(xiàn)小文件,設(shè)置此值允許Hudi繼續(xù)將特定分區(qū)中的插入視為對現(xiàn)有文件的更新,從而使文件的大小小于此值small.file.limit被重寫。
·parquet.max.file.size – 這是Hudi數(shù)據(jù)集中單個parquet文件的最大文件大小,之后將創(chuàng)建一個新文件來存儲更多數(shù)據(jù)。對于Amazon S3的存儲和數(shù)據(jù)查詢需求,我們可以將其保持在256MB-1GB(256x1024x1024=268435456)。
·[Insert|Upsert|bulkinsert].shuffle.parallelism。本篇文章中我們只處理了少量記錄的小數(shù)據(jù)集。然而,在實(shí)際情況下可能希望在第一次加載時引入數(shù)億條記錄,然后增量CDC數(shù)據(jù)達(dá)百萬,當(dāng)希望對每個Hudi數(shù)據(jù)集分區(qū)中的文件數(shù)量進(jìn)行非??深A(yù)測的控制時,需要設(shè)置一個非常重要的參數(shù),這也需要確保在處理大量數(shù)據(jù)時,不會達(dá)到Apache Spark對數(shù)據(jù)shuffle的2GB限制。例如,如果計(jì)劃在第一次加載時加載200 GB的數(shù)據(jù),并希望文件大小保持在大約256 MB,則將此數(shù)據(jù)集的shuffle parallelism參數(shù)設(shè)置為800(200×1024/256)。有關(guān)詳細(xì)信息,請參閱調(diào)優(yōu)指南。
·在增量加載deltastreamer命令中,我們使用了一個附加參數(shù):--checkpoint 0。當(dāng)Deltastreamer寫Hudi數(shù)據(jù)集時,它將檢查點(diǎn)信息保存在.hoodie文件夾下的.commit文件中,它在隨后的運(yùn)行中使用這些信息,并且只從Amazon S3讀取數(shù)據(jù),后者是在這個檢查點(diǎn)時間之后創(chuàng)建的,在生產(chǎn)場景中,在啟動AWS-DMS任務(wù)之后,只要完成全量加載,該任務(wù)就會繼續(xù)向目標(biāo)S3文件夾寫入增量數(shù)據(jù)。在接下來的步驟中,我們在EMR集群上運(yùn)行了一個命令,將全量文件手動移動到另一個文件夾中,并從那里處理數(shù)據(jù)。當(dāng)我們這樣做時,與S3對象相關(guān)聯(lián)的時間戳將更改為最新的時間戳,如果在沒有checkpoint參數(shù)的情況下運(yùn)行增量加載,deltastreamer在手動移動滿載文件之前不會提取任何寫入Amazon S3的增量數(shù)據(jù),要確保Deltastreamer第一次處理所有增量數(shù)據(jù),請將檢查點(diǎn)設(shè)置為0,這將使它處理文件夾中的所有增量數(shù)據(jù)。但是,只對第一次增量加載使用此參數(shù),并讓Deltastreamer從該點(diǎn)開始使用自己的檢查點(diǎn)方法。
·對于本文,我們手動運(yùn)行Spark submit命令,但是在生產(chǎn)集群中可以運(yùn)行這一步驟。
·可以使用調(diào)度或編排工具安排增量數(shù)據(jù)加載命令以固定間隔運(yùn)行,也可以通過向spark submit命令傳遞附加參數(shù)--min-sync-interval-seconds *XX* –continuous,以特定的頻率以連續(xù)方式運(yùn)行它,其中XX是數(shù)據(jù)拉取每次運(yùn)行之間的秒數(shù)。例如,如果要每5分鐘運(yùn)行一次處理,請將XX替換為300。
7. 清理
當(dāng)完成對解決方案的探索后,請完成以下步驟以清理CloudFormation部署的資源
·清空CloudFormation堆棧創(chuàng)建的S3 bucket
·刪除在s3://<EMR-Logs-s3-Bucket>/HudiBlogEMRLogs/下生成的任何Amazon EMR日志文件。
·停止AWS DMS任務(wù)Hudiblogload。
·刪除CloudFormation。
·刪除CloudFormation模板后保留的所有Amazon RDS for MySQL數(shù)據(jù)庫快照。
8. 結(jié)束
越來越多的數(shù)據(jù)湖構(gòu)建在Amazon S3,當(dāng)對數(shù)據(jù)湖的數(shù)據(jù)進(jìn)行變更時,使用傳統(tǒng)方法處理數(shù)據(jù)刪除和更新涉及到許多繁重的工作,在這篇文章中,我們看到了如何在Amazon EMR上使用AWS DMS和HoodieDeltaStreamer輕松構(gòu)建解決方案。我們還研究了在將數(shù)據(jù)集成到數(shù)據(jù)湖時如何執(zhí)行輕量級的記錄級轉(zhuǎn)換,以及如何將這些數(shù)據(jù)用于聚合等下游流程。我們還討論了使用的重要設(shè)置和命令行選項(xiàng),以及如何修改它們以滿足個性化的需求。