در Airflow 3، مدیریت Assetها به سطح پیشرفتهای رسیده است که ترکیبی از Asset Events، Metadata، زمانبندی هوشمند و نامهای مستعار (Asset Aliases) را در اختیار توسعهدهندگان قرار میدهد. این امکانات باعث میشوند پایپلاینهای دادهای هوشمند، قابل ردیابی و انعطافپذیر شوند و کنترل دقیقی روی جریان دادهها و اجرای DAGها ایجاد کنند.
هر Asset Event میتواند اطلاعات متادیتا اضافه دریافت کند که جزئیات بیشتری درباره دادهها یا وضعیت پردازش ثبت میکند. این ویژگی برای ردیابی، دیباگ و گزارشگیری دقیق از داراییها حیاتی است. اطلاعات اضافی میتواند از طریق کلاس Metadata در Taskflow یا با دسترسی مستقیم به outlet_events در context اپراتورها اضافه شود.
کد نمونه با Taskflow و Metadata:
from airflow.sdk import Asset, Metadata, task
my_asset_1 = Asset("x-asset1")
@task(outlets=[my_asset_1])
def attach_extra_using_metadata():
num = 23
# الصاق اطلاعات اضافی به Asset Event
yield Metadata(my_asset_1, {"myNum": num})
return "hello :)"
attach_extra_using_metadata()
کد نمونه با دسترسی مستقیم به outlet_events:
def _attach_extra_to_asset(context, result):
asset = context["outlet_events"][Asset("x-asset1")]
asset.add_extra({"myNum": 23})
# در یک اپراتور سنتی
BashOperator(
task_id="t2",
bash_command="echo hi",
outlets=[Asset("x-asset1")],
post_execute=_attach_extra_to_asset,
)
DAGها میتوانند هم زمانبندی بر اساس یک Asset خاص داشته باشند و هم با Cron یا تایمرهای ثابت اجرا شوند. این امکان باعث میشود که ETLها یا فرآیندهای دورهای تنها زمانی اجرا شوند که دادههای مورد نیاز آماده هستند و همزمان روی یک برنامه زمانبندی مشخص نیز مطابقت داشته باشند.
کد نمونه برای زمانبندی ترکیبی:
from airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator
@dag(
schedule=["@daily", Asset("my_asset")], # اجرا روزانه و همچنین وقتی Asset آماده شد
catchup=False
)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
Asset Aliases امکان تعریف DAGها روی داراییهایی با نامهای تولید شده در زمان اجرا را فراهم میکند. DAGهای مصرفکننده مستقل از نام ثابت داراییها عمل میکنند و هر بار که یک Asset جدید به Alias متصل شود، DAG مربوطه اجرا میشود.
نمونه تولید Asset Event با Alias و Metadata:
from airflow.sdk import Asset, AssetAlias, Metadata, task
my_alias_name = "my_alias"
@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # نام دارایی در زمان اجرا مشخص میشود
yield Metadata(
asset=Asset(f"updated_{bucket_name}"),
extra={"info": "example"},
alias=AssetAlias(my_alias_name),
)
attach_event_to_alias_metadata()
نمونه DAG مصرفکننده با Asset Alias:
from airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator
my_alias_name = "my_alias"
@dag(schedule=[AssetAlias(my_alias_name)], catchup=False)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
با استفاده از Asset Events، Metadata و Aliases میتوان جریان دادهها را در سطح task یا DAG به صورت هوشمند مدیریت کرد. DAGها تنها پس از آماده شدن دادهها اجرا میشوند، جزئیات کامل پردازش ذخیره میشود و وابستگیها به صورت انعطافپذیر تعریف میگردند. این امکانات باعث میشوند پایپلاینهای دادهای در Airflow 3 قابل ردیابی، قابل نگهداری و آماده برای محیطهای تولیدی پیچیده باشند.