بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

نگاهی دقیق‌تر به Asset Events و کار با Metadata و Inlets – محتوای ویدئویی

Airflow 3 با معرفی مفهوم Asset و Asset Event یک گام مهم در جهت «Data-Aware Scheduling» برداشته است.
به کمک این قابلیت، می‌توان بین DAGهای تولیدکننده و مصرف‌کننده یک جریان داده‌ای واقعی ایجاد کرد: Producer داده را تولید می‌کند، Metadata آن را به شکل یک Asset Event منتشر می‌کند، و Consumer بر اساس همان رویداد — نه صرفاً زمان‌بندی — اجرا می‌شود.

در این راهنما، به‌صورت ساده و عملی توضیح می‌دهم:

  • Asset چیست؟
  • متادیتا چطور Attach می‌شود؟
  • Consumer چطور رویدادها را دریافت می‌کند؟
  • تفاوت مهم بین inlet_events و triggering_asset_events چیست؟

همه چیز را بر اساس کد کامل زیر پیش می‌بریم.


💡 کد مرجع (Producer + Consumer)

from airflow.sdk import dag, task, Asset, Metadata, get_current_context
from airflow.utils.log.logging_mixin import LoggingMixin

log = LoggingMixin().log
my_asset = Asset("transaction_asset_test")

# Producer DAG
@dag(schedule="@daily")
def producer_dag():
    @task(outlets=[my_asset])
    def generate_transactions():
        log.info("Generating transactions...")
        
        # Publish metadata with the asset event
        yield Metadata(
            my_asset,
            extra={
                "transaction_count": 1,
                "total_amount": 100,
                "custom_data": "whatever you need",
                "records": [{"id": 1, "amount": 100}]
            }
        )
        
        log.info("Asset metadata added successfully")
        return [{"id": 1, "amount": 100}]
    
    generate_transactions()

# Consumer DAG
@dag(schedule=[my_asset])
def consumer_dag():
    @task(inlets=[my_asset])
    def process_transactions():
        log.info("Processing transactions...")
        
        context = get_current_context()
        
        # ۱) Events that triggered THIS run
        inlet_events = context.get('inlet_events')
        
        # ۲) ALL events ever generated for this asset
        triggering_events = context.get('triggering_asset_events', {})
        
        if triggering_events:
            events_for_asset = triggering_events.get(my_asset, [])
            for event in events_for_asset:
                log.info(f"Event: {event.extra}")
        else:
            log.info("No triggering asset events found")
        
        log.info("Done processing.")
    
    process_transactions()

producer_dag()
consumer_dag()

۱) Asset چیست و چطور تعریف می‌شود؟

my_asset = Asset("transaction_asset_test")

در Airflow 3، یک Asset نشان‌دهنده‌ی یک منبع داده، خروجی محاسبات، فایل، یا هر موجودیت منطقی داده است.
این نام، نقطه اتصال Producer و Consumer محسوب می‌شود.


۲) Producer DAG – تولید رویداد + متادیتا

@task(outlets=[my_asset])
def generate_transactions():

وجود outlets=[my_asset] یعنی این تسک یک رویداد برای این Asset تولید می‌کند.
اما نکته مهم‌تر، خط زیر است:

yield Metadata(my_asset, extra={...})

این همان جایی است که Metadata (بخش extra) به Event چسبانده می‌شود.
این متادیتا می‌تواند شامل:

  • تعداد رکوردها
  • جمع مبالغ
  • شناسه Batch
  • نمونه رکوردها
  • مسیر فایل
  • نسخه Schema

باشد — هر چیزی که Consumer باید بداند.

نکته:
در Airflow 3 یک Task می‌تواند هم Metadata Event تولید کند (yield) و هم خروجی معمولی داشته باشد (return).


۳) Consumer DAG – اجرا بر اساس Asset

@dag(schedule=[my_asset])

این یعنی DAG فقط زمانی اجرا می‌شود که رویدادی برای این Asset منتشر شود.

و تسک اصلی:

@task(inlets=[my_asset])

به Airflow می‌گوید که این تسک به Asset Event دسترسی نیاز دارد.


۴) چطور رویدادها را از Context بخوانیم؟

Airflow 3 با TaskFlow SDK از این الگو استفاده می‌کند:

context = get_current_context()

و دو مقدار کلیدی را در اختیار ما می‌گذارد:


🔥 تفاوت کلیدی: inlet_events vs triggering_asset_events

این بخش مهم‌ترین قسمت کار است.

۱) inlet_events: فقط رویدادهایی که همین Run را Trigger کرده‌اند
inlet_events = context.get('inlet_events')

ویژگی‌ها:

  • معمولاً شامل یک رویداد است
  • فقط رویداد «جدیدترین اجرای Producer»
  • فقط چیزی که الان باعث اجرای DAG شده

🔍 کاربرد:

  • پردازش Incremental
  • واکنش به آخرین تغییر
  • Data freshness

۲) triggering_asset_events: تاریخچه کامل تمام رویدادهای Asset
triggering_events = context.get('triggering_asset_events', {})
events_for_asset = triggering_events.get(my_asset, [])

ویژگی‌ها:

  • لیست تمام رویدادهایی که تاکنون برای Asset ثبت شده‌اند
  • شامل تمام Metadataهای قبلی
  • امکان تحلیل تاریخی و Trend Analysis

🔍 کاربرد:

  • محاسبه مجموع همه batchها
  • تحلیل رفتار داده
  • پاک‌سازی و Audit
  • تصمیم‌گیری با توجه به تاریخچه داده

مقایسه سریع
ویژگیinlet_eventstriggering_asset_events
چه چیزی را نشان می‌دهد؟فقط رویداد Trigger فعلیهمه رویدادهای تاریخی
معمولاً چندتاست؟۱چندین
کاربردپردازش incrementalتحلیل و پردازش تجمیعی
حجم اطلاعاتکمبیشتر
متادیتافقط متادیتای آخرین runمتادیتای تمام runهای قبلی

۵) چطور متادیتا را پردازش کنیم؟

هر Event شامل:

  • source_dag_id
  • source_task_id
  • source_run_id
  • timestamp
  • extra (همان Metadata شما)

مثال:

for event in events_for_asset:
    meta = event.extra
    log.info(f"Count: {meta.get('transaction_count')}")
    log.info(f"Amount: {meta.get('total_amount')}")

۶) چه چیزهایی را در Metadata قرار بدهیم؟

بهترین شیوه‌ها:

  • transaction_count
  • run_timestamp
  • batch_id
  • file_path
  • data_quality_score
  • sample_records

قانون عمومی:

Metadata باید «کم‌حجم ولی معنادار» باشد.


۷) اشتباهات رایج

❌ چرا print() لاگ نمی‌دهد؟

Airflow 3 با TaskFlow SDK همیشه print() را capture نمی‌کند.
از log.info() استفاده کنید.

❌ Assetها همنام هستند

همه Assetها باید نام یکتا داشته باشند.


۸) یک مثال پیشرفته – پردازش هوشمند

@task(inlets=[my_asset])
def smart_process():
    context = get_current_context()
    
    latest_event = context.get("inlet_events")[0]
    all_events = context.get("triggering_asset_events", {}).get(my_asset, [])

    total_transactions = sum(e.extra.get("transaction_count", 0) for e in all_events)

    if total_transactions > 10000:
        log.info("High volume detected — use fast path")
    else:
        log.info("Normal volume — use standard path")

جمع‌بندی – چه چیزی را یاد گرفتیم؟

  • Producer با yield Metadata(...) رویداد + متادیتا منتشر می‌کند.
  • Consumer بر اساس رویداد Asset اجرا می‌شود.
  • در Context دو چیز داریم:

inlet_events

رویدادی که الان DAG را Trigger کرده است.

triggering_asset_events

تمام رویدادهای تاریخی Asset.


محتوای ویدئویی

کارگاه عملی پنجم جلسه پنجم به موضوع مدیریت رخدادها به ازای Asset ها و موضوع Metadata و همچنین Inlets می‌پردازد که به کم این مفاهیم، می توانیم کنترل کامل چرخه تولید و پردازش Data Asset ها را مدیریت کنیم.. این محتوای آموزشی را می توانید به صورت عملی در فیلم آموزشی زیر مشاهده کنید.

نکته : اگر فیلم در قسمت زیر قابل مشاهده نیست، مطمئن شوید که با آی پی ایران متصل شده اید یا یک اینترنت پروایدر دیگر را امتحان کنید .

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید