Producerها همان نقطه ورود داده به Kafka هستند. وظیفه اصلی آنها ارسال پیامها به پارتیشنهای مناسب و لیدر مربوطه است. اما فرآیند ساده “ارسال یک پیام” پشت پرده شامل چندین مرحله مهم است:
acks).Producer علاوه بر ارسال، مسئول مدیریت حافظه محلی، batching، retry و ترتیب پیامها نیز هست.
قبل از ارسال اولین پیام، Producer نیاز دارد بداند:
این اطلاعات metadata در cache Producer ذخیره میشود و هر چند دقیقه یکبار یا در صورت خطا، بروزرسانی میگردد.
۱. اتصال به یکی از bootstrap brokers
۲. ارسال MetadataRequest
۳. دریافت:
- لیست brokerها
- Controller ID
- Topic → Partition → Leader mapping
۴. ذخیره metadata محلی
بدون این مرحله، Producer نمیتواند تصمیم بگیرد که پیام را به کدام پارتیشن و لیدر ارسال کند.
.send() و آمادهسازی Batchهنگامی که Producer پیامی را برای ارسال، دریافت میکند:
(topic, partition) ذخیره میشود.این مرحله مهم است چون باعث افزایش throughput و کاهش تعداد درخواستهای شبکه میشود.
acksProducer میتواند تعیین کند که چه تعداد ack از Broker قبل از موفقیت ارسال نیاز است.
acks| acks | رفتار | نتیجه |
|---|---|---|
| ۰ | هیچ ack | Fire-and-forget، کمترین دوام، بیشترین سرعت |
| ۱ | Leader فقط | پیام امن اگر Leader زنده بماند، اما replication انجام نشده |
| all / -1 | همه ISR | بالاترین دوام، منتظر ack از همه replicaهای in-sync |
این پارامتر مستقیماً بر latency و durability پیام تاثیر میگذارد.
در سناریوهای واقعی، Producer ممکن است به دلیل خطا یا timeout، پیامها را دوباره ارسال کند. بدون idempotency، این کار باعث تکرار پیامها در log پارتیشن میشود.
enable.idempotence=True(topic, partition) sequence number نگه میدارد.نتیجه: حتی با retry یا crash، هر پیام فقط یکبار ذخیره میشود.
گاهی نیاز است که پیامها چند پارتیشن یا چند تاپیک را به صورت atomic ارسال کنند.( این موضوع را با جزییات بیشتر در ادامه دوره بررسی می کنیم )
transactional.id + enable.idempotence=Trueinit_transactions()
begin_transaction()
produce(... multiple topics)
commit_transaction() or abort_transaction()
تمام پیامها یا commit میشوند یا abort، تضمین میکند که هیچ داده نیمهکارهای در کلاستر نماند.
Producer از حافظه محلی (buffer.memory) برای نگهداری پیامها تا زمان ارسال استفاده میکند.
این پارامترها Tradeoff بین Latency و Throughput را مدیریت میکنند.
این تصمیم باعث میشود ترتیب پیام برای entity-specific یا session-specific حفظ شود.
۱. Producer Start
→ Connect bootstrap servers
→ Fetch metadata
→ Assign PID (if idempotent)
→ Prepare RecordAccumulator
۲. Send()
→ Serialize key/value
→ Append to per-partition batch
→ Flush batch when batch.size پر شد یا linger.timeout
→ Send to partition leader
→ Leader assigns offset, writes to log
→ Replicas fetch and ack
→ Producer receives ack (based on acks)
→ If idempotent, update (PID, sequence)
| Config | نقش | دلیل و نیاز | تاثیر عملی |
|---|---|---|---|
acks | سطح دوام پیام | مشخص کردن چند replica قبل از موفقیت | Latency vs Durability |
retries | Retry در خطاهای موقت | جلوگیری از پیام از دست رفته | بدون idempotence → duplicates |
enable.idempotence | جلوگیری از duplicate | PID + Sequence | Safe retry، exactly-once per partition |
max.in.flight.requests | حفظ ترتیب | تعداد درخواستهای بلاک نشده | >1 → سریعتر ولی order ممکن است خراب شود |
batch.size / linger.ms | Batching | کنترل ارسال گروهی پیامها | Tradeoff Latency vs Throughput |
compression.type | کاهش پهنای باند | فشردهسازی batch | CPU overhead vs network savings |
buffer.memory | حافظه پیامهای unsent | نگهداری در Producer | پر شدن → block یا Timeout |
max.block.ms | مدت انتظار بلاک | حداکثر زمان انتظار send() | TimeoutException در صورت طولانی شدن |
key | انتخاب پارتیشن | Hash(key) → partition | حفظ ترتیب پیامها |
transactional.id | تراکنشها | Atomicity across partitions/topics | Exactly-once delivery |