互联网技术 / 互联网资讯 · 2024年3月13日

数仓数据同步方法与实现架构

数据仓库的核心特性之一是数据集成,即将来自不同来源、不同形式的未加工数据统一进入ODS层,通常包括日志数据与业务数据库数据。对于业务数据库数据(如存储在 MySQL 中的表),将数据采集并导入到数仓中(通常是 Hive 或其他计算引擎)是关键环节。

如何高效、准确地把业务 DB 数据同步到数仓?常见的方案有两类:直连同步与实时增量同步(数据库日志解析)。直连同步的思路是直接连接数据库进行 SELECT,将查询结果暂存为本地文件再加载到数仓。该方法简单易用,但随业务增长会遇到瓶颈,下文将详细分析。

为解决上述问题,通常采用实时增量的方式进行数据同步,核心原理是 CDC(Change Data Capture)+ Merge,即实时日志采集 + 离线日志还原数据的一体化方案。

常见数据同步方式

• 流式数据集成

• 数据同步方式:直连同步

直连同步是指通过规范接口 API 与动态库(如 ODBC/JDBC)直接连接业务库,统一标准接口由各数据库提供驱动,支持相同的函数调用与 SQL 实现。常用工具如 Sqoop 就采用此方式进行批量数据同步。

直连同步配置简单、易上手,适合操作型业务系统的数据同步,但存在以下问题:

数据同步时延随数据规模增长而增加,难以满足下游数仓的时效性要求;性能瓶颈明显,直接查询数据库对源库影响较大,若未部署主备方案可能影响业务服务;即便有主备,数据量较大时性能仍可能不足。

数仓 | 几种常见的数据同步方式

日志解析

所谓日志解析,是指解析数据库的变更日志,例如 MySQL 的 Binlog、Oracle 的归档日志等。通过读取日志信息,提取变化数据并将其解析到目标存储中实现实时同步。该方式的工作通常在操作系统层完成,不直接通过数据库查询,因此对源数据库的性能压力较小。

日志解析的同步可以达到实时或准实时,延迟通常在毫秒级别,最大的优势在于高性能与对源数据库影响小。实践中,实时增量同步常通过 CDC+Merge 的组合实现。但也存在挑战,如批量补数时可能产生大量数据更新,日志解析处理速度可能成为瓶颈;此外,需要一套实时抽取系统来抓取并解析日志,投入较大、实现较复杂。

数仓 | 几种常见的数据同步方式

在直连基础上增加流式同步的链路:通过流式计算引擎将 Binlog 采集到 Kafka,并通过 Kafka to Hive 的流程将数据导入到原始层,随后再经过 Merge 产生所需的 ODS 数据。

该数据集成方式的优势在于将数据传输时间压到 T+0,当天完成一次 Merge,显著节省时间与算力资源。

流式数据集成实现思路

数仓 | 几种常见的数据同步方式

1) 采用 Flink 从 Kafka 上拉取 Binlog 数据并写入 HDFS,形成增量表。

2) 对每张 ODS 表,先创建快照(Snapshot),将 MySQL 中的存量数据读取到 Hive。底层可通过直连 MySQL 的方式进行一次性全量导入,生成全量表。

3) 对每张 ODS 表,基于存量数据与当天增量的 Binlog 进行每日 Merge,还原出完整的业务数据。

4) Binlog 持续产出,通过对 Binlog 的实时采集,将部分数据处理需求从每日批处理平滑到实时流处理。无论从性能还是对 MySQL 的访问压力来看,都会有明显改善。Binlog 本身记录了数据变更类型(Insert、Update、Delete),经过语义化处理即可实现精准还原。

关于 Binlog 解析,可以使用 canal 等工具。将数据采集到 Kafka 之后,可用 Flink 解析并写回 HDFS,解析可通过 Flink DataStream API 或 Flink SQL 的 canal-json 数据源来实现,使用 Flink SQL 通常更简单。下面展示 canal-json 格式的 Kafka 数据源示例。

数据解析完成后,进入合并还原完整数据的阶段。常见做法之一是使用全外连接(FULL OUTER JOIN)进行合并。示例逻辑包括:当天的增量数据与昨天的全量数据进行全外连接,生成当天的目标表分区数据。

INSERT OVERWRITE TABLE User_oRdeR PARTITION(ds=’20211012′) SELECT CASE WHEN n.id IS NULL THEN o.id ELSE n.id END ,CASE WHEN n.id IS NULL THEN o.cReate_tiMe ELSE n.cReate_tiMe END ,CASE WHEN n.id IS NULL THEN o.Modified_tiMe ELSE n.Modified_tiMe END ,CASE WHEN n.id IS NULL THEN o.User_id ELSE n.User_id END ,CASE WHEN n.id IS NULL THEN o.sku_code ELSE n.sku_code END ,CASE WHEN n.id IS NULL THEN o.pay_fee ELSE n.pay_fee END FROM ( SELECT * FROM User_oRdeR_delta WHERE ds = ‘20211012’ AND id IS NOT NULL AND User_id IS NOT NULL ) n FULL OUTER JOIN ( SELECT * FROM User_oRdeR WHERE ds = ‘20211011’ AND id IS NOT NULL AND User_id IS NOT NULL ) o ON o.id = n.id AND o.User_id = n.User_id ;

通过上述步骤,即可实现数据的完整还原。

总结

本文概述了数据仓库在构建 ODS 层时常见的数据同步方式,并给出相应的示意与说明。随后介绍了 CDC+Merge 的数据同步方案。需要注意的是,Flink 1.11 引入了 CDC 的连接器,支持 MySQL CDC、PostgreSQL CDC,并对 Kafka 的 Connectors 提供 canal-json、debezium-json 与 changelog-json 等格式的支持,大幅简化数据变更捕获与处理过程,降低数据同步的复杂度。