# 8.4 ETL-Prozesse ## Was ist ETL? **ETL** = Extract, Transform, Load ``` ETL - Prozess ┌─────────┐ ┌─────────────┐ ┌─────────┐ │ Extract │ → │ Transform │ → │ Load │ │(Extrahieren)│ │(Transformieren)│ │(Laden) │ └─────────┘ └─────────────┘ └─────────┘ ``` ### Anwendungsfälle | Fall | Beschreibung | |------|-------------| | Data Warehouse | Daten für Analysen konsolidieren | | Migration | Daten auf neues System übertragen | | Integration | Daten aus verschiedenen Quellen zusammenführen | | Reporting | Daten für Reports aufbereiten | --- ## Extract (Extraktion) ### Extraktionsarten | Typ | Beschreibung | |-----|-------------| | Full Load | Alle Daten laden | | Incremental | Nur neue/geänderte Daten | | CDC | Change Data Capture | ### Quelldaten ``` Extraktionsquellen ├── Datenbanken ├── Dateien (CSV, JSON, XML) ├── APIs └── Streams ``` ### Beispiel ```python # Datenbank df_quelle = pd.read_sql("SELECT * FROM tabelle", conn) # API response = requests.get('https://api.example.com/daten') df_quelle = pd.DataFrame(response.json()) # Datei df_quelle = pd.read_csv('daten.csv') ``` --- ## Transform (Transformation) ### Transformationstypen ``` Transformationen - Übersicht ├── Datentyp-Konvertierung ├── Berechnungen ├── Aggregationen ├── Filterung ├── JOINs ├── Normalisierung └── Bereinigung ``` ### Bereinigung ```python # Fehlende Werte df['alter'].fillna(df['alter'].mean(), inplace=True) # Duplikate entfernen df.drop_duplicates(inplace=True) # Groß-/Kleinschreibung df['name'] = df['name'].str.lower() # Trimmen df['name'] = df['name'].str.strip() ``` ### Berechnungen ```python # Neue Spalte df['umsatz_mit_mwst'] = df['umsatz'] * 1.19 # Bedingte Werte df['rabatt'] = df.apply( lambda x: x['umsatz'] * 0.1 if x['umsatz'] > 100 else 0, axis=1 ) ``` ### Aggregation ```python # Gruppieren und aggregieren df_grouped = df.groupby('kategorie').agg({ 'umsatz': 'sum', 'anzahl': 'count' }).reset_index() ``` --- ## Load (Laden) ### Ladestrategien | Typ | Beschreibung | |-----|-------------| | Full Load | Tabelle komplett ersetzen | | Incremental Load | Nur neue Daten hinzufügen | | Upsert | Update oder Insert | ### Ziel-Systeme ``` Lade-Ziele ├── Datenbank (MySQL, PostgreSQL) ├── Data Warehouse (Snowflake, BigQuery) ├── Datei (CSV, JSON) └── Cloud Storage ``` ### Beispiel ```python # In Datenbank laden df.to_sql('ziel_tabelle', conn, if_exists='replace', index=False) # In CSV schreiben df.to_csv('ausgabe.csv', index=False) # In Excel schreiben df.to_excel('ausgabe.xlsx', index=False) ``` --- ## Orchestrierung ### Tools | Tool | Beschreibung | |------|-------------| | Apache Airflow | Workflow-Orchestrierung | | Luigi | Python-basiert | | dbt | Data Transformation | | Talend | ETL-Werkzeug | | Apache NiFi | Datenfluss | ### Airflow DAG ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG('etl_pipeline', start_date=datetime(2024, 1, 1)) extract = PythonOperator( task_id='extract', python_callable=extract_daten, dag=dag ) transform = PythonOperator( task_id='transform', python_callable=transform_daten, dag=dag ) load = PythonOperator( task_id='load', python_callable=load_daten, dag=dag ) extract >> transform >> load ``` --- ## Fehlerbehandlung ``` ETL - Fehlerstrategien ├── Logging ├── Benachrichtigung ├── Retry-Logik ├── Quarantäne (Problem-Daten) └── Monitoring ``` --- ## Praktisches Beispiel: Vollständiger ETL-Pipeline ```python import pandas as pd import requests from sqlalchemy import create_engine import logging # Logging konfigurieren logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def etl_pipeline(): """ Vollständiger ETL-Pipeline für Verkaufsdaten """ # === EXTRACT === logger.info("Starte Extraktion...") # Aus CSV kunden_df = pd.read_csv('daten/kunden.csv') bestellungen_df = pd.read_csv('daten/bestellungen.csv') # Aus API try: response = requests.get('https://api.shop.de/produkte', timeout=30) produkte_df = pd.DataFrame(response.json()) except Exception as e: logger.error(f"API Fehler: {e}") produkte_df = pd.DataFrame() logger.info(f"Extrahiert: {len(kunden_df)} Kunden, {len(bestellungen_df)} Bestellungen") # === TRANSFORM === logger.info("Starte Transformation...") # Daten bereinigen kunden_df = kunden_df.drop_duplicates() kunden_df['email'] = kunden_df['email'].str.lower().str.strip() kunden_df['erstellt_am'] = pd.to_datetime(kunden_df['erstellt_am']) # Berechnungen bestellungen_df['umsatz_mit_mwst'] = bestellungen_df['umsatz_netto'] * 1.19 # JOIN: Bestellungen mit Kunden verbinden merged_df = bestellungen_df.merge( kunden_df[['kunden_id', 'name', 'stadt']], on='kunden_id', how='left' ) # Aggregation: Umsatz pro Stadt umsatz_pro_stadt = merged_df.groupby('stadt')['umsatz_netto'].sum().reset_index() logger.info(f"Transformation abgeschlossen: {len(merged_df)} Datensätze") # === LOAD === logger.info("Starte Laden...") # Datenbank-Verbindung engine = create_engine('postgresql://user:pass@localhost:5432/warehouse') # In Datenbank laden merged_df.to_sql('fact_bestellungen', engine, if_exists='replace', index=False) umsatz_pro_stadt.to_sql('dim_umsatz_stadt', engine, if_exists='replace', index=False) logger.info("ETL Pipeline erfolgreich abgeschlossen!") if __name__ == '__main__': etl_pipeline() ``` --- ## Querverweise - [[LF8-03-Datenformate|Zurück: Datenformate]] - [[LF9-Netzwerke-Dienste|Nächstes Lernfeld: Netzwerke und Dienste]] --- *Stand: 2024*