یکی از مهمترین توصیهها در بهترین شیوههای توسعه با Airflow این است که در فایلهای DAG فقط باید تعریف تسکها و جریان دادهها (Workflow Orchestration) انجام شود.
قرار دادن منطق پردازشی یا کدهای اجرایی خارج از Taskها میتواند مشکلات جدی ایجاد کند.
در این مثال (Stock-Exchange-V1) قبل از تعریف DAG، یک کد Python برای تولید پروفایل جعلی با Faker نوشته شده:
import json
import uuid
from faker import Faker
from pathlib import Path
from datetime import date, datetime
# ---- Generate Random JSON Profile ----
fake = Faker()
profiles_dir = Path("/data/profiles")
profiles_dir.mkdir(parents=True, exist_ok=True)
profile = fake.simple_profile()
def json_serializer(obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return str(obj)
file_path = profiles_dir / f"{uuid.uuid4()}.json"
with open(file_path, "w") as f:
json.dump(profile, f, indent=2, default=json_serializer)
print(f"[INFO] Faker profile saved to {file_path}")
# --------------------------------------
این کد بیرون از DAG و خارج از هر Task قرار گرفته.
در Airflow 3، فایلهای DAG توسط یک سرویس مستقل به نام DAG Processor Parse میشوند.
👉 نتیجه این تغییر این است که هر بار DAG Processor فایل DAG را بررسی کند، تمام کدی که بیرون از Task نوشته شده اجرا میشود.
اگر به پوشه /data/profiles نگاه کنید که در Docker Compose به پوشه data کنار پروژه مونت شده است، خواهید دید که حدود هر ۳۰ ثانیه یک فایل JSON جدید تولید میشود.
این دقیقاً همان Side Effect ناخواستهای است که از قرار دادن کد خارج از Task به وجود میآید.
مثلاً در مسیر plugins/utils/profile_generator.py:
import json, uuid
from faker import Faker
from pathlib import Path
from datetime import date, datetime
fake = Faker()
profiles_dir = Path("/data/profiles")
profiles_dir.mkdir(parents=True, exist_ok=True)
def generate_profile():
profile = fake.simple_profile()
file_path = profiles_dir / f"{uuid.uuid4()}.json"
def json_serializer(obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return str(obj)
with open(file_path, "w") as f:
json.dump(profile, f, indent=2, default=json_serializer)
print(f"[INFO] Faker profile saved to {file_path}")
from airflow.operators.python import PythonOperator
from utils.profile_generator import generate_profile
task_generate_profile = PythonOperator(
task_id="Generate-Profile",
python_callable=generate_profile,
)
/data/profiles در محیط Docker Compose میتوانید اثر این اشتباه را بهصورت عملی مشاهده کنید: فایلها هر ۳۰ ثانیه تولید میشوند.