بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

بررسی دیتابیس پستگرس ایرفلو و مشاهده نحوه ذخیره تسک‌ها و مقادیر برگشتی

Airflow برای مدیریت و ردیابی DAGها و Taskها از یک دیتابیس متادیتا استفاده می‌کند. در محیط ما، این دیتابیس PostgreSQL است که مشخصات اتصال آن در فایل docker-compose.yml به شکل زیر تعریف شده:

services:
  postgres:
    image: docker.arvancloud.ir/postgres:15-alpine
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "۵۴۵۴:۵۴۳۲"
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always
    networks:
      - iceberg_net

۱️⃣ اتصال به دیتابیس با DBeaver

  • از ابزارهایی مثل DBeaver یا pgAdmin می‌توان برای بررسی دیتابیس استفاده کرد.
  • اطلاعات اتصال:
    • Host: localhost
    • Port: ۵۴۵۴
    • Database: airflow
    • User: airflow
    • Password: airflow

با اتصال به دیتابیس، می‌توانیم جداول متادیتای Airflow و خصوصاً جدول xcom را مشاهده کنیم.


۲️⃣ جدول XCom

  • XCom (Cross-Communication) برای انتقال داده بین Taskها در DAG استفاده می‌شود.
  • هر Task می‌تواند داده‌ای را به XCom ارسال کند و Taskهای بعدی آن را دریافت کنند.

نکته مهم در نسخه کلاسیک:

  • اگر یک دیکشنری بزرگ یا JSON پیچیده را در یک Task قرار دهیم، Airflow به صورت پیش‌فرض دو نوع رکورد ایجاد می‌کند:
    1. یک رکورد کلی برای کل JSON،
    2. یک رکورد به ازای هر کلید و مقدار در JSON.
  • این کار باعث می‌شود برای JSONهای بزرگ، جدول xcom بسیار سنگین شود و روی عملکرد دیتابیس تاثیر بگذارد.

۳️⃣ مشاهده XCom برای یک Transaction نمونه

فرض کنید یک Task با TaskFlow API یک تراکنش تولید کرده است:

@task()
def generate_transaction() -> dict:
    transaction = {
        "transactionId": "123",
        "userId": "ali",
        "amount": 500,
        "currency": "USD"
    }
    return transaction
  • با اجرای این Task، ایرفلو به طور خودکار return value را به XCom می‌فرستد.
  • در جدول xcom مشاهده می‌کنیم:
    • رکورد اصلی با کل JSON،
    • رکوردهای جداگانه برای هر کلید (transactionId, userId, amount, currency)
  • اگر تعداد تراکنش‌ها زیاد باشد، این روش باعث افزایش شدید حجم دیتابیس می‌شود.

۴️⃣ بهترین روش برای JSONهای حجیم

  • برای جلوگیری از سنگینی دیتابیس، بهتر است داده‌های بزرگ را در فایل یا Storage خارجی (مثل MinIO) ذخیره کنیم و تنها مسیر فایل یا شناسه آن را در XCom قرار دهیم:
@task()
def save_transaction(transaction: dict) -> str:
    path = f"/tmp/tx_{transaction['transactionId']}.json"
    with open(path, "w") as f:
        json.dump(transaction, f)
    return path  # فقط مسیر فایل در XCom ذخیره می‌شود
  • این روش باعث می‌شود XCom سبک بماند و دیتابیس دچار فشار نشود.

۵️⃣ نکات تکمیلی

موضوعتوضیح
جدول dag_runرکوردهای اجرای DAGها، وضعیت و زمان اجرا
جدول task_instanceوضعیت هر Task، Retryها و زمان شروع/پایان
جدول xcomداده‌های برگشتی Taskها و پیام‌های بین Taskها
  • TaskFlow API به صورت خودکار return value را به XCom می‌فرستد.
  • برای داده‌های کوچک، این کار مناسب است.
  • برای داده‌های حجیم، بهتر است فایل یا مسیر خارجی ذخیره شود و XCom تنها مسیر را نگه دارد.

اگر بخواهی، می‌توانم یک تصویر یا دیاگرام جریان XCom بین Taskها و DAGها همراه با جدول xcom واقعی از یک transaction نمونه هم رسم کنم تا دید بصری از ساختار داشته باشیم.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید