iToverDose/Yazılım· 23 NISAN 2026 · 12:02

Python ile Dağıtık Sistemler Nasıl Kolaylaştırılır? CQRS ve Outbox Modeli

Python'da dağıtık sistemler geliştirirken karşılaşılan tutarlılık sorunlarına karşı python-cqrs çatısını kullanarak nasıl basit ve güvenilir çözümler üretebilirsiniz? İşte detaylı rehber.

DEV Community4 dk okuma0 Yorumlar

Python'da dağıtık mimariler geliştirmek, özellikle işlemlerin birden fazla hizmet arasında senkronize edilmesi gerektiğinde oldukça karmaşık hale gelebilir. Bir siparişin ödendiği anda envanterin de güncellenmesi gerektiğinde, herhangi bir hata anında tüm sistemin tutarlılığını korumak kritik önem taşır. Bu noktada CQRS (Command Query Responsibility Segregation) ve Outbox Pattern gibi desenler devreye giriyor.

Bugün sizlere, Vadim liderliğindeki Timeweb Cloud ekibinin finansal altyapı projelerinde kullanılan python-cqrs çatısını tanıtacağız. Bu kütüphane, Python'da dağıtık sistemler geliştirirken karşılaşılan tutarlılık ve güvenilirlik sorunlarını çözmek için özel olarak tasarlanmış. İşte nasıl çalıştığına dair detaylar.

python-cqrs Nedir ve Nasıl Çalışır?

python-cqrs, CQRS ve olay güdümlü mimarilerin temel prensiplerini uygulayan bir Python çatısıdır. Bu kütüphane, özellikle komutlar (commands), sorgu işleyicileri (query handlers) ve etki alan olayları (domain events) gibi bileşenleri yönetmek için tasarlanmıştır. Temel avantajı, iş mantığını HTTP, Kafka ya da CLI gibi dış bağımlılıklardan soyutlamasıdır.

Örneğin, bir kullanıcı oluşturma komutu düşünün:

import di
import cqrs
from cqrs.requests import bootstrap

# 1) Komut ve yanıt yapıları tanımlanıyor
class CreateUserCommand(cqrs.Request):
    email: str
    name: str

class CreateUserResponse(cqrs.Response):
    user_id: str

# 2) İş mantığı içeren temiz bir komut işleyici
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
    async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
        user_id = f"user_{request.email}"
        return CreateUserResponse(user_id=user_id)

# 3) Mediator aracılığıyla komutlar ve işleyiciler eşleştiriliyor
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)

# Komut gönderiliyor
result = await mediator.send(
    CreateUserCommand(email="user@example.com", name="John"),
)

Bu örnekte, CreateUserCommand bir komutu temsil ederken, CreateUserHandler iş mantığını içeriyor. Mediator, komutları ilgili işleyicilere yönlendiriyor ve tüm sürecin bağımsızlığını sağlıyor.

Sagalar ve Otomatik Düzeltme Mekanizmaları

Dağıtık sistemlerde karşılaşılan en büyük zorluklardan biri, sistemin farklı parçaları arasındaki işlemlerin tutarlılığını sağlamaktır. Örneğin, bir siparişin ödendiği anda envanterin de güncellenmesi gerektiğinde, ödeme servisinin başarılı olması ancak envanter servisinin başarısız olması durumunda ne olacaktır?

python-cqrs, bu sorunu çözmek için saga desenini kullanıyor. Saga'lar, uzun süren işlemleri yönetmek ve herhangi bir hata durumunda otomatik düzeltme (compensating actions) işlemlerini gerçekleştirmek için tasarlanmıştır. Örneğin:

  • Ödeme başarılı, ancak envanter güncellenemediğinde, ödeme geri alınabilir.
  • Sistemin çökmesi durumunda, saga'lar yeniden başlatılabilir ve kaldığı yerden devam edebilir.

Bu sayede, sistemin herhangi bir parçasında meydana gelebilecek aksaklıklar, diğer parçaları olumsuz etkilemeden çözümlenebilir.

Transactional Outbox: Veritabanı ve Mesajlaşma Arasındaki Güvenlik Ağı

Dağıtık sistemlerdeki en yaygın sorunlardan biri, veritabanındaki bir işlemin başarılı olması ancak mesajlaşma sistemine (örneğin Kafka) ulaşamaması durumudur. Bu durum, sistemin farklı parçaları arasında senkronizasyon kaybına yol açabilir.

Transactional Outbox deseni, bu sorunu çözmek için geliştirilmiştir. Bu desen sayesinde, entegrasyon olayları (integration events) veritabanındaki bir işlemin parçası olarak kaydedilir. Ardından, ayrı bir süreç bu olayları mesajlaşma sistemine aktarır. Bu sayede, veritabanındaki işlem başarılı olduğu sürece, olayın da mesajlaşma sistemine ulaşacağı garanti altına alınır.

Örneğin, bir kullanıcının sisteme katılması durumunda bir olay yayınlanması gerektiğinde:

import cqrs
from pydantic import BaseModel

# Olayın yapısını tanımlıyoruz
class UserJoinedPayload(BaseModel, frozen=True):
    user_id: str
    meeting_id: str

# Olayın mesajlaşma sistemiyle eşleştirilmesi
cqrs.OutboxedEventMap.register(
    "user_joined",  # Kafka topic ya da RabbitMQ routing key
    cqrs.NotificationEvent[UserJoinedPayload],
)

Ardından, komut işleyicisi içinde bu olayı outbox tablosuna ekleyebilirsiniz. Ayrı bir süreç, outbox tablosundaki olayları okuyarak mesajlaşma sistemine aktarır ve bu sayede en az bir kez (at-least-once) teslimat garantisi sağlanır.

Alan Olayları (Domain Events) ve Projeksiyonlar

Alan olayları, aynı işlem içinde gerçekleşen ve sistemdeki diğer bileşenlere bildirilen olaylardır. Bu olaylar, genellikle veritabanı işlemleriyle birlikte kaydedilir ve aynı anda ilgili diğer bileşenlere iletilir. Örneğin, bir sipariş oluşturulduğunda, bu siparişle ilgili bir alan olayı yayınlanabilir ve bu olay, siparişin detaylarını içeren bir projeksiyonu güncelleyebilir.

import cqrs
import di
from cqrs.requests import bootstrap

# Alan olayı tanımlanıyor
class OrderLineAdded(cqrs.DomainEvent):
    order_id: str
    sku: str

# Komut ve yanıt yapıları
class CreateOrder(cqrs.Request):
    order_id: str

class OrderCreated(cqrs.Response):
    pass

# Komut işleyicisi
class CreateOrderHandler(cqrs.RequestHandler[CreateOrder, OrderCreated]):
    def __init__(self) -> None:
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events

    async def handle(self, request: CreateOrder) -> OrderCreated:
        self._events.append(
            OrderLineAdded(order_id=request.order_id, sku="sku-1"),
        )
        return OrderCreated()

# Olay işleyicisi
class OnOrderLineAdded(cqrs.EventHandler[OrderLineAdded]):
    async def handle(self, event: OrderLineAdded) -> None:
        # Projeksiyon güncellenebilir
        pass

# Olayların eşleştirilmesi
def events_mapper(m: cqrs.EventMap) -> None:
    m.bind(OrderLineAdded, OnOrderLineAdded)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(CreateOrder, CreateOrderHandler),
    domain_events_mapper=events_mapper,
    max_concurrent_event_handlers=4,  # Paralel çalışan olay işleyicilerinin sayısı sınırlandırılıyor
)

Bu örnekte, CreateOrderHandler bir sipariş oluştururken aynı zamanda bir alan olayı yayınlıyor. OnOrderLineAdded ise bu olayı dinleyerek ilgili projeksiyonu güncelliyor. Bu sayede, sistemdeki farklı bileşenler arasında gerçek zamanlı senkronizasyon sağlanıyor.

Sonuç: Dağıtık Sistemlerinizi Basitleştirin

Python'da dağıtık sistemler geliştirmek, özellikle tutarlılık ve güvenilirlik gereksinimleri göz önünde bulundurulduğunda oldukça karmaşık bir süreç olabilir. Ancak, python-cqrs gibi kütüphaneler sayesinde, bu süreci önemli ölçüde basitleştirmek ve güvenilir çözümler üretmek mümkün hale geliyor.

Saga'lar, Transactional Outbox ve alan olayları gibi desenler, sisteminizin herhangi bir parçasında meydana gelebilecek aksaklıklara karşı otomatik düzeltme mekanizmaları sunarken, aynı zamanda sisteminizin genelinde tutarlılığı ve güvenilirliği artırıyor.

Eğer siz de Python'da dağıtık sistemler geliştiriyor ve bu tür sorunlarla karşılaşıyorsanız, python-cqrs kütüphanesini değerlendirmenizi öneriyoruz. Bu sayede, daha güvenilir ve ölçeklenebilir sistemler inşa etmek çok daha kolay hale gelecektir.

Yapay zeka özeti

Learn how to implement reliable distributed transactions in Python using CQRS, sagas, and transactional outbox patterns with python-cqrs.

Yorumlar

00
YORUM BIRAK
ID #WW20D7

0 / 1200 KARAKTER

İnsan doğrulaması

6 + 5 = ?

Editör onayı sonrası yayına girer

Moderasyon · Spam koruması aktif

Henüz onaylı yorum yok. İlk yorumu sen bırak.