# 8.4 ETL-Prozesse ## Was ist ETL? **ETL** = Extract, Transform, Load ``` ETL - Prozess ┌─────────┐ ┌─────────────┐ ┌─────────┐ │ Extract │ → │ Transform │ → │ Load │ │(Extrahieren)│ │(Transformieren)│ │(Laden) │ └─────────┘ └─────────────┘ └─────────┘ ``` ### Anwendungsfälle | Fall | Beschreibung | |------|-------------| | Data Warehouse | Daten für Analysen konsolidieren | | Migration | Daten auf neues System übertragen | | Integration | Daten aus verschiedenen Quellen zusammenführen | | Reporting | Daten für Reports aufbereiten | --- ## Extract (Extraktion) ### Extraktionsarten | Typ | Beschreibung | |-----|-------------| | Full Load | Alle Daten laden | | Incremental | Nur neue/geänderte Daten | | CDC | Change Data Capture | ### Quelldaten ``` Extraktionsquellen ├── Datenbanken ├── Dateien (CSV, JSON, XML) ├── APIs └── Streams ``` ### Beispiel ```python # Datenbank df_quelle = pd.read_sql("SELECT * FROM tabelle", conn) # API response = requests.get('https://api.example.com/daten') df_quelle = pd.DataFrame(response.json()) # Datei df_quelle = pd.read_csv('daten.csv') ``` --- ## Transform (Transformation) ### Transformationstypen ``` Transformationen - Übersicht ├── Datentyp-Konvertierung ├── Berechnungen ├── Aggregationen ├── Filterung ├── JOINs ├── Normalisierung └── Bereinigung ``` ### Bereinigung ```python # Fehlende Werte df['alter'].fillna(df['alter'].mean(), inplace=True) # Duplikate entfernen df.drop_duplicates(inplace=True) # Groß-/Kleinschreibung df['name'] = df['name'].str.lower() # Trimmen df['name'] = df['name'].str.strip() ``` ### Berechnungen ```python # Neue Spalte df['umsatz_mit_mwst'] = df['umsatz'] * 1.19 # Bedingte Werte df['rabatt'] = df.apply( lambda x: x['umsatz'] * 0.1 if x['umsatz'] > 100 else 0, axis=1 ) ``` ### Aggregation ```python # Gruppieren und aggregieren df_grouped = df.groupby('kategorie').agg({ 'umsatz': 'sum', 'anzahl': 'count' }).reset_index() ``` --- ## Load (Laden) ### Ladestrategien | Typ | Beschreibung | |-----|-------------| | Full Load | Tabelle komplett ersetzen | | Incremental Load | Nur neue Daten hinzufügen | | Upsert | Update oder Insert | ### Ziel-Systeme ``` Lade-Ziele ├── Datenbank (MySQL, PostgreSQL) ├── Data Warehouse (Snowflake, BigQuery) ├── Datei (CSV, JSON) └── Cloud Storage ``` ### Beispiel ```python # In Datenbank laden df.to_sql('ziel_tabelle', conn, if_exists='replace', index=False) # In CSV schreiben df.to_csv('ausgabe.csv', index=False) # In Excel schreiben df.to_excel('ausgabe.xlsx', index=False) ``` --- ## Orchestrierung ### Tools | Tool | Beschreibung | |------|-------------| | Apache Airflow | Workflow-Orchestrierung | | Luigi | Python-basiert | | dbt | Data Transformation | | Talend | ETL-Werkzeug | | Apache NiFi | Datenfluss | ### Airflow DAG ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime dag = DAG('etl_pipeline', start_date=datetime(2024, 1, 1)) extract = PythonOperator( task_id='extract', python_callable=extract_daten, dag=dag ) transform = PythonOperator( task_id='transform', python_callable=transform_daten, dag=dag ) load = PythonOperator( task_id='load', python_callable=load_daten, dag=dag ) extract >> transform >> load ``` --- ## Fehlerbehandlung ``` ETL - Fehlerstrategien ├── Logging ├── Benachrichtigung ├── Retry-Logik ├── Quarantäne (Problem-Daten) └── Monitoring ``` --- ## Querverweise - [[LF8-03-Datenformate|Zurück: Datenformate]] - [[LF9-Netzwerke-Dienste|Nächstes Lernfeld: Netzwerke und Dienste]] --- *Stand: 2024*