بخش اول: مفاهیم پایه، معماری و Kafka Core
بخش دوم : سایر بازیگران اکوسیستم کافکا
بخش سوم ابزارهای جانبی کافکا : کانکت و رجیستری
بخش چهارم : پردازش جریان در عمل
بخش پنجم :‌ مباحث مدیریتی و تکمیلی

رازهای Kafka Consumer: از اتصال تا Rebalance

پس از آنکه پیام‌ها توسط Producer در Kafka تولید شدند، وظیفه Consumer است که این پیام‌ها را دریافت، پردازش و در صورت نیاز ذخیره کند. به زبان ساده، کانسیومر همان مصرف‌کننده پیام‌ها است که با کلاستر Kafka ارتباط برقرار می‌کند و داده‌ها را از بروکرها می‌خواند.

اما جریان کار Consumer کمی پیچیده‌تر است و شامل چند نکته کلیدی می‌شود:

  1. تعیین پارتیشن‌های اختصاص یافته
    • هر کانسیومر خودش تصمیم نمی‌گیرد که از کدام پارتیشن پیام بخواند.
    • ابتدا هنگام پیوستن به گروه مصرف‌کنندگان (Consumer Group) یا در جریان یک فرآیند Rebalance، مشخص می‌شود که چه پارتیشن‌هایی به او تخصیص داده شده‌اند.
    • این تخصیص بر اساس استراتژی گروه (Range, Round-Robin, Sticky) انجام می‌شود و توسط Group Coordinator مدیریت می‌شود.
  2. شناسایی لیدر هر پارتیشن
    • وقتی پارتیشن‌ها به کانسیومر تخصیص یافت، او باید رهبر هر پارتیشن را شناسایی کند.
    • کانسیومر همیشه از رهبر پارتیشن پیام دریافت می‌کند، نه از رپلیکاها. این تضمین می‌کند که داده‌ها به صورت منظم و بدون تناقض خوانده شوند.
  3. واکشی پیام‌ها (Fetch)
    • کانسیومر شروع به واکشی پیام‌ها از لیدر پارتیشن‌های اختصاص یافته می‌کند.
    • دریافت پیام‌ها می‌تواند به صورت تکی یا به صورت دسته‌ای (Batch) انجام شود، بسته به پیکربندی‌هایی مثل max.poll.records و fetch.min.bytes.
  4. کامیت پیام‌ها (Offset Commit)
    • پس از پردازش پیام‌ها، کانسیومر باید به لیدر اعلام کند که تا چه پیام یا تا چه Offset را دریافت و پردازش کرده است.
    • این کار می‌تواند خودکار (enable.auto.commit=True) یا دستی (commit_sync() یا commit_async()) انجام شود.
    • کامیت کردن Offset، تضمین می‌کند که در صورت قطع اتصال یا ری‌استارت کانسیومر، مصرف از همان نقطه ادامه یابد.

در ادامه این جلسه، ما به صورت دقیق چگونگی تعیین پارتیشن‌ها، فرآیند Rebalance، واکشی پیام‌ها از لیدر و مدیریت Offset را واکاوی می‌کنیم و بررسی می‌کنیم که هر کانسیومر چگونه با Coordinator و Leader Broker تعامل می‌کند تا پیام‌ها با نظم و کارایی بالا مصرف شوند.


۱- وقتی یک Consumer شروع به کار می‌کند – گروه‌بندی و کشف متادیتا

قدم‌به‌قدم:
  1. اتصال اولیه (Bootstrap Connection)
    • Consumer به یکی از بروکرهای موجود در لیست bootstrap.servers متصل می‌شود.
    • اطلاعات متادیتا از بروکر گرفته می‌شود، شامل:
      • نام تاپیک‌ها و پارتیشن‌ها
      • رهبر هر پارتیشن
      • شناسه بروکر کنترلر
  2. یافتن Group Coordinator
    • هر Consumer در یک Consumer Group باید با یک Group Coordinator تعامل داشته باشد.
    • در Kafka 4.x، این Coordinator همان بروکری است که با هَش کردن group.id انتخاب می‌شود.
    • Coordinator مسئول مدیریت اعضای گروه و تخصیص پارتیشن‌ها است.
  3. عضویت در گروه (Join Group)
    • Consumer یک درخواست JoinGroupRequest به Coordinator می‌فرستد.
    • Coordinator اطلاعات زیر را مدیریت می‌کند:
      • لیست اعضای فعلی گروه
      • Member ID و استراتژی تقسیم پارتیشن‌ها (Range, Round-Robin, Sticky)
      • Session Timeout و اطلاعات heartbeat
  4. تخصیص پارتیشن‌ها (Assignment)
    • Coordinator بر اساس استراتژی تقسیم پارتیشن، پارتیشن‌ها را بین اعضا تقسیم می‌کند.
    • هر Consumer لیست پارتیشن‌های مسئولیت خود را دریافت می‌کند.

۲- دریافت پیام‌ها – هماهنگی با رهبر هر پارتیشن

  1. Fetch Request
    • Consumer یک FetchRequest به رهبر هر پارتیشن ارسال می‌کند.
    • رهبر پیام‌ها را از offset مورد نظر Consumer برمی‌گرداند.
  2. ردیابی Offset
    • Consumer به صورت محلی Offsetهای هر پارتیشن را نگه می‌دارد.
    • اگر enable_auto_commit=True باشد، Offsetها به صورت خودکار به تاپیک __consumer_offsets ارسال می‌شوند.
    • در غیر این صورت، می‌توان به صورت دستی با commit_sync() یا commit_async() Offset را ثبت کرد.

۳- Rebalancing – تخصیص مجدد پارتیشن‌ها

مثال:

  • تاپیک numbers چهار پارتیشن دارد.
  • Consumer1 به گروه می‌پیوندد → پارتیشن‌های ۰ و ۱ را می‌گیرد.
  • Consumer2 به گروه می‌پیوندد → یک rebalance رخ می‌دهد و پارتیشن‌ها دوباره تقسیم می‌شوند.

مکانیزم پشت صحنه:

  1. Coordinator یک Rebalance را فعال می‌کند.
  2. همه Consumerها موقتا Fetch را متوقف می‌کنند.
  3. تخصیص جدید پارتیشن‌ها محاسبه می‌شود.
  4. Consumerها با SyncGroup پارتیشن‌های جدید را دریافت می‌کنند.
  5. Fetch از آخرین Offset ثبت شده ادامه پیدا می‌کند.

⚠️ توجه: در طول Rebalance، هر Offset ثبت‌نشده ممکن است از بین برود مگر اینکه مدیریت مناسبی داشته باشید.


۴- پیکربندی‌های مهم Consumer و اثر آن‌ها

پارامترمقدار پیش‌فرضاثر / نکات
group.idنداردشناسه گروه، تعیین کننده Coordinator
enable.auto.commitTrueثبت خودکار Offsetها
auto.offset.resetlatestنقطه شروع خواندن وقتی Offset ثبت نشده است (earliest, latest, none, by_duration:P30D)
max.poll.records۵۰۰حداکثر رکورد در هر Poll
fetch.min.bytes۱حداقل داده‌ای که بروکر منتظر می‌ماند
fetch.max.wait.ms۵۰۰حداکثر مدت انتظار بروکر برای رسیدن به حداقل داده
session.timeout.ms۱۰۰۰۰زمانی که Coordinator منتظر heartbeat است
heartbeat.interval.ms۳۰۰۰فاصله بین ارسال heartbeatها
max.partition.fetch.bytes1MBحداکثر داده دریافت شده از هر پارتیشن در هر fetch
isolation.levelread_uncommittedنمایش پیام‌های تراکنشی
key.deserializer / value.deserializerنداردتبدیل داده‌های بایت به اشیا قابل استفاده

۵- مدیریت Offset

  • پیام‌ها در بروکر با Offset ذخیره می‌شوند.
  • Offsetهای Consumer در تاپیک داخلی __consumer_offsets ذخیره می‌شوند.
  • در زمان Fetch:
    • Consumer پیام‌ها را از next offset که می‌خواهد پردازش کند دریافت می‌کند.
    • Commit کردن Offset = نقطه بازیابی در صورت قطع ارتباط یا ریستارت.

۶- جریان کار Consumer – به صورت شماتیک

Consumer Start:
   → اتصال به bootstrap servers
   → دریافت metadata کلاستر
   → یافتن group coordinator
   → عضویت در گروه
   → دریافت پارتیشن‌های مسئولیت

Fetch Loop:
   → poll()
       → ارسال FetchRequest به رهبر هر پارتیشن
       → دریافت RecordBatch
       → Deserialize داده‌ها
       → بروزرسانی Offsetها
       → Commit (خودکار یا دستی)

Rebalance:
   → Coordinator Rebalance فعال می‌کند
   → همه Consumerها Fetch را متوقف می‌کنند
   → تخصیص جدید پارتیشن‌ها
   → Fetch ادامه پیدا می‌کند از آخرین Offset ثبت شده

۷- نکات پیشرفته و سناریوهای عملی

  1. چند Consumer در یک گروه → پارتیشن‌ها بین آن‌ها تقسیم می‌شوند، هیچ دو Consumer یک پارتیشن را همزمان نمی‌خوانند.
  2. Consumer جدید می‌آید یا Consumer فعلی خارج می‌شود → Rebalance فعال می‌شود.
  3. Kafka 4.1 → امکان auto.offset.reset=by_duration:P30D برای مصرف پیام‌های اخیر.
  4. تولیدکننده تراکنشی + Consumer:
    • isolation.level=read_committed → فقط پیام‌های کامیت‌شده مشاهده می‌شوند.

۸- مثال پایتون ساده

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'numbers',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='marketing',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for msg in consumer:
    print(f"Partition: {msg.partition}, Offset: {msg.offset}, Value: {msg.value}")
  • Consumer به گروه "marketing" تعلق دارد.
  • اگر Offset قبلی نباشد، از ابتدا می‌خواند.
  • Offsetها خودکار ثبت می‌شوند.
  • نمایش شماره پارتیشن، Offset و مقدار پیام.


اگر بخواهیم خیلی خلاصه تصویری بگوییم:

Consumer → Coordinator (joins group)
       → Receives assigned partitions
       → Fetch from partition leaders
       → Update offsets
       → Commit offsets
       ↻ Rebalance triggers → repeat

این توضیحات مرحله به مرحله و با مثال‌های عملی، هم مکانیزم Consumerها را نشان می‌دهد و هم پارامترهای اصلی و تأثیر آن‌ها را توضیح می‌دهد.

بررسی دقیق Kafka Consumer و پارامترهای کلیدی

فرض کنید Consumer پایه‌ای ما به شکل زیر است:

consumer = KafkaConsumer(
    'numbers',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='marketing',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

ویژگی‌های این Consumer:

  • عضو یک گروه مصرف‌کننده به نام "marketing" است.
  • در صورتی که Offset قبلی موجود نباشد، از اولین پیام‌ها (earliest) شروع به خواندن می‌کند.
  • Offsets به صورت خودکار و دوره‌ای ثبت می‌شوند (auto_commit=True).
  • پیام‌ها را با Partition، Offset و Value دریافت و پردازش می‌کند.

⚙️ پارامترهای کلیدی و اثرات عملی آن‌ها

🧩 ۱. auto_offset_reset

کاربرد:
مشخص می‌کند اگر Offset معتبر قبلی برای Consumer وجود نداشت، از کجا شروع به خواندن پیام‌ها کند.

مقادیر:

  • "earliest" → از قدیمی‌ترین پیام‌ها شروع می‌کند.
  • "latest" → فقط پیام‌های جدید بعد از اتصال را می‌خواند.
  • "none" → خطا می‌دهد اگر Offset موجود نباشد.
  • "by_duration:P30D" → پیام‌های آخر ۳۰ روز را می‌خواند (Kafka 4.x+).

مثال عملی:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group marketing --delete-offsets --topic numbers
  • earliest → همه پیام‌ها را بازپخش می‌کند.
  • latest → فقط پیام‌های جدید دریافت می‌شود.
  • by_duration:P1D → فقط پیام‌های ۲۴ ساعت اخیر خوانده می‌شوند.

🧩 ۲. enable_auto_commit و auto_commit_interval_ms

کاربرد:
تعیین می‌کند Offsetها خودکار ثبت شوند یا دستی.

حالتتوضیح
enable_auto_commit=TrueOffsets به صورت دوره‌ای ثبت می‌شوند
enable_auto_commit=Falseباید دستی ثبت شود (commit_sync() یا commit_async())

مثال عملی:

for message in consumer:
    print(f"Consumed: {message.value}")
    consumer.commit()  # ثبت دستی Offset

اگر Consumer وسط کار قطع شود، در حالت دستی از آخرین Commit ادامه می‌دهد.


🧩 ۳. max_poll_records

کاربرد:
حداکثر تعداد پیام‌هایی که در هر poll() برمی‌گردد.

  • پیش‌فرض: ۵۰۰ پیام.
  • با مقدار کمتر: Latency کمتر، Throughput کمتر
  • با مقدار بیشتر: Latency بیشتر، Throughput بیشتر
consumer = KafkaConsumer('numbers', max_poll_records=10, ...)

🧩 ۴. fetch_min_bytes و fetch_max_wait_ms

کاربرد:
کنترل اندازه و زمان پاسخ Broker برای Fetch پیام‌ها.

پارامترتوضیح
fetch_min_bytesمنتظر می‌ماند تا حداقل این تعداد بایت آماده شود
fetch_max_wait_msحداکثر مدت زمان انتظار قبل از پاسخ‌دهی
consumer = KafkaConsumer('numbers', fetch_min_bytes=1024, fetch_max_wait_ms=5000)

وقتی تولید کندگان کند عمل می‌کنند، Broker صبر می‌کند تا حداقل داده برسد یا زمان تمام شود.


🧩 ۵. session.timeout.ms و heartbeat.interval.ms

کاربرد:
تعیین زمان زنده بودن Consumer و مدیریت گروه.

پارامترتوضیح
session.timeout.msمدت زمان انتظار Broker قبل از علامت دادن به Consumer به عنوان Dead
heartbeat.interval.msفاصله ارسال Heartbeat از Consumer به Coordinator
session_timeout_ms=6000
heartbeat_interval_ms=2000

اگر Consumer ناگهانی قطع شود، Coordinator فرآیند Rebalance را فعال می‌کند.


🧩 ۶. group_id و Rebalancing

  • هر گروه یک group_id منحصربه‌فرد دارد و Offsetها بر اساس آن ذخیره می‌شوند.
  • وقتی Consumer جدید اضافه یا حذف شود، Rebalance رخ می‌دهد و پارتیشن‌ها بازتخصیص می‌شوند.

مثال عملی:

  • Consumer 1 → ۴ پارتیشن
  • Consumer 2 joins → پارتیشن‌ها تقسیم می‌شوند (۲ & ۲)
  • Consumer 2 leaves → Consumer 1 دوباره همه پارتیشن‌ها را می‌گیرد

Rebalance باعث توقف کوتاه Consumerها می‌شود، ولی داده‌ها از دست نمی‌روند.


🧩 ۷. isolation.level

برای مصرف پیام‌های Transaction-aware:

  • "read_uncommitted" → همه پیام‌ها
  • "read_committed" → فقط پیام‌های Commit شده

🧩 ۸. partition.assignment.strategy

  • تعیین می‌کند پارتیشن‌ها چگونه بین Consumerها تقسیم شود.
  • گزینه‌های رایج:
    • range → پیش‌فرض
    • roundrobin
    • cooperative-sticky → مدرن و کمترین اختلال هنگام Rebalance
partition_assignment_strategy=("org.apache.kafka.clients.consumer.RoundRobinAssignor",)

🧩 ۹. max.poll.interval.ms

  • بیشترین مدت زمانی که Consumer می‌تواند بدون فراخوانی poll() بماند.
  • اگر طولانی شود → Broker Consumer را Dead فرض می‌کند → Rebalance.

جمع‌بندی پارامترهای اصلی کانسیومر

مفهومنقش و اثر
Fetch از رهبرهر Consumer فقط از Leader پارتیشن داده می‌گیرد
Offsetنقطه ریکاوری و کنترل جریان مصرف
Rebalanceتخصیص مجدد پارتیشن‌ها در صورت تغییر اعضا
Coordinatorبروکری که مدیریت گروه و تقسیم پارتیشن‌ها را انجام می‌دهد
auto_offset_resetتعیین نقطه شروع خواندن پیام‌ها
enable_auto_commitکنترل خودکار یا دستی ثبت Offset
max_poll_recordsتعداد پیام در هر Poll و تاثیر بر throughput
session.timeout.ms و heartbeat.interval.msتعیین سلامت و زمان‌بندی Rebalance
partition.assignment.strategyنحوه تقسیم پارتیشن‌ها بین Consumerها
isolation.levelمدیریت پیام‌های تراکنشی
فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید