6. Juni 2024 von Christian Del Monte
Change Data Capture für Data Lakehouse
Change Data Capture (CDC) ist eine Technik, die alle Datenänderungen in einem Datenarchiv erfasst, sammelt und für die Übertragung und Replikation in andere Systeme aufbereitet, entweder als Batch-Prozess oder als Stream. Dieser Blog-Beitrag konzentriert sich auf die Anwendung von CDC in Data Lakehouses am Beispiel von Change Data Feed, einer Variante von CDC, die von Databricks im Rahmen von Delta-Lake-basierten Data Lakehouses entwickelt wurde.
Einführung in Data Lakehouse und Open Table Format Technologien
Ein Data Lakehouse kombiniert die Vorteile von Data Lakes und Data Warehouses, indem es sowohl die skalierbare Speicherung von Rohdaten als auch die leistungsfähige Analyse und Verwaltung von strukturierten und semistrukturierten Daten ermöglicht. Ein zentrales Element ist dabei das Open Table Format (OTF), das strukturierte Daten effizient speichert und es den Benutzern ermöglicht, wie in einer traditionellen Datenbank zu arbeiten.
Gängige OTFs wie Apache Iceberg, Apache Hudi und Delta Lake sind Open Source und mit Technologien wie Apache Spark und Presto kompatibel. Diese Formate bieten effiziente Lese-, Schreib- und Verwaltungsoperationen, die ACID-Transaktionen unterstützen. Sie ermöglichen eine effiziente Datenpartitionierung zur Verbesserung der Abfrageleistung, den Zugriff auf historische Datenversionen für Analyse und Wiederherstellung sowie die native Unterstützung von Change Data Capture zur Verfolgung und Weiterverarbeitung von Datenänderungen.
Anwendung der CDC in der Medaillon-Datenarchitektur
Ein Data Lakehouse organisiert Daten häufig mit Hilfe einer Medaillon-Datenarchitektur, die in drei Ebenen unterteilt ist: Bronze, Silber und Gold, um eine effiziente Verwaltung und Verteilung von Datenänderungen (CDC) zu ermöglichen.
- Bronze-Ebene: Speichert die Rohdaten. Änderungen und neue Daten werden von externen CDC-Tools wie Debezium und Kafka erfasst und in die Bronze-Tabellen geschrieben.
- Silber-Ebene: Bereinigt, filtert und reichert die Rohdaten an, um nutzbare Datensätze zu erzeugen. Änderungen aus der Bronze-Ebene werden über temporäre Sichten und Merge-Operationen in die Silber-Tabellen integriert.
- Gold-Ebene: Aggregiert und optimiert Daten für Geschäftsanalysen und Berichte. Änderungen aus der Silber-Ebene werden über temporäre Views und Merge-Operationen in die Gold-Tabellen übernommen.
Diese Struktur verbessert die Datenqualität und -zuverlässigkeit, während CDC sicherstellt, dass alle Datenänderungen effizient durch die Ebenen propagiert werden, um stets aktuelle und analysierbare Daten bereitzustellen.
Beispiel für den Einsatz von CDC in einem Data Lakehouse
Ein E-Commerce-Unternehmen verwendet sowohl eine herkömmliche Datenbank für die Verwaltung des Tagesgeschäfts als auch ein Data Lakehouse für erweiterte Analysen und Berichte. Das Unternehmen muss große Mengen an Verkaufs-, Kunden- und Produktdaten verwalten. Nehmen wir an, dass die folgenden anfänglichen Verkaufsdaten in einer PostgreSQL-Datenbank vorhanden sind:
Nun treten Änderungen in der operativen Datenbank auf. Ein neuer Verkauf wird in die operative Datenbank eingegeben:
INSERT INTO sales (sale_id, product_id, customer_id, amount, sale_date)
VALUES (3, 103, 1003, 300, '2023-01-03');
Debezium erkennt diese Eingabe und sendet ein Ereignis an Kafka. Kafka sendet das Ereignis an einen Verbraucher, der mit Apache Spark Structured Streaming konfiguriert ist. Structured Streaming verarbeitet das Ereignis und fügt den neuen Datensatz in die Bronze-Tabelle ein. Außerdem wird ein bestehender Datensatz aktualisiert:
UPDATE sales
SET amount = 250
WHERE sale_id = 2;
Diese Änderung wird ebenfalls von Debezium erkannt und als Ereignis an Kafka gesendet. Der Kafka Consumer, der mit Apache Spark Structured Streaming konfiguriert ist, verarbeitet dieses Ereignis ebenfalls und fügt den aktualisierten Datensatz als Duplikat in die Bronze-Tabelle ein. Die Daten in der Bronze-Tabelle sehen nun wie folgt aus:
In der nächsten Phase (Silber-Ebene) werden die Daten bereinigt und transformiert. Dabei werden Duplikate entfernt und fehlende Werte korrigiert. In diesem Beispiel wird die doppelte Zeile mit sale_id 2 entfernt und ein eventuell fehlender Wert für customer_id korrigiert. Ist eine Korrektur nicht möglich, wird die entsprechende Zeile entfernt. Nach der Bereinigung und Transformation sehen die Daten in der Silber-Tabelle wie folgt aus:
Schließlich werden die Daten auf der Gold-Ebene aggregiert. Hier wird der Gesamtumsatz pro Kundin oder Kunde berechnet:
Ein Beispiel für CDC: Der Change Data Feed (CDF) in Delta Lake
Delta Lake ist ein OTF, das ACID-Transaktionen garantiert. Jede Datenänderungsoperation, sei es Einfügen, Aktualisieren oder Löschen, wird als vollständige und unabhängige Transaktion behandelt. Ein zentrales Merkmal von Delta Lake ist das Transaktionsprotokoll, in dem alle Änderungen aufgezeichnet werden. Dieses Protokoll ermöglicht die Wiederherstellung früherer Versionen der Daten (Zeitreisefunktion) und stellt sicher, dass alle Änderungen korrekt aufgezeichnet und angewendet werden. Darüber hinaus kann Delta Lake Änderungen am Datenschema automatisch erkennen und anwenden, was die Verwaltung der Datenstruktur erheblich vereinfacht.
Delta Tables verwendet Apache Parquet als Speicherformat, das durch Techniken wie Datenindizierung und fortgeschrittene Komprimierung optimiert wurde, um eine effiziente Speicherung und performante Lese- und Schreibvorgänge für strukturierte und semistrukturierte Daten zu gewährleisten.
Für CDC bietet Delta Lake eine spezielle Funktion, den Change Data Feed (CDF), der die Implementierung von CDC in Data Engineering Pipelines erheblich vereinfacht. In der Vergangenheit mussten viele Entwickler CDC im Kontext einer Medaillon-Architektur manuell verwalten, was mit erheblichem Aufwand und Kosten verbunden war. Databricks hat dieses Problem erkannt und CDF in Delta Lake integriert, um die Ineffizienzen der manuellen CDC-Implementierung zu beseitigen. Diese Integration verbessert die Performance, vereinfacht die Verwaltung und erhöht die Skalierbarkeit von CDC-Operationen in großen Data Lakes.
Die folgenden Codeausschnitte zeigen, wie Delta Lakes Change Data Feed (CDF) in PySpark implementiert wird, um den Datentransfer zwischen Bronze-, Silber- und Gold-Tabellen effizient zu verwalten. Die vollständige Implementierung, einschließlich des gesamten Codes, ist im GitHub Repository https://github.com/cdelmonte-zg/delta-table-example verfügbar.
Der erste Codebeispiel zeigt, wie Datenaktualisierungen und neue Dateneinträge in der Silber-Tabelle simuliert werden können, wobei nur die letzte Änderung für jede sale_id
berücksichtigt wird. Zuerst werden Aktualisierungen und neue Daten zur Bronze-Tabelle hinzugefügt. Dann werden die Änderungen aus der Bronze-Tabelle mit Hilfe der CDF abgerufen. Eine Window-Funktion wird verwendet, um nur den neuesten Datensatz für jede sale_id
auszuwählen. Zuletzt werden diese Änderungen mit einem merge
in die Silber-Tabelle integriert. Wenn eine passende sale_id
gefunden wird, wird die Zeile aktualisiert, andernfalls wird eine neue Zeile eingefügt.
def simulate_data_flow_to_silver(
spark_session, bronze_path, silver_path, starting_from_timestamp
):
# Updates, die der Bronze-Tabelle hinzugefügt werden sollen
updates = [
# Aktualisierter Betrag
(
1,
101,
1001,
250,
datetime.strptime("2020-08-21 10:00:00", "%Y-%m-%d %H:%M:%S"),
),
# Weitere Aktualisierung
(
1,
101,
1001,
260,
datetime.strptime("2020-08-21 11:00:00", "%Y-%m-%d %H:%M:%S"),
),
# Neuer Verkauf
(
3,
103,
1001,
300,
datetime.strptime("2020-08-22 10:00:00", "%Y-%m-%d %H:%M:%S"),
),
]
df_updates = spark_session.createDataFrame(updates, schema)
df_updates.write.format("delta").mode("append").save(bronze_path)
# Änderungen aus der Bronze-Tabelle mit CDF lesen
df_changes = (
spark_session.read.format("delta")
.option("readChangeData", "true")
.option("startingTimestamp", starting_from_timestamp)
.table("sales_bronze")
)
# Nur den neuesten Datensatz für jede sale_id mit einer Fensterfunktion auswählen
window_spec = Window.partitionBy("sale_id").orderBy(col("sale_date").desc())
df_latest_changes = (
df_changes.withColumn("rn", row_number().over(window_spec))
.filter("rn = 1")
.drop("rn")
)
# Änderungen mit Merge auf die Silber-Tabelle anwenden
silver_table = DeltaTable.forPath(spark_session, silver_path)
(
silver_table.alias("silver")
.merge(df_latest_changes.alias("updates"), "silver.sale_id = updates.sale_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Das zweite Codebeispiel zeigt, wie Datenaktualisierungen von der Silber- in die Gold-Tabelle mit CDF simuliert werden. Änderungen werden aus der Silber-Tabelle gelesen und eine Fensterfunktion wird verwendet, um nur die neuesten Datensätze für jede sale_id
auszuwählen. Diese neuesten Änderungen werden in einer temporären Ansicht gespeichert und dann aggregiert, um den Gesamtbetrag für jeden Kunden zu berechnen. Die aggregierten Daten werden dann in die Gold-Tabelle gemerged.
def simulate_data_flow_to_gold(
spark_session, silver_path, gold_path, starting_from_timestamp
):
# Änderungen aus der Silber-Tabelle mit CDF lesen
df_silver_changes = (
spark_session.read.format("delta")
.option("readChangeData", "true")
.option("startingTimestamp", starting_from_timestamp)
.table("sales_silver")
)
# Nur den neuesten Datensatz für jede sale_id mit einer Fensterfunktion auswählen
window_spec = Window.partitionBy("sale_id").orderBy(col("sale_date").desc())
df_silver_latest = (
df_silver_changes.withColumn("rn", row_number().over(window_spec))
.filter("rn = 1")
.drop("rn")
)
# Temporäre Ansicht mit den neuesten Änderungen erstellen
df_silver_latest.createOrReplaceTempView("temp_latest_changes")
# Neueste Änderungen aggregieren
df_gold_aggregate = spark_session.sql(
"""
SELECT customer_id, SUM(amount) AS total_amount
FROM temp_latest_changes
GROUP BY customer_id
"""
)
# Aggregierte Daten in die Gold-Tabelle mergen
gold_table = DeltaTable.forPath(spark_session, gold_path)
(
gold_table.alias("gold")
.merge(
df_gold_aggregate.alias("updates"), "gold.customer_id = updates.customer_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Fazit
Change Data Capture ist entscheidend für die Synchronisierung von Datenspeichern in Data Lakes und ermöglicht die Verwaltung von Datenänderungen auf den drei Ebenen Bronze, Silber und Gold. Der Change Data Feed in Delta Lake vereinfacht die Implementierung von CDC erheblich und verbessert die Leistung und Skalierbarkeit von CDC-Operationen in großen Data Lakes.
Weitere spannende Themen aus der adesso-Welt findet ihr in unseren bisher erschienenen Blog-Beiträgen.
Auch interessant: