# Importer biblioteker
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, explode, expr, sequence, sum, avg
from pyspark.sql.types import DateType, DoubleType, StructField, StructType
# Initialierer en SparkSession
= (
spark "local[2]")
SparkSession.builder.master("Dapla-manual-example")
.appName(
.getOrCreate() )
Introduksjon til PySpark
Apache Spark er et sterkt verktøy som utvider mulighetsrommet for databehandling med R og Python. Kjernefunksjonaliteten ligger i at den lar deg kjøre en jobb på flere maskiner samtidig, noe som ikke er mulig med klassiske rammeverk som Pandas og Tidyverse. Følgelig er det et rammeverk som blant annet er veldig egnet for å prosessere store datamengder eller gjøre store beregninger. Les om mer om Apache Spark i Dapla-manualen
I denne notebooken vises noen enkle eksempler på hvordan du kan jobbe med data med PySpark, et Python-grensesnitt mot Spark.
Oppsett
Når du logger deg inn på Dapla kan du velge mellom 2 ferdigoppsatte kernels for å jobbe med PySpark:
- Pyspark (local)
- Pyspark (k8s cluster)
Den første lar deg bruke Spark på en enkeltmaskin, mens den andre lar deg distribuere kjøringen på mange maskiner avhengig av hvor store jobbene er. I eksemplene under brukes Pyspark (local).
I koden over importerer vi de bibliotekene vi skal bruke under. Grunnen til at vi importerer pyspark.sql
er at dette er at Spark SQL er Apache Spark sin modul for å jobbe med strukturerte data. Og som navnet tilsier vil det si at vi kan blande Python og SQL i koden vår. Vi skal nærmere på hvordan man bruke SQL fra PySpark-notebook senere.
Spark tilbyr også et eget grensesnitt, Spark UI, for å monitorere hva som skjer under en SparkSession. Vi kan bruke følgende kommando for å få opp en lenke til Spark UI i notebooken vår:
spark.sparkContext
Klikker du på Spark UI-lenken så tar den deg til dashboard som lar deg monitorere, debugge, optimalisere og forstå kjøringene dine. Det kan være et svært nyttig verktøy i mange tilfeller.
Generere data
Vi kan begynne med å generere en Spark DataFrame med en kolonne som inneholder månedlige datoer for perioden 2000M1-2023M8.
# Genererer månedlige data
= spark.range(1).select(
dates_df
explode(
sequence(=expr("date '2000-01-01'"),
start=expr("date '2023-08-01'"),
stop=expr("interval 1 month"),
step
)"Date")
).alias(
)5) dates_df.show(
+----------+
| Date|
+----------+
|2000-01-01|
|2000-02-01|
|2000-03-01|
|2000-04-01|
|2000-05-01|
+----------+
only showing top 5 rows
En Spark DataFrame er en distribuert samling av data som er organisert inn i kolonner. Siden Spark lar deg distribuere kjøringer på flere maskiner, er DataFrames optimalisert for å kunne splittes opp slik at de kan brukes på flere maskiner. Med andre er dette ikke det samme som en Pandas dataframe mange kjenner fra før.
Over genererte vi en datokolonne. For å få litt mer data kan vi også generere 100 kolonner med tidsseriedata og så printer vi de 2 første av disse:
# Genererer random walk data
= StructType(
schema f"serie{i:02d}", DoubleType(), True) for i in range(100)]
[StructField(
)
= [
data tuple((10 + np.random.normal(0, 1, 100)).cumsum().tolist())
for _ in range(284) # 284 months from 2000-01 to 2023-08
]
= spark.createDataFrame(data, schema=schema)
data_df
"serie00", "serie01").show(5) data_df.select(
+------------------+------------------+
| serie00| serie01|
+------------------+------------------+
|10.410703377184355| 21.06318801110079|
|10.509249410154466| 19.5674295298024|
| 9.618310122060274|17.635805093465642|
| 9.691112692298294|18.593842915949082|
| 9.903675228685067|20.012215769058564|
+------------------+------------------+
only showing top 5 rows
Til slutt kan vi joine de to datasettene sammen og lage noen kolonner som viser år, kvartal og måned. Deretter printer vi ut noen av kolonnene med kommandoen show()
.
Code
# Legger til row index til DataFrame før join med dates_df
= data_df.withColumn("row_index", expr("monotonically_increasing_id()"))
data_df
# Joiner de to datasettene
= (
df "row_index", expr("monotonically_increasing_id()"))
dates_df.withColumn("row_index")
.join(data_df, "row_index")
.drop(
)
# Legger til år, kvartal og mnd
= df.withColumn("Year", date_format(df.Date, "yyyy"))
df = df.withColumn("Quarter", expr("quarter(Date)"))
df = df.withColumn("Month", date_format(df.Date, "MM"))
df
"Date", "Year", "Quarter", "Month", "serie00", "serie01").show(5) df.select(
+----------+----+-------+-----+------------------+------------------+
| Date|Year|Quarter|Month| serie00| serie01|
+----------+----+-------+-----+------------------+------------------+
|2000-01-01|2000| 1| 01| 9.495232388801012| 19.016168503192|
|2000-02-01|2000| 1| 02| 10.70952411634649|21.404467063442723|
|2000-03-01|2000| 1| 03|11.118293927071951| 21.25035527677261|
|2000-04-01|2000| 2| 04| 9.346911680164684|19.982136698759238|
|2000-05-01|2000| 2| 05| 9.663303382177363|19.925236690504494|
+----------+----+-------+-----+------------------+------------------+
only showing top 5 rows
Og med det har vi noe data vi kan jobbe med i resten av notebooken.
Skrive til Parquet
PySpark tilbyr mange opsjoner ved skriving til parquet-filer som vi kanskje ikke er vant til å forholde oss til med enklere rammeverk som Pandas. Den enkleste måten å skrive ut en fil er som følger:
df.write.parquet("gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)
Dette vil fungere hvis filen ikke finnes fra før. Hvis den finnes fra før så vil den feile. Grunnen er at vi ikke har spesifisert hva vi ønsker at den skal gjøre. Vi kan velge mellom overwrite
, append
, ignore
eller errorifexists
. Sistnevnte er også default-oppførsel hvis du ikke ber den gjøre noe annet.
Under bruker vi opsjonen overwrite
, det vil si at den skriver over en evt eksisterende fil med samme navn.
"overwrite").parquet(
df.write.mode("gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)
Vi kan inspisere hva som ble skrevet ved å liste ut innholder i bøtta.
from dapla import FileClient
= FileClient.get_gcs_file_system()
fs
"gs://ssb-dapla-felles-data-produkt-prod/temp/**") fs.glob(
['ssb-dapla-felles-data-delt-prod/temp/',
'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet',
'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/',
'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/_SUCCESS',
'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/part-00000-b32e7299-0590-4b31-bcc2-dc3d58725529-c000.snappy.parquet',
'ssb-dapla-felles-data-delt-prod/temp/timeseries.parquet/part-00001-b32e7299-0590-4b31-bcc2-dc3d58725529-c000.snappy.parquet']
Hvis denne parquet-filen hadde vært partisjonert etter en kolonne, så ville det vært egne undermapper med navnestruktur column_name=value
som indikerte hva filen er partisjonert på. Siden vi her bruker en maskin og har et lite datasett, valgte Spark å ikke partisjonere.
Lese inn Parquet
Apache Spark kan lese inn flere parquet-filer samtidig. Syntaxen er like enkel som den for å skrive ut.
= spark.read.parquet(
df_ts "gs://ssb-dapla-felles-data-produkt-prod/temp/timeseries.parquet"
)"Date", "Year", "Quarter", "Month", "serie66", "serie55").show(5) df_ts.select(
+----------+----+-------+-----+-----------------+-----------------+
| Date|Year|Quarter|Month| serie66| serie55|
+----------+----+-------+-----+-----------------+-----------------+
|2000-01-01|2000| 1| 01|670.2679830025959|562.4312808525777|
|2000-02-01|2000| 1| 02|675.4233411662802|562.5168447360121|
|2000-03-01|2000| 1| 03|687.3412458214908|568.6203957584232|
|2000-04-01|2000| 2| 04|673.1128047244955|557.4633871253379|
|2000-05-01|2000| 2| 05| 667.513406101114|561.7766450346327|
+----------+----+-------+-----+-----------------+-----------------+
only showing top 5 rows
PySpark og SQL
Du kan også skrive SQL med Spark. For å skrive SQL må vi først lage et temporary view
. Under kaller vi viewt for tidsserie.
"tidsserie") df.createOrReplaceTempView(
Vi kan deretter skrive en SQL-statement som vi ønsker å kjøre på viewet:
= "SELECT * FROM tidsserie WHERE Year = 2010" query
Deretter kan vi bruke det til å filtrere datasettet:
= spark.sql(query)
result_df "Date", "Year", "Quarter", "Month", "serie00", "serie01").show(5) result_df.select(
+----------+----+-------+-----+------------------+------------------+
| Date|Year|Quarter|Month| serie00| serie01|
+----------+----+-------+-----+------------------+------------------+
|2010-01-01|2010| 1| 01| 11.26910423907778|21.730128215168268|
|2010-02-01|2010| 1| 02| 8.722783282690738| 17.46851086792347|
|2010-03-01|2010| 1| 03|10.376873608348605|20.109386343182802|
|2010-04-01|2010| 2| 04|11.459959305590926|21.995141825523866|
|2010-05-01|2010| 2| 05|10.441456792180572| 21.25096473981906|
+----------+----+-------+-----+------------------+------------------+
only showing top 5 rows
Aggregering
vi kan aggregere opp enten med PySpark-syntax eller SQL-syntax. Under viser vi begge:
from pyspark.sql import functions as F
# Assuming df_ts is your DataFrame
= df_ts.groupBy("Quarter").agg(F.sum("serie00").alias("Sum"),
aggregated_df "serie00").alias("Average"),
F.avg(max("serie00").alias("Maximum"))
F.
# Show the result
aggregated_df.show()
+-------+------------------+------------------+------------------+
|Quarter| Sum| Average| Maximum|
+-------+------------------+------------------+------------------+
| 1|363.95869885234185|10.109963857009497|11.829453550532005|
| 3|365.68324879453405| 10.15786802207039|12.233378837422391|
| 4| 342.2334082209804|10.065688477087658|12.210138970053695|
| 2| 361.991445506568|10.055317930738001|12.276030776082463|
+-------+------------------+------------------+------------------+
La oss gjøre det samme med SQL, men grupperer etter to variabler og sorterer output etterpå.
# Assuming df_ts is your DataFrame
"temp_table")
df_ts.createOrReplaceTempView(
# Now you can run a SQL query
= """
query SELECT
Year,
Quarter,
SUM(serie00) AS Sum,
AVG(serie00) AS Average,
MAX(serie00) AS Maximum
FROM
temp_table
GROUP BY
Year, Quarter
ORDER BY
YEAR, QUARTER
"""
= spark.sql(query)
aggregated_df_sql
# Show the result
10) aggregated_df_sql.show(
+----+-------+------------------+------------------+------------------+
|Year|Quarter| Sum| Average| Maximum|
+----+-------+------------------+------------------+------------------+
|2000| 1|31.323050432219453|10.441016810739818|11.118293927071951|
|2000| 2|28.911192473027377| 9.637064157675793| 9.900977410685329|
|2000| 3|33.670797229797415|11.223599076599138|12.233378837422391|
|2000| 4|28.094793356286914| 9.364931118762305| 10.32000478359274|
|2001| 1|31.636678535169537|10.545559511723178|11.367822302191831|
|2001| 2|29.629770128521507| 9.876590042840503|11.135215930381191|
|2001| 3| 30.75408440118315| 10.25136146706105|10.723803326978505|
|2001| 4|30.361048932627902| 10.1203496442093|10.368365984482093|
|2002| 1|31.184163218551227|10.394721072850409|10.550579652234951|
|2002| 2|29.128978392451202| 9.7096594641504|10.186365745367246|
+----+-------+------------------+------------------+------------------+
only showing top 10 rows