بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

نکات پیشرفته مدیریت Assetها و زمان‌بندی هوشمند

در Airflow 3، مدیریت Assetها به سطح پیشرفته‌ای رسیده است که ترکیبی از Asset Events، Metadata، زمان‌بندی هوشمند و نام‌های مستعار (Asset Aliases) را در اختیار توسعه‌دهندگان قرار می‌دهد. این امکانات باعث می‌شوند پایپ‌لاین‌های داده‌ای هوشمند، قابل ردیابی و انعطاف‌پذیر شوند و کنترل دقیقی روی جریان داده‌ها و اجرای DAGها ایجاد کنند.


الصاق اطلاعات اضافی به Asset Events

هر Asset Event می‌تواند اطلاعات متادیتا اضافه دریافت کند که جزئیات بیشتری درباره داده‌ها یا وضعیت پردازش ثبت می‌کند. این ویژگی برای ردیابی، دیباگ و گزارش‌گیری دقیق از دارایی‌ها حیاتی است. اطلاعات اضافی می‌تواند از طریق کلاس Metadata در Taskflow یا با دسترسی مستقیم به outlet_events در context اپراتورها اضافه شود.

کد نمونه با Taskflow و Metadata:

from airflow.sdk import Asset, Metadata, task

my_asset_1 = Asset("x-asset1")

@task(outlets=[my_asset_1])
def attach_extra_using_metadata():
    num = 23
    # الصاق اطلاعات اضافی به Asset Event
    yield Metadata(my_asset_1, {"myNum": num})
    return "hello :)"

attach_extra_using_metadata()

کد نمونه با دسترسی مستقیم به outlet_events:

def _attach_extra_to_asset(context, result):
    asset = context["outlet_events"][Asset("x-asset1")]
    asset.add_extra({"myNum": 23})

# در یک اپراتور سنتی
BashOperator(
    task_id="t2",
    bash_command="echo hi",
    outlets=[Asset("x-asset1")],
    post_execute=_attach_extra_to_asset,
)

زمان‌بندی ترکیبی (Asset + زمان)

DAGها می‌توانند هم زمان‌بندی بر اساس یک Asset خاص داشته باشند و هم با Cron یا تایمرهای ثابت اجرا شوند. این امکان باعث می‌شود که ETLها یا فرآیندهای دوره‌ای تنها زمانی اجرا شوند که داده‌های مورد نیاز آماده هستند و همزمان روی یک برنامه زمان‌بندی مشخص نیز مطابقت داشته باشند.

کد نمونه برای زمان‌بندی ترکیبی:

from airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator

@dag(
    schedule=["@daily", Asset("my_asset")],  # اجرا روزانه و همچنین وقتی Asset آماده شد
    catchup=False
)
def my_consumer_dag():
    EmptyOperator(task_id="empty_task")

my_consumer_dag()

نام‌های مستعار برای دارایی‌ها (Asset Aliases)


Asset Aliases امکان تعریف DAGها روی دارایی‌هایی با نام‌های تولید شده در زمان اجرا را فراهم می‌کند. DAGهای مصرف‌کننده مستقل از نام ثابت دارایی‌ها عمل می‌کنند و هر بار که یک Asset جدید به Alias متصل شود، DAG مربوطه اجرا می‌شود.

نمونه تولید Asset Event با Alias و Metadata:

from airflow.sdk import Asset, AssetAlias, Metadata, task

my_alias_name = "my_alias"

@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
    bucket_name = "my-bucket"  # نام دارایی در زمان اجرا مشخص می‌شود
    yield Metadata(
        asset=Asset(f"updated_{bucket_name}"),
        extra={"info": "example"},
        alias=AssetAlias(my_alias_name),
    )

attach_event_to_alias_metadata()

نمونه DAG مصرف‌کننده با Asset Alias:

from airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator

my_alias_name = "my_alias"

@dag(schedule=[AssetAlias(my_alias_name)], catchup=False)
def my_consumer_dag():
    EmptyOperator(task_id="empty_task")

my_consumer_dag()

یکپارچه‌سازی و کنترل جریان داده‌ها


با استفاده از Asset Events، Metadata و Aliases می‌توان جریان داده‌ها را در سطح task یا DAG به صورت هوشمند مدیریت کرد. DAGها تنها پس از آماده شدن داده‌ها اجرا می‌شوند، جزئیات کامل پردازش ذخیره می‌شود و وابستگی‌ها به صورت انعطاف‌پذیر تعریف می‌گردند. این امکانات باعث می‌شوند پایپ‌لاین‌های داده‌ای در Airflow 3 قابل ردیابی، قابل نگهداری و آماده برای محیط‌های تولیدی پیچیده باشند.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید