Verteilte Systeme stoßen an ihre Grenzen, wenn lokale Transaktionen nicht automatisch mit anderen Diensten synchronisiert werden. Ein klassisches Beispiel: Ein Nutzer platziert eine Bestellung, die Zahlung wird erfolgreich verarbeitet – doch der Lagerdienst, der die Ware reservieren soll, fällt aus. Das Geld ist weg, die Bestellung bleibt unsichtbar. Solche Szenarien sind kein Mythos, sondern Realität in vielen Microservice-Architekturen.
Genau hier setzt python-cqrs an, ein Framework, das auf den Prinzipien von Command Query Responsibility Segregation (CQRS) und Event-Driven-Architektur basiert. Entwickelt von einem Team um Vadim, ehemaliger Tech-Lead bei Timeweb Cloud, bietet die Bibliothek eine praktische Lösung, um verteilte Systeme ohne manuellen Aufwand konsistent zu halten.
Der Kern: Einfache Handler-Struktur ohne Abhängigkeiten
Das Framework trennt klar zwischen Befehlen (Commands), Abfragen (Queries) und Domain-Events. Jede Operation folgt einem einheitlichen Muster:
- Ein Command beschreibt eine Aktion (z. B.
Bestellung erstellen). - Ein Handler führt die Geschäftslogik aus – unabhängig von HTTP, Kafka oder CLI.
- Der Mediator leitet den Command an den passenden Handler weiter, aufgelöst via Dependency Injection (DI).
Ein minimales Beispiel zur Erstellung eines Nutzers zeigt die Einfachheit:
import di
import cqrs
from cqrs.requests import bootstrap
# 1) Command-Definition (CQRS: Schreiboperation)
class CreateUserCommand(cqrs.Request):
email: str
name: str
class CreateUserResponse(cqrs.Response):
user_id: str
# 2) Handler ohne externe Abhängigkeiten
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
user_id = f"user_{request.email}"
return CreateUserResponse(user_id=user_id)
# 3) Mediator-Konfiguration
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)
# Ausführung
result = await mediator.send(
CreateUserCommand(email="user@example.com", name="John")
)Wichtig: Der Handler enthält ausschließlich businessrelevante Logik. Infrastrukturdetails wie Nachrichtenbroker oder Datenbanken werden extern injiziert.
Sagas: Automatische Fehlerbehebung in verteilten Transaktionen
Ein zentrales Problem verteilter Systeme sind partielle Ausfälle: Eine Transaktion wird in einem Dienst abgeschlossen, scheitert jedoch in einem anderen. Sagas lösen dieses Problem durch kompensierende Aktionen, die bei Fehlern automatisch ausgelöst werden.
python-cqrs speichert den Saga-Status in der Datenbank und ermöglicht:
- Persistente Zustände – selbst nach Server-Neustarts oder Abstürzen.
- Automatische Kompensation – z. B. Rückbuchung bei Zahlungsausfall.
- Wiederherstellung – nach Crashes wird die Saga an der letzten bekannten Position fortgesetzt.
Ein praktisches Beispiel:
- Ein Nutzer platziert eine Bestellung (
BestellungErstellenCommand). - Der Zahlungsdienst verarbeitet die Zahlung erfolgreich.
- Der Lagerdienst reserviert die Ware – falls dies fehlschlägt, wird eine
ZahlungStornierenCommand-Saga ausgelöst.
from cqrs.saga import Saga, SagaStep
class OrderSaga(Saga):
def __init__(self):
super().__init__(
steps=[
SagaStep(
command=PaymentProcessCommand(order_id=self.order_id),
compensation=PaymentRefundCommand(order_id=self.order_id),
),
SagaStep(
command=InventoryReserveCommand(order_id=self.order_id, sku="SKU-123"),
compensation=InventoryReleaseCommand(order_id=self.order_id),
),
]
)Vorteil: Die Saga läuft atomar ab – selbst wenn ein Dienst ausfällt, bleibt die Konsistenz gewahrt.
Transaktionale Outbox: Kein Datenverlust zwischen DB und Broker
Ein häufiger Fehler in Event-Driven-Systemen: Eine Transaktion wird in der Datenbank abgeschlossen, doch die Nachricht an den Broker (z. B. Kafka) geht verloren. Die transaktionale Outbox löst dieses Problem, indem sie:
- Events im selben DB-Transaktionsschritt speichert wie die Hauptdaten.
- Mindestens-einmal-Zustellung an den Broker garantiert (Redundanz durch idempotente Konsumenten).
So funktioniert’s in python-cqrs:
- Ein Handler fügt ein
NotificationEventzur Outbox hinzu – innerhalb der gleichen Transaktion wie die Geschäftslogik. - Ein separater Publisher-Service liest die Outbox-Tabelle aus und veröffentlicht die Events an Kafka.
- Bei einem Neustart wird die Outbox neu verarbeitet.
from pydantic import BaseModel
import cqrs
# Event-Payload definieren
class UserJoinedPayload(BaseModel, frozen=True):
user_id: str
meeting_id: str
# Outbox-Eintrag registrieren
cqrs.OutboxedEventMap.register(
"user_joined", # Kafka-Topic
cqrs.NotificationEvent[UserJoinedPayload],
)
# Im Handler: Event zur Outbox hinzufügen
class CreateUserCommandHandler(cqrs.RequestHandler[...]):
async def handle(self, request: CreateUserCommand) -> ...:
user_id = ...
# Event zur Outbox hinzufügen (innerhalb der DB-Transaktion)
self._outbox_repo.append(
cqrs.NotificationEvent(
event_type="user_joined",
payload=UserJoinedPayload(user_id=user_id, meeting_id="meeting-123"),
)
)Wichtig: Der Publisher muss idempotente Konsumenten unterstützen, da Nachrichten mehrfach gesendet werden können.
Domain-Events: Schnelle In-Memory-Kommunikation
Nicht alle Events müssen persistent und brokerbasiert sein. Für innerhalb eines Dienstes reicht oft eine in-Memory-Kommunikation via Domain-Events. Diese werden direkt vom Mediator an Event-Handler weitergeleitet – ohne Datenbank oder Kafka.
Beispiel: Nach der Erstellung einer Bestellung wird ein OrderCreated-Event ausgelöst, das eine Projektion aktualisiert:
class OrderCreated(cqrs.DomainEvent):
order_id: str
class OrderProjectionUpdater(cqrs.EventHandler[OrderCreated]):
async def handle(self, event: OrderCreated) -> None:
# z. B. Redis-Update oder Cache-Invaliderung
pass
# Mediator-Konfiguration
mediator = bootstrap.bootstrap(
commands_mapper=...,
domain_events_mapper=lambda m: m.bind(OrderCreated, OrderProjectionUpdater),
)Einsatzbereich: Ideal für schnelle, atomare Updates innerhalb eines Microservices. Bei einem Dienstausfall gehen diese Events verloren ("at-most-once").
Fazit: Konsistenz ohne manuellen Aufwand
Verteilte Systeme müssen nicht komplex sein, wenn man die richtigen Muster einsetzt. python-cqrs bietet eine pragmatische Lösung für CQRS, Sagas und transaktionale Outbox – ohne theoretischen Ballast. Die Bibliothek hilft Entwicklern, sich auf die Geschäftslogik zu konzentrieren, während Infrastrukturprobleme wie Konsistenz, Fehlerbehandlung und Nachrichtenverteilung automatisch gelöst werden.
Mit Features wie persistenten Sagas, transaktionaler Outbox und einfacher Mediator-Integration wird die Entwicklung robuster Microservices deutlich einfacher. Für Teams, die nach einer ausgereiften, aber dennoch schlanken Lösung suchen, könnte dies der nächste Schritt in Richtung zuverlässiger verteilter Systeme sein.
KI-Zusammenfassung
Learn how to implement reliable distributed transactions in Python using CQRS, sagas, and transactional outbox patterns with python-cqrs.
Tags