本文翻譯自大數據技術公司 Databricks 針對數據湖 Delta Lake 系列技術文章。眾所周知,Databricks 主導著開源大數據社區 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為數據湖核心存儲引擎方案給企業帶來諸多的優勢。
此外,阿里云和 Apache Spark 及 Delta Lake 的原廠 Databricks 引擎團隊合作,推出了基于阿里云的企業版全托管 Spark 產品——Databricks 數據洞察,該產品原生集成企業版 Delta Engine 引擎,無需額外配置,提供高性能計算能力。有興趣的同學可以搜索` Databricks 數據洞察`或`阿里云 Databricks `進入官網,或者直接訪問https://www.aliyun.com/product/bigdata/spark 了解詳情。
——使用 Delta Lake 穩定的特性來可靠的管理您的數據
目錄
本文介紹內容
Delta Lake 系列電子書由 Databricks 出版,阿里云計算平臺事業部大數據生態企業團隊翻譯,旨在幫助領導者和實踐者了解 Delta Lake 的全部功能以及它所處的場景。在本文 Delta Lake 系列 - 特性( Features )中,重點介紹 Delta Lake 的特性。
后續
讀完本文后,您不僅可以了解 Delta Lake 提供了那些特性,還可以理解這些的特性是如何帶來實質性的性能改進的。
什么是 Delta Lake?
Delta Lake 是一個統一的數據管理系統,為云上數據湖帶來數據可靠性和快速分析。Delta Lake 運行在現有數據湖之上,并且與 Apache Spark 的 API 完全兼容。
在 Databricks 中,我們看到了 Delta Lake 如何為數據湖帶來可靠性、高性能和生命周期管理。我們的客戶已經驗證,Delta Lake 解決了以下挑戰:從復雜的數據格式中提取數據、很難刪除符合要求的數據、以及為了進行數據捕獲從而修改數據所帶來的問題。
通過使用 Delta Lake,您可以加快高質量數據導入數據湖的速度,團隊也可以在安全且可擴展云服務上快速使用這些數據。
Delta Lake 是在 Apache Spark 之上構建的下一代引擎,支持 MERGE 命令,該命令使您可以有效地在數據湖中上傳和刪除記錄。
MERGE 命令大大簡化了許多通用數據管道的構建方式-所有重寫整個分區的低效且復雜的多跳步驟現在都可以由簡單的 MERGE 查詢代替。
這種更細粒度的更新功能簡化了如何為各種用例(從變更數據捕獲到 GDPR )構建大數據管道的方式。您不再需要編寫復雜的邏輯來覆蓋表同時克服快照隔離的不足。
隨著數據的變化,另一個重要的功能是在發生錯誤寫入時能夠進行回滾。 Delta Lake 還提供了帶有時間旅行特性的回滾功能,因此如果您合并不當,則可以輕松回滾到早期版本。
在本章中,我們將討論需要更新或刪除現有數據的常見用例。我們還將探討新增和更新固有的挑戰,并說明 MERGE 如何解決這些挑戰。
什么時候需要 upserts?
在許多常見場景中,都需要更新或刪除數據湖中的現有數據:
為什么對數據湖的 upserts 在傳統上具有挑戰性
由于數據湖基本上是基于文件的,它們經常針對新增數據而不是更改現有數據進行優化。因此構建上述用例一直是具有挑戰性的。
用戶通常會讀取整個表(或分區的子集),然后將其覆蓋。因此,每個組織都嘗試通過編寫復雜的查詢 SQL,Spark 等方式來重新造輪子,來滿足他們的需求。這種方法的特點是:
介紹 Delta Lake 中 MERGE 命令
使用 Delta Lake,您可以使用以下 MERGE 命令輕松解決上述用例,并且不會遇到任何上述問題:
讓我們通過一個簡單的示例來了解如何使用 MERGE。 假設您有一個變化緩慢的用戶數據表,該表維護著諸如地址之類的用戶信息。 此外您還有一個現有用戶和新用戶的新地址表。 要將所有新地址合并到主用戶表中,可以運行以下命令:
MERGE INTO users
USING updates
ON users.userId = updates.userId
WHEN MATCHED THEN
UPDATE SET address = updates.addresses
WHEN NOT MATCHED THEN
INSERT (userId, address) VALUES (updates.userId, updates.address)
這完全符合語法的要求-對于現有用戶(即 MATCHED 子句),它將更新 address 列,對于新用戶(即 NOT MATCHED 子句),它將插入所有列。 對于具有 TB 規模的大型數據表,Delta Lake MERGE 操作比覆蓋整個分區或表要快N個數量級,因為 Delta Lake 僅讀取相關文件并更新它們。 具體來說,Delta Lake 的 MERGE 命令具有以下優勢:
下圖是 MERGE 與手寫管道的直觀對比。
使用 MERGE 簡化用例
遵守 GDPR 而刪除數據
遵守 GDPR 的“被遺忘權”條款對數據湖中的數據進行任何處理都不容易。您可以使用示例代碼來設置一個簡單的定時計劃作業,如下所示,刪除所有選擇退出服務的用戶。
MERGE INTO users
USING opted_out_users
ON opted_out_users.userId = users.userId
WHEN MATCHED THEN DELETE
數據庫中的數據變更應用
您可以使用 MERGE 語法輕松地將外部數據庫的所有數據更改(更新,刪除,插入)應用到 Delta Lake 表中,如下所示:
MERGE INTO users
USING (
SELECT userId, latest.address AS address, latest.deleted AS deleted FROM
(
SELECT userId, MAX(struct(TIME, address, deleted)) AS latest
FROM changes GROUP BY userId
)
) latestChange
ON latestChange.userId = users.userId
WHEN MATCHED AND latestChange.deleted = TRUE THEN
DELETE
WHEN MATCHED THEN
UPDATE SET address = latestChange.address
WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN
INSERT (userId, address) VALUES (userId, address)
從 streaming 管道更新會話信息
如果您有流事件的數據流入,并且想要對流事件數據進行會話化,同時增量更新會話并將其存儲在 Delta Lake 表中,則可以使用結構化數據流和 MERGE 中的 foreachBatch 來完成此操作。 例如,假設您有一個結構化流數據框架,該框架為每個用戶計算更新的 session 信息。 您可以在所有會話應用中啟動流查詢,更新數據到 Delta Lake 表中,如下所示(Scala 語言)。
streamingSessionUpdatesDF.writeStream
.foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) =>
microBatchOutputDF.createOrReplaceTempView(“updates”) microBatchOutputDF.sparkSession.sql(s”””
MERGE INTO sessions
USING updates
ON sessions.sessionId = updates.sessionId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT * “””)
}.start()
在本章中,我們將演示在飛機時刻表的場景中,如何在 Delta Lake 中使用 Python 和新的 Python API。 我們將展示如何新增,更新和刪除數據,如何使用 time travle 功能來查詢舊版本數據,以及如何清理較舊的版本。
Delta Lake 使用入門
Delta Lake 軟件包可以通過 PySpark 的--packages 選項來進行安裝。在我們的示例中,我們還將演示在 VACUUM 文件和 Apache Spark 中執行 Delta Lake SQL 命令的功能。 由于這是一個簡短的演示,因此我們還將啟用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false
允許我們清理文件的時間短于默認的保留時間7天。 注意,這僅是對于 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
在 Apache Spark 中啟用 Delta Lake SQL 命令;這對于 Python 或 Scala API 調用不是必需的。
# Using Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”
Delta Lake 數據的加載和保存
這次將使用準時飛行數據或離港延誤數據,這些數據是從 RITA BTS 航班離崗統計中心生成的;這些數據的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 針對Apache Spark的具有圖形化結構的準時飛行數據。 在 PySpark 中,首先讀取數據集。
# Location variables
tripdelaysFilePath = “/root/data/departuredelays.csv”
pathToEventsTable = “/root/deltalake/departureDelays.delta”
# Read flight delay data
departureDelays = spark.read \
.option(“header”, “true”) \
.option(“inferSchema”, “true”) \
.csv(tripdelaysFilePath)
接下來,我們將離港延遲數據保存到 Delta Lake 表中。 在保存的過程中,我們能夠利用它的優勢功能,包括 ACID 事務,統一批處理,streaming 和 time travel。
# Save flight delay data into Delta Lake format
departureDelays \
.write \
.format(“delta”) \
.mode(“overwrite”) \
.save(“departureDelays.delta”)
注意,這種方法類似于保存 Parquet 數據的常用方式。 現在您將指定格式(“delta”)而不是指定格式(“parquet”)。如果要查看基礎文件系統,您會注意到為 Delta Lake 的離港延遲表創建了四個文件。
/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
現在,讓我們重新加載數據,但是這次我們的數據格式將由 Delta Lake 支持。
# Load flight delay data in Delta Lake format
delays_delta = spark \
.read \
.format(“delta”) \
.load(“departureDelays.delta”)
# Create temporary view
delays_delta.createOrReplaceTempView(“delays_delta”)
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
運行結果:
最后,我們確定了從西雅圖飛往舊金山的航班數量;在此數據集中,有1698個航班。
立馬轉換到 Delta Lake
如果您有現成的 Parquet 表,則可以將它們轉換為 Delta Lake 格式,從而無需重寫表。 如果要轉換表,可以運行以下命令。
from delta.tables import *
# Convert non partitioned parquet table at path ‘/path/to/table’
deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)
# Convert partitioned parquet table at path ‘/path/to/table’ and
partitioned by integer column named ‘part’
partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)
要從傳統的數據湖表中刪除數據,您將需要:
from delta.tables import *
from pyspark.sql.functions import *
# Access the Delta Lake table
deltaTable = DeltaTable.forPath(spark, pathToEventsTable )
# Delete all on-time and early flights
deltaTable.delete(“delay < 0”)
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
從上面的查詢中可以看到,我們刪除了所有準時航班和早班航班(更多信息,請參見下文),從西雅圖到舊金山的航班有837班延誤。 如果您查看文件系統,會注意到即使刪除了一些數據,還是有更多文件。
/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
在傳統的數據湖中,刪除是通過重寫整個表(不包括要刪除的值)來執行的。 使用 Delta Lake,可以通過有選擇地寫入包含要刪除數據的文件的新版本來執行刪除操作,同時僅將以前的文件標記為已刪除。 這是因為 Delta Lake 使用多版本并發控制(MVCC)對表執行原子操作:例如,當一個用戶正在刪除數據時,另一用戶可能正在查詢之前的版本。這種多版本模型還使我們能夠回溯時間(即 time travel)并查詢以前的版本,這個功能稍后我們將看到。
更新我們的航班數據
要更新傳統數據湖表中的數據,您需要:
代替上面的步驟,使用 Delta Lake 我們可以通過運行 UPDATE 語句來簡化此過程。 為了顯示這一點,讓我們更新所有從底特律到西雅圖的航班。
# Update all flights originating from Detroit to now be
originating from Seattle
deltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’
and destination = ‘SFO’”).show()
如今底特律航班已被標記為西雅圖航班,現在我們有986航班從西雅圖飛往舊金山。如果您要列出您的離崗延遲文件系統(即 $ ../departureDelays/ls -l),您會注意到現在有11個文件(而不是刪除文件后的8個文件和表創建后的4個文件)。
合并我們的航班數據
使用數據湖時,常見的情況是將數據連續追加到表中。這通常會導致數據重復(您不想再次將其插入表中),需要插入的新行以及一些需要更新的行。 使用 Delta Lake,所有這些都可以通過使用合并操作(類似于 SQL MERGE 語句)來實現。
讓我們從一個樣本數據集開始,您將通過以下查詢對其進行更新,插入或刪除重復數據。
# What flights between SEA and SFO for these date periods
spark.sql(“select * from delays_delta where origin = ‘SEA’ and
destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
該查詢的輸出如下表所示。 請注意,已添加顏色編碼以清楚地標識哪些行是已刪除的重復數據(藍色),已更新的數據(黃色)和已插入的數據(綠色)。
接下來,讓我們生成自己的 merge_table,其中包含將插入,更新或刪除重復的數據。具體看以下代碼段
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’),
(1010822, 31, 590, ‘SEA’, ‘SFO’)]
cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’]
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()
在上表(merge_table)中,有三行不同的日期值:
使用 Delta Lake,可以通過合并語句輕松實現,具體看下面代碼片段。
# Merge merge_table with flights
deltaTable.alias(“flights”) \
.merge(merge_table.alias(“updates”),”flights.date = updates.date”) \
.whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \
.execute()
# What flights between SEA and SFO for these date periods
spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
一條語句即可有效完成刪除重復數據,更新和插入這三個操作。
查看數據表歷史記錄
如前所述,在我們進行每個事務(刪除,更新)之后,在文件系統中創建了更多文件。 這是因為對于每個事務,都有不同版本的 Delta Lake 表。
這可以通過使用 DeltaTable.history() 方法看到,如下所示。
注意,您還可以使用 SQL 執行相同的任務:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所見,對于每個操作(創建表,刪除和更新),都有三行代表表的不同版本(以下為簡化版本,以幫助簡化閱讀):
回溯數據表的歷史
借助 Time Travel,您可以查看帶有版本或時間戳的 Delta Lake 表。要查看歷史數據,請指定版本或時間戳選項。 在以下代碼段中,我們將指定版本選項。
# Load DataFrames for each version
dfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”)
dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”)
dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)
# Calculate the SEA to SFO flight counts for each version of history
cnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
# Print out the value
print(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))
## Output
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
無論是用于治理,風險管理,合規(GRC)還是錯誤時進行回滾,Delta Lake 表都包含元數據(例如,記錄操作員刪除的事實)和數據(例如,實際刪除的行)。但是出于合規性或大小原因,我們如何刪除數據文件?
使用 vacuum 清理舊版本的數據表
默認情況下,Delta Lake vacuum 方法將刪除所有超過7天參考時間的行(和文件)。如果要查看文件系統,您會注意到表的11個文件。
/departureDelays.delta$ ls -l _delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
要刪除所有文件,以便僅保留當前數據快照,您可以 vacuum 方法指定一個較小的值(而不是默認保留7天)。
# Remove all files older than 0 hours old.
deltaTable.vacuum(0)
Note, you perform the same task via SQL syntax: ?
# Remove all files older than 0 hours old
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
清理完成后,當您查看文件系統時,由于歷史數據已被刪除,您會看到更少的文件。
/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
請注意,運行 vacuum 之后,回溯到比保留期更早的版本的功能將會失效。
Delta Lake 提供 Time Travel 功能。 Delta Lake 是一個開源存儲層,可為數據湖帶來可靠性。 Delta Lake 提供 ACID 事務,可伸縮的元數據處理,以及批流一體數據處理。 Delta Lake 在您現有的數據湖之上運行,并且與 Apache Spark API 完全兼容。
使用此功能,Delta Lake 會自動對您存儲在數據湖中的大數據進行版本控制,同時您可以訪問該數據的任何歷史版本。這種臨時數據管理可以簡化您的數據管道,包括簡化審核,在誤寫入或刪除的情況下回滾數據以及重現實驗和報告。
您的組織最終可以在一個干凈,集中化,版本化的云上大數據存儲庫上實現標準化,以此進行分析。
更改數據的常見挑戰
使用Time Travel功能
Delta Lake 的 time travel 功能簡化了上述用例的數據管道構建。Delta Lake 中的 Time Travel 極大地提高了開發人員的生產力。它有助于:
企業最終可以在干凈,集中化,版本化的云存儲中的大數據存儲庫上建立標準化,在此基礎上進行數據分析。我們很高興看到您將能夠使用此功能完成工作。
當您寫入 Delta Lake 表或目錄時,每個操作都會自動進行版本控制。您可以通過兩種不同的方式訪問數據的不同版本:
使用時間戳
Scala 語法
您可以將時間戳或日期字符串作為 DataFrame 閱讀器的選項來提供:
val df = spark.read
.format(“delta”) .
option(“timestampAsOf”, “2019-01-01”)
.load(“/path/to/my/table”)
df = spark.read \
.format(“delta”) \
.option(“timestampAsOf”, “2019-01-01”) \
.load(“/path/to/my/table”)
SQL語法
SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01”
SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”
如果您無權訪問閱讀器的代碼庫,您可以將輸入參數傳遞給該庫以讀取數據,通過將 yyyyMMddHHmmssSSS 格式的時間戳傳遞給表來進行數據回滾:
val inputPath = “/path/to/my/table@20190101000000000”
val df = loadData(inputPath)
// Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = {
spark.read
.format(“delta”)
.load(inputPath)
}
inputPath = “/path/to/my/table@20190101000000000”
df = loadData(inputPath)
# Function in a library that you don’t have access to
def loadData(inputPath):
return spark.read \
.format(“delta”) \
.load(inputPath)
}
使用版本號
在 Delta Lake 中,每次寫入都有一個版本號,您也可以使用該版本號來進行回溯。
Scala語法
val df = spark.read
.format(“delta”)
.option(“versionAsOf”, “5238”)
.load(“/path/to/my/table”)
val df = spark.read
.format(“delta”)
.load(“/path/to/my/table@v5238”)
Python語法
df = spark.read \
.format(“delta”) \
.option(“versionAsOf”, “5238”) \
.load(“/path/to/my/table”)
df = spark.read \
.format(“delta”) \
.load(“/path/to/my/table@v5238”)
SQL語法
SELECT count(*) FROM my_table VERSION AS OF 5238
審核數據變更
您可以使用 DESCRIBE HISTORY 命令或通過 UI 來查看表更改的歷史記錄。
重做實驗和報告
Time travel 在機器學習和數據科學中也起著重要作用。模型和實驗的可重復性是數據科學家的關鍵考慮因素,因為他們通常在投入生產之前會創建數百個模型,并且在那個耗時的過程中,有可能想回到之前早期的模型。 但是由于數據管理通常與數據科學工具是分開的,因此確實很難實現。
Databricks 將 Delta Lake 的 Time Travel 功能與 MLflow(機器學習生命周期的開源平臺)相集成來解決可重復實驗的問題。 為了重新進行機器學習培訓,您只需將帶有時間戳的 URL 路徑作為 MLflow 參數來跟蹤每個訓練作業的數據版本。
這使您可以返回到較早的設置和數據集以重現較早的模型。 您無需與上游團隊就數據進行協調,也不必擔心為不同的實驗克隆數據。 這就是統一分析的力量,數據科學與數據工程緊密結合在一起。
回滾
Time travel 可以在產生臟數據的情況下方便回滾。 例如,如果您的 GDPR 管道作業有一個意外刪除用戶信息的 bug,您可以用下面方法輕松修復管道:
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
You can also fix incorrect updates as follows:
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
如果您只想回滾到表的之前版本,則可以使用以下任一命令來完成:
RESTORE TABLE my_table VERSION AS OF [version_number]
RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]
固定視圖的不斷更新跨多個下游作業的 Delta Lake 表
通過 AS OF 查詢,您現在可以為多個下游作業固定不斷更新的 Delta Lake 表的快照。考慮一種情況,其中 Delta Lake 表正在不斷更新,例如每15秒更新一次,并且有一個下游作業會定期從此 Delta Lake 表中讀取數據并更新不同的目標表。 在這種情況下,通常需要一個源 Delta Lake 表的一致視圖,以便所有目標表都反映相同的狀態。
現在,您可以按照下面的方式輕松處理這種情況:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()
# Will use the latest version of the table for all operations below
data = spark.table(“my_table@v%s” % version[0][0]
data.where(“event_type = e1”).write.jdbc(“table1”)
data.where(“event_type = e2”).write.jdbc(“table2”)
...
data.where(“event_type = e10”).write.jdbc(“table10”)
時間序列分析查詢變得簡單
Time travel 還簡化了時間序列分析。例如,如果您想了解上周添加了多少新客戶,則查詢可能是一個非常簡單的方式,如下所示:
SELECT count(distinct userId) - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
FROM my_table
Delta Lake 有一個表克隆的功能,可以輕松進行測試,共享和重新創建表以實現 ML 的多次訓練。在數據湖或數據倉庫中創建表的副本有幾種實際用途。但是考慮到數據湖中表的數據量及其增長速度,進行表的物理副本是一項昂貴的操作。
借助表克隆,Delta Lake 現在使該過程更簡單且更省成本。
什么是克隆?
克隆是源表在給定時間點的副本。它們具有與源表相同的元數據:相同表結構,約束,列描述,統計信息和分區。但是它們是一個單獨的表,具有單獨的體系或歷史記錄。對克隆所做的任何更改只會影響克隆表,而不會影響源表。由于快照隔離,在克隆過程中或之后發生的源表更改也不會反映到克隆表中。在 Delta Lake 中,我們有兩種克隆方式:淺克隆或深克隆。
淺克隆
淺克隆(也稱為零拷貝)僅復制要克隆的表的元數據;表本身的數據文件不會被復制。這種類型的克隆不會創建數據的另一物理副本,從而將存儲成本降至最低。淺克隆很便宜,而且創建起來非常快。
這些克隆表自己不作為數據源,而是依賴于它們的源文件作為數據源。如果刪除了克隆表所依賴的源文件,例如使用 VACUUM,則淺克隆可能會變得不可用。因此,淺克隆通常用于短期使用案例,例如測試和實驗。
深克隆
淺克隆非常適合短暫的用例,但某些情況下需要表數據的獨立副本。深克隆會復制源表的元數據和數據文件全部信息。從這個意義上講,它的功能類似于使用 CTAS 命令(CREATE TABLE .. AS ... SELECT ...)進行復制。但是由于它可以按指定版本復制原始表,因此復制起來更簡單,同時您無需像使用 CTAS 一樣重新指定分區,約束和其他信息。此外它更快,更健壯,也可以針對故障使用增量方式進行工作。
使用深克隆,我們將復制額外的元數據,例如 streaming 應用程序事務和 COPY INTO 事務。因此您可以在深克隆之后繼續運行 ETL 應用程序。
克隆的適用場景?
有時候我希望有一個克隆人來幫助我做家務或魔術。但是我們這里不是在談論人類克隆。在許多情況下,您需要數據集的副本-用于探索,共享或測試 ML 模型或分析查詢。以下是一些客戶用例的示例。
用生產表進行測試和試驗
當用戶需要測試其數據管道的新版本時,他們通常依賴一些測試數據集,這些測試數據跟其生產環境中的數據還是有很大不同。數據團隊可能也想嘗試各種索引技術,以提高針對海量表的查詢性能。這些實驗和測試想在生產環境進行,就得冒影響線上數據和用戶的風險。
為測試或開發環境拷貝線上數據表可能需要花費數小時甚至數天的時間。此外,開發環境保存所有重復的數據會產生額外的存儲成本-設置反映生產數據的測試環境會產生很大的開銷。 對于淺克隆,這是微不足道的:
-- SQL
CREATE TABLE delta.`/some/test/location` SHALLOW CLONE prod.events
# Python
DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=True)
// Scala
DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=true)
在幾秒鐘內創建完表的淺克隆之后,您可以開始運行管道的副本以測試新代碼,或者嘗試在不同維度上優化表,可以看到查詢性能提高了很多很多。 這些更改只會影響您的淺克隆,而不會影響原始表。
暫存對生產表的重大更改
有時,您可能需要對生產表進行一些重大更改。 這些更改可能包含許多步驟,并且您不希望其他用戶看到您所做的更改,直到您完成所有工作。 淺克隆可以在這里為您提供幫助:
-- SQL
CREATE TABLE temp.staged_changes SHALLOW CLONE prod.events;
DELETE FROM temp.staged_changes WHERE event_id is null;
UPDATE temp.staged_changes SET change_date = current_date()
WHERE change_date is null;
...
-- Perform your verifications
對結果滿意后,您有兩種選擇。 如果未對源表進行任何更改,則可以用克隆替換源表。如果對源表進行了更改,則可以將更改合并到源表中。
-- If no changes have been made to the source
REPLACE TABLE prod.events CLONE temp.staged_changes;
-- If the source table has changed
MERGE INTO prod.events USING temp.staged_changes
ON events.event_id <=> staged_changes.event_id
WHEN MATCHED THEN UPDATE SET *;
-- Drop the staged table
DROP TABLE temp.staged_changes;
機器學習結果的可重復性
訓練出有效的 ML 模型是一個反復的過程。在調整模型不同部分的過程中,數據科學家需要根據固定的數據集來評估模型的準確性。
這是很難做到的,特別是在數據不斷被加載或更新的系統中。 在訓練和測試模型時需要一個數據快照。 此快照支持了 ML 模型的重復訓練和模型治理。
我們建議利用 Time Travel 在一個快照上運行多個實驗;在 Machine Learning Data Lineage With MLflow and Delta Lake 中可以看到一個實際的例子。
當您對結果感到滿意并希望將數據存檔以供以后檢索時(例如,下一個黑色星期五),可以使用深克隆來簡化歸檔過程。 MLflow 與 Delta Lake 的集成非常好,并且自動記錄功能(mlflow.spark.autolog()方法)將告訴您使用哪個數據表版本進行了一組實驗。
# Run your ML workloads using Python and then
DeltaTable.forName(spark, “feature_store”).cloneAtVersion(128, “feature_ store_bf2020”)
數據遷移
出于性能或管理方面的原因,可能需要將大量表移至新的專用存儲系統。原始表將不再接收新的更新,并且將在以后的某個時間點停用和刪除。深度克隆使海量表的復制更加健壯和可擴展。
-- SQL
CREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events;
ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;
由于借助深克隆,我們復制了流應用程序事務和 COPY INTO 事務,因此您可以從遷移后停止的確切位置繼續ETL應用程序!
資料共享
在一個組織中,來自不同部門的用戶通常都在尋找可用于豐富其分析或模型的數據集。您可能希望與組織中的其他用戶共享數據。 但不是建立復雜的管道將數據移動到另一個里,而是創建相關數據集的副本通常更加容易和經濟。這些副本以供用戶瀏覽和測試數據來確認其是否適合他們的需求而不影響您自己生產系統的數據。在這里深克隆再次起到關鍵作用。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;
數據存檔
出于監管或存檔的目的,表中的所有數據需要保留一定的年限,而活動表則將數據保留幾個月。如果您希望盡快更新數據,但又要求將數據保存幾年,那么將這些數據存儲在一個表中并進行 time travel 可能會變得非常昂貴。
在這種情況下,每天,每周,每月歸檔數據是一個更好的解決方案。深克隆的增量克隆功能將在這里為您提供真正的幫助。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;
請注意,與源表相比此表將具有獨立的歷史記錄,因此根據您的存檔頻率,源表和克隆表上的 time travel 查詢可能會返回不同的結果。
看起來真棒!有問題嗎?
這里只是重申上述一些陷阱,請注意以下幾點:
我該如何使用?
淺克隆和深克隆支持數據團隊在測試和管理其新型云數據湖和倉庫如何開展新功能。表克隆可以幫助您的團隊對其管道實施生產級別的測試,微調索引以實現最佳查詢性能,創建表副本以進行共享-所有這些都以最小的開銷和費用實現。如果您的組織需要這樣做,我們希望您能嘗試克隆表并提供反饋意見-我們期待聽到您將來的新用例和擴展。
Delta Lake 0.7.0 的發布與 Apache Spark 3.0 的發布相吻合,從而啟用了一組新功能,這些功能使用了 Delta Lake 的 SQL 功能進行了簡化。以下是一些關鍵功能。
在 Hive Metastore 中定義表支持 SQL DDL 命令
現在,您可以在 Hive Metastore 中定義 Delta 表,并在創建(或替換)表時在所有 SQL 操作中使用表名。
創建或替換表
-- Create table in the metastore
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION ‘/delta/events’
-- If a table with the same name already exists, the table is replaced with
the new configuration, else it is created
CREATE OR REPLACE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION ‘/delta/events’
顯式更改表架構
-- Alter table and schema
ALTER TABLE table_name ADD COLUMNS (
col_name data_type
[COMMENT col_comment]
[FIRST|AFTER colA_name],
...)
您還可以使用 Scala / Java / Python API:
支持 SQL 插入,刪除,更新和合并
通過 Delta Lake Tech Talks,最常見的問題之一是何時可以在 Spark SQL 中使用 DML 操作(如刪除,更新和合并)?不用再等了,這些操作現在已經可以在 SQL 中使用了! 以下是有關如何編寫刪除,更新和合并(使用 Spark SQL 進行插入,更新,刪除和重復數據刪除操作)的示例。
-- Using append mode, you can atomically add new data to an existing
Delta table
INSERT INTO events SELECT * FROM newEvents
-- To atomically replace all of the data in a table, you can use overwrite mode
INSERT OVERWRITE events SELECT * FROM newEvents
-- Delete events
DELETE FROM events WHERE date < ‘2017-01-01’
-- Update events
UPDATE events SET eventType = ‘click’ WHERE eventType = ‘click’
-- Upsert data to a target Delta
-- table using merge
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN UPDATE
SET events.data = updates.data
WHEN NOT MATCHED THEN INSERT (date, eventId, data)
VALUES (date, eventId, data)
值得注意的是,Delta Lake 中的合并操作比標準 ANSI SQL 語法支持更高級的語法。例如,合并支持
...
WHEN MATCHED AND events.shouldDelete THEN DELETE
WHEN MATCHED THEN UPDATE SET events.data = updates.data
WHEN MATCHED THEN SET *
WHEN NOT MATCHED THEN INSERT *
-- equivalent to updating/inserting with event.date = updates.date,
events.eventId = updates.eventId, event.data = updates.data
自動和增量式 Presto/Athena 清單生成
正如 Query Delta Lake Tables From Presto and Athena, Improved Operations Concurrency,andMergePerformance 文章中所述,Delta Lake 支持其他處理引擎通過 manifest 文件來讀取 Delta Lake。manifest 文件包含清單生成時的最新版本。如上一章所述,您將需要:
Delta Lake 0.7.0的新增功能是使用以下命令自動更新清單文件:
ALTER TABLE delta.`pathToDeltaTable`
SET TBLPROPERTIES(
delta.compatibility.symlinkFormatManifest.enabled=true
)
通過表屬性文件來配置表
通過使用 ALTER TABLE SET TBLPROPERTIES,您可以在表上設置表屬性,可以啟用,禁用或配置 Delta Lake 的許多功能,就像自動清單生成那樣。例如使用表屬性,您可以使用 delta.appendOnly=true 阻止 Delta 表中數據的刪除和更新。
您還可以通過以下屬性輕松控制 Delta Lake 表保留的歷史記錄:
從 Delta Lake 0.7.0 開始,您可以使用 ALTER TABLE SET TBLPROPERTIES 來配置這些屬性。
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(
delta.logRetentionDuration = “interval “
delta.deletedFileRetentionDuration = “interval “
)
在 Delta Lake 表中提交支持添加用戶定義的元數據
您可以指定自定義的字符串來作為元數據,通過 Delta Lake 表操作進行的提交,也可以使用DataFrameWriter選項userMetadata,或者 SparkSession 的配置spark.databricks.delta.commitInfo。 userMetadata。
在以下示例中,我們將根據每個用戶請求從數據湖中刪除一個用戶(1xsdf1)。為確保我們將用戶的請求與刪除相關聯,我們還將 DELETE 請求 ID 添加到了 userMetadata中。
SET spark.databricks.delta.commitInfo.userMetadata={
“GDPR”:”DELETE Request 1x891jb23”
};
DELETE FROM user_table WHERE user_id = ‘1xsdf1’
當查看用戶表(user_table)的歷史記錄操作時,可以輕松地在事務日志中標識關聯的刪除請求。
其他亮點
Delta Lake 0.7.0 版本的其他亮點包括:
在 AMA 期間,關于結構化流和使用 trigger.once 的問題又很多。
有關更多信息,一些解釋此概念的有用資源包括:
后續
您已經了解了 Delta Lake 及其特性,以及如何進行性能優化,本系列還包括其他內容:
譯者:張鵬(卓昇),阿里云計算平臺事業部技術專家
原文鏈接:https://developer.aliyun.com/article/784938?utm_content=g_1000280868
本文為阿里云原創內容,未經允許不得轉載。
抗擊疫情,我們守望相助!
嚴格防控,我們共克時艱!
西安,加油!
陜西新增新冠肺炎病例情況
2021年12月16日15時—12月17日15時,西安市新增本土確診病例3例,無癥狀感染者3例;均已轉運到定點醫療機構隔離診療。
6例病例活動軌跡復雜,涉及地鐵、公交車、出租車等交通工具;醫院、綜合商業體、餐館、學校等人員密集公共場所。
這里,特別提醒:
一是凡是與公布的確診病例軌跡有重合的,凡是去過中高風險區的人員,請務必立即主動與當地社區或疾控部門聯系,做好自我隔離,配合做好流調和核酸檢測等工作。
二是廣大居民群眾非必要不外出,非必要不前往人員密集場所,不參加密閉空間內的群體活動,不在密閉空間和通風不良場所滯留。在室外活動,也要注意保持適當的社交距離。
西安新增3個中風險地區
根據國務院應對新冠肺炎疫情聯防聯控機制關于科學劃分、精準管控等工作要求和省市專家綜合評估,經西安市新冠肺炎疫情防控指揮部研究決定:自2021年12月17日起,將西安市雁塔區小寨街道長安中路33號長安大學家屬院2區15號樓、13號樓3單元;雁塔區長延堡街道朱雀大街南段10號紫郡長安北區F6棟;西咸新區灃東新城上林街道奧林匹克花園E3棟,調整為中風險地區。西安市其他區域風險等級不變。
此前西安8個中風險地區:
1.曲江新區雁展路288號沿街69-10101至69-10107商鋪
2.雁塔區長延堡街道丈八東路2號家屬院
3.雁塔區小寨路街道長安中路33號長安大學長安路住宅區3區11號樓和2區11號樓
4. 經開區長大南路356號長安大學渭水校區住宅小區10號樓3單元
5.碑林區含光路23號九錦臺小區12號樓2單元
6.雁塔區小寨路街道長安中路33號長安大學長安路住宅區3區10號樓1單元
7.西安市碑林區張家村街道融信社區大學東路80號民房
8.西安市雁塔區長延堡街道朱雀大街南段10號紫郡長安北區F5棟2單元
致敬!我們的最美守護者!
□ 嚴控疫情,交大在行動!
△ 校醫院醫護工作者開展核酸檢測
△ 保衛處工作人員守護校園安全
□ 西安交通大學第一附屬醫院
□ 西安交通大學第二附屬醫院
交大一附院線上門診就醫須知
親愛的患者朋友:
感謝您對交大一附院的信賴、關心和支持!因疫情防控工作需要,醫院門診暫停3天(12月17-19日,周五至周日),為了滿足常見病及慢性病患者在門診停診期間的就醫需求,保障特殊時期醫療服務的延續性,現將我院智慧好醫院本周五至周日上午8:00-12:00,下午14:00-20:00線上門診服務具體操作方法及出診專家信息一覽表予以公布,請常見病及慢性病患者適時就診。
一、智慧好醫院線上就醫指南
(一)「智慧好醫院」App可提供網絡門診、方便門診、在線咨詢、病歷郵寄、送藥到家等服務。
(二)智慧好醫院線上就醫操作流程
1.網絡門診
溫馨提示:為了確保醫療質量和安全,智慧好醫院線上門診主要提供常見 病、慢性病的復診;檢查檢驗項目暫時無法開具,感謝您的理解和支持!
醫生可根據病情所需開具醫囑及藥品處方。
2.方便門診
可開具常用的、普通的、非限制性的藥品處方,填寫病情描述及既往病史,選擇目錄內的常規藥品提交審核,醫生審核完成后,在線繳納費用,藥品寄送到家。
3.在線咨詢
在線咨詢可隨時向醫生發起咨詢,咨詢用藥或尋求康復指導等,醫生接診后,可 溝通時限長達 24 小時。
二、智慧好醫院下載鏈接及服務熱線
(一)下載鏈接:掃描下方二維碼下載「智慧好醫院」App即可進行線上門診。
▲掃碼下載 App
▲掃碼在線咨詢
(二)服務熱線:
客服電話:400-004-5999
其他聯系方式:掃碼進群,在線咨詢
工作時間:
12月17-19日(周五-周日)
上午8:00-12:00,下午14:00-20:00
智慧好醫院線上門診專家一覽表見下圖
新冠疫情防控形勢依舊嚴峻,在此提醒廣大市民朋友牢固樹立「健康第一責任人」的理念,做好個人防護;保持社交距離,不扎堆,不聚集,如非必要,減少外出。
「出品 / 黨委宣傳部」
內容來源 / 西安市疫情防控指揮部辦公室
西安發布 交大校醫院、保衛處
交大一附院 交大二附院
海報 / 西安發布
責任編輯 / 交小童