《数据“科学家”必读》 | 与Cosmos DB实现数据同步

作为一款致力于成为数字化企业「最强大脑」的服务,Azure Synapse Analysis高效高弹性的架构设计、简单易用的操作、强大的功能和澎湃的数据处理和分析能力,能够帮助我们解决与数据准备、数据管理、数据仓库、大数据和AI等方面有关的很多挑战。

我们将通过《数据“科学家”必读》系列文章带领大家全面体验Azure Synapse Analysis。本系列共分为六期内容,本篇是其中的第二期:

  1. 第一次亲密接触:开箱初体验,概括了解Azure Synapse Analysis的功能与价值

  2. 围绕Cosmos DB自行DIY的Azure Synapse Analysis解决方案

  3. Azure Synapse Analysis与Azure Function服务的配合使用;

  4. 通过增量数据CDC对Azure Synapse Analysis中的数据进行更新;

  5. 借助Azure Data Factory工具实现数据处理水线的自动化操作;

  6. 借助Synapse Link的一键同步省略ETL过程,实现最新数据的直接访问。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

在上一期内容中,我们已经从整体角度,概括介绍了Azure Synapse Analysis的功能和价值,尤其是该服务中包含的Synapse Link功能。同时在上期末尾,我们提出了一种适合数据科学家的解决方案架构。从本期开始,我们将深入介绍这个架构的一些技术细节。本篇,重点将会介绍其中的Cosmos DB Change Feed。

首先,回顾一下这个架构:

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

再联想一下大家平时接触的各类OLTP系统,当我们需要让此类系统中的数据向下游数据服务进行同步时,通常会有两种做法,全量同步增量同步

如果将Cosmos DB作为OLTP数据源,其内置的ChangeFeed功能可以帮助用户捕捉数据库中增量变化的数据,从而提升同步效率。ChangeFeed通过将增量数据通过SDK暴露访问节点,用户可以通过SDK获取增量变化数据,同时其支持断点(CheckPoint),这也使得用户可以*选择获取增量数据的起始位置。

Cosmos DB数据库支持多种数据库引擎API接口,如SQL、Mongo、Cassandra等。过去一年间,Cosmos DB对ChangeFeed的支持范围进行了更新,从最开始只支持SQL API和.NET SDK,发现到现在已经可以支持多种数据库引擎API及多种开发语言,这也为开发人员提供了一致的体验,使产品获得更好的兼容性。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

为了帮助大家快速入手,本文我们选择了SQL API + Python的方式为大家进行演示。

在整个演示过程中,我们先来设计一下原始数据表格的Schema,为此创建了一个结构如下所示的商品价目表,其中包含商品ID、QUANTITY数量、PRICE价格信息。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

下面我们开始准备Cosmos DB环境。大致上,需要分别执行如下操作:

1、创建Cosmos DB,可参考这里,注意在创建时需要选择API为Core(SQL)。

2、开启Synapse Link后续使用,详情可参考这里

3、记录Cosmos DB访问**,详情可参考这里。请按照文档获取Master Key并替换后续代码中Config.py中的master_key参数。

4、准备Config.py配置文件。其中Host在Azure门户中选中你所创建的Cosmos DB,在Overview页面URI中获取。Master_key替换为步骤3记录的**,database_id和container_id可自行设定,例如可命名为“demo”。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

5、创建Database和Container:

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

6、生成仿真演示数据,演示中插入了10条演示数据:

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

7、通过ChangeFeed SDK读取增量变更数据,在query_items_change_feed函数中带入了参数is_start_from_beginning表示从头开始读取增量变化数据。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

8、通过断点读取ChangeFeed增量变更数据,通过获取签署ChangeFeed返回中的etag来标记下一次读取的起始位置,演示中通过再次插入两条记录来出发变更。

《数据“科学家”必读》 | 与Cosmos DB实现数据同步

至此,我们通过SDK获取增量变化数据的基本方式已经走通了。

回到开篇的架构中,我们还需要将增量数据持续的更新到下游OLAP数据仓库中。一种做法是,将上述代码逻辑进一步完善将其跑在虚拟机中持续执行;另一种方法可直接通过Azure Functions来托管上述抽取增量变化数据逻辑。

下期文章将会介绍如何通过Azure Functions服务实现这一目标。借助Azure Functions服务,我们可以实现代码运行环境的托管,按需执行(通过定期轮询的方式),以及通过内置连接器简化代码开发。

最后需要注意,上述演示中只是简单介绍了Cosmos ChangeFeed功能,大家可以参考下述链接进一步了解该功能。此外需要注意,Cosmos DB ChangeFeed目前还不支持捕获Delete条目,目前只能通过一些比较间接的方式来实现,这里不做赘述,感兴趣的小伙伴可访问下列ChangeFeed介绍链接。

不过对Delete的支持目前已经在支持计划中,详见这里

其他资料参考: