در این بخش، به بازنویسی پایپلاین پردازش داده با استفاده از تسکها و پارامتر outlets در Airflow 3 میپردازیم. هدف این مرحله، ارتقاء DAGهای دادهمحور با کنترل دقیقتر جریان داده و وابستگیها است و اجازه میدهد چندین Asset در یک DAG مدیریت شوند و تسکها فقط زمانی اجرا شوند که دادههای upstream آماده باشند.
transaction_extract_dag شامل یک تسک است که تراکنشهای تصادفی تولید و در MinIO به صورت binary ذخیره میکند.outlets=[Asset("transaction_asset")] باعث میشود ایرفلو متوجه بشود که این تسک قرار است یک Asset با نام مشخص شده را به روز رسانی کند. بنابراین در انتهای تابع متناظر، رخدادهای لازم برای آن Asset تولید خواهند شد که به کمک آنها، سایر دگهای وابسته به این دادهها، تریگر خواهند شد. from airflow.sdk import Asset, dag, task
from faker import Faker
from datetime import datetime, timezone
import random, pickle, os, json
import pandas as pd
from minio import Minio
from airflow.sdk import Variable
fake = Faker()
MINIO_BUCKET = "airflow-s3"
MINIO_STAGE_FOLDER = "stage/transactions"
MINIO_FINAL_FOLDER = "transactions"
# ------------------- Extract: Generate Transactions -------------------
@dag(schedule="* * * * *", catchup=False, tags=["transactions"])
def transaction_extract_dag():
@task(outlets=[Asset("raw_transactions")])
def extract_transactions():
"""Generate random transactions and stage them in MinIO"""
client = Minio(
Variable.get("MINIO_ENDPOINT", "minio:9000"),
access_key=Variable.get("MINIO_ACCESS_KEY"),
secret_key=Variable.get("MINIO_SECRET_KEY"),
secure=False
)
if not client.bucket_exists(MINIO_BUCKET):
client.make_bucket(MINIO_BUCKET)
transactions = []
for _ in range(2):
user = fake.simple_profile()
tx = {
"transactionId": fake.uuid4(),
"userId": user['username'],
"timestamp": datetime.now(timezone.utc).isoformat(),
"amount": round(random.uniform(10, 1000), 2),
"currency": random.choice(['USD', 'GBP']),
"city": fake.city(),
"country": fake.country(),
"merchantName": fake.company(),
"paymentMethod": random.choice(['credit_card', 'debit_card', 'online_transfer']),
"ipAddress": fake.ipv4(),
"voucherCode": random.choice(['', 'DISCOUNT10', '']),
"affiliateId": fake.uuid4(),
}
transactions.append(tx)
# Stage as binary
filename = f"tx_{tx['transactionId']}.bin"
local_path = f"/tmp/{filename}"
with open(local_path, "wb") as f:
pickle.dump(tx, f)
object_path = f"{MINIO_STAGE_FOLDER}/{filename}"
client.fput_object(MINIO_BUCKET, object_path, local_path)
os.remove(local_path)
print(f"Extracted {len(transactions)} transactions")
return transactions
extract_transactions()
transaction_extract_dag()
schedule="@daily").transaction_finalize_dag وابسته به Asset تراکنشها است.
# ------------------- Transform: Finalize Transactions -------------------
@dag(schedule=[Asset("raw_transactions")], catchup=False, tags=["transactions"])
def transaction_transform_dag():
@task(outlets=[Asset("finalized_transactions")])
def finalize_transactions(**context):
"""Convert staged binaries to JSON in MinIO"""
transactions = context["ti"].xcom_pull(
dag_id="transaction_extract_dag",
task_ids="extract_transactions",
key="return_value",
include_prior_dates=True
)
# Handle None or nested lists
if not transactions:
print("No transactions found")
return []
# Flatten if nested
if isinstance(transactions[0], list):
flat_transactions = [tx for sublist in transactions for tx in sublist]
else:
flat_transactions = transactions
client = Minio(
Variable.get("MINIO_ENDPOINT", "minio:9000"),
access_key=Variable.get("MINIO_ACCESS_KEY"),
secret_key=Variable.get("MINIO_SECRET_KEY"),
secure=False
)
final_files = []
for tx in flat_transactions:
filename = f"tx_{tx['transactionId']}.json"
local_json = f"/tmp/{filename}"
with open(local_json, "w") as f:
json.dump(tx, f)
final_object_path = f"{MINIO_FINAL_FOLDER}/{filename}"
client.fput_object(MINIO_BUCKET, final_object_path, local_json)
os.remove(local_json)
final_files.append(final_object_path)
print(f"Transformed {len(final_files)} transactions to JSON")
return final_files
finalize_transactions()
transaction_transform_dag()

transaction_load_dag وابسته به Asset نهاییسازی است.combine_jsons_2_parquet حداقل ۱۰ فایل JSON را ترکیب کرده و یک فایل Parquet تولید میکند.
# ------------------- Load: Parquet Lakehouse -------------------
@dag(schedule=[Asset("finalized_transactions")], catchup=False, tags=["transactions"])
def transaction_load_dag():
@task(outlets=[Asset("lakehouse_parquet")], pool="lakehouse_pool", pool_slots=1)
def generate_parquet(**context):
"""Combine finalized JSONs into Parquet"""
transaction_files = context["ti"].xcom_pull(
dag_id="transaction_transform_dag",
task_ids="finalize_transactions",
key="return_value",
include_prior_dates=True
)
# FIX: Flatten nested lists from XCom
if not transaction_files:
print("No transaction files found")
return None
# Handle nested list structures
flat_files = []
if isinstance(transaction_files, list):
for item in transaction_files:
if isinstance(item, list):
# Nested list - flatten it
flat_files.extend(item)
else:
# Single item
flat_files.append(item)
else:
flat_files = [transaction_files]
print(f"Found {len(flat_files)} transaction files after flattening")
if len(flat_files) < 10:
print("Less than 10 files. Skipping Parquet creation.")
return None
client = Minio(
Variable.get("MINIO_ENDPOINT", "minio:9000"),
access_key=Variable.get("MINIO_ACCESS_KEY"),
secret_key=Variable.get("MINIO_SECRET_KEY"),
secure=False
)
dfs, selected_files = [], flat_files[:10]
for f in selected_files:
# Now 'f' is guaranteed to be a string, not a list
local_file = f"/tmp/{os.path.basename(f)}"
client.fget_object(MINIO_BUCKET, f, local_file)
with open(local_file, "r") as file:
data = json.load(file)
dfs.append(pd.DataFrame([data]))
os.remove(local_file)
parquet_path = f"/tmp/transactions_{int(datetime.now().timestamp())}.parquet"
combined_df = pd.concat(dfs, ignore_index=True)
combined_df.to_parquet(parquet_path, engine="pyarrow", index=False)
object_name = f"lakehouse/{os.path.basename(parquet_path)}"
client.fput_object(MINIO_BUCKET, object_name, parquet_path)
os.remove(parquet_path)
for f in selected_files:
client.remove_object(MINIO_BUCKET, f)
print(f"Loaded Parquet asset: {object_name}")
return object_name
generate_parquet()
transaction_load_dag()
schedule=[Asset("finalized_transactions")] اجرا میشود، یعنی تنها زمانی که دادههای upstream آماده باشند.
schedule=[Asset(...)] تضمین میکند که تسکها فقط زمانی اجرا شوند که دادههای upstream آماده باشند.با بازنویسی پایپلاین به صورت task-based:
کارگاه عملی چهارم جلسه پنجم به موضوع Task Based Data Assets میپردازد و به صورت عملی نشان میدهد که چگونه میتوانیم کنترل بیشتری روی تسکها و Asset ها داشته باشیم. مثالی که در بالا در همین خصوص، توضیح داده شده است را می توانید به صورت عملی در فیلم آموزشی زیر مشاهده کنید.
نکته : اگر فیلم در قسمت زیر قابل مشاهده نیست، مطمئن شوید که با آی پی ایران متصل شده اید یا یک اینترنت پروایدر دیگر را امتحان کنید