در دنیای پردازش داده، یکی از وظایف اصلی مهندسین داده، طراحی خطوط پردازش داده و انتقال داده (ETL) است.
Airflow 3 با امکانات جدید خود امکان میدهد پایپلاینهای حرفهای چند مرحلهای بسازیم که:
- با تریگرهای خارجی (API، CLI یا Eventها) اجرا شوند،
- یا به صورت زنجیرهای از DAGها طراحی شوند؛ یعنی DAG اول به پایان برسد، DAG بعدی به صورت خودکار اجرا شود.
در این مثال، ما یک پایپلاین دو مرحلهای برای پردازش تراکنشهای بانکی طراحی کردهایم:
۱️⃣ DAG 1: تولید تراکنشها و ذخیره در MinIO
وظیفه: تولید تراکنشهای بانکی رندم و ذخیره به صورت فایل JSON در MinIO.
- کتابخانه Faker برای تولید دادههای رندم استفاده شده است.
- MinIO به عنوان Storage توزیعشده برای ذخیره فایلها به کار رفته.
- در پایان DAG، از
TriggerDagRunOperator برای اجرای DAG بعدی استفاده میکنیم.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_combine = TriggerDagRunOperator(
task_id="trigger_combine_dag",
trigger_dag_id="TX_Combine_Transactions",
wait_for_completion=False,
trigger_rule=TriggerRule.ALL_DONE
)
wait_for_completion=False باعث میشود DAG دوم به صورت غیر همزمان اجرا شود.
- هر تراکنش به صورت JSON ذخیره شده و مسیر فایل در DAG دوم قابل دسترسی است.
۲️⃣ DAG 2: ترکیب تراکنشها و تبدیل به Parquet
وظیفه:
- بررسی پوشه MinIO برای فایلهای تراکنش،
- اگر تعداد فایلها به حد نصاب (۱۰ فایل) رسید، همه را دانلود و به یک فایل Parquet تبدیل میکند،
- فایل Parquet را در مسیر Lakehouse ذخیره میکند،
- فایلهای اصلی JSON را حذف میکند.
مزیتها:
- جلوگیری از ایجاد فایلهای کوچک متعدد در Lakehouse،
- کاهش فشار روی سیستم فایل،
- مدیریت مرحلهای و قابل توسعه ETL.
۳️⃣ نحوه اتصال دو DAG
- DAG اول تولید تراکنش را انجام میدهد.
- پس از اتمام تراکنشها، TriggerDagRunOperator DAG دوم را فراخوانی میکند.
- DAG دوم بررسی میکند که حداقل ۱۰ فایل تراکنش موجود است و سپس عملیات Combine & Convert انجام میدهد.
مزیت طراحی زنجیرهای:
- هر DAG نمایانگر یک مرحله منطقی از پایپلاین است،
- DAGها مستقل و قابل تست هستند،
- مدیریت خطا و Retry سادهتر است.
۴️⃣ نکات کلیدی و Best Practices
- استفاده از TriggerDagRunOperator برای اجرای DAG دوم به جای import کردن DAG در DAG اول.
- DAG دوم با
schedule=None تعریف میشود تا فقط با تریگر اجرا شود.
- فایلها ابتدا در
/tmp ذخیره میشوند و پس از آپلود به MinIO حذف میشوند.
- فایلها به صورت Parquet ذخیره میشوند تا پردازشهای بعدی در Lakehouse سریعتر انجام شود.
- برای پروژههای بزرگ، میتوان DAG سوم یا بیشتر اضافه کرد و زنجیره ETL را گسترش داد.
این طراحی یک مثال عملی از ایجاد پایپلاین دو مرحلهای با DAGهای مجزا است که هم قابلیت توسعه دارد و هم قابلیت مدیریت منابع و همروندی را افزایش میدهد.