در محیطهای عملیاتی، گاهی نیاز است که تعداد Taskهای همزمان محدود شود تا:
Airflow Pool و Slotهای اجرایی (Execution Slots) مکانیزم استاندارد برای مدیریت این همروندی هستند.
airflow pools create excel_pool 2 "Pool for Excel processing tasks"
این Pool دو اسلات همزمان دارد، یعنی فقط دو Task از این Pool میتوانند همزمان اجرا شوند.
در DAG کلاسیک، هنگام تعریف Task میتوانیم پارامتر pool را مشخص کنیم:
task_read_stock_exchange_xlsx_file = BashOperator(
task_id="Download-Stock-Exchange-Xlsx-File",
bash_command='curl ...',
pool="excel_pool", # <- محدود به Pool تعریف شده
)
در مثال DAG
Stock-Exchange-V4-Pool، تمام Taskهای مرتبط با پردازش Excel (Download,FileSensor,PythonOperator) به Poolexcel_poolاختصاص داده شدهاند.
task_read_stock_exchange_xlsx_file >> task_waiting_file_xlsx >> task_preprocess_convert_to_csv >> task_remove_exchange_xlsx_file >> task_dummy
در TaskFlow API هم میتوان همان محدودیت Pool را برای Taskها تعیین کرد.
مثال:
from airflow.decorators import dag, task
from datetime import timedelta
import pendulum
POOL_NAME = "excel_pool"
@dag(
dag_id="Stock-Exchange-MinIO-Pool",
start_date=pendulum.today("UTC").subtract(weeks=1),
schedule="0 14 * * 6,0-3",
catchup=True,
default_args={"retries": 3, "retry_delay": timedelta(minutes=1)},
)
def stock_exchange_taskflow_pool():
@task(pool=POOL_NAME)
def download_excel(ds: str):
print(f"Downloading Excel file for {ds}")
return f"/data/daily_trades_{ds}.xlsx"
@task(pool=POOL_NAME)
def preprocess_excel(file_path: str, ds: str):
print(f"Processing {file_path}")
return f"/data/daily_trades_{ds}.csv"
@task()
def cleanup(file_path: str):
print(f"Deleting {file_path}")
ds = "{{ ds }}"
xlsx_file = download_excel(ds)
csv_file = preprocess_excel(xlsx_file, ds)
cleanup(csv_file)
stock_exchange_dag_pool = stock_exchange_taskflow_pool()
pool=POOL_NAME، TaskFlow API نیز از مکانیزم Pool برای کنترل همزمانی استفاده میکند.