Introduksjon til Delta Lake

Delta Lake er et databaselag som kan legges over parquet-filer i bøtter. Det kan gi oss mye av den funksjonaliteten vi har vært vant til i relasjonelle databaser og datavarehus. I denne notebooken vises hvordan man kan ta det i bruk på Dapla.

Oppsett

Hvis du logger deg inn på https://jupyter.dapla.ssb.no/ kan du ta i bruk Delta Lake via Spark. Men for å gjøre det må du installere delta-spark. For å installere pakken må du jobbe i et ssb-project. I tillegg må du installere delta-spark-versjon som er kompatibel med Spark-versjonen som er installert på Dapla. Gjør derfor følgende:

  1. Opprett et ssb-project med kommandoen:
    ssb-project create test-delta-lake
  2. I terminalen skriver du følgende for å sjekke Spark-versjonen som er installert:
    spark-shell --version
  3. Sjekk hvilken delta-spark-versjon som er kompatibel med din Spark-versjon og installer den med kommandoen[1]:
    poetry add delta-spark@2.3
  4. Åpne en ny notebook og velg kernel test-delta-lake.

Nå har du satt opp et virtuelt miljø med en PySpark-kernel som kjører en maskin (såkalt Pyspark local kernel), der du har installert delta-spark. Vi kan nå importere de bibliotekene vi trenger og sette igang en Spark-session.

[1] I eksempelet er det Spark V3.3.1 som er installert og jeg installerer derfor delta-spark v2.3

# Importerer biblioteker
import pyspark
from delta import *

Deretter initialiserer jeg en Spark-session. %%capture_output er med for å forhindre at det ikke blir printet ut sensitiv informasjon i en notebook som skal inn i Dapla-manualen.

%%capture captured_output
%run /usr/local/share/jupyter/kernels/pyspark_local/init.py

Genererer noe data

# Genererer noe data med Spark
data = spark.range(10, 15)
data.show()
+---+
| id|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
+---+

Skriver ut en Delta Lake Table

La oss skrive ut Spark DataFrame som en Delta Lake Table og bli kjent med strukturen i objektet:

%%time
data.write.format("delta").mode("overwrite").save(
    "gs://ssb-dapla-felles-data-produkt-prod/temp4"
)
CPU times: user 3.38 ms, sys: 1.7 ms, total: 5.08 ms
Wall time: 5.59 s

Vi kan deretter printe ut hva som ble opprettet når vi skrev ut en Delta Lake Table:

from dapla import FileClient

fs = FileClient.get_gcs_file_system()

fs.glob("gs://ssb-dapla-felles-data-produkt-prod/temp4/**")
['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet']

Delta Lake Tabellstruktur

Mappenstrukturen du ser over er typisk for en Delta Lake-tabell. La oss bryte ned komponentene:

  1. _delta_log/:
    • Dette er transaksjonsloggmappen for Delta Lake-tabellen. Den inneholder en serie med JSON-filer som registrerer alle transaksjoner gjort på tabellen.
    • Transaksjonsloggen er avgjørende for Delta Lakes ACID-transaksjonsegenskaper, som muliggjør funksjoner som atomiske forpliktelser, tilbakerullinger og tid-reise-forespørsler.
  2. _delta_log/00000000000000000000.json:
    • Dette er en JSON-fil innenfor transaksjonsloggkatalogen. Hver JSON-fil i denne mappen tilsvarer en transaksjon (eller en batch av transaksjoner) gjort på tabellen.
    • Filnavnet er null-fylt og representerer transaksjonsversjonen. I dette tilfellet representerer 00000000000000000000.json den aller første versjonen (versjon 0) av tabellen. Etter hvert som flere transaksjoner blir gjort, vil nye filer bli lagt til med økende versjonsnumre.
  3. part-00000-450715bd-6b0c-4827-bb8a-b0265505ca72-c000.snappy.parquet og part-00001-977ed55f-5ce5-469f-8a1d-9eafb143c215-c000.snappy.parquet:
    • Dette er de faktiske datafilene til Delta Lake-tabellen, lagret i Parquet-format.
    • .snappy.parquet-utvidelsen indikerer at dataene er komprimert ved hjelp av Snappy-komprimeringsalgoritmen.
    • Filnavnene er typiske for Sparks navngivningskonvensjon for datadeler. Prefiksene part-00000 og part-00001 indikerer partisjonsnumre. De lange UUID-lignende strengene er unike identifikatorer for datafilene. Suffikset c000 indikerer Spark-oppgaven som skrev filen.

Mappen representerer en Delta Lake-tabell med data lagret i Parquet-format og en transaksjonslogg som sporer endringer i tabellen. Tilstedeværelsen av _delta_log-mappen og innholdet er det som skiller en Delta Lake-tabell fra et vanlig Parquet-datasett.

Lese inn tabell

deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 13|
| 14|
| 10|
| 11|
+---+

Modifisere datasettet

La oss modifisere datasettet ved å bytte ut verdien 13 med 15 i id-kolonnen:

from pyspark.sql.functions import col, lit

# Update the cell with value 13 to 15
deltaTable.update(condition=(col("id") == 13), set={"id": lit(15)})

Et viktig poeng å få med seg her er at vi nå oppdaterte Delta Lake Table objektet både i minnet og på disk. La oss bevise det ved å lese inn fra disk:

deltaTable2 = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable2.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+

Og deretter ved å printe ut det opprinnelige objektet vi leste inn:

deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+

Append data

La oss legge til verdiene 20 og 21 til datasettet. Først lager vi en Spark dataframe:

new_data = [(20,), (21,)]
new_df = spark.createDataFrame(new_data, ["id"])
new_df.show()
+---+
| id|
+---+
| 20|
| 21|
+---+

Deretter kan vi appendere det til vår opprinnelige dataframe:

new_df.write.format("delta").mode("append").save(
    "gs://ssb-dapla-felles-data-produkt-prod/temp4"
)
deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
| 21|
| 20|
+---+

Historien og metadata til filen

Nå som vi har gjort noen endringer kan vi se på historien til filen:

# Lister ut filene i bøtta
fs = FileClient.get_gcs_file_system()
fs.glob("gs://ssb-dapla-felles-data-produkt-prod/temp4/**")
['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000001.json',
 'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000002.json',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00000-d04d0ca2-8e8b-42e9-a8a3-0fed9a0e4e41-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp4/part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet']

Vi ser av transaksjonsloggen i _delta_log-mappen at det nå har vært 3 transaksjoner på datasettet. vi ser også av navnene på parquet-filene, part-00000 og part-00001, at det finnes to versjoner av filen. Hvis vi ønsker å bli bedre kjent med hva slags informasjon som blir lagret fra transaksjonene, så kan vi printe ut den siste transaksjonen som heter 00000000000000000002.json:

import json

from dapla import FileClient

# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()

# Filsti
path = "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000002.json"

with fs.open(path, "r") as f:
    for line in f:
        data = json.loads(line)
        pretty_content = json.dumps(data, indent=4)
        print(pretty_content)
        print("-" * 50)  # Print separator for clarity
{
    "commitInfo": {
        "timestamp": 1696942544879,
        "operation": "WRITE",
        "operationParameters": {
            "mode": "Append",
            "partitionBy": "[]"
        },
        "readVersion": 1,
        "isolationLevel": "Serializable",
        "isBlindAppend": true,
        "operationMetrics": {
            "numFiles": "2",
            "numOutputRows": "2",
            "numOutputBytes": "956"
        },
        "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
        "txnId": "a3dcd582-8362-4fc2-a8ce-57613d2eb2b8"
    }
}
--------------------------------------------------
{
    "add": {
        "path": "part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet",
        "partitionValues": {},
        "size": 478,
        "modificationTime": 1696942544755,
        "dataChange": true,
        "stats": "{\"numRecords\":1,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":20},\"nullCount\":{\"id\":0}}"
    }
}
--------------------------------------------------
{
    "add": {
        "path": "part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet",
        "partitionValues": {},
        "size": 478,
        "modificationTime": 1696942544833,
        "dataChange": true,
        "stats": "{\"numRecords\":1,\"minValues\":{\"id\":21},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
    }
}
--------------------------------------------------

Her ser vi at vi får masse informasjon om endringen, både metadata om transaksjonen i commitInfo, og informasjon om nye data-filer i add-delen. Vi ser at det er en veldig rik beskrivelse av endringer, men det kan være vanskeig å lese innholdet direkte. La oss heller bruke history()-funksjonen som kommer med delta:

history = deltaTable.history()
history.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      2|2023-10-10 12:55:...|  null|    null|    WRITE|{mode -> Append, ...|null|    null|     null|          1|  Serializable|         true|{numFiles -> 2, n...|        null|Apache-Spark/3.3....|
|      1|2023-10-10 12:55:...|  null|    null|   UPDATE|{predicate -> (id...|null|    null|     null|          0|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.3....|
|      0|2023-10-10 12:55:...|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|       null|  Serializable|        false|{numFiles -> 2, n...|        null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+

Siden det blit trangt i tabellen over så kan vi velge hvilke variabler vi ønsker å se på:

# Oversikt over alle kolonner som finnes i historien
history.columns
['version',
 'timestamp',
 'userId',
 'userName',
 'operation',
 'operationParameters',
 'job',
 'notebook',
 'clusterId',
 'readVersion',
 'isolationLevel',
 'isBlindAppend',
 'operationMetrics',
 'userMetadata',
 'engineInfo']
# Velger de kolonnene jeg er interessert i
selected_history = deltaTable.history().select(
    "version", "timestamp", "operation", "operationParameters"
)
# Display the selected columns
selected_history.show(truncate=50)
+-------+-----------------------+---------+--------------------------------------+
|version|              timestamp|operation|                   operationParameters|
+-------+-----------------------+---------+--------------------------------------+
|      2|2023-10-10 12:55:45.014|    WRITE|   {mode -> Append, partitionBy -> []}|
|      1|2023-10-10 12:55:37.054|   UPDATE|        {predicate -> (id#4452L = 13)}|
|      0|2023-10-10 12:55:29.048|    WRITE|{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+

Egendefinert metadata

Delta Lake støtter også egendefinert metadata. Det kan for eksempel være nyttig hvis man ønsker å bruke Delta Lake som en backend for et GUI som lar brukeren oppdatere en tabell fra GUI-et. Da ønsker man typisk å lagre hvem som gjorde endringer og når det ble gjort. La oss legge på noe slik metadata:

# Leser inn filen
df = spark.read.format("delta").load("gs://ssb-dapla-felles-data-produkt-prod/temp4")
# Lagrer egendefinert metadata i en json-fil
import json

metadata = {
    "comment": "Kontaktet oppgavegiver og kranglet!",
    "manueltEditert": "True",
    "maskineltEditert": "False",
}
metadata
{'comment': 'Kontaktet oppgavegiver og kranglet!',
 'manueltEditert': 'True',
 'maskineltEditert': 'False'}

Vi kan deretter appende dette til den siste versjonen av fila.

(
    df.write.format("delta")
    .mode("append")
    .option("userMetadata", json.dumps(metadata))  # Serialize metadata to a string
    .save("gs://ssb-dapla-felles-data-produkt-prod/temp4")
)
# Laster inn tabellen
deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")

# Henter ut historien
history_df = deltaTable.history()
# Show the operation details, including metadata
history_df.select(
    "version", "timestamp", "operation", "operationParameters", "userMetadata"
).show(truncate=10)
+-------+----------+---------+-------------------+------------+
|version| timestamp|operation|operationParameters|userMetadata|
+-------+----------+---------+-------------------+------------+
|      3|2023-10...|    WRITE|         {mode -...|  {"comme...|
|      2|2023-10...|    WRITE|         {mode -...|        null|
|      1|2023-10...|   UPDATE|         {predic...|        null|
|      0|2023-10...|    WRITE|         {mode -...|        null|
+-------+----------+---------+-------------------+------------+
history_df.select("version", "userMetadata").show(truncate=50)
+-------+--------------------------------------------------+
|version|                                      userMetadata|
+-------+--------------------------------------------------+
|      3|{"comment": "Kontaktet oppgavegiver og kranglet...|
|      2|                                              null|
|      1|                                              null|
|      0|                                              null|
+-------+--------------------------------------------------+

Vi ser at vi la til vår egen metadata i versjon 3 av fila. Vi kan printe ut den rå transaksjonsloggen som tidligere, men nå er vi på transaksjon 3 00000000000000000003.json:

from dapla import FileClient

# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()

# Filsti
path = "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000003.json"

with fs.open(path, "r") as f:
    for line in f:
        data = json.loads(line)
        pretty_content = json.dumps(data, indent=4)
        print(pretty_content)
        print("-" * 50)  # Print separator for clarity
{
    "commitInfo": {
        "timestamp": 1696942553907,
        "operation": "WRITE",
        "operationParameters": {
            "mode": "Append",
            "partitionBy": "[]"
        },
        "readVersion": 2,
        "isolationLevel": "Serializable",
        "isBlindAppend": false,
        "operationMetrics": {
            "numFiles": "2",
            "numOutputRows": "7",
            "numOutputBytes": "989"
        },
        "userMetadata": "{\"comment\": \"Kontaktet oppgavegiver og kranglet!\", \"manueltEditert\": \"True\", \"maskineltEditert\": \"False\"}",
        "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
        "txnId": "e7de92bf-b0f9-4341-8bbb-9b382f2f3eb6"
    }
}
--------------------------------------------------
{
    "add": {
        "path": "part-00000-96369f3d-fe4a-4365-a0df-00c813027399-c000.snappy.parquet",
        "partitionValues": {},
        "size": 503,
        "modificationTime": 1696942553856,
        "dataChange": true,
        "stats": "{\"numRecords\":5,\"minValues\":{\"id\":10},\"maxValues\":{\"id\":15},\"nullCount\":{\"id\":0}}"
    }
}
--------------------------------------------------
{
    "add": {
        "path": "part-00001-0f1bc8e6-093b-49a9-ad0b-78d5a148cfb6-c000.snappy.parquet",
        "partitionValues": {},
        "size": 486,
        "modificationTime": 1696942553853,
        "dataChange": true,
        "stats": "{\"numRecords\":2,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
    }
}
--------------------------------------------------

Vi er da at metadataene ligger som forventet i metadata-delen.