در کنار ابزارهای خط فرمان، معمولاً در پروژههای واقعی لازم است از داخل یک برنامه (Python, Java, Go و …) پیامها را ارسال یا دریافت کنیم. در این بخش، یک مثال کامل و قدمبهقدم با پایتون ارائه میکنیم تا ساخت اولین Producer و Consumer کاملاً روشن شود.
دو کتابخانه معروف وجود دارد:
| کتابخانه | مزایا |
|---|---|
| kafka-python | ساده، کلاسیک و مناسب آموزش |
| confluent-kafka-python | مبتنی بر librdkafka، سریعتر و مناسب Production |
در این مقاله از kafka-python استفاده میکنیم:
pip install kafka-python
ایده ساده است:
یک producer در پایتون به یک یا چند broker وصل میشود، نام تاپیک را مشخص میکند و پیامها را ارسال میکند.
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for i in range(5):
message = {"id": i, "event": "user_registered"}
producer.send('user-events', message)
print(f"Sent: {message}")
producer.flush()
bootstrap_servers آدرس یک یا چند بروکر است.value_serializer پیام را قبل از ارسال به JSON تبدیل میکنیم.send() پیام را غیرهمزمان ارسال میکند.flush() تضمین میکند تمامی پیامها ارسال شدهاند.یک Consumer از یک یا چند Partition پیام میخواند.
هر Consumer باید یک Consumer Group داشته باشد تا کافکا متوجه شود کدام پیامها را قبلاً تحویل داده است.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers='localhost:9092',
group_id='marketing-service',
auto_offset_reset='earliest',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
print("Consumer started. Waiting for messages...")
for message in consumer:
print(f"Received: {message.value}")
group_id مشخص میکند این Consumer عضو چه گروهی است.earliest یعنی مصرفکننده از اولین پیام شروع کند.latest یعنی فقط پیامهای جدید را دریافت کند.در دنیای واقعی، یک تاپیک مانند user-events ممکن است برای کارهای مختلف استفاده شود:
{"id": 12, "event": "user_registered", "email": "a@test.com"}
| سرویس | وظیفه | Consumer Group |
|---|---|---|
| Marketing Service | ارسال کد تخفیف | marketing-service |
| Email Service | ارسال ایمیل خوشآمد | email-service |
| Analytics Service | تحلیل رفتار کاربران | analytics-service |
هر سه مورد پیام را از همان تاپیک میگیرند، اما چون هرکدام گروه مصرفکننده جداگانه دارند، کافکا پیامها را مستقل به هر گروه تحویل میدهد.
۱) ابتدا یک topic بسازید:
kafka-topics.sh --bootstrap-servers localhost:9092 --create --topic user-events --partitions 3 --replication-factor 1
۲) یک پنجره ترمینال باز کنید و Consumer را اجرا کنید:
python consumer.py
۳) در یک پنجره دیگر Producer را اجرا کنید:
python producer.py
خروجی مشابه زیر خواهید دید:
Sent: {'id': 0, 'event': 'user_registered'}
Sent: {'id': 1, 'event': 'user_registered'}
...
Received: {'id': 0, 'event': 'user_registered'}
Received: {'id': 1, 'event': 'user_registered'}
در پایتون میتوانید acks را تعیین کنید:
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
acks='all'
)
مقادیر:
۰ → بدون انتظار برای تایید۱ → تایید تنها از Leaderall → تایید از همه ISRها (پایداری بالا)در این بخش: