بخش اول : مفاهیم پایه و پیش‌نیازها
بخش دوم : جعبه ابزار یک مهندس داده
1 از 2

نوشتن یک برنامه Producer & Consumer ساده با پایتون

در کنار ابزارهای خط فرمان، معمولاً در پروژه‌های واقعی لازم است از داخل یک برنامه (Python, Java, Go و …) پیام‌ها را ارسال یا دریافت کنیم. در این بخش، یک مثال کامل و قدم‌به‌قدم با پایتون ارائه می‌کنیم تا ساخت اولین Producer و Consumer کاملاً روشن شود.


۱) پیش‌نیازها

نصب کتابخانه Kafka برای پایتون

دو کتابخانه معروف وجود دارد:

کتابخانهمزایا
kafka-pythonساده، کلاسیک و مناسب آموزش
confluent-kafka-pythonمبتنی بر librdkafka، سریع‌تر و مناسب Production

در این مقاله از kafka-python استفاده می‌کنیم:

pip install kafka-python

۲) تولید پیام (Producer)

ایده ساده است:
یک producer در پایتون به یک یا چند broker وصل می‌شود، نام تاپیک را مشخص می‌کند و پیام‌ها را ارسال می‌کند.

✔️ مثال یک Producer ساده
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for i in range(5):
    message = {"id": i, "event": "user_registered"}
    producer.send('user-events', message)
    print(f"Sent: {message}")

producer.flush()
توضیحات مهم:
  • bootstrap_servers آدرس یک یا چند بروکر است.
  • با value_serializer پیام را قبل از ارسال به JSON تبدیل می‌کنیم.
  • send() پیام را غیرهمزمان ارسال می‌کند.
  • flush() تضمین می‌کند تمامی پیام‌ها ارسال شده‌اند.
  • اگر ack یا idempotency نیاز داشته باشید، می‌توانیم Producer را کامل‌تر پیکربندی کنیم (در ادامه دوره).

۳) مصرف پیام‌ها (Consumer)

یک Consumer از یک یا چند Partition پیام می‌خواند.
هر Consumer باید یک Consumer Group داشته باشد تا کافکا متوجه شود کدام پیام‌ها را قبلاً تحویل داده است.

✔️ مثال یک Consumer ساده
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers='localhost:9092',
    group_id='marketing-service',
    auto_offset_reset='earliest',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

print("Consumer started. Waiting for messages...")

for message in consumer:
    print(f"Received: {message.value}")
نکات مهم:
  • group_id مشخص می‌کند این Consumer عضو چه گروهی است.
  • اگر پیام‌ها از قبل در تاپیک وجود داشته باشند:
    • earliest یعنی مصرف‌کننده از اولین پیام شروع کند.
    • latest یعنی فقط پیام‌های جدید را دریافت کند.
  • کافکا دقیقاً متوجه می‌شود هر گروه مصرف‌کننده تا کدام “offset” را دریافت کرده است.

۴) سناریوی واقعی: پردازش رویداد ثبت‌نام کاربر

در دنیای واقعی، یک تاپیک مانند user-events ممکن است برای کارهای مختلف استفاده شود:

مثال پیام:
{"id": 12, "event": "user_registered", "email": "a@test.com"}
سرویس‌ها:
سرویسوظیفهConsumer Group
Marketing Serviceارسال کد تخفیفmarketing-service
Email Serviceارسال ایمیل خوش‌آمدemail-service
Analytics Serviceتحلیل رفتار کاربرانanalytics-service

هر سه مورد پیام را از همان تاپیک می‌گیرند، اما چون هرکدام گروه مصرف‌کننده جداگانه دارند، کافکا پیام‌ها را مستقل به هر گروه تحویل می‌دهد.


۵) اجرای Producer و Consumer

۱) ابتدا یک topic بسازید:

kafka-topics.sh --bootstrap-servers localhost:9092 --create --topic user-events --partitions 3 --replication-factor 1

۲) یک پنجره ترمینال باز کنید و Consumer را اجرا کنید:

python consumer.py

۳) در یک پنجره دیگر Producer را اجرا کنید:

python producer.py

خروجی مشابه زیر خواهید دید:

Sent: {'id': 0, 'event': 'user_registered'}
Sent: {'id': 1, 'event': 'user_registered'}
...
Received: {'id': 0, 'event': 'user_registered'}
Received: {'id': 1, 'event': 'user_registered'}

۶) نکات تکمیلی

Ack‌ها در Producer (نسخه ساده)

در پایتون می‌توانید acks را تعیین کنید:

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    acks='all'
)

مقادیر:

  • ۰ → بدون انتظار برای تایید
  • ۱ → تایید تنها از Leader
  • all → تایید از همه ISR‌ها (پایداری بالا)

جمع‌بندی

در این بخش:

  • کتابخانه Kafka برای پایتون را نصب کردیم.
  • یک Producer ساده ساختیم.
  • یک Consumer با group_id ساختیم.
  • فرایندی مشابه سرویس‌های واقعی (Marketing / Email / Analytics) را شبیه‌سازی کردیم.
  • نکات مهم مانند offset و گروه مصرف‌کننده و bootstrap servers را مرور کردیم.
فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید