Airflow 3 با معرفی مفهوم Asset و Asset Event یک گام مهم در جهت «Data-Aware Scheduling» برداشته است.
به کمک این قابلیت، میتوان بین DAGهای تولیدکننده و مصرفکننده یک جریان دادهای واقعی ایجاد کرد: Producer داده را تولید میکند، Metadata آن را به شکل یک Asset Event منتشر میکند، و Consumer بر اساس همان رویداد — نه صرفاً زمانبندی — اجرا میشود.
در این راهنما، بهصورت ساده و عملی توضیح میدهم:
inlet_events و triggering_asset_events چیست؟همه چیز را بر اساس کد کامل زیر پیش میبریم.
from airflow.sdk import dag, task, Asset, Metadata, get_current_context
from airflow.utils.log.logging_mixin import LoggingMixin
log = LoggingMixin().log
my_asset = Asset("transaction_asset_test")
# Producer DAG
@dag(schedule="@daily")
def producer_dag():
@task(outlets=[my_asset])
def generate_transactions():
log.info("Generating transactions...")
# Publish metadata with the asset event
yield Metadata(
my_asset,
extra={
"transaction_count": 1,
"total_amount": 100,
"custom_data": "whatever you need",
"records": [{"id": 1, "amount": 100}]
}
)
log.info("Asset metadata added successfully")
return [{"id": 1, "amount": 100}]
generate_transactions()
# Consumer DAG
@dag(schedule=[my_asset])
def consumer_dag():
@task(inlets=[my_asset])
def process_transactions():
log.info("Processing transactions...")
context = get_current_context()
# ۱) Events that triggered THIS run
inlet_events = context.get('inlet_events')
# ۲) ALL events ever generated for this asset
triggering_events = context.get('triggering_asset_events', {})
if triggering_events:
events_for_asset = triggering_events.get(my_asset, [])
for event in events_for_asset:
log.info(f"Event: {event.extra}")
else:
log.info("No triggering asset events found")
log.info("Done processing.")
process_transactions()
producer_dag()
consumer_dag()
my_asset = Asset("transaction_asset_test")
در Airflow 3، یک Asset نشاندهندهی یک منبع داده، خروجی محاسبات، فایل، یا هر موجودیت منطقی داده است.
این نام، نقطه اتصال Producer و Consumer محسوب میشود.
@task(outlets=[my_asset])
def generate_transactions():
وجود outlets=[my_asset] یعنی این تسک یک رویداد برای این Asset تولید میکند.
اما نکته مهمتر، خط زیر است:
yield Metadata(my_asset, extra={...})
این همان جایی است که Metadata (بخش extra) به Event چسبانده میشود.
این متادیتا میتواند شامل:
باشد — هر چیزی که Consumer باید بداند.
نکته:
در Airflow 3 یک Task میتواند هم Metadata Event تولید کند (yield) و هم خروجی معمولی داشته باشد (return).
@dag(schedule=[my_asset])
این یعنی DAG فقط زمانی اجرا میشود که رویدادی برای این Asset منتشر شود.
و تسک اصلی:
@task(inlets=[my_asset])
به Airflow میگوید که این تسک به Asset Event دسترسی نیاز دارد.
Airflow 3 با TaskFlow SDK از این الگو استفاده میکند:
context = get_current_context()
و دو مقدار کلیدی را در اختیار ما میگذارد:
inlet_events vs triggering_asset_eventsاین بخش مهمترین قسمت کار است.
inlet_events: فقط رویدادهایی که همین Run را Trigger کردهاندinlet_events = context.get('inlet_events')
ویژگیها:
🔍 کاربرد:
triggering_asset_events: تاریخچه کامل تمام رویدادهای Assettriggering_events = context.get('triggering_asset_events', {})
events_for_asset = triggering_events.get(my_asset, [])
ویژگیها:
🔍 کاربرد:
| ویژگی | inlet_events | triggering_asset_events |
|---|---|---|
| چه چیزی را نشان میدهد؟ | فقط رویداد Trigger فعلی | همه رویدادهای تاریخی |
| معمولاً چندتاست؟ | ۱ | چندین |
| کاربرد | پردازش incremental | تحلیل و پردازش تجمیعی |
| حجم اطلاعات | کم | بیشتر |
| متادیتا | فقط متادیتای آخرین run | متادیتای تمام runهای قبلی |
هر Event شامل:
source_dag_idsource_task_idsource_run_idtimestampextra (همان Metadata شما)مثال:
for event in events_for_asset:
meta = event.extra
log.info(f"Count: {meta.get('transaction_count')}")
log.info(f"Amount: {meta.get('total_amount')}")
بهترین شیوهها:
transaction_countrun_timestampbatch_idfile_pathdata_quality_scoresample_recordsقانون عمومی:
Metadata باید «کمحجم ولی معنادار» باشد.
print() لاگ نمیدهد؟Airflow 3 با TaskFlow SDK همیشه print() را capture نمیکند.
از log.info() استفاده کنید.
همه Assetها باید نام یکتا داشته باشند.
@task(inlets=[my_asset])
def smart_process():
context = get_current_context()
latest_event = context.get("inlet_events")[0]
all_events = context.get("triggering_asset_events", {}).get(my_asset, [])
total_transactions = sum(e.extra.get("transaction_count", 0) for e in all_events)
if total_transactions > 10000:
log.info("High volume detected — use fast path")
else:
log.info("Normal volume — use standard path")
yield Metadata(...) رویداد + متادیتا منتشر میکند.inlet_eventsرویدادی که الان DAG را Trigger کرده است.
triggering_asset_eventsتمام رویدادهای تاریخی Asset.
کارگاه عملی پنجم جلسه پنجم به موضوع مدیریت رخدادها به ازای Asset ها و موضوع Metadata و همچنین Inlets میپردازد که به کم این مفاهیم، می توانیم کنترل کامل چرخه تولید و پردازش Data Asset ها را مدیریت کنیم.. این محتوای آموزشی را می توانید به صورت عملی در فیلم آموزشی زیر مشاهده کنید.
نکته : اگر فیلم در قسمت زیر قابل مشاهده نیست، مطمئن شوید که با آی پی ایران متصل شده اید یا یک اینترنت پروایدر دیگر را امتحان کنید .