Introduksjon til PySpark

Apache Spark er et sterkt verktøy som utvider mulighetsrommet for databehandling med R og Python. Kjernefunksjonaliteten ligger i at den lar deg kjøre en jobb på flere maskiner samtidig, noe som ikke er mulig med klassiske rammeverk som Pandas og Tidyverse. Følgelig er det et rammeverk som blant annet er veldig egnet for å prosessere store datamengder eller gjøre store beregninger. Les om mer om Apache Spark i Dapla-manualen

I denne notebooken vises noen enkle eksempler på hvordan du kan jobbe med data med PySpark, et Python-grensesnitt mot Spark.

Oppsett

Når du logger deg inn på Dapla kan du velge mellom 2 ferdigoppsatte kernels for å jobbe med PySpark:

  1. Pyspark (local)
  2. Pyspark (k8s cluster)

Den første lar deg bruke Spark på en enkeltmaskin, mens den andre lar deg distribuere kjøringen på mange maskiner avhengig av hvor store jobbene er. I eksemplene under brukes Pyspark (local).

# Importer biblioteker
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, explode, expr, sequence, sum, avg
from pyspark.sql.types import DateType, DoubleType, StructField, StructType

# Initialierer en SparkSession
spark = (
    SparkSession.builder.master("local[2]")
    .appName("Dapla-manual-example")
    .getOrCreate()
)

I koden over importerer vi de bibliotekene vi skal bruke under. Grunnen til at vi importerer pyspark.sql er at dette er at Spark SQL er Apache Spark sin modul for å jobbe med strukturerte data. Og som navnet tilsier vil det si at vi kan blande Python og SQL i koden vår. Vi skal nærmere på hvordan man bruke SQL fra PySpark-notebook senere.

Spark tilbyr også et eget grensesnitt, Spark UI, for å monitorere hva som skjer under en SparkSession. Vi kan bruke følgende kommando for å få opp en lenke til Spark UI i notebooken vår:

spark.sparkContext

SparkContext

Spark UI

Version
v3.3.1
Master
local[*]
AppName
pyspark-shell

Klikker du på Spark UI-lenken så tar den deg til dashboard som lar deg monitorere, debugge, optimalisere og forstå kjøringene dine. Det kan være et svært nyttig verktøy i mange tilfeller.

Generere data

Vi kan begynne med å generere en Spark DataFrame med en kolonne som inneholder månedlige datoer for perioden 2000M1-2023M8.

# Genererer månedlige data
dates_df = spark.range(1).select(
    explode(
        sequence(
            start=expr("date '2000-01-01'"),
            stop=expr("date '2023-08-01'"),
            step=expr("interval 1 month"),
        )
    ).alias("Date")
)
dates_df.show(5)
+----------+
|      Date|
+----------+
|2000-01-01|
|2000-02-01|
|2000-03-01|
|2000-04-01|
|2000-05-01|
+----------+
only showing top 5 rows

En Spark DataFrame er en distribuert samling av data som er organisert inn i kolonner. Siden Spark lar deg distribuere kjøringer på flere maskiner, er DataFrames optimalisert for å kunne splittes opp slik at de kan brukes på flere maskiner. Med andre er dette ikke det samme som en Pandas dataframe mange kjenner fra før.

Over genererte vi en datokolonne. For å få litt mer data kan vi også generere 100 kolonner med tidsseriedata og så printer vi de 2 første av disse:

# Genererer random walk data
schema = StructType(
    [StructField(f"serie{i:02d}", DoubleType(), True) for i in range(100)]
)

data = [
    tuple((10 + np.random.normal(0, 1, 100)).cumsum().tolist())
    for _ in range(284)  # 284 months from 2000-01 to 2023-08
]

data_df = spark.createDataFrame(data, schema=schema)

data_df.select("serie00", "serie01").show(5)
+------------------+------------------+
|           serie00|           serie01|
+------------------+------------------+
|10.410703377184355| 21.06318801110079|
|10.509249410154466|  19.5674295298024|
| 9.618310122060274|17.635805093465642|
| 9.691112692298294|18.593842915949082|
| 9.903675228685067|20.012215769058564|
+------------------+------------------+
only showing top 5 rows

Til slutt kan vi joine de to datasettene sammen og lage noen kolonner som viser år, kvartal og måned. Deretter printer vi ut noen av kolonnene med kommandoen show().

# Legger til row index til DataFrame før join med dates_df
data_df = data_df.withColumn("row_index", expr("monotonically_increasing_id()"))

# Joiner de to datasettene
df = (
    dates_df.withColumn("row_index", expr("monotonically_increasing_id()"))
    .join(data_df, "row_index")
    .drop("row_index")
)

# Legger til år, kvartal og mnd
df = df.withColumn("Year", date_format(df.Date, "yyyy"))
df = df.withColumn("Quarter", expr("quarter(Date)"))
df = df.withColumn("Month", date_format(df.Date, "MM"))

df.select("Date", "Year", "Quarter", "Month", "serie00", "serie01").show(5)
+----------+----+-------+-----+------------------+------------------+
|      Date|Year|Quarter|Month|           serie00|           serie01|
+----------+----+-------+-----+------------------+------------------+
|2000-01-01|2000|      1|   01| 9.495232388801012|   19.016168503192|
|2000-02-01|2000|      1|   02| 10.70952411634649|21.404467063442723|
|2000-03-01|2000|      1|   03|11.118293927071951| 21.25035527677261|
|2000-04-01|2000|      2|   04| 9.346911680164684|19.982136698759238|
|2000-05-01|2000|      2|   05| 9.663303382177363|19.925236690504494|
+----------+----+-------+-----+------------------+------------------+
only showing top 5 rows

Og med det har vi noe data vi kan jobbe med i resten av notebooken.

Skrive til Parquet

PySpark tilbyr mange opsjoner ved skriving til parquet-filer som vi kanskje ikke er vant til å forholde oss til med enklere rammeverk som Pandas. Den enkleste måten å skrive ut en fil er som følger:

df.write.parquet(
    "gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)

Dette vil fungere hvis filen ikke finnes fra før. Hvis den finnes fra før så vil den feile. Grunnen er at vi ikke har spesifisert hva vi ønsker at den skal gjøre. Vi kan velge mellom overwrite, append, ignore eller errorifexists. Sistnevnte er også default-oppførsel hvis du ikke ber den gjøre noe annet.

Under bruker vi opsjonen overwrite, det vil si at den skriver over en evt eksisterende fil med samme navn.

df.write.mode("overwrite").parquet(
    "gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)

Vi kan inspisere hva som ble skrevet ved å liste ut innholder i bøtta.

from dapla import FileClient

fs = FileClient.get_gcs_file_system()

fs.glob("gs://ssb-dapla-felles-data-produkt-prod/temp/**")
['ssb-dapla-felles-data-delt-prod/temp/',
 'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet',
 'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/',
 'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/_SUCCESS',
 'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/part-00000-b32e7299-0590-4b31-bcc2-dc3d58725529-c000.snappy.parquet',
 'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/part-00001-b32e7299-0590-4b31-bcc2-dc3d58725529-c000.snappy.parquet']

Hvis denne parquet-filen hadde vært partisjonert etter en kolonne, så ville det vært egne undermapper med navnestruktur column_name=value som indikerte hva filen er partisjonert på. Siden vi her bruker en maskin og har et lite datasett, valgte Spark å ikke partisjonere.

Lese inn Parquet

Apache Spark kan lese inn flere parquet-filer samtidig. Syntaxen er like enkel som den for å skrive ut.

df_ts = spark.read.parquet(
    "gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)
df_ts.select("Date", "Year", "Quarter", "Month", "serie66", "serie55").show(5)
+----------+----+-------+-----+-----------------+-----------------+
|      Date|Year|Quarter|Month|          serie66|          serie55|
+----------+----+-------+-----+-----------------+-----------------+
|2000-01-01|2000|      1|   01|670.2679830025959|562.4312808525777|
|2000-02-01|2000|      1|   02|675.4233411662802|562.5168447360121|
|2000-03-01|2000|      1|   03|687.3412458214908|568.6203957584232|
|2000-04-01|2000|      2|   04|673.1128047244955|557.4633871253379|
|2000-05-01|2000|      2|   05| 667.513406101114|561.7766450346327|
+----------+----+-------+-----+-----------------+-----------------+
only showing top 5 rows

PySpark og SQL

Du kan også skrive SQL med Spark. For å skrive SQL må vi først lage et temporary view. Under kaller vi viewt for tidsserie.

df.createOrReplaceTempView("tidsserie")

Vi kan deretter skrive en SQL-statement som vi ønsker å kjøre på viewet:

query = "SELECT * FROM tidsserie WHERE Year = 2010"

Deretter kan vi bruke det til å filtrere datasettet:

result_df = spark.sql(query)
result_df.select("Date", "Year", "Quarter", "Month", "serie00", "serie01").show(5)
+----------+----+-------+-----+------------------+------------------+
|      Date|Year|Quarter|Month|           serie00|           serie01|
+----------+----+-------+-----+------------------+------------------+
|2010-01-01|2010|      1|   01| 11.26910423907778|21.730128215168268|
|2010-02-01|2010|      1|   02| 8.722783282690738| 17.46851086792347|
|2010-03-01|2010|      1|   03|10.376873608348605|20.109386343182802|
|2010-04-01|2010|      2|   04|11.459959305590926|21.995141825523866|
|2010-05-01|2010|      2|   05|10.441456792180572| 21.25096473981906|
+----------+----+-------+-----+------------------+------------------+
only showing top 5 rows

Aggregering

vi kan aggregere opp enten med PySpark-syntax eller SQL-syntax. Under viser vi begge:

from pyspark.sql import functions as F

# Assuming df_ts is your DataFrame
aggregated_df = df_ts.groupBy("Quarter").agg(F.sum("serie00").alias("Sum"),
                                             F.avg("serie00").alias("Average"),
                                             F.max("serie00").alias("Maximum"))

# Show the result
aggregated_df.show()
+-------+------------------+------------------+------------------+
|Quarter|               Sum|           Average|           Maximum|
+-------+------------------+------------------+------------------+
|      1|363.95869885234185|10.109963857009497|11.829453550532005|
|      3|365.68324879453405| 10.15786802207039|12.233378837422391|
|      4| 342.2334082209804|10.065688477087658|12.210138970053695|
|      2|  361.991445506568|10.055317930738001|12.276030776082463|
+-------+------------------+------------------+------------------+

La oss gjøre det samme med SQL, men grupperer etter to variabler og sorterer output etterpå.

# Assuming df_ts is your DataFrame
df_ts.createOrReplaceTempView("temp_table")

# Now you can run a SQL query
query = """
    SELECT
        Year,
        Quarter,
        SUM(serie00) AS Sum,
        AVG(serie00) AS Average,
        MAX(serie00) AS Maximum
    FROM 
        temp_table
    GROUP BY 
        Year, Quarter
    ORDER BY
        YEAR, QUARTER
"""

aggregated_df_sql = spark.sql(query)

# Show the result
aggregated_df_sql.show(10)
+----+-------+------------------+------------------+------------------+
|Year|Quarter|               Sum|           Average|           Maximum|
+----+-------+------------------+------------------+------------------+
|2000|      1|31.323050432219453|10.441016810739818|11.118293927071951|
|2000|      2|28.911192473027377| 9.637064157675793| 9.900977410685329|
|2000|      3|33.670797229797415|11.223599076599138|12.233378837422391|
|2000|      4|28.094793356286914| 9.364931118762305| 10.32000478359274|
|2001|      1|31.636678535169537|10.545559511723178|11.367822302191831|
|2001|      2|29.629770128521507| 9.876590042840503|11.135215930381191|
|2001|      3| 30.75408440118315| 10.25136146706105|10.723803326978505|
|2001|      4|30.361048932627902|  10.1203496442093|10.368365984482093|
|2002|      1|31.184163218551227|10.394721072850409|10.550579652234951|
|2002|      2|29.128978392451202|   9.7096594641504|10.186365745367246|
+----+-------+------------------+------------------+------------------+
only showing top 10 rows