Validere at filer er prosessert i Kildomaten

Eksempelkode som kan benyttes etter driftstans

kildomaten
Forfatter
Tilhører

Øyvind Bruer-Skarsbø

Seksjon for dataplattform (724)

Opprettet

May 14, 2025

Sist endret

May 19, 2025

Ifm. driftstansen, og driftsproblemene i etterkant, i Kildomaten fra 12. mai kl. 15.00 til 14. mai kl. 13.00 så kan det være team som ønsker å sjekke om det kom inn filer i kildebøtta som ikke trigget Kildomaten. I denne artikkelen så finner man eksempelkode for å gjøre denne sjekken.

Støtte fra Kundeservice

Hvis teamet ønsker hjelp til å validere om Kildomaten har prosessert alle filer, så kan man ta kontakt med Kundeservice.

Sjekke kildebøtta

Det første man bør gjøre er å sjekke om det, i tidsintevallet for driftsstansen, har blitt skrevet filer til mapper i kildebøtta som vanligvis trigger Kildomaten-jobber. Har det ikke skjedd blitt skrevet filer, så trenger man ikke å gjøre noe mer.

Sjekken kan gjøres av en data-admins på teamet på følgende måte:

  1. Åpne Jupyter på Dapla Lab
  2. Under tjenestekonfigurasjonen for Jupyter velger du å representere data-admins for det aktuelle teamet
  3. Åpne en notebook og velg en kernel som har Python-pakken gcsfs installert. Hvis du ikke har en slik kernel kan du opprette et ssb-project fra terminalen med kommandoen ssb-project create test, gå inn i mappen og installere pakken ved å skrive poetry add gcsfs. Da vil kernelen du skal velge hete test.
  4. Kjør denne koden etter at du har endret verdiene til variablene source_bucket og source_folder:
Notebook
from datetime import datetime
from gcsfs import GCSFileSystem
import pytz

utc = pytz.UTC

source_bucket = "gs://ssb-dapla-felles-kilde-prod"
source_folder = "freg"

filter_start_date = datetime(year=2025, month= 5, day=12, hour=15)
filter_end_date = datetime(year=2025, month=5, day=14, hour=13)



def is_file_modified_on_date(file_path: str) -> bool:
    """
    Returns True if file at file path was modified on the date "filter_date", returns False otherwise
    """
    file_info = GCSFileSystem().info(path=file_path)
    modified_date: datetime | None = file_info.get("mtime")
    if not isinstance(modified_date, datetime):
        return False

    return modified_date < utc.localize(
        filter_end_date
    ) and modified_date > utc.localize(filter_start_date)


source_files = GCSFileSystem().find(
    path=f"{source_bucket}/{source_folder}", maxdepth=None, withdirs=False
)
source_files_at_date = [f for f in source_files if is_file_modified_on_date(f) is True]

print(f"Source files found between date {str(filter_start_date)} and {str(filter_end_date)}:")
print(source_files_at_date)

Hvis man kjører koden over og den returnerer en tom liste så har man verifisert at det ikke ble skrevet noen filer til mappen under driftsstansen. Hvis koden returnerer en liste med filnavn så bør man undersøke om filene ble prosessert av Kildomaten eller ikke.

Verifisere produktbøtta

Når man har en liste med filnavn som ble skrevet til kildebøtta i tidsperioden med driftsstans, så må man verifisere om filene ble skrevet til produktbøtta som forventet. Hvordan man gjør dette vil avhenge av logikken i teamets kildomaten-skript. Hvis Kildomaten-skriptet skriver en fil til produktbøtta med samme filnavn som er brukt i kildebøtta, så kan man verfisere at samme fil også finne i kildebøtta. Hvis Kildomaten-skriptet endrer navnet, så må man først gjenskape det nye navnet, og deretter verifisere.

For å verifisere om filen er prosessert må man gjøre følgende:

  1. hente ut liste med filer fra forrige steg (krever innloggin som data-admins i Dapla Lab)
  2. åpne en ny tjeneste i Dapla Lab som developers og kopiere over listen fra tjenesten man åpnet som data-admins.
  3. åpne en notebook og velg en kernel som har Python-pakken gcsfs installert. Hvis du ikke har en slik kernel kan du opprette et ssb-project fra terminalen med kommandoen ssb-project create test, gå inn i mappen og installere pakken ved å skrive poetry add gcsfs. Da vil kernelen du skal velge hete test.
  4. Kjør koden under, der source_files_at_date er listen du kopierte over fra Sjekke kildebøtta-delen. I tillegg må du oppgi hvilken bøtte som skal sjekkes under bucket, og hvilken undermappe i bøtta som det skal søkes gjennom i folder.
from gcsfs import GCSFileSystem

# Denne listen inneholder filnavnene vi identifiserte tidligere
source_files_at_date = [
  'ssb-etlev-data-kilde-prod/kildetilgang/access-grant_abc.json',  
  'ssb-etlev-data-kilde-prod/kildetilgang/access-grant_def.json',]

# Disse variablene må brukeren oppgi selv  
bucket = "gs://my-bucket-name"
folder = "my/folder/name"


def find_missing_files(bucket_name: str, folder_name: str, source_files: list[str]) -> list[str]:
    file_names = [f.split(sep="/")[-1].split(sep=".")[0] for f in source_files]

    fs_prefix = "" if "gs://" in bucket_name else "gs://"
    fs_path = f"{fs_prefix}{bucket_name}/{folder_name}"
    fs = GCSFileSystem()

    missing_files = []

    for i, f in enumerate(file_names):
        found = fs.glob(f"{fs_path}/**/{f}.*")
        if len(found) == 0:
            source_file = f"gs://{source_files[i]}" if "gs://" not in source_files[i] else source_files[i]
            missing_files.append(source_file)
    
    return missing_files

find_missing_files(
  bucket_name=bucket,
  folder_name=folder,
  source_files=source_files_at_date)

Funksjonen over returnerer alle filer den ikke finner på den oppgitte stien, og som derfor ikke er prosessert av Kildomaten. I kapitlet som heter Trigge prosessering manuelt finner du en beskrivelse av hvordan man trigger prosesseringen av disse filene manuelt.

Fra Altinn3 til ISEE

Statistikkproduksjoner som har datafangst fra Altinn3 og synkroniserer filer prosessert med Kildomaten ned til ISEE, må selv hente ut en liste over hvilke filer som er lastet til ISEE. En fremgangsmåte er sammenligne hvilke filer som innkvittert i SFU med hva som ligger i ISEE-databasen. Disse kan identifiseres ved deres unike referansenummer i SFU. Hvis man har en liste med referansenummer som finnes i SFU, men ikke i ISEE, så kan søke etter disse filene i kildebøtta.

Ta kontakt med Kundeservice dersom man ønsker hjelp til dette.

Trigge prosessering manuelt

Hvis koden i forrige avsnitt returnerte en liste med filer, så kan man manuelt trigge prosessering av disse filene med Kildomaten.

Logg deg inn som data-admins for trigge Kildomaten manuelt

For å trigge Kildomaten manuelt må man være logget inn som data-admins i en Dapla Lab tjeneste. Det betyr at man kan liste ut alle filer som ble skrevet i et gitt tidspunkt, og trigge prosesseringen av disse i samme notebook eller script. Men man må oppgi project_id for standardprosjektet (ikke kildeprosjektet) i koden under, noe som kan være forvirrende for brukeren. Dette er planlagt endret slik at det blir lettere bruke fremover.

Koden i forrige avsnitt returnerte en liste med alle filer som heter source_files_at_date, og vi kan loope over disse og trigge prosesseringen. For å gjøre dette må du først fylle inn riktig verdier for project-id og source_name.

  1. project-id for standardprosjektet. Slik finner du prosjekt-id. Merk at dette ikke er kilde-prosjektet!
  2. source_name er navnet på mappen i iac-repoet hvor kilden du skal trigge er definert. Navnet på kilden i eksempelet med team dapla-example var altinn.
Notebook
from dapla import trigger_source_data_processing

# Brukeren fyller inn her
project_id = ""
source_name = ""

# Fjerner bøttenavn fra filstier
stripped_paths = [path.split('/', 1)[1] if '/' in path else path for path in source_files_at_date]

# Trigger prosessering for alle filer i listen
for file in stripped_paths:
    trigger_source_data_processing(project_id, source_name, file, kuben=True)