در جلسه چهارم دوره آموزشی ایرفلو، مباحث مهمی پیرامون مدیریت خطا، طراحی پایپلاینهای داده، مدیریت کتابخانههای موردنیاز و بهینهسازی DAGها مطرح شد. در ادامه، نکات کلیدی این جلسه بهصورت منظم مرور میشود:
مدیریت خطا بعد از Backfill
- اگر پس از اجرای Backfill متوجه شویم خطایی رخ داده است و لازم باشد تمامی تسکهای آن DAG متوقف شوند، این کار را میتوانیم از طریق خط فرمان (CLI) انجام دهیم.
- مراحل کار:
- متوقف کردن DAG.
- Clear و پاکسازی تمام تسکهای آن DAG.
- حذف کامل رکوردهای متناظر با آن DAG از متادیتابیس ایرفلو با دستور:
airflow dags delete <DAG_ID>
- با این روش، ردپای آن DAG و اجرایهای ناموفقش از سیستم حذف میشود.
جایگاه صحیح کدنویسی در DAG
- بررسی کردیم چرا نباید کدهای پایتون را خارج از توابع و در سطح DAG بنویسیم.
- سرویس DAG Processor بهطور مداوم کدها را ایمپورت میکند، بنابراین هر کدی بیرون از توابع نوشته شود در هر بار پردازش DAG اجرا خواهد شد.
- در مثال عملی، کدی برای تولید JSON خارج از توابع قرار داده شد و مشاهده کردیم که فایلها بهطور مداوم و ناخواسته ایجاد میشوند.
کار با دادههای واقعی (دانلود فایلهای اکسل بورس)
- یک مثال عملی برای دانلود روزانه فایلهای اکسل بورس پیادهسازی شد.
- علاوه بر اجرای روزانه، مفهوم Backfill برای پردازش دادههای گذشته بررسی شد.
- همچنین با Connections و Variables در ایرفلو آشنا شدیم تا بتوانیم تنظیمات و پارامترها را مدیریت کنیم.
مدیریت کتابخانههای موردنیاز در ایرفلو
- در هنگام اجرای DAGهای پردازش دادههای بورس نیاز به نصب کتابخانههایی مثل:
faker
jdatetime
minio
openpyxl
داشتیم.
- در ابتدا این کتابخانهها را بهصورت دستی داخل کانتینرهای dag processor و workerها نصب کردیم.
- سپس برای اتوماتیکسازی، آنها را به متغیر محیطی
_PIP_ADDITIONAL_REQUIREMENTS در فایل Docker Compose اضافه کردیم.
- با این کار پس از بالا آمدن کلاستر، این کتابخانهها بهصورت خودکار در تمامی کانتینرها نصب شدند.
❌ مشکل: این روش برای کتابخانههای حجیم یا تعداد زیاد کتابخانهها کارایی مناسبی ندارد، چون نصب آنها هر بار زمان زیادی میگیرد.
✅ راهحل حرفهای:
- سفارشیسازی ایمیج پایه ایرفلو و اضافهکردن کتابخانههای موردنیاز به آن.
- بیلدکردن این ایمیج و استفاده از آن در فایل Docker Compose (با فعالکردن بخش
build).
- این روش باعث میشود کتابخانهها یکبار در زمان ساخت ایمیج نصب شوند و هنگام بالا آمدن کلاستر نیازی به نصب مجدد آنها نباشد.
مدیریت همزمانی با Pool و Slots
- برای کنترل اجرای همزمان تسکها از مکانیزم Pool و Execution Slots استفاده کردیم.
- با محدود کردن تعداد اسلاتهای اجرایی، امکان مدیریت بهتر منابع و جلوگیری از تداخل تسکها فراهم میشود.
ذخیرهسازی دادهها در سیستم توزیعشده
- خروجی پردازشها (فایلهای CSV) در یک سیستم ذخیرهسازی توزیعشده ذخیره شدند.
- برای این منظور از MinIO بهعنوان یک استوریج S3-compatible بهره گرفتیم.
طراحی یک پایپلاین دو مرحلهای با دو DAG
یک پایپلاین حرفهای شامل دو DAG طراحی شد:
DAG اول (تولید تراکنشها):
- تولید تراکنشهای بانکی با استفاده از کتابخانه Faker.
- ذخیره فایلهای تولیدشده در MinIO.
- در پایان اجرای DAG، با استفاده از TriggerDagRunOperator، اجرای DAG دوم آغاز میشد.
DAG دوم (پردازش و تجمیع):
- بررسی پوشه ورودی و شمارش تعداد فایلها.
- در صورت رسیدن تعداد فایلها به بیش از ۱۰، این فایلها با کمک Pandas در یک فایل Parquet ادغام میشدند.
- فایل پارکت در پوشه
lakehouse در MinIO ذخیره و فایلهای اولیه حذف میشدند.
به این ترتیب یک پایپلاین دادهای دو مرحلهای و خودکار ایجاد شد.
بهینهسازی ذخیرهسازی دادههای حجیم در Airflow Metadata DB
- در DAG اول، خروجی تسک تولید تراکنشها یک JSON حجیم بود.
- از آنجا که ایرفلو خروجی تسک را در متادیتابیس (Postgres) ذخیره میکند، کل JSON و حتی کلید/مقدارهای آن بهصورت جداگانه ذخیره میشد که برای دادههای حجیم ناکارآمد است.
✅ راهحل:
- استفاده از Pickle برای سریالایز کردن JSON.
- ذخیره فایل سریالایز شده در پوشه
stage در MinIO.
- ارسال آدرس فایل به مرحله بعدی بهجای انتقال کل داده.
- این کار باعث کاهش چشمگیر سربار دیتابیس و افزایش کارایی شد.
جمعبندی
در این جلسه یاد گرفتیم:
- چطور بعد از Backfill، یک DAG مشکلدار را بهطور کامل متوقف و پاکسازی کنیم.
- چرا نباید کدهای اجرایی را خارج از توابع در DAG بنویسیم.
- نحوه کار با فایلهای واقعی (اکسل بورس) و اجرای Backfill.
- مدیریت کتابخانههای موردنیاز در ایرفلو و سفارشیسازی ایمیج پایه.
- کنترل همزمانی تسکها با Pool و Slots.
- ذخیرهسازی دادهها در MinIO بهعنوان استوریج توزیعشده.
- طراحی یک پایپلاین دادهای حرفهای با دو DAG و استفاده از TriggerDagRunOperator.
- بهینهسازی ذخیرهسازی دادههای حجیم با استفاده از Pickle و کاهش سربار روی دیتابیس Airflow.
این مجموعه مفاهیم، دید جامعی نسبت به بهترینروشهای طراحی DAGها، مدیریت منابع و بهینهسازی پردازش داده در ایرفلو ایجاد کرد.