بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

آشنایی با مکانیزم کنترل همروندی به کمک تعریف Pool و تنظیم اسلات‌های اجرایی

در محیط‌های عملیاتی، گاهی نیاز است که تعداد Taskهای همزمان محدود شود تا:

  • مصرف منابع (CPU, Memory, I/O) کنترل شود،
  • سرویس‌های خارجی مثل MinIO یا دیتابیس‌ها تحت فشار زیاد قرار نگیرند،
  • اجرای همزمان چند DAG یا Task مشابه باعث مشکلات رقابتی نشود.

Airflow Pool و Slotهای اجرایی (Execution Slots) مکانیزم استاندارد برای مدیریت این همروندی هستند.


۱. تعریف Pool در Airflow

  • Pool باید قبل از استفاده از طریق UI یا CLI ساخته شود.
  • هر Task می‌تواند به یک Pool اختصاص داده شود و هر Pool تعداد Slot همزمان مشخص دارد.
  • مثال ایجاد Pool از CLI:
airflow pools create excel_pool 2 "Pool for Excel processing tasks"

این Pool دو اسلات همزمان دارد، یعنی فقط دو Task از این Pool می‌توانند همزمان اجرا شوند.


۲. اختصاص Task به Pool (حالت کلاسیک)

در DAG کلاسیک، هنگام تعریف Task می‌توانیم پارامتر pool را مشخص کنیم:

task_read_stock_exchange_xlsx_file = BashOperator(
    task_id="Download-Stock-Exchange-Xlsx-File",
    bash_command='curl ...',
    pool="excel_pool",  # <- محدود به Pool تعریف شده
)
  • هر Task که به یک Pool اختصاص داده شود، Slot مورد نظر از Pool را مصرف می‌کند.
  • اگر Slot پر باشد، Task منتظر می‌ماند تا Slot آزاد شود.

در مثال DAG Stock-Exchange-V4-Pool، تمام Taskهای مرتبط با پردازش Excel (Download, FileSensor, PythonOperator) به Pool excel_pool اختصاص داده شده‌اند.


۳. DAG Flow با Pool و کنترل همروندی

task_read_stock_exchange_xlsx_file >> task_waiting_file_xlsx >> task_preprocess_convert_to_csv >> task_remove_exchange_xlsx_file >> task_dummy
  • حتی اگر چند Run از این DAG همزمان اجرا شود، تعداد Taskهای همزمان که روی Pool اجرا می‌شوند محدود خواهد شد.
  • این مکانیزم مانع فشار بیش از حد روی منابع و سرویس‌های خارجی می‌شود.

۴. استفاده از Pool در TaskFlow API

در TaskFlow API هم می‌توان همان محدودیت Pool را برای Taskها تعیین کرد.
مثال:

from airflow.decorators import dag, task
from datetime import timedelta
import pendulum

POOL_NAME = "excel_pool"

@dag(
    dag_id="Stock-Exchange-MinIO-Pool",
    start_date=pendulum.today("UTC").subtract(weeks=1),
    schedule="0 14 * * 6,0-3",
    catchup=True,
    default_args={"retries": 3, "retry_delay": timedelta(minutes=1)},
)
def stock_exchange_taskflow_pool():

    @task(pool=POOL_NAME)
    def download_excel(ds: str):
        print(f"Downloading Excel file for {ds}")
        return f"/data/daily_trades_{ds}.xlsx"

    @task(pool=POOL_NAME)
    def preprocess_excel(file_path: str, ds: str):
        print(f"Processing {file_path}")
        return f"/data/daily_trades_{ds}.csv"

    @task()
    def cleanup(file_path: str):
        print(f"Deleting {file_path}")

    ds = "{{ ds }}"
    xlsx_file = download_excel(ds)
    csv_file = preprocess_excel(xlsx_file, ds)
    cleanup(csv_file)

stock_exchange_dag_pool = stock_exchange_taskflow_pool()
  • با تعیین pool=POOL_NAME، TaskFlow API نیز از مکانیزم Pool برای کنترل همزمانی استفاده می‌کند.
  • Pool محدودیت Slotها را اعمال می‌کند و Taskها منتظر می‌مانند تا Slot آزاد شود.

✅ نکات کلیدی

  1. Pool و Slotها ابزار اصلی کنترل همروندی در Airflow هستند.
  2. تعداد Slot هر Pool باید با منابع سخت‌افزاری و سرویس‌های خارجی هماهنگ باشد.
  3. در حالت کلاسیک و TaskFlow API می‌توان Taskها را به Pool اختصاص داد.
  4. Pool مانع اجرای بیش از حد Taskهای همزمان می‌شود و ثبات DAGها را افزایش می‌دهد.
فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید