در معماری کافکا، پارتیشنها واحد اصلی مقیاسپذیری و Consumer Groupها مکانیزم اصلی پردازش موازی هستند. بهترین راه برای درک رفتار آنها، اجرای یک سناریوی عملی است.
در این بخش، با ساخت یک تاپیک دوپارتیشنه و اجرای چند برنامه پایتونی ساده، رفتار واقعی کافکا را کاملاً مشاهده میکنیم.
ابتدا یک تاپیک به نام demo-topic با دو پارتیشن ایجاد میکنیم:
kafka-topics.sh \
--bootstrap-servers localhost:9092 \
--create \
--topic demo-topic \
--partitions 2 \
--replication-factor 1
فایل producer.py:
from kafka import KafkaProducer
import json
import time
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
i = 0
while True:
msg = {"id": i, "event": "test_message"}
producer.send("demo-topic", msg)
print("Sent:", msg)
i += 1
time.sleep(0.5)
این Producer هر نیمثانیه یک پیام تولید میکند.
برنامهای مینویسیم که عضو یک Consumer Group باشد و نشان دهد کدام پیام از کدام پارتیشن دریافت شده است.
فایل consumer.py:
from kafka import KafkaConsumer
import json
import sys
group = "demo-group" # later we change this
consumer = KafkaConsumer(
"demo-topic",
bootstrap_servers='localhost:9092',
group_id=group,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)
print(f"Consumer started. PID={sys.argv[0]} | Group={group}")
for msg in consumer:
print(f"[{sys.argv[0]}] Partition={msg.partition}, Offset={msg.offset}, Value={msg.value}")
دو ترمینال باز کنید و هر دو را اجرا کنید:
python consumer.py
python consumer.py
[consumer1] Partition=0 Value=...
[consumer2] Partition=1 Value=...
یکی از دو برنامه را متوقف کنید.
[consumer2] Partition=0 Value=...
[consumer2] Partition=1 Value=...
برنامه متوقفشده را دوباره اجرا کنید:
python consumer.py
حالا یک برنامه سوم اجرا کنید:
python consumer.py
| Consumer | وضعیت |
|---|---|
| C1 | یک پارتیشن |
| C2 | یک پارتیشن |
| C3 | Idle (بیکار) |
اگر یکی از دو Consumer اول را متوقف کنید:
حالا فایل را ویرایش کنید:
group = "another-group"
و برنامه جدید را اجرا کنید:
python consumer.py
گروه جدید، تاریخچه کامل پیامها را دارد چون کافکا برای هر گروه، offset را جداگانه نگه میدارد.
✔️ یک پارتیشن → در هر لحظه فقط یک Consumer از یک گروه آن را میخواند.
✔️ تعداد مصرفکنندههای یک گروه بیشتر از تعداد پارتیشنها شود → بقیه Consumerها بیکار میمانند.
✔️ با قطع یکی از Consumerها → Rebalancing انجام میشود و مصرفکنندههای باقیمانده پارتیشنها را تحویل میگیرند.
✔️ نسخه ۴ Kafka → Sticky Assignor
تا حد ممکن پارتیشنها را به همان مصرفکننده قبلی میدهد.
✔️ گروه مصرفکننده جدید → تمام پیامها را (مطابق offset خودش) دریافت میکند.