بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

آشنایی عملی با Data Assets و ایجاد یک پایپ‌ لاین پردازش داده – محتوای ویدئویی

در این بخش، به آشنایی عملی با Data Assets در ایرفلو ۳ و ایجاد یک پایپ‌لاین پردازش داده می‌پردازیم. با استفاده از قابلیت Assets، می‌توانیم DAGها را به گونه‌ای طراحی کنیم که وابسته به داده باشند؛ یعنی به جای اجرای صرفاً زمان‌بندی‌شده، DAGها فقط زمانی اجرا شوند که داده‌های ورودی آماده باشند. این رویکرد به‌ویژه برای خطوط پردازش داده و پروژه‌های یادگیری ماشین بسیار کاربردی است.


اهمیت استفاده از Assets

  • DAGهای سنتی صرفاً بر اساس زمان اجرا می‌شوند (cron-based).
  • در پروژه‌های واقعی، بسیاری از تسک‌ها وابسته به داده‌های تولیدشده توسط تیم‌های دیگر هستند.
  • با Assets، می‌توان اجرای DAGهای downstream را تنها زمانی که داده آماده است برنامه‌ریزی کرد.
  • مزایای اصلی:
    • زمان‌بندی وابسته به داده
    • تعریف وابستگی‌های داده‌ای بین DAGها
    • ایجاد پایپ‌لاین‌های ساده‌تر و قابل نگهداری‌تر

مفهوم پایه @asset

  • @asset یک دکوراتور است که یک DAG با یک تسک تولیدکننده‌ی داده (Asset) می‌سازد.
  • مثال:
from airflow.sdk import asset

@asset(schedule="@daily")
def my_asset():
    # منطق تسک
    pass
  • schedule می‌تواند زمان‌بندی دوره‌ای یا وابسته به داده‌های upstream باشد.
  • خروجی Asset از طریق XCom برای تسک‌های downstream قابل دسترسی است.

نکته مهم : مبنای زمان‌بندی و شروع یک کار در ایرفلو، دگ DAG است یعنی زمان‌بندی کارها روی DAG تعریف می‌شود. بنابراین اگر بخواهیم که بر اساس آماده شدن دیتا، کار جدیدی در ایرفلو شروع و پردازش مرحله بعد روی آن انجام شود، باید یک دگ تعریف کنیم. به همین دلیل در بخش Data Asset ها ما به ازای هر مرحله از پردازش داده، یک دگ ایجاد می کنیم و زمان‌بندی آنرا وابسته به یک Data Asset می‌کنیم. در سایر مثال‌های این بخش هم که از Task Based Data Assets استفاده خواهیم کرد، همین منطق برقرار است و به ازای هر مرحله از پردازش، زمانی که قرار است یک کار جدید بر اساس داده‌های مرحله قبل شروع شود، یک دگ خواهیم ساخت.


ایجاد DAG Asset برای تراکنش‌ها

  • داده‌های تولید شده در Step 1 به عنوان یک Asset تعریف می‌شوند.
  • MinIO برای staging داده‌ها به صورت binary استفاده می‌شود. این مرحله برای شبیه سازی دنیای واقعی استفاده شده است که در آن، ما از یک یا چند گام میانی برای پردازش داده استفاده میکنیم و سپس در انتهای مراحل مختلف پردازشی، دیتای نهایی را به یک دیتابیس یا یک مقصد مورد نیاز دیگر منتقل می‌شود. به این فضایی که برای پردازش های میانی نیاز داریم، Stage می گوییم و اینجا هم به همین منظور از MiniO برای ذخیره موقت داده‌ها استفاده شده است.
  • DAG نمونه:
@asset(schedule="* * * * *")  # هر دقیقه برای نمونه
def transaction_asset():
    """Generate random transactions and stage them in MinIO"""
    ...
    return transactions
  • هر تراکنش به صورت فایل binary ذخیره می‌شود.
  • خروجی به تسک‌های بعدی برای پردازش نهایی منتقل می‌شود.

نهایی‌سازی تراکنش‌ها به عنوان Asset downstream

  • تبدیل فایل‌های binary به JSON.
  • اجرا فقط زمانی که transaction_asset آماده باشد.
  • نمونه کد:
@asset(schedule=transaction_asset)
def finalize_transactions_asset(context):
    """Convert staged binaries to JSON in MinIO"""
    ...
    return final_files
  • XCom برای انتقال داده‌ها بین Assetها استفاده می‌شود.
  • فایل‌های JSON در MinIO ذخیره و فایل‌های binary حذف می‌شوند.

تولید فایل‌های Parquet برای Lakehouse

  • Asset سوم، JSONهای نهایی را می‌گیرد و فایل Parquet تولید می‌کند.
  • این تسک در pool اختصاصی اجرا می‌شود تا منابع کنترل شوند.
  • نمونه کد:
@asset(schedule=finalize_transactions_asset, pool="lakehouse_pool", pool_slots=1)
def lakehouse_generate_parquets(context):
    """Combine JSON files into Parquet"""
    ...
    return object_name
  • ترکیب داده‌ها با Pandas و ذخیره در MinIO.
  • فایل‌های JSON ترکیبی حذف می‌شوند تا فضای ذخیره‌سازی آزاد شود.

نکات کلیدی

  1. Data-aware scheduling: اجرا وابسته به آماده شدن داده‌های upstream.
  2. زنجیره Assetها: تراکنش‌ها → نهایی‌سازی JSON → تولید Parquet.
  3. XComها: انتقال امن داده بین Assetها.
  4. Pools: کنترل منابع برای تسک‌های سنگین.
  5. ادغام با MinIO: ذخیره‌سازی واقعی داده‌ها.

✅ ساختار نهایی DAG

transaction_asset (* * * * *)  
        ↓  
finalize_transactions_asset (triggered after transactions)  
        ↓  
lakehouse_generate_parquets (pooled, runs when ≥۱۰ files)  

🎯 جمع‌بندی

در این مرحله، DAG تراکنش‌ها به یک پایپ‌لاین داده‌محور تبدیل شد:

  • تراکنش‌ها به عنوان Asset اصلی مطرح شدند.
  • نهایی‌سازی JSON و تولید Parquet تنها زمانی که داده آماده است انجام می‌شوند.
  • استفاده از Pools، اجرای تسک‌های سنگین را مدیریت می‌کند.
  • با Assets در Airflow 3، از زمان‌بندی صرفاً زمانی به جریان‌های کاری داده‌محور منتقل می‌شویم، که pipelines را هوشمندتر و آماده تولید می‌کند.

محتوای ویدئویی

کارگاه عملی سوم جلسه پنجم که شروع کار با Data Asset‌ هاست به اجرای مثال فوق به کمک تعریف سه Asset‌ که نمایانگر مراحل مختلف پردازش داده هستند می پردازد و همزمان به کمک امکانات گرافیکی ایرفلو، جریان داده بین مراحل مختلف (گراف پردازش داده‌) را هم به کاربر نشان می دهد. این محتوای آموزشی را می توانید به صورت عملی در فیلم آموزشی زیر مشاهده کنید.

نکته : اگر فیلم در قسمت زیر قابل مشاهده نیست، مطمئن شوید که با آی پی ایران متصل شده اید یا یک اینترنت پروایدر دیگر را امتحان کنید .

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید