بخش اول : مفاهیم پایه و پیش‌نیازها
بخش دوم : جعبه ابزار یک مهندس داده

مدیریت Offset و Commitو Ack دستی

در کافکا، Offset نشان‌دهنده موقعیت آخرین پیام مصرف‌شده در یک پارتیشن است. مدیریت صحیح Offsetها، کلید اطمینان از پردازش دقیق و قابل‌اعتماد پیام‌ها است. به‌خصوص در سناریوهای Production، نیاز داریم کنترل کامل روی Commit و Ack داشته باشیم.


۱) Offset چیست؟

  • هر پیام در یک پارتیشن یک شماره ترتیبی دارد که به آن Offset گفته می‌شود.
  • مصرف‌کننده‌ها از این شماره برای پیگیری موقعیت خود استفاده می‌کنند.
  • Kafka این اطلاعات را برای هر Consumer Group ذخیره می‌کند تا در دفعات بعدی اتصال، ادامه مصرف از همان Offset شروع شود.

۲) Commit خودکار (Auto Commit)

به‌صورت پیش‌فرض، Kafka Consumer در پایتون دارای Auto Commit است:

consumer = KafkaConsumer(
    'demo-topic',
    bootstrap_servers='localhost:9092',
    group_id='example-group',
    auto_offset_reset='earliest',
    enable_auto_commit=True,    # فعال بودن Auto Commit
    auto_commit_interval_ms=5000
)
  • enable_auto_commit=True → Kafka هر چند ثانیه (پیش‌فرض ۵ ثانیه) آخرین Offset مصرف‌شده را ذخیره می‌کند.
  • مزایا:
    • ساده و راحت
  • معایب:
    • اگر مصرف‌کننده قبل از Commit قطع شود، ممکن است بعضی پیام‌ها دوباره خوانده شوند یا برخی از دست بروند.
    • کنترل دقیق روی پردازش پیام‌ها ندارد.

۳) Commit دستی (Manual Commit)

برای پردازش‌های حساس، بهتر است Commit دستی انجام دهیم:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'demo-topic',
    bootstrap_servers='localhost:9092',
    group_id='manual-commit-group',
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

for msg in consumer:
    print(f"Processing: {msg.value} (Partition={msg.partition}, Offset={msg.offset})")
    
    # فرض کنید پردازش پیام زمان‌بر است
    success = True  # نتیجه پردازش
    if success:
        consumer.commit()
نکات مهم:
  • enable_auto_commit=False → Kafka خودکار Commit نمی‌کند.
  • با فراخوانی consumer.commit()، آخرین Offset ذخیره می‌شود.
  • اگر مصرف‌کننده قطع شود قبل از Commit → پیام‌ها دوباره دریافت می‌شوند.
  • این روش برای پردازش دقیق Exactly-Once یا کاری که نباید پیام از دست برود بسیار مناسب است.

۴) Commit به صورت Partition مشخص

گاهی می‌خواهیم Commit را برای هر پارتیشن جداگانه انجام دهیم:

from kafka import TopicPartition

tp = TopicPartition('demo-topic', 0)
consumer.commit(offsets={tp: 10})
  • این روش امکان کنترل دقیق روی پردازش هر پارتیشن را می‌دهد.
  • برای پردازش موازی و پیچیده در چند Thread/Process کاربرد دارد.

۵) رابطه Commit و Ack در Producer/Consumer

  • Producer:acks مشخص می‌کند پیام با چه سطحی تایید شود:
    • acks=0 → بدون انتظار برای تایید
    • acks=1 → رهبر یا لیدر پارتیشن تأیید می‌کند
    • acks=all → تمام ISRها تأیید می‌کنند
  • Consumer: Commit دقیق Offset حکم Ack در سمت مصرف‌کننده را دارد.
    • پیام تا زمانی که Offset Commit نشده باشد، برای گروه مصرف‌کننده در دسترس باقی می‌ماند.
    • Commit دستی اجازه می‌دهد پیام تنها بعد از پردازش موفق ذخیره شود.

۶) سناریوی عملی با Python

  1. Producer ساده ایجاد کنید (همان producer.py).
  2. Consumer با Commit دستی بسازید:
consumer = KafkaConsumer(
    'demo-topic',
    bootstrap_servers='localhost:9092',
    group_id='ack-demo-group',
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

for msg in consumer:
    print(f"Received: {msg.value}")
    # شبیه‌سازی پردازش طولانی
    import time
    time.sleep(1)
    consumer.commit()
مشاهده رفتار:
  • اگر Consumer را قبل از Commit قطع کنیم → پیام دوباره خوانده می‌شود.
  • با Commit دستی، می‌توانیم پردازش امن و قابل‌اعتماد داشته باشیم.

۷) جمع‌بندی

مفهومتوضیح
Offsetشماره ترتیبی پیام در یک پارتیشن
Auto CommitKafka خودکار Offset را ذخیره می‌کند (ساده اما کم‌قابل‌کنترل)
Manual Commitکنترل کامل روی پردازش پیام و ذخیره Offset
Partition Commitذخیره Offset برای پارتیشن خاص، مناسب پردازش موازی
Ack (Producer)میزان تأیید پیام هنگام ارسال به کلاستر

با فهم دقیق Offset و Commit، می‌توان سیستم‌های Exactly-Once Processing یا At-Least-Once Processing طراحی کرد و از از دست رفتن یا دوباره‌خواندن پیام‌ها جلوگیری کرد.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید