使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)

时间:2021-07-15 | 标签: | 作者:Q8 | 来源:ApacheHudi网络

小提示:您能找到这篇{使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)}绝对不是偶然,我们能帮您找到潜在客户,解决您的困扰。如果您对本页介绍的使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)内容感兴趣,有相关需求意向欢迎拨打我们的服务热线,或留言咨询,我们将第一时间联系您!

< font-size: 16px;">< ">接下来查询数据并查看目录中retail_transactions表的数据。

< font-size: 16px;">1.在先前建立的Systems Manager会话中,运行以下命令(确保已完成post的所有先前条件,包括在Lake Formation中将IAMAllowedPrincipals添加为数据库创建者):

< font-size: 16px;">```shell spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 --jars /usr/lib/hudi/hudi-spark-bundle_2.11-0.5.2-incubating.jar,/usr/lib/spark/external/lib/spark-avro.jar

< font-size: 16px;">```

< font-size: 16px;">2.对retail_transactions表运行以下查询:

< font-size: 16px;">sql spark.sql("select * from hudiblogdb.retail_transactions order by tran_id").show()

< font-size: 16px;">接着可以在表中看到与MySQL数据库相同的数据,其中有几个列是由HoodieDeltaStreamer自动添加Hudi元数据。

< font-size: 16px;">现在在MySQL数据库上运行一些DML语句,并将这些更改传递到Hudi数据集。

< font-size: 16px;">1.在MySQL数据库上运行以下DML语句

insert into dev.retail_transactions values(15,'2019-03-16',7,'CHICAGO','IL','XXXXXX',17,126.25);

update dev.retail_transactions set store_city='SPRINGFIELD' where tran_id=12;

delete from dev.retail_transactions where tran_id=2;

< font-size: 16px;">几分钟后将看到在S3存储桶中的dmsdata/dev/retail_transactions文件夹下创建了一个新的.parquet文件。

< font-size: 16px;">1.在EMR集群上运行以下命令,将增量数据获取到Hudi数据集(将<s3 bucket name>替换为CloudFormation模板创建的s3 bucket的名称):

shell spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 --master yarn --deploy-mode cluster --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar --table-type COPY_ON_WRITE --source-ordering-field dms_received_ts --props s3://<s3-bucket-name>/properties/dfs-source-retail-transactions-incremental.properties --source-class org.apache.hudi.utilities.sources.ParquetDFSSource --target-base-path s3://<s3-bucket-name>/hudi/retail_transactions --target-table hudiblogdb.retail_transactions --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --payload-class org.apache.hudi.payload.AWSDmsAvroPayload --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --enable-hive-sync --checkpoint 0

< font-size: 16px;">此命令与上一个命令之间的关键区别在于属性文件,该文件包含–-props和--checkpoint参数,对于先前执行全量加载的命令,我们使用dfs-source-retail-transactions-full.properties进行全量加危机公关机构载、dfs-source-retail-transactions-incremental.properties进行增量加载,这两个属性文件之间的区别是:

< font-size: 16px;">源数据的位置在AmazonS3中的全量数据和增量数据之间发生变化。

< font-size: 16px;">SQL t各种危机公关的例子ransformer查询包含了一个全量任务的Op字段,因为AWS DMS首次全量加载不包括parquet数据集的Op字段,Op字段可有I、U和D值,表示插入、更新和删除。

< font-size: 16px;">本文后面的"部署到生产环境时的注意事项"部分讨论--checkpoint参数的详细信息。

< font-size: 16px;">1.作业完成后,在spark shell中运行相同的查询。

< font-size: 16px;">将会看到这些更新应用于Hudi数据集。

< font-size: 16px;">另外还可以使用Hudi Cli来管理Hudi数据集,以查看有关提交、文件系统、统计信息等的信息。

< font-size: 16px;">1.为此在Systems Manager会话中,运行以下命令

< font-size: 16px;">sql /usr/lib/hudi/cli/bin/hudi-cli.sh

< font-size: 16px;">2.在Hudi Cli中,运行以下命令(将<s3 bucket name>替换为CloudFormation模板创建的s3 bucket的名称):

< font-size: 16px;">sql connect --path s3://<s3-bucket-name>/hudi/retail_transactions

< font-size: 16px;">3.要检查Hudi数据集上的提交(commit),请运行以下命令



< font-size: 16px;">sql commits show



< font-size: 16px;">还可以从Hudi数据集查询增量数据,这对于希望将增量数据用于下游处理(如聚合)时非常有用,Hudi提供了多种增量提取数据的方法,Hudi快速入门指南中提供了如何使用此功能的示例。

< font-size: 16px;">6. 部署到生产环境时的注意事项

< font-size: 16px;">前面展示了一个如何从关系数据库到基于Amazon S3的数据湖构建CDC管道的示例,但如果要将此解决方案用于生产,则应考虑以下事项:

< font-size: 16px;">为了确保高可用性,可以在多AZ配置中设置AWS-DMS实例。

< font-size: 16px;">CloudFormation将deltastreamer实用程序所需的属性文件部署到S3://<s3bucket name>/properties/处的S3 bucket中,你可以根据需求定制修改,其中有几个参数需要注意

< font-size: 16px;">deltastreamer.transformer.sql – 此属性是Deltastreamer实用程序的一个非常强大的特性:它使您能够在数据被摄取并保存在Hudi数据集中时动态地转换数据,在本文中,我们展示了一个基本的转换,它将tran_date列强制转换为字符串,但是您可以将任何转换作为此查询的一部分应用。

< font-size: 16px;">parquet.small.file.limit– 此字段以字节为单位,是一个关键存储配置,指定Hudi如何处理Amazon S3上的小文件,由于每个分区的每个插入操作中要处理的记录数,可能会出现小文件,设置此值允许Hudi继续将特定分区中的插入视为对现有文件的更新,从而使文件的大小小于此值small.file.limit被重写。

< font-size: 16px;">parquet.max.file.size – 这是Hudi数据集中单个parquet文件的最大文件大小,之后将创建一个新文件来存储更多数据。对于Amazon S3的存储和数据查询需求,我们可以将其保持在256MB-1GB(256x1024x1024=268435456)。

< font-size: 16px;">[Insert|Upsert|bulkinsert].shuffle.parallelism。本篇文章中我们只处理了少量记录的小数据集。然而,在实际情况下可能希望在第一次加载时引入数亿条记录,然后增量CDC数据达百万,当希望对每个Hudi数据集分区中的文件数量进行非常可预测的控制时,需要设置一个非常重要的参数,这也需要确保在处理大量数据时,不会达到Apache Spark对数据shuffle的2GB限制。例如,如果计划在第一次加载时加载200 GB的数据,并希望文件大小保持在大约256 MB,则将此数据集的shuffle parallelism参数设置为800(2001024/256)。有关详细信息,请参阅调优指南。

< font-size: 16px;">在增量加载deltastreamer命令中,我们使用了一个附加参数:--checkpoint 0。当Deltastreamer写Hudi数据集时,它将检查点信息保存在.hoodie文件夹下的.commit文件中,它在随后的运行中使用这些信息,并且只从Amazon S3读取数据,后者是在这个检查点时间之后创建的,在生产场景中,在启动AWS-DMS任务之后,只要完成全量加载,该任务就会继续向目标S3文件夹写入增量数据。在接下来的步骤中,我们在EMR集群上运行了一个命令,将全量文件手动移动到另一个文件夹中,并从那里处理数据。当我们这样做时,与S3对象相关联的时间戳将更改为最新的时间戳,如果在没有checkpoint参数的情况下运行增量加载,deltastreamer在手动移动满载文件之前不会提取任何写入Amazon S3的增量数据,要确保Deltastreamer第一次处理所有增量数据,请将检查点设置为0,这将使它处理文件夹中的所有增量数据。但是,只对第一次增量加载使用此参数,并让Deltastreamer从该点开始使用自己的检查点方法。

< font-size: 16px;">对于本文,我们手动运行Spark submit命令,但是在生产集群中可以运行这一步骤。

< font-size: 16px;">可以使用调度或编排工具安排增量数据加载命令以固定间隔运行,也可以通过向spark submit命令传递附加参数--min-sync-interval-seconds *XX* –continuous,以特定的频率以连续方式运行它,其中XX是数据拉取每次运行之间的秒数。例如,如果要每5分钟运行一次处理,请将XX替换为300。

< font-size: 16px;">7. 清理

< font-size: 16px;">当完成对解决方案的探索后,请完成以下步骤以清理CloudFormation部署的资源

< font-size: 16px;">清空CloudFormation堆栈创建的S3 bucket

< font-size: 16px;">删除在s3://<EMR-Logs-s3-Bucket>/HudiBlogEMRLogs/下生成的任何Amazon EMR日志文件。

< font-size: 16px;">停止AWS DMS任务Hudiblogload。

< font-size: 16px;">删除CloudFormation。

< font-size: 16px;">删除CloudFormation模板后保留的所有Amazon RDS for MySQL数据库快照。

< font-size: 16px;">8. 结束

< font-size: 16px;">越来越多的数据湖构建在Amazon S3,当对数据湖的数据进行变更时,使用传统方法处理数据删除和更新涉及到许多繁重的工作,在这篇文章中,我们看到了如何在Amazon EMR上使用AWS DMS和HoodieDeltaStreamer轻松构建解决方案。我们还研究了在将数据集成到数据湖时如何执行轻量级的记录级转换,以及如何将这些数据用于聚合等下游流程。我们还讨论了使用的重要设置和命令行选项,以及如何修改它们以满足个性化的需求。

使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)

上一篇:在 Azure上购买域名并进行域名解析
下一篇:Tiktok运营 以下是使用计时器捕获视频的方法


版权声明:以上主题为“使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)"的内容可能是本站网友自行发布,或者来至于网络。如有侵权欢迎联系我们客服QQ处理,谢谢。
相关内容
推荐内容
扫码咨询
    使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)
    打开微信扫码或长按识别二维码

小提示:您应该对本页介绍的“使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)”相关内容感兴趣,若您有相关需求欢迎拨打我们的服务热线或留言咨询,我们尽快与您联系沟通使用Apache Hudi + Amazon S3 + AWS DMS构建数据湖(下)的相关事宜。

关键词:使用Apache,Hudi,+,Amazon,S3,+

关于 | 业务 | 案例 | 免责 | 隐私
客服邮箱:sales@1330.com.cn
电话:400-021-1330 | 客服QQ:865612759
沪ICP备12034177号 | 沪公网安备31010702002418号