首页 > 科技 > Spark 开源新技术 Delta Lake 介绍

Spark 开源新技术 Delta Lake 介绍

At the beginning


3年前,我加入了一个数据仓库项目组。随着数据的不断增长,当时的项目慢慢从一个数据仓库退化成了数据湖。我们首先来看一下业界目前对数据仓库和数据湖的定义:

被许多人认为是数据仓库之父的Bill Inmon曾经对数据仓库做过如下定义:

数据仓库是一个优化的数据分析系统,用于分析来自事务系统和业务线应用程序的关系数据。事先定义数据结构和 Schema 以优化快速 SQL 查询,其中结果通常用于操作报告和分析。数据经过了清理、丰富和转换,因此可以充当用户可信任的“单一信息源”。

数据湖有所不同,数据湖概念的诞生,源自企业面临的一些挑战,如数据应该以何种方式处理和存储。因为它存储来自业务线应用程序的关系数据,以及来自移动应用程序、IoT 设备和社交媒体的非关系数据。捕获数据时,未定义数据结构或 Schema。这意味着您可以存储所有数据,而不需要精心设计也无需知道将来您可能需要哪些问题的答案。可以对数据使用不同类型的分析(如 SQL 查询、大数据分析、全文搜索、实时分析和机器学习)来获得见解。


通过上面的分析我们可以看出来数据仓库如果管理不完善会慢慢演化成了一个数据湖,数据仓库中有时会存在多种多样各种格式的几十P原始数据,原始数据的存储十分便捷并且可以扩展,进而成为了各种业务的上游,比如ML、风控数据挖掘、BI等等,所以实际上演化成了一个标准的数据湖项目。

而大部分数据并没有得到很好的应用,实际上我们是在做垃圾数据的存储和处理,大部分Hadoop资源被浪费在了处理垃圾上。数据仓库是一个已经存在了几十年的概念,这里重点说明一下数据湖的价值和挑战。

数据湖的价值

我们可以看到数据湖和数据仓库最大的差别就是从Schema-on-write到Schema-on-read的转变,数据湖能够在更短的时间内从更多来源利用更多数据,并使用户能够以不同方式协同处理和分析数据,从而做出更好、更快的决策。

提高存储效率 数据湖便捷扩展地特性可以带来更高的存储效率,在数据湖中存储数据时我们不需要预先定义Schema,只需要将数据存入数据湖就可以

提高运营效率 数据湖Schema-on-read的方式可以已较低的存储成本快速获取到我们想要的结果,只需要对一些数据做预处理

数据湖的挑战

数据湖架构的主要挑战是存储原始数据而不监督内容。对于使数据可用的数据湖,它需要有定义的机制来编目和保护数据。没有这些元素,就无法找到或信任数据,从而导致出现“数据沼泽”。 满足更广泛受众的需求需要数据湖具有管理、语义一致性和访问控制。

对数据湖进行的读写操作不可靠。数据工程师经常遇到数据湖写入不安全的问题,这会导致读取方在写入期间看到垃圾数据。它们必须构建变通方案,确保读取方在写操作期间看到的数据始终一致。

数据湖的数据质量较差。将非结构化数据转储到数据湖中很容易。但这是以数据质量为代价的。如果没有任何机制验证模式和数据,那么数据湖就会受到数据质量差的影响。因此,试图挖掘这些数据的分析项目也会失败。

随着数据量的增加,性能变差。随着转储到数据湖中的数据量的增加,文件和目录的数量也会增加。处理数据的大数据作业和查询引擎在处理元数据操作上花费了大量时间。这个问题在流作业的情况下会更加明显。

更新数据湖中的记录非常困难。工程师需要构建复杂的管道来读取整个分区或表,修改数据并将其写回。这样的管道效率低下,难以维护。

针对这些挑战,Spark的母公司Databricks最近开源了一款数据湖引擎Delta Lake,里面的很多feature看起来确实能够解决数据湖的痛点

2. Data Analysis on Data Lake

2.1 数据湖的价值

我们可以看到数据湖和数据仓库最大的差别就是从Schema-on-write到Schema-on-read的转变,数据湖能够在更短的时间内从更多来源利用更多数据,并使用户能够以不同方式协同处理和分析数据,从而做出更好、更快的决策。

提高存储效率 数据湖便捷扩展地特性可以带来更高的存储效率,在数据湖中存储数据时我们不需要预先定义Schema,只需要将数据存入数据湖就可以

提高运营效率 数据湖Schema-on-read的方式可以已较低的存储成本快速获取到我们想要的结果,只需要对一些数据做预处理

2.2 数据湖的挑战

数据湖架构的主要挑战是存储原始数据而不监督内容。对于使数据可用的数据湖,它需要有定义的机制来编目和保护数据。没有这些元素,就无法找到或信任数据,从而导致出现“数据沼泽”。 满足更广泛受众的需求需要数据湖具有管理、语义一致性和访问控制。这些挑战也是一些管理不完善的数据仓库目前正在面临的一些严重问题(冗余数据非常多数据质量差冗余作业非常多数据查询性能差):

读写操作不可靠。数据工程师经常遇到数据湖写入不安全的问题,这会导致读取方在写入期间看到垃圾数据。它们必须构建变通方案,确保读取方在写操作期间看到的数据始终一致。

数据质量差,数据湖中可能存在很多不一致和冗余数据。将非结构化数据转储到数据湖中很容易。但这是以数据质量为代价的。如果没有任何机制验证模式和数据,那么数据湖就会受到数据质量差的影响。因此,试图挖掘这些数据的分析项目也会失败。

随着数据量的增加,性能变差。随着转储到数据湖中的数据量的增加,文件和目录的数量也会增加。处理数据的大数据作业和查询引擎在处理元数据操作上花费了大量时间。这个问题在流作业的情况下会更加明显。

更新数据湖中的记录非常困难。当我们发现我们的数据算法出错的时候,我们可能要回溯半年甚至一年的数据,发起多个作业来做这件事情。工程师需要构建复杂的管道来读取整个分区或表,修改数据并将其写回。这样的管道效率低下,难以维护,并且很容易出错。

针对这些挑战,如果要在Lambda架构做优化是十分困难的,我们需要花费大量的人力去做计算作业梳理、Checkpoint检查、数据梳理、数据活跃性检测等等。而Spark的母公司Databricks今年4月份开源了一款数据湖引擎 Delta Lake,尝试用Spark的思想来解决数据湖的问题,里面的很多feature看起来确实能够解决数据湖的痛点,我们下面来一起看一下:

DeltaLake解决了上述问题,简化了数据湖构建。以下是DeltaLake提供的主要功能:

ACID 事务:Delta Lake 提供多个写操作之间的 ACID 事务。每个写操作都是一个事务,事务日志中记录的写操作有一个串行顺序。事务日志会跟踪文件级的写操作,并使用 乐观并发控制,这非常适合数据湖,因为尝试修改相同文件的多个写操作并不经常发生。在存在冲突的场景中,Delta Lake 会抛出一个并发修改异常,以便用户处理它们并重试它们的作业。Delta Lake 还提供了强大的 序列化隔离级别#Serializable),允许工程师不断地对目录或表进行写操作,而用户可以不断地从相同的目录或表中读取数据。读取者将看到读操作开始时存在的最新快照。

模式管理:Delta Lake 会自动验证正在写入的 DataFrame 模式是否与表的模式兼容。表中存在但 DataFrame 中不存在的列会被设置为 null。如果 DataFrame 中有额外的列在表中不存在,那么该操作将抛出异常。Delta Lake 具有可以显式添加新列的 DDL 和自动更新模式的能力。

可伸缩的元数据处理:Delta Lake 将表或目录的元数据信息存储在事务日志中,而不是存储在元存储(metastore)中。这使得 Delta Lake 能够在固定的时间内列出大型目录中的文件,并且在读取数据时非常高效。

数据版本控制和时间旅行:Delta Lake 允许用户读取表或目录先前的快照。当文件在写期间被修改时,Delta Lake 将创建文件的新版本并保存旧版本。当用户希望读取表或目录的旧版本时,他们可以向 Apache Spark 的读操作 API 提供一个时间戳或版本号,Delta Lake 根据事务日志中的信息构建该时间戳或版本的完整快照。这使得用户可以重新进行试验并生成报告,如果需要,还可以将表还原为旧版本。

统一的批处理和流接收(streaming sink):除了批处理写之外,Delta Lake 还可以使用 Apache Spark 的结构化流 作为高效的流接收。再结合 ACID 事务和可伸缩的元数据处理,高效的流接收现在支持许多接近实时的分析用例,而且无需维护复杂的流和批处理管道。

记录更新和删除:Delta Lake 将支持合并、更新和删除 DML 命令。这使得工程师可以轻松地维护和删除数据湖中的记录,并简化他们的变更数据捕获和 GDPR 用例。由于 Delta Lake 在文件粒度上跟踪和修改数据,因此,比读取和覆写整个分区或表要高效得多。

数据期望:Delta Lake 还将支持一个新的 API,用于设置表或目录的数据期望。工程师将能够通过指定布尔条件及调整严重程度来处理数据期望。当 Apache Spark 作业写入表或目录时,Delta Lake 将自动验证记录,当出现违规时,它将根据所预置的严重程度处理记录。

Delta Lake ACID 保证是建立在存储系统的原子性和持久性基础之上的。具体来说,该存储系统需要提供以下特性:

原子可见性:必须有一种方法使文件完全可见或完全不可见。

互斥:只有一个写入者能够在最终目的地创建(或重命名)文件。

一致性清单:一旦在目录中写入了一个文件,该目录未来的所有清单都必须返回该文件。

3. Delta Lake

Databricks今年4月份开源了一个令spark社区非常兴奋的数据湖项目,项目的最终目标是数据科学和机器学习,实际上是在用spark的思维来处理存储问题。数据永远会是混乱的,而没有并发访问控制的数据存储也会给数据带来混乱,造成脏数据、冗余数据。而Delta Lake的一个目标就是提升数据的价值,解决数据治理问题。

Delta使用的默认格式为Parquet。在Delta Lake中,数据被划分成了三个逻辑层次:Bronze(底层源数据层)、Silver(初步处理的数据层)、Gold(业务展现数据层)。

Bronze层主要用于存储原始数据,Delta Lake是一个数据湖引擎,可以支持各种各样的数据接入,这些数据源可能是Kafka、Spark或者是其他数据湖,这些数据接入Delta Lake之后就存储在Bronze层,Bronze层可以为HDFS或者是其他云厂商的BlobStore,这也保证了数据湖中数据存储的可扩展性;

Silver层主要用于存储初步加工的数据,存储中间数据主要有两方面意义,一个意义是为了做数据共享,另一个意义是用这些中间数据来做debug;

Gold层主要给业务层直接使用,这些数据是处理后的可以被ML系统或BI系统使用的有价值的产品。可以使用Spark或者Presto在Gold层上直接做展现,或者在这些数据上做数据挖掘

然而仅仅有这些逻辑抽象是不足以让Delta Lake脱颖而出的,Delta Lake采用了跟Spark RDD一样的设计理念,使用快照来记录我们所有的处理过程,给数据增加了版本控制,如果我们发现有数据计算错误需要回溯,只需要重新重放redo log即可。

spark一直在往批流结合的路线演进,当我们的存储层有了快照系统,存储层也可以做到批流结合,我们就可以用streaming的方式增量模仿batch作业。

流式处理不等于实时Streaming is not about low latency, streaming is incremental processing of data.And the magic part of streaming is that once you use streaming, you will no longer thinking about what data is new?What data is old?What data is already processed?How do I checkpoint my intermedia results?How do I recover from failures?How do I ensure that processing happens end to end with exactly once sematics.Streaming will think these for you.

4. Delta under the cover

在这里主要介绍Delta三个比较亮眼的设计,具体实现还是建议大家去看代码。

4.1 Redo Log

Apache Parquet 是 Databricks 的首选格式。它是 Hadoop 生态系统中任何项目都可以使用的一种开源列式存储格式,而且与选择哪种数据处理框架无关。因此,Delta Lake 似乎是其所支持的数据存储格式之上的一层。

Delta Lake在底层存储的组织形式如下面所示,其中delta log中使用json存储transaction log,每一个版本的Table就会记录一个json文件,而数据文件存储在分区中的parquet文件中

my_table/

|

|-- _delta_log/

| |

| |-- 00000.json

| |-- 00001.json

|

|-- date=2019-01-01/

|

| |-- file-1.parquet

4.2 ACID保证

Atomic 是由底层存储语义来保证的


4.3 冲突处理

同一个Table的线性也是由底层存储系统的原语保证的,如果有冲突,那么其中一个应该fail。这里你可能会有疑问,如果每次冲突都让一方fail的话,那重试代价太大了。

Delta这里使用了Database的一个optimistic conflict rule,这个策略默认认为会成功commit,如果commit失败,那么就会评估冲突的commit是否会对commit造成影响,如果没有影响commit会自动重试,而不是直接失败。


本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.sosokankan.com/article/879005.html

setTimeout(function () { fetch('http://www.sosokankan.com/stat/article.html?articleId=' + MIP.getData('articleId')) .then(function () { }) }, 3 * 1000)