بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

آشنایی با Task Based Data Assets و پارامتر Outlets – محتوای ویدئویی

در این بخش، به بازنویسی پایپ‌لاین پردازش داده با استفاده از تسک‌ها و پارامتر outlets در Airflow 3 می‌پردازیم. هدف این مرحله، ارتقاء DAGهای داده‌محور با کنترل دقیق‌تر جریان داده و وابستگی‌ها است و اجازه می‌دهد چندین Asset در یک DAG مدیریت شوند و تسک‌ها فقط زمانی اجرا شوند که داده‌های upstream آماده باشند.


اهمیت Task-Based Assets

  • هر تسک می‌تواند یک یا چند Asset تولید کند و محدود به DAG نیست.
  • چندین Asset می‌توانند داخل یک DAG وجود داشته باشند.
  • اما باز هم اگر نیاز به شروع یک تسک جدید بر اساس دیتای آماده شده مرحله قبل داشته باشیم باید یک دگ جدید تعریف کنیم و آنرا به این Data Asset وابسته کنیم.
  • مزایا:
    • زمان‌بندی دقیق و وابسته به داده
    • مدیریت داده‌های nested در XCom
    • طراحی ETL pipelineهای پیچیده و قابل نگهداری

تولید Asset تراکنش‌ها

  • DAG transaction_extract_dag شامل یک تسک است که تراکنش‌های تصادفی تولید و در MinIO به صورت binary ذخیره می‌کند.
  • استفاده از outlets=[Asset("transaction_asset")] باعث می‌شود ایرفلو متوجه بشود که این تسک قرار است یک Asset با نام مشخص شده را به روز رسانی کند. بنابراین در انتهای تابع متناظر، رخدادهای لازم برای آن Asset تولید خواهند شد که به کمک آنها، سایر دگ‌های وابسته به این داده‌ها، تریگر خواهند شد.
  • نمونه کد:
from airflow.sdk import Asset, dag, task
from faker import Faker
from datetime import datetime, timezone
import random, pickle, os, json
import pandas as pd
from minio import Minio
from airflow.sdk import Variable

fake = Faker()
MINIO_BUCKET = "airflow-s3"
MINIO_STAGE_FOLDER = "stage/transactions"
MINIO_FINAL_FOLDER = "transactions"

# ------------------- Extract: Generate Transactions -------------------
@dag(schedule="* * * * *", catchup=False, tags=["transactions"])
def transaction_extract_dag():

    @task(outlets=[Asset("raw_transactions")])
    def extract_transactions():
        """Generate random transactions and stage them in MinIO"""
        client = Minio(
            Variable.get("MINIO_ENDPOINT", "minio:9000"),
            access_key=Variable.get("MINIO_ACCESS_KEY"),
            secret_key=Variable.get("MINIO_SECRET_KEY"),
            secure=False
        )

        if not client.bucket_exists(MINIO_BUCKET):
            client.make_bucket(MINIO_BUCKET)

        transactions = []
        for _ in range(2):
            user = fake.simple_profile()
            tx = {
                "transactionId": fake.uuid4(),
                "userId": user['username'],
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "amount": round(random.uniform(10, 1000), 2),
                "currency": random.choice(['USD', 'GBP']),
                "city": fake.city(),
                "country": fake.country(),
                "merchantName": fake.company(),
                "paymentMethod": random.choice(['credit_card', 'debit_card', 'online_transfer']),
                "ipAddress": fake.ipv4(),
                "voucherCode": random.choice(['', 'DISCOUNT10', '']),
                "affiliateId": fake.uuid4(),
            }
            transactions.append(tx)

            # Stage as binary
            filename = f"tx_{tx['transactionId']}.bin"
            local_path = f"/tmp/{filename}"
            with open(local_path, "wb") as f:
                pickle.dump(tx, f)

            object_path = f"{MINIO_STAGE_FOLDER}/{filename}"
            client.fput_object(MINIO_BUCKET, object_path, local_path)
            os.remove(local_path)

        print(f"Extracted {len(transactions)} transactions")
        return transactions

    extract_transactions()

transaction_extract_dag()
  • این DAG به صورت روزانه اجرا می‌شود (schedule="@daily").

نهایی‌سازی تراکنش‌ها

  • DAG transaction_finalize_dag وابسته به Asset تراکنش‌ها است.
  • داده‌های مورد نیاز از XCom خوانده می شوند و مکانیزم دریافت داده‌ها از طریق XCOM است.
  • JSONهای نهایی در MinIO ذخیره می‌شوند.
  • نمونه کد:

# ------------------- Transform: Finalize Transactions -------------------
@dag(schedule=[Asset("raw_transactions")], catchup=False, tags=["transactions"])
def transaction_transform_dag():

    @task(outlets=[Asset("finalized_transactions")])
    def finalize_transactions(**context):
        """Convert staged binaries to JSON in MinIO"""
        transactions = context["ti"].xcom_pull(
            dag_id="transaction_extract_dag",
            task_ids="extract_transactions",
            key="return_value",
            include_prior_dates=True
        )

        # Handle None or nested lists
        if not transactions:
            print("No transactions found")
            return []

        # Flatten if nested
        if isinstance(transactions[0], list):
            flat_transactions = [tx for sublist in transactions for tx in sublist]
        else:
            flat_transactions = transactions

        client = Minio(
            Variable.get("MINIO_ENDPOINT", "minio:9000"),
            access_key=Variable.get("MINIO_ACCESS_KEY"),
            secret_key=Variable.get("MINIO_SECRET_KEY"),
            secure=False
        )

        final_files = []
        for tx in flat_transactions:
            filename = f"tx_{tx['transactionId']}.json"
            local_json = f"/tmp/{filename}"
            with open(local_json, "w") as f:
                json.dump(tx, f)

            final_object_path = f"{MINIO_FINAL_FOLDER}/{filename}"
            client.fput_object(MINIO_BUCKET, final_object_path, local_json)
            os.remove(local_json)
            final_files.append(final_object_path)

        print(f"Transformed {len(final_files)} transactions to JSON")
        return final_files


    finalize_transactions()

transaction_transform_dag()

ترکیب JSONها و تولید Parquet

  • DAG transaction_load_dag وابسته به Asset نهایی‌سازی است.
  • تسک combine_jsons_2_parquet حداقل ۱۰ فایل JSON را ترکیب کرده و یک فایل Parquet تولید می‌کند.
  • JSONهای ترکیب‌شده پس از تولید Parquet حذف می‌شوند.
  • نمونه کد:

# ------------------- Load: Parquet Lakehouse -------------------
@dag(schedule=[Asset("finalized_transactions")], catchup=False, tags=["transactions"])
def transaction_load_dag():

    @task(outlets=[Asset("lakehouse_parquet")], pool="lakehouse_pool", pool_slots=1)
    def generate_parquet(**context):
        """Combine finalized JSONs into Parquet"""
        transaction_files = context["ti"].xcom_pull(
            dag_id="transaction_transform_dag",
            task_ids="finalize_transactions",
            key="return_value",
            include_prior_dates=True
        )

        # FIX: Flatten nested lists from XCom
        if not transaction_files:
            print("No transaction files found")
            return None
            
        # Handle nested list structures
        flat_files = []
        if isinstance(transaction_files, list):
            for item in transaction_files:
                if isinstance(item, list):
                    # Nested list - flatten it
                    flat_files.extend(item)
                else:
                    # Single item
                    flat_files.append(item)
        else:
            flat_files = [transaction_files]
        
        print(f"Found {len(flat_files)} transaction files after flattening")

        if len(flat_files) < 10:
            print("Less than 10 files. Skipping Parquet creation.")
            return None

        client = Minio(
            Variable.get("MINIO_ENDPOINT", "minio:9000"),
            access_key=Variable.get("MINIO_ACCESS_KEY"),
            secret_key=Variable.get("MINIO_SECRET_KEY"),
            secure=False
        )

        dfs, selected_files = [], flat_files[:10]

        for f in selected_files:
            # Now 'f' is guaranteed to be a string, not a list
            local_file = f"/tmp/{os.path.basename(f)}"
            client.fget_object(MINIO_BUCKET, f, local_file)

            with open(local_file, "r") as file:
                data = json.load(file)
            dfs.append(pd.DataFrame([data]))
            os.remove(local_file)

        parquet_path = f"/tmp/transactions_{int(datetime.now().timestamp())}.parquet"
        combined_df = pd.concat(dfs, ignore_index=True)
        combined_df.to_parquet(parquet_path, engine="pyarrow", index=False)

        object_name = f"lakehouse/{os.path.basename(parquet_path)}"
        client.fput_object(MINIO_BUCKET, object_name, parquet_path)
        os.remove(parquet_path)

        for f in selected_files:
            client.remove_object(MINIO_BUCKET, f)

        print(f"Loaded Parquet asset: {object_name}")
        return object_name

    generate_parquet()

transaction_load_dag()
  • این DAG با schedule=[Asset("finalized_transactions")] اجرا می‌شود، یعنی تنها زمانی که داده‌های upstream آماده باشند.

🔑 نکات کلیدی

  1. Task-based assets امکان تولید چند Asset در یک DAG را فراهم می‌کنند.
  2. Flatten کردن XCom داده‌های nested را ایمن می‌کند.
  3. schedule=[Asset(...)] تضمین می‌کند که تسک‌ها فقط زمانی اجرا شوند که داده‌های upstream آماده باشند.
  4. این Assetها می‌توانند داده را تولید، تبدیل و ذخیره کنند (مثلاً در MinIO یا دیگر storageها).

🎯 جمع‌بندی

با بازنویسی پایپ‌لاین به صورت task-based:

  • DAGهای ETL وابسته به داده و کنترل‌شده ساخته شدند.
  • داده‌ها از مرحله تولید تا ترکیب و ذخیره Parquet به صورت خودکار و ایمن جریان پیدا می‌کنند.
  • این روش، زنجیره داده‌ای Flatten شده، کنترل منابع و وابستگی‌های دقیق بین تسک‌ها را فراهم می‌کند.

محتوای ویدئویی

کارگاه عملی چهارم جلسه پنجم به موضوع Task Based Data Assets می‌پردازد و به صورت عملی نشان می‌دهد که چگونه می‌توانیم کنترل بیشتری روی تسک‌ها و Asset ها داشته باشیم. مثالی که در بالا در همین خصوص، توضیح داده شده است را می توانید به صورت عملی در فیلم آموزشی زیر مشاهده کنید.

نکته : اگر فیلم در قسمت زیر قابل مشاهده نیست، مطمئن شوید که با آی پی ایران متصل شده اید یا یک اینترنت پروایدر دیگر را امتحان کنید

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید