پس از آنکه پیامها توسط Producer در Kafka تولید شدند، وظیفه Consumer است که این پیامها را دریافت، پردازش و در صورت نیاز ذخیره کند. به زبان ساده، کانسیومر همان مصرفکننده پیامها است که با کلاستر Kafka ارتباط برقرار میکند و دادهها را از بروکرها میخواند.
اما جریان کار Consumer کمی پیچیدهتر است و شامل چند نکته کلیدی میشود:
max.poll.records و fetch.min.bytes.enable.auto.commit=True) یا دستی (commit_sync() یا commit_async()) انجام شود.در ادامه این جلسه، ما به صورت دقیق چگونگی تعیین پارتیشنها، فرآیند Rebalance، واکشی پیامها از لیدر و مدیریت Offset را واکاوی میکنیم و بررسی میکنیم که هر کانسیومر چگونه با Coordinator و Leader Broker تعامل میکند تا پیامها با نظم و کارایی بالا مصرف شوند.
bootstrap.servers متصل میشود.group.id انتخاب میشود.JoinGroupRequest به Coordinator میفرستد.FetchRequest به رهبر هر پارتیشن ارسال میکند.enable_auto_commit=True باشد، Offsetها به صورت خودکار به تاپیک __consumer_offsets ارسال میشوند.commit_sync() یا commit_async() Offset را ثبت کرد.مثال:
numbers چهار پارتیشن دارد.مکانیزم پشت صحنه:
SyncGroup پارتیشنهای جدید را دریافت میکنند.⚠️ توجه: در طول Rebalance، هر Offset ثبتنشده ممکن است از بین برود مگر اینکه مدیریت مناسبی داشته باشید.
| پارامتر | مقدار پیشفرض | اثر / نکات |
|---|---|---|
group.id | ندارد | شناسه گروه، تعیین کننده Coordinator |
enable.auto.commit | True | ثبت خودکار Offsetها |
auto.offset.reset | latest | نقطه شروع خواندن وقتی Offset ثبت نشده است (earliest, latest, none, by_duration:P30D) |
max.poll.records | ۵۰۰ | حداکثر رکورد در هر Poll |
fetch.min.bytes | ۱ | حداقل دادهای که بروکر منتظر میماند |
fetch.max.wait.ms | ۵۰۰ | حداکثر مدت انتظار بروکر برای رسیدن به حداقل داده |
session.timeout.ms | ۱۰۰۰۰ | زمانی که Coordinator منتظر heartbeat است |
heartbeat.interval.ms | ۳۰۰۰ | فاصله بین ارسال heartbeatها |
max.partition.fetch.bytes | 1MB | حداکثر داده دریافت شده از هر پارتیشن در هر fetch |
isolation.level | read_uncommitted | نمایش پیامهای تراکنشی |
key.deserializer / value.deserializer | ندارد | تبدیل دادههای بایت به اشیا قابل استفاده |
__consumer_offsets ذخیره میشوند.Consumer Start:
→ اتصال به bootstrap servers
→ دریافت metadata کلاستر
→ یافتن group coordinator
→ عضویت در گروه
→ دریافت پارتیشنهای مسئولیت
Fetch Loop:
→ poll()
→ ارسال FetchRequest به رهبر هر پارتیشن
→ دریافت RecordBatch
→ Deserialize دادهها
→ بروزرسانی Offsetها
→ Commit (خودکار یا دستی)
Rebalance:
→ Coordinator Rebalance فعال میکند
→ همه Consumerها Fetch را متوقف میکنند
→ تخصیص جدید پارتیشنها
→ Fetch ادامه پیدا میکند از آخرین Offset ثبت شده
auto.offset.reset=by_duration:P30D برای مصرف پیامهای اخیر.isolation.level=read_committed → فقط پیامهای کامیتشده مشاهده میشوند.from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='marketing',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for msg in consumer:
print(f"Partition: {msg.partition}, Offset: {msg.offset}, Value: {msg.value}")
"marketing" تعلق دارد.اگر بخواهیم خیلی خلاصه تصویری بگوییم:
Consumer → Coordinator (joins group)
→ Receives assigned partitions
→ Fetch from partition leaders
→ Update offsets
→ Commit offsets
↻ Rebalance triggers → repeat
این توضیحات مرحله به مرحله و با مثالهای عملی، هم مکانیزم Consumerها را نشان میدهد و هم پارامترهای اصلی و تأثیر آنها را توضیح میدهد.
فرض کنید Consumer پایهای ما به شکل زیر است:
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='marketing',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
ویژگیهای این Consumer:
"marketing" است.auto_commit=True).auto_offset_resetکاربرد:
مشخص میکند اگر Offset معتبر قبلی برای Consumer وجود نداشت، از کجا شروع به خواندن پیامها کند.
مقادیر:
"earliest" → از قدیمیترین پیامها شروع میکند."latest" → فقط پیامهای جدید بعد از اتصال را میخواند."none" → خطا میدهد اگر Offset موجود نباشد."by_duration:P30D" → پیامهای آخر ۳۰ روز را میخواند (Kafka 4.x+).مثال عملی:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group marketing --delete-offsets --topic numbers
earliest → همه پیامها را بازپخش میکند.latest → فقط پیامهای جدید دریافت میشود.by_duration:P1D → فقط پیامهای ۲۴ ساعت اخیر خوانده میشوند.enable_auto_commit و auto_commit_interval_msکاربرد:
تعیین میکند Offsetها خودکار ثبت شوند یا دستی.
| حالت | توضیح |
|---|---|
enable_auto_commit=True | Offsets به صورت دورهای ثبت میشوند |
enable_auto_commit=False | باید دستی ثبت شود (commit_sync() یا commit_async()) |
مثال عملی:
for message in consumer:
print(f"Consumed: {message.value}")
consumer.commit() # ثبت دستی Offset
اگر Consumer وسط کار قطع شود، در حالت دستی از آخرین Commit ادامه میدهد.
max_poll_recordsکاربرد:
حداکثر تعداد پیامهایی که در هر poll() برمیگردد.
consumer = KafkaConsumer('numbers', max_poll_records=10, ...)
fetch_min_bytes و fetch_max_wait_msکاربرد:
کنترل اندازه و زمان پاسخ Broker برای Fetch پیامها.
| پارامتر | توضیح |
|---|---|
fetch_min_bytes | منتظر میماند تا حداقل این تعداد بایت آماده شود |
fetch_max_wait_ms | حداکثر مدت زمان انتظار قبل از پاسخدهی |
consumer = KafkaConsumer('numbers', fetch_min_bytes=1024, fetch_max_wait_ms=5000)
وقتی تولید کندگان کند عمل میکنند، Broker صبر میکند تا حداقل داده برسد یا زمان تمام شود.
session.timeout.ms و heartbeat.interval.msکاربرد:
تعیین زمان زنده بودن Consumer و مدیریت گروه.
| پارامتر | توضیح |
|---|---|
session.timeout.ms | مدت زمان انتظار Broker قبل از علامت دادن به Consumer به عنوان Dead |
heartbeat.interval.ms | فاصله ارسال Heartbeat از Consumer به Coordinator |
session_timeout_ms=6000
heartbeat_interval_ms=2000
اگر Consumer ناگهانی قطع شود، Coordinator فرآیند Rebalance را فعال میکند.
group_id و Rebalancinggroup_id منحصربهفرد دارد و Offsetها بر اساس آن ذخیره میشوند.مثال عملی:
Rebalance باعث توقف کوتاه Consumerها میشود، ولی دادهها از دست نمیروند.
isolation.levelبرای مصرف پیامهای Transaction-aware:
"read_uncommitted" → همه پیامها"read_committed" → فقط پیامهای Commit شدهpartition.assignment.strategyrange → پیشفرضroundrobincooperative-sticky → مدرن و کمترین اختلال هنگام Rebalancepartition_assignment_strategy=("org.apache.kafka.clients.consumer.RoundRobinAssignor",)
max.poll.interval.mspoll() بماند.| مفهوم | نقش و اثر |
|---|---|
| Fetch از رهبر | هر Consumer فقط از Leader پارتیشن داده میگیرد |
| Offset | نقطه ریکاوری و کنترل جریان مصرف |
| Rebalance | تخصیص مجدد پارتیشنها در صورت تغییر اعضا |
| Coordinator | بروکری که مدیریت گروه و تقسیم پارتیشنها را انجام میدهد |
auto_offset_reset | تعیین نقطه شروع خواندن پیامها |
enable_auto_commit | کنترل خودکار یا دستی ثبت Offset |
max_poll_records | تعداد پیام در هر Poll و تاثیر بر throughput |
session.timeout.ms و heartbeat.interval.ms | تعیین سلامت و زمانبندی Rebalance |
partition.assignment.strategy | نحوه تقسیم پارتیشنها بین Consumerها |
isolation.level | مدیریت پیامهای تراکنشی |