# Importerer biblioteker
import pyspark
from delta import *Introduksjon til Delta Lake
Delta Lake er et databaselag som kan legges over parquet-filer i bøtter. Det kan gi oss mye av den funksjonaliteten vi har vært vant til i relasjonelle databaser og datavarehus. I denne notebooken vises hvordan man kan ta det i bruk på Dapla.
Oppsett
Hvis du logger deg inn på https://jupyter.dapla.ssb.no/ kan du ta i bruk Delta Lake via Spark. Men for å gjøre det må du installere delta-spark. For å installere pakken må du jobbe i et ssb-project. I tillegg må du installere delta-spark-versjon som er kompatibel med Spark-versjonen som er installert på Dapla. Gjør derfor følgende:
- Opprett et ssb-project med kommandoen:
ssb-project create test-delta-lake - I terminalen skriver du følgende for å sjekke Spark-versjonen som er installert:
spark-shell --version - Sjekk hvilken delta-spark-versjon som er kompatibel med din Spark-versjon og installer den med kommandoen[1]:
poetry add delta-spark@2.3 - Åpne en ny notebook og velg kernel
test-delta-lake.
Nå har du satt opp et virtuelt miljø med en PySpark-kernel som kjører en maskin (såkalt Pyspark local kernel), der du har installert delta-spark. Vi kan nå importere de bibliotekene vi trenger og sette igang en Spark-session.
[1] I eksempelet er det Spark V3.3.1 som er installert og jeg installerer derfor delta-spark v2.3
Deretter initialiserer jeg en Spark-session. %%capture_output er med for å forhindre at det ikke blir printet ut sensitiv informasjon i en notebook som skal inn i Dapla-manualen.
%%capture captured_output
%run /usr/local/share/jupyter/kernels/pyspark_local/init.pyGenererer noe data
# Genererer noe data med Spark
data = spark.range(10, 15)
data.show()+---+
| id|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
+---+
Skriver ut en Delta Lake Table
La oss skrive ut Spark DataFrame som en Delta Lake Table og bli kjent med strukturen i objektet:
%%time
data.write.format("delta").mode("overwrite").save(
"gs://ssb-dapla-felles-data-produkt-prod/temp4"
)CPU times: user 3.38 ms, sys: 1.7 ms, total: 5.08 ms
Wall time: 5.59 s
Vi kan deretter printe ut hva som ble opprettet når vi skrev ut en Delta Lake Table:
from dapla import FileClient
fs = FileClient.get_gcs_file_system()
fs.glob("gs://ssb-dapla-felles-data-produkt-prod/temp4/**")['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet']
Delta Lake Tabellstruktur
Mappenstrukturen du ser over er typisk for en Delta Lake-tabell. La oss bryte ned komponentene:
_delta_log/:- Dette er transaksjonsloggmappen for Delta Lake-tabellen. Den inneholder en serie med JSON-filer som registrerer alle transaksjoner gjort på tabellen.
- Transaksjonsloggen er avgjørende for Delta Lakes ACID-transaksjonsegenskaper, som muliggjør funksjoner som atomiske forpliktelser, tilbakerullinger og tid-reise-forespørsler.
_delta_log/00000000000000000000.json:- Dette er en JSON-fil innenfor transaksjonsloggkatalogen. Hver JSON-fil i denne mappen tilsvarer en transaksjon (eller en batch av transaksjoner) gjort på tabellen.
- Filnavnet er null-fylt og representerer transaksjonsversjonen. I dette tilfellet representerer
00000000000000000000.jsonden aller første versjonen (versjon 0) av tabellen. Etter hvert som flere transaksjoner blir gjort, vil nye filer bli lagt til med økende versjonsnumre.
part-00000-450715bd-6b0c-4827-bb8a-b0265505ca72-c000.snappy.parquetogpart-00001-977ed55f-5ce5-469f-8a1d-9eafb143c215-c000.snappy.parquet:- Dette er de faktiske datafilene til Delta Lake-tabellen, lagret i Parquet-format.
.snappy.parquet-utvidelsen indikerer at dataene er komprimert ved hjelp av Snappy-komprimeringsalgoritmen.- Filnavnene er typiske for Sparks navngivningskonvensjon for datadeler. Prefiksene
part-00000ogpart-00001indikerer partisjonsnumre. De lange UUID-lignende strengene er unike identifikatorer for datafilene. Suffiksetc000indikerer Spark-oppgaven som skrev filen.
Mappen representerer en Delta Lake-tabell med data lagret i Parquet-format og en transaksjonslogg som sporer endringer i tabellen. Tilstedeværelsen av _delta_log-mappen og innholdet er det som skiller en Delta Lake-tabell fra et vanlig Parquet-datasett.
Lese inn tabell
deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable.toDF().show()+---+
| id|
+---+
| 12|
| 13|
| 14|
| 10|
| 11|
+---+
Modifisere datasettet
La oss modifisere datasettet ved å bytte ut verdien 13 med 15 i id-kolonnen:
from pyspark.sql.functions import col, lit
# Update the cell with value 13 to 15
deltaTable.update(condition=(col("id") == 13), set={"id": lit(15)})Et viktig poeng å få med seg her er at vi nå oppdaterte Delta Lake Table objektet både i minnet og på disk. La oss bevise det ved å lese inn fra disk:
deltaTable2 = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
deltaTable2.toDF().show()+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+
Og deretter ved å printe ut det opprinnelige objektet vi leste inn:
deltaTable.toDF().show()+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
+---+
Append data
La oss legge til verdiene 20 og 21 til datasettet. Først lager vi en Spark dataframe:
new_data = [(20,), (21,)]
new_df = spark.createDataFrame(new_data, ["id"])
new_df.show()+---+
| id|
+---+
| 20|
| 21|
+---+
Deretter kan vi appendere det til vår opprinnelige dataframe:
new_df.write.format("delta").mode("append").save(
"gs://ssb-dapla-felles-data-produkt-prod/temp4"
)deltaTable.toDF().show()+---+
| id|
+---+
| 12|
| 15|
| 14|
| 10|
| 11|
| 21|
| 20|
+---+
Historien og metadata til filen
Nå som vi har gjort noen endringer kan vi se på historien til filen:
# Lister ut filene i bøtta
fs = FileClient.get_gcs_file_system()
fs.glob("gs://ssb-dapla-felles-data-produkt-prod/temp4/**")['ssb-dapla-felles-data-delt-prod/temp4/_delta_log',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000000.json',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000001.json',
'ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000002.json',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-9b3b81a9-2771-4fb4-9f0f-659fd160d643-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00000-d04d0ca2-8e8b-42e9-a8a3-0fed9a0e4e41-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-0f2f8ba5-3161-41e8-b5d1-2084128a5bed-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp4/part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet']
Vi ser av transaksjonsloggen i _delta_log-mappen at det nå har vært 3 transaksjoner på datasettet. vi ser også av navnene på parquet-filene, part-00000 og part-00001, at det finnes to versjoner av filen. Hvis vi ønsker å bli bedre kjent med hva slags informasjon som blir lagret fra transaksjonene, så kan vi printe ut den siste transaksjonen som heter 00000000000000000002.json:
import json
from dapla import FileClient
# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()
# Filsti
path = "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000002.json"
with fs.open(path, "r") as f:
for line in f:
data = json.loads(line)
pretty_content = json.dumps(data, indent=4)
print(pretty_content)
print("-" * 50) # Print separator for clarity{
"commitInfo": {
"timestamp": 1696942544879,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[]"
},
"readVersion": 1,
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {
"numFiles": "2",
"numOutputRows": "2",
"numOutputBytes": "956"
},
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
"txnId": "a3dcd582-8362-4fc2-a8ce-57613d2eb2b8"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00000-73e5052f-1b82-48da-ab37-2cbc01bb46c1-c000.snappy.parquet",
"partitionValues": {},
"size": 478,
"modificationTime": 1696942544755,
"dataChange": true,
"stats": "{\"numRecords\":1,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":20},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00001-30d707e4-dd9a-4bfd-a4c7-7fbb1933e9ae-c000.snappy.parquet",
"partitionValues": {},
"size": 478,
"modificationTime": 1696942544833,
"dataChange": true,
"stats": "{\"numRecords\":1,\"minValues\":{\"id\":21},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
Her ser vi at vi får masse informasjon om endringen, både metadata om transaksjonen i commitInfo, og informasjon om nye data-filer i add-delen. Vi ser at det er en veldig rik beskrivelse av endringer, men det kan være vanskeig å lese innholdet direkte. La oss heller bruke history()-funksjonen som kommer med delta:
history = deltaTable.history()
history.show()+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
| 2|2023-10-10 12:55:...| null| null| WRITE|{mode -> Append, ...|null| null| null| 1| Serializable| true|{numFiles -> 2, n...| null|Apache-Spark/3.3....|
| 1|2023-10-10 12:55:...| null| null| UPDATE|{predicate -> (id...|null| null| null| 0| Serializable| false|{numRemovedFiles ...| null|Apache-Spark/3.3....|
| 0|2023-10-10 12:55:...| null| null| WRITE|{mode -> Overwrit...|null| null| null| null| Serializable| false|{numFiles -> 2, n...| null|Apache-Spark/3.3....|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
Siden det blit trangt i tabellen over så kan vi velge hvilke variabler vi ønsker å se på:
# Oversikt over alle kolonner som finnes i historien
history.columns['version',
'timestamp',
'userId',
'userName',
'operation',
'operationParameters',
'job',
'notebook',
'clusterId',
'readVersion',
'isolationLevel',
'isBlindAppend',
'operationMetrics',
'userMetadata',
'engineInfo']
# Velger de kolonnene jeg er interessert i
selected_history = deltaTable.history().select(
"version", "timestamp", "operation", "operationParameters"
)# Display the selected columns
selected_history.show(truncate=50)+-------+-----------------------+---------+--------------------------------------+
|version| timestamp|operation| operationParameters|
+-------+-----------------------+---------+--------------------------------------+
| 2|2023-10-10 12:55:45.014| WRITE| {mode -> Append, partitionBy -> []}|
| 1|2023-10-10 12:55:37.054| UPDATE| {predicate -> (id#4452L = 13)}|
| 0|2023-10-10 12:55:29.048| WRITE|{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+
Egendefinert metadata
Delta Lake støtter også egendefinert metadata. Det kan for eksempel være nyttig hvis man ønsker å bruke Delta Lake som en backend for et GUI som lar brukeren oppdatere en tabell fra GUI-et. Da ønsker man typisk å lagre hvem som gjorde endringer og når det ble gjort. La oss legge på noe slik metadata:
# Leser inn filen
df = spark.read.format("delta").load("gs://ssb-dapla-felles-data-produkt-prod/temp4")# Lagrer egendefinert metadata i en json-fil
import json
metadata = {
"comment": "Kontaktet oppgavegiver og kranglet!",
"manueltEditert": "True",
"maskineltEditert": "False",
}
metadata{'comment': 'Kontaktet oppgavegiver og kranglet!',
'manueltEditert': 'True',
'maskineltEditert': 'False'}
Vi kan deretter appende dette til den siste versjonen av fila.
(
df.write.format("delta")
.mode("append")
.option("userMetadata", json.dumps(metadata)) # Serialize metadata to a string
.save("gs://ssb-dapla-felles-data-produkt-prod/temp4")
)# Laster inn tabellen
deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-produkt-prod/temp4")
# Henter ut historien
history_df = deltaTable.history()# Show the operation details, including metadata
history_df.select(
"version", "timestamp", "operation", "operationParameters", "userMetadata"
).show(truncate=10)+-------+----------+---------+-------------------+------------+
|version| timestamp|operation|operationParameters|userMetadata|
+-------+----------+---------+-------------------+------------+
| 3|2023-10...| WRITE| {mode -...| {"comme...|
| 2|2023-10...| WRITE| {mode -...| null|
| 1|2023-10...| UPDATE| {predic...| null|
| 0|2023-10...| WRITE| {mode -...| null|
+-------+----------+---------+-------------------+------------+
history_df.select("version", "userMetadata").show(truncate=50)+-------+--------------------------------------------------+
|version| userMetadata|
+-------+--------------------------------------------------+
| 3|{"comment": "Kontaktet oppgavegiver og kranglet...|
| 2| null|
| 1| null|
| 0| null|
+-------+--------------------------------------------------+
Vi ser at vi la til vår egen metadata i versjon 3 av fila. Vi kan printe ut den rå transaksjonsloggen som tidligere, men nå er vi på transaksjon 3 00000000000000000003.json:
from dapla import FileClient
# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()
# Filsti
path = "gs://ssb-dapla-felles-data-produkt-prod/temp4/_delta_log/00000000000000000003.json"
with fs.open(path, "r") as f:
for line in f:
data = json.loads(line)
pretty_content = json.dumps(data, indent=4)
print(pretty_content)
print("-" * 50) # Print separator for clarity{
"commitInfo": {
"timestamp": 1696942553907,
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[]"
},
"readVersion": 2,
"isolationLevel": "Serializable",
"isBlindAppend": false,
"operationMetrics": {
"numFiles": "2",
"numOutputRows": "7",
"numOutputBytes": "989"
},
"userMetadata": "{\"comment\": \"Kontaktet oppgavegiver og kranglet!\", \"manueltEditert\": \"True\", \"maskineltEditert\": \"False\"}",
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.3.0",
"txnId": "e7de92bf-b0f9-4341-8bbb-9b382f2f3eb6"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00000-96369f3d-fe4a-4365-a0df-00c813027399-c000.snappy.parquet",
"partitionValues": {},
"size": 503,
"modificationTime": 1696942553856,
"dataChange": true,
"stats": "{\"numRecords\":5,\"minValues\":{\"id\":10},\"maxValues\":{\"id\":15},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
{
"add": {
"path": "part-00001-0f1bc8e6-093b-49a9-ad0b-78d5a148cfb6-c000.snappy.parquet",
"partitionValues": {},
"size": 486,
"modificationTime": 1696942553853,
"dataChange": true,
"stats": "{\"numRecords\":2,\"minValues\":{\"id\":20},\"maxValues\":{\"id\":21},\"nullCount\":{\"id\":0}}"
}
}
--------------------------------------------------
Vi er da at metadataene ligger som forventet i metadata-delen.