[浮云]活动推荐:DataFun五周年直播
[礼物]直播亮点:发布业界首个数据智能知识地图
[中国赞]观看方式:重磅!业界首个数据智能知识地图发布
导读:Databricks 是一家由 Apache Spark 创始者创立的软件公司,其主要的产品是一个基于公有云的 Data+AI 平台,目前服务了超过 7000 个客户。Databricks 于近几年提出了 Data Lakeshouse 的架构试图将数据仓库和数据湖整合在一个统一的平台上以处理所有数据,分析和 AI 用例。
我们今天分享的主要内容分为以下三个方面:
- Lakehouse 的定义以及为什么我们需要 Lakehouse
- Databricks 在构建 Lakehouse 上的实践经验
- Databricks 正在研发项目的最新动态
分享嘉宾|范文臣 Databricks 技术主管
编辑整理|王宇翔
出品社区|DataFun
01
Lakehouse systems: what are they and why now?
首先在开始前请大家思考一个问题,对于数据平台来讲用户最关心的事情是什么?大家可能会觉得是性能或者是各种丰富的功能,这些其实都是次要的。
我们发现,用户最关心的问题是数据本身的可达性,也就是是否可以通过各种工具例如 BI 客户端访问数据。
第二个是数据的可靠性,客户所需的数据是不是可靠的,会不会产生错误。
第三个是数据的时效性,客户获得的数据是不是最新的,如果我们可以取得的数据有一个礼拜甚至上月的延迟的话,对于数据分析决策就失去了价值。所以对于数据平台我们的核心是提供高质量高时效性的数据,否则其他任何的东西都是无根之水。
以上结论并不是 Databricks 的独立发现,业界对此也有很多共识。比如 Fivetran 针对数据分析场景的用户做了一个问卷发现 60% 的人都报告说数据质量是他们最头疼的问题,是他们最大的挑战;并且有 86% 以上的分析师表示他们只能使用一些过时的数据,甚至有 41% 的受访者使用的数据超过两个月未更新;最后有 90% 的受访者认为他们的数据源不够可靠,经常会出现数据不可用,数据错误的问题。所以数据质量对于数据平台来说是非常重要的问题。接下来我们就来讨论一下如何通过改造数据平台架构来产生高质量高时效性的数据。
我们先来回顾历史。从 1980 年代开始数据仓库的产品随着关系模型的成熟应运而生。数仓是非常成功的产品,它会将一家公司中不同业务系统中的数据通过ETL管道汇聚到一起存储到数仓中,之后便可以对外提供 BI,报表以及查询的服务。数仓提供了非常丰富的管理功能,并且提供了很多性能优化的特性来加速查询。比如它的数据都是以表的形式存储的,每个表都有固定的 Schema,这样可以保证它的字段类型都是固定的,同时数仓一般都可以建立索引以便可以更好地加速查询。此外数仓支持 Transaction,这样对于业务侧来说就有了事务性的保障,否则某些业务根本无法正常进行。目前来讲,数仓也是应用非常广泛的产品,大部分的数据平台都会提供数仓服务。
但是数仓产品随着时间的发展也发现了新的问题。首先近十年我们多了很多非结构化和半结构化数据,比如像时间序列,日志,文档和图片等数据,这些数据很难在数仓高效地保存。同时数仓拥有其历史局限性,在数仓刚被发明时他的主要场景的数据都是人类行为所产生的,数据量不会特别多,因为人类行为是有限的,不会有太高的增长。但是现在很多的数据都是由设备而不是人所产生的,其产生的数据量级就有很大的变化。数仓为了各种功能和性能它的存储结构是高度优化过的,储存成本非常高,如果这时我们在数仓中存储大量新的半结构化或非结构化的数据类型,它就会面临很大的存储成本问题的挑战。同时近些年兴起的数据科学和机器学习的数据接口可能并不是以 SQL 查询而是直接以文件的形式作为输入去运行训练任务,这都是数仓无法直接支持的。
所以在 2010 年左右数据湖就开始兴起了。首先它提供了非常低成本的存储系统可以帮你储存各种各样的原始数据,并且可以以文件 API 的方式直接读取文件,例如 S3,HDFS。与此同时开放的文件格式也在这个阶段开始兴起,这些文件格式的主要优点是可以被直接读取, 也就是所有的生态都可以使用这些开放格式来访问数据;比如说我们现在大部分的机器学习以及数据科学的工具和库都可以直接访问这些文件来做分析;于此同时我们仍然有数仓的需求存在,也就需要额外的 ETL 作业来不断的将数据从数据湖经过一系列清洗之后存到数仓之中以之后以支持 BI 和报表的业务。这是目前在各个公司中比较常见的架构,也就是企业将大部分数据存到数据湖之后,在数据湖之上可以直接支撑一些数据科学和机器学习的任务,同时还会有 ETL 任务将一部分数据同步到数仓以提供传统的数仓服务。
以上架构目前存在的问题是会导致存在两套数据架构,一套是传统的数仓架构,它有自己的存储层,有自己的数据治理功能,比如表级别的访问控制,他的上层应用可以直接提供 BI 和 SQL 查询的接口;另一套就是数据湖的架构,它的底层是文件 API 的储存系统,并且数据湖也有安全需求和数据治理的模块,可以帮助做到文件和数据块级别的访问控制。它的上层接入的是机器学习和数据科学的应用。
接下来我们来看这个架构有什么问题:
- 数据同步。数据需要先从业务系统同步到数据湖再同步到数仓,额外的同步过程会造成有冗余数据同时存在于数据湖和数仓,这就带来了额外的储存成本。额外的同步业务很难保证 100% 是可靠的,一旦出现了数据的遗失,错误或者较大的延迟,这些问题都很难解决。
- 安全和管理模型不适配。对于数仓来说有自身的可以做到表级别的权限管理,对于数据湖来说一般会有云商提供的基于文件和块的权限管理。这两个是完全不同的产品,导致管理员要同时维护两个系统,权限管理复杂,增加工作成本。
- 应用层合作问题。由于数仓应用和数据湖应用是基于两套不同的系统,不同的组来维护的,不同的组之间合作就会增加很多沟通成本,从而降低合作效率。
下面我们来看如何通过 Lakehouse 架构来解决以上三个问题。
1. 数据存储
对于数据湖架构有额外同步任务的缺点,我们采用了一种开放可靠高效的数据格式来处理应对数仓查询任务和请求。这样我们就可以统一数据储存的结构,无论数仓还是数据湖都可以使用统一的储存结构进行数据访问。
2. 数据安全和治理层
我们建立了统一的数据安全和治理服务能够管理所有的数据资产,包括表,文件,以及机器学习的模型都可以通过统一的服务来管理。
3. 应用层
在统一了数据存储和管理层之后,我们就可以将所有的上层应用基于统一的存储和管理架构来实现,这样用户就不需要在两套不同的架构中去实现不同的业务。
在整合了以上的方案之后,我们就可以使用开放的数据湖格式来支撑统一的数据管理服务处理几乎所有方面的需求,包括传统数仓、数据科学以及机器学习的任务。
--
02
Building Lakehouse Systems
第二点我们来讲一下 Databricks 在构建 Lakehouse 的过程中的一些实践经验。
在数据储存层,我们采用了基于公有云的存储介质,在此之上加上 Delta Lake 的数据湖格式。
在数据安全和管理层,我们研发了名为 Unity Catalog 的细粒度的数据管理工具,能够管理所有的数据资产,包括表,文件和机器学习的模型。
在数据应用层,我们提供了丰富的数据应用来提供用户 SQL,BI 以及 ML 的能力。
接下来我们来介绍一下为了实现如上的数仓架构的核心技术是什么,在这里我们主要介绍三点:
- 元数据层。在数据湖上实现高可靠性的数据湖格式要求我们有对元数据的管控,包括事务和多版本支持等。
- 引擎架构设计。在数据湖上的数据需要提供具有数仓性能的查询请求,所以引擎的设计也至关重要。
- 声明式 I/O 接口。既然要支持同一平台的所有应用,那对于机器学习和数据科学的数据访问需要我们有声明式的 IO 接口来支持。
1. 数据湖元数据层
因为数据湖是以文件 API 来提供数据的储存服务的,所以元数据层也主要是对文件的管理。元数据层的主要任务是记录表的各版本中包含了哪些文件。目前主要的数据湖格式都是以这个方向来设计的,包括 Delta Lake,Iceberg,Hudi 以及 Hive ACID。
接下来我们来举个例子,假设我们有一张名为 Events 的表,该表之前已经经过了两次改动。现在有一个新的请求要求将 17 号用户所有 Event 的数据全部删除来符合 GDPR 的要求。这类请求比较直接的一个解决方案是定位到数据所在的文件,例如图中的 File1与 File3,我们将文件读取之后删除相关用户的 Event 数据再将数据重写回去生成一个新的文件以覆盖旧文件。但是这种方案存在的问题是如果在删除过程中有其他用户也在读取这张表该怎办,我们该如何保证原子性?
对于 Delta Lake 来说,首先我们还是重写文件,因为它的兼容性比较好,也不需要 Merge On Read 的设计。但是在重写过程中我们不删除旧的文件,同时我们还有 Log 系统来跟踪版本所对应的文件列表。假设表的版本经历过两次改动后目前的版本号是 v2,其中包括 File1,File2,File3 三个文件。当需要删掉数据时我们会启动一个作业进行读取以及重写的过程,在重写没有完成时文件不会进行提交。在进行完删除操作之后它会进行提交并加上新的版本号 v3 到 Log 中去。新提交的文件文件名与原文件名不同,对于后续表的读取它们会拿到最新的文件列表,读到删除之后的数据。对于实现细节可以参考论文 Armbrust et al,VLDB 2020。
让我们再来看一下 Delta Lake 的其他管理功能:
- Time Travel to OldTtable Versions:Delta Lake 在读取数据时可以指定历史版本。
- Zero-copy CLONE by forking the log:对于表的复制可以通过只复制 Log 而非数据文件的方式做到零拷贝。
- DESCRIBE HISTORY:由于 Log 中保存了所有的提交记录,所以我们可以通过一些命令来查看表的所有变更记录。
- Schema Enforcement & Constraints:我们还可以在 Log 中加一些额外的信息,比如说我们我想针对某些列做一些条件控制,例如限定列为非 NULL,从而对客户写入的数据进行验证和约束。
- Streaming I/O:由于我们是通过 Commit来提交的改动,它本身就有 Streaming 的概念,我们可以将 Delta 表看作为流数据的更改,我们也就可以通过 Streaming 的API来读取 Delta Lake 表来消除对于消息总线例如 Kafka的依赖,虽然它的性能相较于 Kafka 还有一定差距。
- Delta Sharing:由于我们的数据是存储在云上的,如 AWS,如果我们想跨公司跨部门的共享数据就可以建立一个额外的服务,通过一些临时的秘钥来分享数据。首先它是非常高效的,它可以直接用到公有云的能力来传输数据,同时也通过临时秘钥的功能对安全性进行了保证。
2. 数据湖仓引擎设计
我们要求湖仓查询引擎的设计能让其在基于数据湖文件上的查询可以达到数仓性能。
让我们来看一下要实现与数仓相当的性能面临的挑战有什么。对于传统数仓来讲它对数据储存和查询引擎是有完整的控制的,可以更加紧密的联动,实现更好的性能设计。但是对于 Lakehouse 来说它的核心思路是基于开放的文件格式,比如说 Parquet 格式可以直接被不同的工具来访问,这就导致了我们在做优化时我们无法对开放的文件格式进行进行改动。
对于如何在开放格式上达到能与传统数仓媲美的性能 Databricks 提出了四点思路:
- 辅助数据结构。针对 Parquet 文件提供一些统计数据和索引结构进行 Data Skipping 来减轻 I/O 的开销。
- 优化文件布局。进行合理的分区和分桶以及优化文件布局以减少 I/O 开销。
- 缓存。将经常访问的热数据缓存到 SSD 中以提高数据的访问速率。
- 执行优化。采用最新的向量化查询引擎来加速整体的计算效率。例如 Databricks 开发了 Photon 引擎来加速计算。
下面让我们来看具体的优化细节:
① 辅助数据结构
即使基准数据是 Parquet 的文件格式,我们依然可以通过构建其他数据来加速查询,并且保证事务性。接下来让我们举一个简单的例子,假设我们针对每个文件的某些列会去跟踪列的最大最小值,如图我们记录了文件中 Year 列与 UID 列的最大最小值。例如当我们查询 year=2020 and uid=24000 的时候我们就可以通过最大最小值来过滤,我们可以发现图中只有 File3 符合要求,其他两个文件都不符合要求,那我们就可以跳过 File1,2 直接使用 File3 来达到降低数据量的需求。
② 数据布局
刚刚提到的分桶分片是一种方法,现在我们来介绍一下目前比较流行的 Z-order 数据布局方式。我们知道最大最小值可以帮助过滤掉一些文件,它的局限性在于如果数据是均匀分布的,那最大最小值就失去了过滤数据的意义,所以我们需要数据按照一定顺序排列好之后再写入。但是当我们对数据进行 Order By 的时候不同列是有先后顺序的。这会导致一个问题,如果我们根据 a,b 两列进行排序写入,文件会根据列 a 的值进行分割,这样仅针对列 a 过滤会非常有效率,但是对于每个文件中的列 b 值来说可能是非常分散的,这时每个文件中列 b 的最大最小值都可能非常接近,从而起不到针对列 b 的数据过滤。对于这样的缺陷我们使用 Z-order 将 Order By 中多个列的权重比较均匀的分配,而不是只针对一列有效,从而发挥更好的数据过滤效果。
③ 缓存
我们会在计算节点本地的 SSD 或内存上根据 LRU 算法来缓存数据,这样的缓存是非常简单的也没有任何一致性的问题,因为我们的数据湖格式都是基于文件来提交事务的,我们的缓存也是基于文件名作为 Key 来缓存的,假设表在进行 Insert 操作之后,新生成的文件会有不同的文件名,所以客户端只要看文件名是否存在于本地缓存就可以了,而不用考虑文件修改的问题,因为文件本身是不可以被修改的,这样就可以利用文件名作为唯一的 ID 来管理缓存层。其他要注意的事项还有,我们在缓存数据使会先将数据解码解压缩之后再缓存,这样可以节省一些后续重复的 CPU 计算,不然每次要查缓存数据都需要解压缩和解码,这也是浪费 CPU 资源的操作。如图是我们的一些性能测试数据,我们可以看到在 S3 上直接读取 Parquet 文件是最慢的,一旦将其落地到 SSD 做缓存就可以快 3 倍,在我们使用 Photon 引擎将其解压缩解码之后再缓存可以再快 2-3 倍左右,最终的效果也是非常好的。
④ 向量化执行
通过利用 CPU 的指令集成批的处理数据,这样整个查询引擎的性能会更加高效,因为它充分利用了 CPU 的并发度。在此我们就不深入细节,从最终的测试结果可以看出使用了 Photon 引擎之后在执行 TPC-DS 测试集时性能对比 S 有了 8-10 倍的提升。如果大家对 Photon 引擎有兴趣可以去读一下 Photon: A Fast Query Engine for Lakehouse Systems 这篇论文。
当我们将以上所有讲的东西拼到一起,让我们看一下我们的 Lakehouse 引擎能否达到数仓的性能,我们的答案是可以的。Databricks 在去年参加了 TPC-DS 竞赛并且打破了记录,这可以充分证明我们 Lakehouse 的架构是完全能够达到甚至超过目前数仓的要求的,同时我们还保存了存储层的开放以适配各种不同的业务场景。
3. 数据科学和机器学习的声明式 I/O 接口
我们再来讲一下如何在 Lakehouse 架构上支撑数据科学和机器学习两种不同的业务。
如果基于传统数仓做 ML 是非常痛苦的,首先 ML 的作业与传统数仓的作业非常不一样,对于ML 来说一般需要能够直接读取大量的数据通过算法进行训练之后得到最终模型,这时我们最关注的东西是吞吐量而不是查询性能方面的事情,如果使用 JDBC/ODBC 来支撑 ML 的吞吐量是完全不够的。这时就有两种思路,一种是将数仓中的数据导出成一种开放的文件格式(如 Parquet)到数据湖中使用,而额外的数据导出步骤会造成数据的时延较高。另一种方法是在写数据的时候写两份数据,一份写到数仓,一份写到数据湖,这样会导致情况更加复杂产生更多的问题。
以上的问题在使用了 Lakehouse 之后就可以迎刃而解了,因为我们的存储层是开放的,这时就完全可以使用ML相关的库和工具直接访问底层文件。如图让我们举个例子,我们用 Spark 读取 Delta 表之后做一些简单的数据清洗,之后直接对接 ML 的一些作业开始用 Parquet 文件去训练模型。中间我们可以将ETL的逻辑翻译成 Spark Dataframe 利用其优化器优化查询性能,最后产生的文件会直接提供给 ML 的库或者服务进行读取从而用于训练模型,这整个流程都是非常高效的。
在 Databricks 我们将数仓和 ML 的功能进行了进一步的整合。我们将ML输出的模型写入Delta 表,通过 ML Flow 管理整个 ML 的流程。对于 Feature Store 我们可以使用 Delta 表作为存储,然后使用 Spark Streaming 构建Pipeline。输出的模型可以通过 UDF 的方式直接在 SQL 或者 ETL 作业中使用。我们通过以上方式统一了用户的体验,比单独使用两套不同的平台去做 DS 和 ML 要方便很多。
接下来让我们对这一节进行一下总结:
Lakehouse 的架构结合了数仓和数据湖两者的优势,它通过开放的数据访问格式,可以使用各种各样不同的工具直接访问数据湖上的文件;同时 Lakehouse 提供了与数仓一样的丰富的管理功能,通过元数据层实现了对事务和多版本的支持;在性能上 Lakehouse 通过全新的引擎设计达到和数仓一样的性能来满足 BI 和报表的业务需求;最后基于云厂商的低成本数据存储可以降低整个数据架构的成本。由于我们的数据都存储在同一平台的数据湖之上,从而不需要不同应用之间数据的导入导出,这在简化了数据架构的同时也增加了数据的可靠性,并且可以保证数据的时效性。
--
03
Ongoing Projects
Databricks 对整个数据行业还是非常乐观的,我们认为企业对大规模数据和机器学习的使用才刚刚开始。在未来的五年中,我们认为还有 10-100 倍的用户会开始使用这些新的工具,也会有 10-100 倍的数据和机器学习应用会被开发出来。
我们再来讲一下我们在一些正在做的项目的最新进展:
1. Delta Live Tables
一般来说数据管道都是由 SQL 构建,但是一个完整的管道一般不仅仅由一个过程来完成的,它一般需要多次转换落地,这时我们就需要一些管理工具来管理所有单独的查询语句进行串行并做失败重启等一些的工作。这个事情是比较复杂的,DBT 是业界目前比较常用的一个工具来对数据管道进行管理,我们可以通过它进行单元测试和版本控制等一些事情。对于 Delta Live Table 来说它与我们平台的结合更加紧密,我们提供了 Dataframe API 可以让我们通过 Python 或 Scala 来构建 Pipeline,而不只是通过 SQL 来构建。并且我们也提供了一些非常丰富的功能让我们可以看到作业的进度和指标的一些信息。
更多信息可以参考博文:Announcing the Launch of Delta Live Tables: Reliable Data Engineering Made Easy
2. Unity Catalog
Unity Catalog 可以让我们在一个统一的数据治理的服务上管理所有的数据资产。由先前的例子我们可以看出我们需要管理的数据资产是多样的,包括文件,表,ML 模型以及 Delta Sharing。我们可以通过标准的 SQL API Grant 来对以上所有数据资产来进行权限管理的操作。同时它也整合血缘管理的功能。
对于 Unity Catalog 的设计思路可以查看博:Introducing Databricks Unity Catalog: Fine-grained Governance for Data and AI on the Lakehouse
3. 新的引擎
① Photon:使用原生的向量引擎改善查询性能
② Aether:我们目前的任务调度还是 Spark 的方式,在某些对于时效性很高的场景下任务调度的效率还不够高,所以我们计划重写整个的调度和执行框架以满足对于短查询场景的支持。
③ Streaming:由于目前对于实时计算的要求越来越高涨,我们也成立了新的团队尝试重构Streaming 引擎。目前该项目还在立项阶段,我们也准备投入更多的资源来进行该方面的研发。
总结:Databricks 在近几年一直在主推 Lakehouse 这个新的架构来尝试解决数据平台的核心问题。我们希望能够构建统一的易用的平台可以让不同需求的用户都在一个平台上处理它们所有的数据。我们认为在这个方向上还有很多事情可以挖掘,也希望大家能够参与到 Lakehouse 的生态建设之中。
今天的分享就到这里,谢谢大家。
|分享嘉宾|
范文臣|Databricks 技术主管
范文臣,Databricks 开源组技术主管,Apache Spark PMC member,Spark社区最活跃的贡献者之一。从2013年开始参与Spark的研发,2015年加入Databricks,目前主要负责Spark Core/SQL 的设计开发和开源社区管理。
|DataFun新媒体矩阵|
|关于DataFun|
专注于大数据、人工智能技术应用的分享与交流。发起于2017年,在北京、上海、深圳、杭州等城市举办超过100+线下和100+线上沙龙、论坛及峰会,已邀请超过2000位专家和学者参与分享。其公众号 DataFunTalk 累计生产原创文章800+,百万+阅读,15万+精准粉丝。