بخش اول : مفاهیم پایه و شروع کار با ایرفلو
بخش دوم : مهارت‌های حرفه‌ای در طراحی جریان‌های کار
بخش سوم : طراحی و مدیریت جریان‌های کار پیچیده
بخش چهارم: کارگاه عملی و آشنایی با جایگزین‌های ایرفلو

بازنویسی پایپ‌لاین پردازش داده‌های بورس با TaskFlow API و نحوه به کارگیری متغیرهای Context در تسک‌ها

در این جلسه یاد گرفتیم که چگونه می‌توان پایپ‌لاین پردازش داده‌ها را با TaskFlow API بازنویسی کرد و از متغیرهای کانتکست برای دسترسی به اطلاعات زمان‌بندی DAG استفاده نمود.


۱. مفهوم Context Variables در TaskFlow

  • TaskFlow API امکان می‌دهد که پارامترهای Execution Context مثل ds، execution_date و غیره را مستقیماً به عنوان آرگومان تابع Task مشخص کنیم.
  • Airflow هنگام اجرای DAG این پارامترها را به طور خودکار پر می‌کند.
  • مثال:
@task()
def download_stock_exchange_xlsx_file(ds: str) -> str:
    url = f"https://members.tsetmc.com/tsev2/excel/MarketWatchPlus.aspx?d={ds}"
    file_path = f"/data/daily_trades_{ds}.xlsx"
    # Download logic...
    return file_path

توجه: ds به طور خودکار مقدار تاریخ اجرای DAG را دریافت می‌کند.


۲. استفاده از Variable و Connection

  • Variable: برای ذخیره مقادیر پیکربندی مثل MINIO_ACCESS_KEY و MINIO_SECRET_KEY استفاده می‌شود.
  • Connection: برای اتصال به سرویس‌های خارجی (مثل MinIO، پایگاه داده‌ها و غیره) استفاده می‌شود.
  • مثال Task برای خواندن Credentialها:
@task()
def read_minio_credentials() -> Dict[str, str]:
    access_key = Variable.get("MINIO_ACCESS_KEY")
    secret_key = Variable.get("MINIO_SECRET_KEY")
    endpoint = Variable.get("MINIO_ENDPOINT", default_var="minio:9000")
    return {"access_key": access_key, "secret_key": secret_key, "endpoint": endpoint}

۳. پردازش فایل‌های Excel و تبدیل به CSV

  • فایل XLSX را دانلود و سپس به CSV تبدیل می‌کنیم و برخی ستون‌های زمان فارسی (fa_date, fa_year) و میلادی (en_date) را اضافه می‌کنیم.
  • پارامتر ds از Context برای تعیین تاریخ استفاده می‌شود.
@task()
def preprocess_convert_to_csv(file_path: str, ds: str) -> str:
    csv_file_path = f"/data/daily_trades_{ds}.csv"
    today_jdate = jdatetime.date.today()
    df = pd.read_excel(file_path, header=0, skiprows=2, engine="openpyxl")
    df = df.assign(fa_date=today_jdate.strftime("%Y-%m-%d"),
                   en_date=ds,
                   fa_year=today_jdate.year)
    df.to_csv(csv_file_path, index=False, encoding="utf-8")
    return csv_file_path

۴. آپلود فایل به MinIO و پاک‌سازی فایل‌ها

  • از کتابخانه MinIO برای ذخیره CSVها در Bucket استفاده می‌کنیم.
  • سپس فایل‌های محلی حذف می‌شوند تا فضای دیسک آزاد شود.
@task()
def upload_csv_to_minio(csv_file_path: str, minio_creds: Dict[str, str]) -> str:
    client = Minio(minio_creds["endpoint"],
                   access_key=minio_creds["access_key"],
                   secret_key=minio_creds["secret_key"],
                   secure=False)
    object_name = f"{jdatetime.date.today().year}/{path.basename(csv_file_path)}"
    if not client.bucket_exists(MINIO_BUCKET):
        client.make_bucket(MINIO_BUCKET)
    client.fput_object(MINIO_BUCKET, object_name, csv_file_path)
    return object_name

@task()
def cleanup_files(*file_paths):
    for file_path in file_paths:
        if path.exists(file_path):
            remove(file_path)

۵. DAG Flow با TaskFlow API

minio_creds = read_minio_credentials()
xlsx_file_path = download_stock_exchange_xlsx_file(ds="{{ ds }}")
csv_file_path = preprocess_convert_to_csv(xlsx_file_path, ds="{{ ds }}")
minio_object = upload_csv_to_minio(csv_file_path, minio_creds)
cleanup = cleanup_files(xlsx_file_path, csv_file_path)

minio_object >> cleanup
  • Dependencyها به طور واضح مشخص شده‌اند.
  • ds مستقیماً از Context به Taskها منتقل می‌شود.

۶. دسترسی به Context در حالت کلاسیک DAG

  • حتی در DAGهای کلاسیک هم می‌توان به Context دسترسی داشت و پارامترهایی مثل ds، ti و غیره را دریافت کرد.
  • مثال استفاده از ti برای XCom:
def preprocess_convert_to_csv(**kwargs):
    ti = kwargs["ti"]
    ds = kwargs["ds"]
    csv_file_path = f"/data/daily_trades_{ds}.csv"
    # پردازش Excel...
    ti.xcom_push(key="fa_year", value=jdatetime.date.today().year)

def print_xcom(**kwargs):
    ti = kwargs["ti"]
    fa_year = ti.xcom_pull(task_ids="Preprocess-Convert-To-CSV", key="fa_year")
    print(f"📌 Pulled XCom value fa_year = {fa_year}")

۷. نکات کلیدی

  1. TaskFlow API ساده‌تر و خواناتر از DAGهای کلاسیک است.
  2. پارامترهای Context را می‌توان مستقیماً به عنوان آرگومان Task نوشت (ds, execution_date, …).
  3. XCom برای انتقال مقادیر بین Taskها استفاده می‌شود.
  4. استفاده از Variable و Connection باعث می‌شود DAG از مقادیر حساس و کانکشن‌ها جدا باشد و کد قابل حمل‌تر شود.
فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید