Airflow برای مدیریت و ردیابی DAGها و Taskها از یک دیتابیس متادیتا استفاده میکند. در محیط ما، این دیتابیس PostgreSQL است که مشخصات اتصال آن در فایل docker-compose.yml به شکل زیر تعریف شده:
services:
postgres:
image: docker.arvancloud.ir/postgres:15-alpine
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "۵۴۵۴:۵۴۳۲"
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- iceberg_net
localhost۵۴۵۴airflowairflowairflowبا اتصال به دیتابیس، میتوانیم جداول متادیتای Airflow و خصوصاً جدول xcom را مشاهده کنیم.
xcom بسیار سنگین شود و روی عملکرد دیتابیس تاثیر بگذارد.فرض کنید یک Task با TaskFlow API یک تراکنش تولید کرده است:
@task()
def generate_transaction() -> dict:
transaction = {
"transactionId": "123",
"userId": "ali",
"amount": 500,
"currency": "USD"
}
return transaction
xcom مشاهده میکنیم:
transactionId, userId, amount, currency)@task()
def save_transaction(transaction: dict) -> str:
path = f"/tmp/tx_{transaction['transactionId']}.json"
with open(path, "w") as f:
json.dump(transaction, f)
return path # فقط مسیر فایل در XCom ذخیره میشود
| موضوع | توضیح |
|---|---|
جدول dag_run | رکوردهای اجرای DAGها، وضعیت و زمان اجرا |
جدول task_instance | وضعیت هر Task، Retryها و زمان شروع/پایان |
جدول xcom | دادههای برگشتی Taskها و پیامهای بین Taskها |
اگر بخواهی، میتوانم یک تصویر یا دیاگرام جریان XCom بین Taskها و DAGها همراه با جدول xcom واقعی از یک transaction نمونه هم رسم کنم تا دید بصری از ساختار داشته باشیم.