# Importerer biblioteker
import pyspark
from delta import *
Introduksjon til Delta Lake
Delta Lake er et databaselag som kan legges over parquet-filer i bøtter. Det kan gi oss mye av den funksjonaliteten vi har vært vant til i relasjonelle databaser og datavarehus. I denne notebooken vises hvordan man kan ta det i bruk på Dapla.
Oppsett
Hvis du logger deg inn på https://jupyter.dapla.ssb.no/ kan du ta i bruk Delta Lake via Spark. Men for å gjøre det må du installere delta-spark. For å installere pakken må du jobbe i et ssb-project. I tillegg må du installere delta-spark
-versjon som er kompatibel med Spark-versjonen som er installert på Dapla. Gjør derfor følgende:
- Opprett et ssb-project med kommandoen:
ssb-project create test-delta-lake
- I terminalen skriver du følgende for å sjekke Spark-versjonen som er installert:
spark-shell --version
- Sjekk hvilken delta-spark-versjon som er kompatibel med din Spark-versjon og installer den med kommandoen[1]:
poetry add delta-spark@2.3
- Åpne en ny notebook og velg kernel
test-delta-lake
.
Nå har du satt opp et virtuelt miljø med en PySpark-kernel som kjører en maskin (såkalt Pyspark local kernel), der du har installert delta-spark. Vi kan nå importere de bibliotekene vi trenger og sette igang en Spark-session.
[1] I eksempelet er det Spark V3.3.1 som er installert og jeg installerer derfor delta-spark v2.3
Deretter initialiserer jeg en Spark-session. %%capture_output
er med for å forhindre at det ikke blir printet ut sensitiv informasjon i en notebook som skal inn i Dapla-manualen.
%%capture captured_output
%run /usr/local/share/jupyter/kernels/pyspark_local/init.py
Genererer noe data
# Genererer noe data med Spark
= spark.range(10, 15)
data data.show()
+---+
| id|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
+---+
Skriver ut en Delta Lake Table
La oss skrive ut Spark DataFrame som en Delta Lake Table og bli kjent med strukturen i objektet:
%%time
format("delta").mode("overwrite").save(
data.write."gs://ssb-dapla-felles-data-produkt-prod/temp4"
)
CPU times: user 3.38 ms, sys: 1.7 ms, total: 5.08 ms
Wall time: 5.59 s
Vi kan deretter printe ut hva som ble opprettet når vi skrev ut en Delta Lake Table:
from dapla import FileClient
= FileClient.get_gcs_file_system()
fs
"gs://ssb-dapla-felles-data-produkt-prod/temp4/**") fs.glob(
['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet']
Delta Lake Tabellstruktur
Mappenstrukturen du ser over er typisk for en Delta Lake-tabell. La oss bryte ned komponentene:
_delta_log/
:- Dette er transaksjonsloggmappen for Delta Lake-tabellen. Den inneholder en serie med JSON-filer som registrerer alle transaksjoner gjort på tabellen.
- Transaksjonsloggen er avgjørende for Delta Lakes ACID-transaksjonsegenskaper, som muliggjør funksjoner som atomiske forpliktelser, tilbakerullinger og tid-reise-forespørsler.
_delta_log/00000000000000000000.json
:- Dette er en JSON-fil innenfor transaksjonsloggkatalogen. Hver JSON-fil i denne mappen tilsvarer en transaksjon (eller en batch av transaksjoner) gjort på tabellen.
- Filnavnet er null-fylt og representerer transaksjonsversjonen. I dette tilfellet representerer
00000000000000000000.json
den aller første versjonen (versjon 0) av tabellen. Etter hvert som flere transaksjoner blir gjort, vil nye filer bli lagt til med økende versjonsnumre.
part-00000-450715bd-6b0c-4827-bb8a-b0265505ca72-c000.snappy.parquet
ogpart-00001-977ed55f-5ce5-469f-8a1d-9eafb143c215-c000.snappy.parquet
:- Dette er de faktiske datafilene til Delta Lake-tabellen, lagret i Parquet-format.
.snappy.parquet
-utvidelsen indikerer at dataene er komprimert ved hjelp av Snappy-komprimeringsalgoritmen.- Filnavnene er typiske for Sparks navngivningskonvensjon for datadeler. Prefiksene
part-00000
ogpart-00001
indikerer partisjonsnumre. De lange UUID-lignende strengene er unike identifikatorer for datafilene. Suffiksetc000
indikerer Spark-oppgaven som skrev filen.
Mappen representerer en Delta Lake-tabell med data lagret i Parquet-format og en transaksjonslogg som sporer endringer i tabellen. Tilstedeværelsen av _delta_log
-mappen og innholdet er det som skiller en Delta Lake-tabell fra et vanlig Parquet-datasett.
Lese inn tabell
= DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 13|
| 14|
| 10|
| 11|
+---+
Modifisere datasettet
La oss modifisere datasettet ved å bytte ut verdien 13
med 15
i id
-kolonnen:
from pyspark.sql.functions import col, lit
# Update the cell with value 13 to 15
=(col("id") == 13), set={"id": lit(15)}) deltaTable.update(condition
Et viktig poeng å få med seg her er at vi nå oppdaterte Delta Lake Table objektet både i minnet og på disk. La oss bevise det ved å lese inn fra disk:
= DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable2 deltaTable2.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+
Og deretter ved å printe ut det opprinnelige objektet vi leste inn:
deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+
Append data
La oss legge til verdiene 20
og 21
til datasettet. Først lager vi en Spark dataframe:
= [(20,), (21,)]
new_data = spark.createDataFrame(new_data, ["id"])
new_df new_df.show()
+---+
| id|
+---+
| 20|
| 21|
+---+
Deretter kan vi appendere det til vår opprinnelige dataframe:
format("delta").mode("append").save(
new_df.write."gs://ssb-dapla-felles-data-produkt-prod/temp4"
)
deltaTable.toDF().show()
+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
| 21|
| 20|
+---+
Historien og metadata til filen
Nå som vi har gjort noen endringer kan vi se på historien til filen:
# Lister ut filene i bøtta
= FileClient.get_gcs_file_system()
fs "gs://ssb-dapla-felles-data-produkt-prod/temp4/**") fs.glob(
['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000001.json',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000002.json',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-d04d0ca2-8e8b-42e9-a8a3-0fed9a0e4e41-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet']
Vi ser av transaksjonsloggen i _delta_log
-mappen at det nå har vært 3 transaksjoner på datasettet. vi ser også av navnene på parquet-filene, part-00000
og part-00001
, at det finnes to versjoner av filen. Hvis vi ønsker å bli bedre kjent med hva slags informasjon som blir lagret fra transaksjonene, så kan vi printe ut den siste transaksjonen som heter 00000000000000000002.json
:
import json
from dapla import FileClient
# Kobler oss på bøttene
= FileClient.get_gcs_file_system()
fs
# Filsti
= "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000002.json"
path
with fs.open(path, "r") as f:
for line in f:
= json.loads(line)
data = json.dumps(data, indent=4)
pretty_content print(pretty_content)
print("-" * 50) # Print separator for clarity
{
"commitInfo": {
"timestamp": 1696942544879,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[]"
},
"readVersion": 1,
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "2",
"numOutputRows": "2",
"numOutputBytes": "956"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
"txnId": "a3dcd582-8362-4fc2-a8ce-57613d2eb2b8"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet",
"partitionValues": {},
"size": 478,
"modificationTime": 1696942544755,
"dataChange": true,
"stats": "{\"numRecords\":1,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":20},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet",
"partitionValues": {},
"size": 478,
"modificationTime": 1696942544833,
"dataChange": true,
"stats": "{\"numRecords\":1,\"minValues\":{\"id\":21},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
Her ser vi at vi får masse informasjon om endringen, både metadata om transaksjonen i commitInfo
, og informasjon om nye data-filer i add
-delen. Vi ser at det er en veldig rik beskrivelse av endringer, men det kan være vanskeig å lese innholdet direkte. La oss heller bruke history()
-funksjonen som kommer med delta
:
= deltaTable.history()
history history.show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
| 2|2023-10-10 12:55:...| null| null| WRITE|{mode -> Append, ...|null| null| null| 1| Serializable| true|{numFiles -> 2, n...| null|Apache-Spark/3.3....|
| 1|2023-10-10 12:55:...| null| null| UPDATE|{predicate -> (id...|null| null| null| 0| Serializable| false|{numRemovedFiles ...| null|Apache-Spark/3.3....|
| 0|2023-10-10 12:55:...| null| null| WRITE|{mode -> Overwrit...|null| null| null| null| Serializable| false|{numFiles -> 2, n...| null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
Siden det blit trangt i tabellen over så kan vi velge hvilke variabler vi ønsker å se på:
# Oversikt over alle kolonner som finnes i historien
history.columns
['version',
'timestamp',
'userId',
'userName',
'operation',
'operationParameters',
'job',
'notebook',
'clusterId',
'readVersion',
'isolationLevel',
'isBlindAppend',
'operationMetrics',
'userMetadata',
'engineInfo']
# Velger de kolonnene jeg er interessert i
= deltaTable.history().select(
selected_history "version", "timestamp", "operation", "operationParameters"
)
# Display the selected columns
=50) selected_history.show(truncate
+-------+-----------------------+---------+--------------------------------------+
|version| timestamp|operation| operationParameters|
+-------+-----------------------+---------+--------------------------------------+
| 2|2023-10-10 12:55:45.014| WRITE| {mode -> Append, partitionBy -> []}|
| 1|2023-10-10 12:55:37.054| UPDATE| {predicate -> (id#4452L = 13)}|
| 0|2023-10-10 12:55:29.048| WRITE|{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+
Egendefinert metadata
Delta Lake støtter også egendefinert metadata. Det kan for eksempel være nyttig hvis man ønsker å bruke Delta Lake som en backend for et GUI som lar brukeren oppdatere en tabell fra GUI-et. Da ønsker man typisk å lagre hvem som gjorde endringer og når det ble gjort. La oss legge på noe slik metadata:
# Leser inn filen
= spark.read.format("delta").load("gs://ssb-dapla-felles-data-produkt-prod/temp4") df
# Lagrer egendefinert metadata i en json-fil
import json
= {
metadata "comment": "Kontaktet oppgavegiver og kranglet!",
"manueltEditert": "True",
"maskineltEditert": "False",
} metadata
{'comment': 'Kontaktet oppgavegiver og kranglet!',
'manueltEditert': 'True',
'maskineltEditert': 'False'}
Vi kan deretter appende dette til den siste versjonen av fila.
(format("delta")
df.write."append")
.mode("userMetadata", json.dumps(metadata)) # Serialize metadata to a string
.option("gs://ssb-dapla-felles-data-produkt-prod/temp4")
.save( )
# Laster inn tabellen
= DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable
# Henter ut historien
= deltaTable.history() history_df
# Show the operation details, including metadata
history_df.select("version", "timestamp", "operation", "operationParameters", "userMetadata"
=10) ).show(truncate
+-------+----------+---------+-------------------+------------+
|version| timestamp|operation|operationParameters|userMetadata|
+-------+----------+---------+-------------------+------------+
| 3|2023-10...| WRITE| {mode -...| {"comme...|
| 2|2023-10...| WRITE| {mode -...| null|
| 1|2023-10...| UPDATE| {predic...| null|
| 0|2023-10...| WRITE| {mode -...| null|
+-------+----------+---------+-------------------+------------+
"version", "userMetadata").show(truncate=50) history_df.select(
+-------+--------------------------------------------------+
|version| userMetadata|
+-------+--------------------------------------------------+
| 3|{"comment": "Kontaktet oppgavegiver og kranglet...|
| 2| null|
| 1| null|
| 0| null|
+-------+--------------------------------------------------+
Vi ser at vi la til vår egen metadata i versjon 3 av fila. Vi kan printe ut den rå transaksjonsloggen som tidligere, men nå er vi på transaksjon 3 00000000000000000003.json
:
from dapla import FileClient
# Kobler oss på bøttene
= FileClient.get_gcs_file_system()
fs
# Filsti
= "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000003.json"
path
with fs.open(path, "r") as f:
for line in f:
= json.loads(line)
data = json.dumps(data, indent=4)
pretty_content print(pretty_content)
print("-" * 50) # Print separator for clarity
{
"commitInfo": {
"timestamp": 1696942553907,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[]"
},
"readVersion": 2,
"isolationLevel": "Serializable",
"isBlindAppend": false,
"operationMetrics": {
"numFiles": "2",
"numOutputRows": "7",
"numOutputBytes": "989"
},
"userMetadata": "{\"comment\": \"Kontaktet oppgavegiver og kranglet!\", \"manueltEditert\": \"True\", \"maskineltEditert\": \"False\"}",
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
"txnId": "e7de92bf-b0f9-4341-8bbb-9b382f2f3eb6"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00000-96369f3d-fe4a-4365-a0df-00c813027399-c000.snappy.parquet",
"partitionValues": {},
"size": 503,
"modificationTime": 1696942553856,
"dataChange": true,
"stats": "{\"numRecords\":5,\"minValues\":{\"id\":10},\"maxValues\":{\"id\":15},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00001-0f1bc8e6-093b-49a9-ad0b-78d5a148cfb6-c000.snappy.parquet",
"partitionValues": {},
"size": 486,
"modificationTime": 1696942553853,
"dataChange": true,
"stats": "{\"numRecords\":2,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
Vi er da at metadataene ligger som forventet i metadata-delen.