Files
Jan-Marlon Leibl 7df533c7a2 Add practical examples to multiple files
- LF9-03 Virtualisierung: Docker Compose + Volume examples
- LF6-02 Frontend: To-Do list practical example
- LF8-04 ETL: Complete ETL pipeline example
- LF6-04 Sicherheit: Express.js security headers
- LF2-04 Nutzwertanalyse: Cloud provider selection example
- LF9-04 Monitoring: Prometheus alerts + Python logging
2026-03-13 12:01:15 +01:00

6.0 KiB

8.4 ETL-Prozesse

Was ist ETL?

ETL = Extract, Transform, Load

ETL - Prozess
┌─────────┐    ┌─────────────┐    ┌─────────┐
│ Extract │ →  │  Transform  │ →  │   Load   │
│(Extrahieren)│  │(Transformieren)│ │(Laden)  │
└─────────┘    └─────────────┘    └─────────┘

Anwendungsfälle

Fall Beschreibung
Data Warehouse Daten für Analysen konsolidieren
Migration Daten auf neues System übertragen
Integration Daten aus verschiedenen Quellen zusammenführen
Reporting Daten für Reports aufbereiten

Extract (Extraktion)

Extraktionsarten

Typ Beschreibung
Full Load Alle Daten laden
Incremental Nur neue/geänderte Daten
CDC Change Data Capture

Quelldaten

Extraktionsquellen
├── Datenbanken
├── Dateien (CSV, JSON, XML)
├── APIs
└── Streams

Beispiel

# Datenbank
df_quelle = pd.read_sql("SELECT * FROM tabelle", conn)

# API
response = requests.get('https://api.example.com/daten')
df_quelle = pd.DataFrame(response.json())

# Datei
df_quelle = pd.read_csv('daten.csv')

Transform (Transformation)

Transformationstypen

Transformationen - Übersicht
├── Datentyp-Konvertierung
├── Berechnungen
├── Aggregationen
├── Filterung
├── JOINs
├── Normalisierung
└── Bereinigung

Bereinigung

# Fehlende Werte
df['alter'].fillna(df['alter'].mean(), inplace=True)

# Duplikate entfernen
df.drop_duplicates(inplace=True)

# Groß-/Kleinschreibung
df['name'] = df['name'].str.lower()

# Trimmen
df['name'] = df['name'].str.strip()

Berechnungen

# Neue Spalte
df['umsatz_mit_mwst'] = df['umsatz'] * 1.19

# Bedingte Werte
df['rabatt'] = df.apply(
    lambda x: x['umsatz'] * 0.1 if x['umsatz'] > 100 else 0,
    axis=1
)

Aggregation

# Gruppieren und aggregieren
df_grouped = df.groupby('kategorie').agg({
    'umsatz': 'sum',
    'anzahl': 'count'
}).reset_index()

Load (Laden)

Ladestrategien

Typ Beschreibung
Full Load Tabelle komplett ersetzen
Incremental Load Nur neue Daten hinzufügen
Upsert Update oder Insert

Ziel-Systeme

Lade-Ziele
├── Datenbank (MySQL, PostgreSQL)
├── Data Warehouse (Snowflake, BigQuery)
├── Datei (CSV, JSON)
└── Cloud Storage

Beispiel

# In Datenbank laden
df.to_sql('ziel_tabelle', conn, if_exists='replace', index=False)

# In CSV schreiben
df.to_csv('ausgabe.csv', index=False)

# In Excel schreiben
df.to_excel('ausgabe.xlsx', index=False)

Orchestrierung

Tools

Tool Beschreibung
Apache Airflow Workflow-Orchestrierung
Luigi Python-basiert
dbt Data Transformation
Talend ETL-Werkzeug
Apache NiFi Datenfluss

Airflow DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG('etl_pipeline', start_date=datetime(2024, 1, 1))

extract = PythonOperator(
    task_id='extract',
    python_callable=extract_daten,
    dag=dag
)

transform = PythonOperator(
    task_id='transform',
    python_callable=transform_daten,
    dag=dag
)

load = PythonOperator(
    task_id='load',
    python_callable=load_daten,
    dag=dag
)

extract >> transform >> load

Fehlerbehandlung

ETL - Fehlerstrategien
├── Logging
├── Benachrichtigung
├── Retry-Logik
├── Quarantäne (Problem-Daten)
└── Monitoring

Praktisches Beispiel: Vollständiger ETL-Pipeline

import pandas as pd
import requests
from sqlalchemy import create_engine
import logging

# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def etl_pipeline():
    """
    Vollständiger ETL-Pipeline für Verkaufsdaten
    """

    # === EXTRACT ===
    logger.info("Starte Extraktion...")

    # Aus CSV
    kunden_df = pd.read_csv('daten/kunden.csv')
    bestellungen_df = pd.read_csv('daten/bestellungen.csv')

    # Aus API
    try:
        response = requests.get('https://api.shop.de/produkte', timeout=30)
        produkte_df = pd.DataFrame(response.json())
    except Exception as e:
        logger.error(f"API Fehler: {e}")
        produkte_df = pd.DataFrame()

    logger.info(f"Extrahiert: {len(kunden_df)} Kunden, {len(bestellungen_df)} Bestellungen")

    # === TRANSFORM ===
    logger.info("Starte Transformation...")

    # Daten bereinigen
    kunden_df = kunden_df.drop_duplicates()
    kunden_df['email'] = kunden_df['email'].str.lower().str.strip()
    kunden_df['erstellt_am'] = pd.to_datetime(kunden_df['erstellt_am'])

    # Berechnungen
    bestellungen_df['umsatz_mit_mwst'] = bestellungen_df['umsatz_netto'] * 1.19

    # JOIN: Bestellungen mit Kunden verbinden
    merged_df = bestellungen_df.merge(
        kunden_df[['kunden_id', 'name', 'stadt']],
        on='kunden_id',
        how='left'
    )

    # Aggregation: Umsatz pro Stadt
    umsatz_pro_stadt = merged_df.groupby('stadt')['umsatz_netto'].sum().reset_index()

    logger.info(f"Transformation abgeschlossen: {len(merged_df)} Datensätze")

    # === LOAD ===
    logger.info("Starte Laden...")

    # Datenbank-Verbindung
    engine = create_engine('postgresql://user:pass@localhost:5432/warehouse')

    # In Datenbank laden
    merged_df.to_sql('fact_bestellungen', engine, if_exists='replace', index=False)
    umsatz_pro_stadt.to_sql('dim_umsatz_stadt', engine, if_exists='replace', index=False)

    logger.info("ETL Pipeline erfolgreich abgeschlossen!")

if __name__ == '__main__':
    etl_pipeline()

Querverweise


Stand: 2024