بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

استفاده از TriggerDagRunOperator و ایجاد یک پایپ‌لاین چند مرحله‌ای (تبدیل فایلهای تراکنش بانکی به پارکت در دو DAG مجزا)

در دنیای پردازش داده، یکی از وظایف اصلی مهندسین داده، طراحی خطوط پردازش داده و انتقال داده (ETL) است.
Airflow 3 با امکانات جدید خود امکان می‌دهد پایپ‌لاین‌های حرفه‌ای چند مرحله‌ای بسازیم که:

  • با تریگرهای خارجی (API، CLI یا Eventها) اجرا شوند،
  • یا به صورت زنجیره‌ای از DAGها طراحی شوند؛ یعنی DAG اول به پایان برسد، DAG بعدی به صورت خودکار اجرا شود.

در این مثال، ما یک پایپ‌لاین دو مرحله‌ای برای پردازش تراکنش‌های بانکی طراحی کرده‌ایم:


۱️⃣ DAG 1: تولید تراکنش‌ها و ذخیره در MinIO

وظیفه: تولید تراکنش‌های بانکی رندم و ذخیره به صورت فایل JSON در MinIO.

  • کتابخانه Faker برای تولید داده‌های رندم استفاده شده است.
  • MinIO به عنوان Storage توزیع‌شده برای ذخیره فایل‌ها به کار رفته.
  • در پایان DAG، از TriggerDagRunOperator برای اجرای DAG بعدی استفاده می‌کنیم.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_combine = TriggerDagRunOperator(
    task_id="trigger_combine_dag",
    trigger_dag_id="TX_Combine_Transactions",
    wait_for_completion=False,
    trigger_rule=TriggerRule.ALL_DONE
)
  • wait_for_completion=False باعث می‌شود DAG دوم به صورت غیر همزمان اجرا شود.
  • هر تراکنش به صورت JSON ذخیره شده و مسیر فایل در DAG دوم قابل دسترسی است.

۲️⃣ DAG 2: ترکیب تراکنش‌ها و تبدیل به Parquet

وظیفه:

  1. بررسی پوشه MinIO برای فایل‌های تراکنش،
  2. اگر تعداد فایل‌ها به حد نصاب (۱۰ فایل) رسید، همه را دانلود و به یک فایل Parquet تبدیل می‌کند،
  3. فایل Parquet را در مسیر Lakehouse ذخیره می‌کند،
  4. فایل‌های اصلی JSON را حذف می‌کند.

مزیت‌ها:

  • جلوگیری از ایجاد فایل‌های کوچک متعدد در Lakehouse،
  • کاهش فشار روی سیستم فایل،
  • مدیریت مرحله‌ای و قابل توسعه ETL.

۳️⃣ نحوه اتصال دو DAG

  • DAG اول تولید تراکنش را انجام می‌دهد.
  • پس از اتمام تراکنش‌ها، TriggerDagRunOperator DAG دوم را فراخوانی می‌کند.
  • DAG دوم بررسی می‌کند که حداقل ۱۰ فایل تراکنش موجود است و سپس عملیات Combine & Convert انجام می‌دهد.

مزیت طراحی زنجیره‌ای:

  • هر DAG نمایانگر یک مرحله منطقی از پایپ‌لاین است،
  • DAGها مستقل و قابل تست هستند،
  • مدیریت خطا و Retry ساده‌تر است.

۴️⃣ نکات کلیدی و Best Practices

  1. استفاده از TriggerDagRunOperator برای اجرای DAG دوم به جای import کردن DAG در DAG اول.
  2. DAG دوم با schedule=None تعریف می‌شود تا فقط با تریگر اجرا شود.
  3. فایل‌ها ابتدا در /tmp ذخیره می‌شوند و پس از آپلود به MinIO حذف می‌شوند.
  4. فایل‌ها به صورت Parquet ذخیره می‌شوند تا پردازش‌های بعدی در Lakehouse سریع‌تر انجام شود.
  5. برای پروژه‌های بزرگ، می‌توان DAG سوم یا بیشتر اضافه کرد و زنجیره ETL را گسترش داد.

این طراحی یک مثال عملی از ایجاد پایپ‌لاین دو مرحله‌ای با DAGهای مجزا است که هم قابلیت توسعه دارد و هم قابلیت مدیریت منابع و همروندی را افزایش می‌دهد.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید