در این جلسه یاد گرفتیم که چگونه میتوان پایپلاین پردازش دادهها را با TaskFlow API بازنویسی کرد و از متغیرهای کانتکست برای دسترسی به اطلاعات زمانبندی DAG استفاده نمود.
ds، execution_date و غیره را مستقیماً به عنوان آرگومان تابع Task مشخص کنیم.@task()
def download_stock_exchange_xlsx_file(ds: str) -> str:
url = f"https://members.tsetmc.com/tsev2/excel/MarketWatchPlus.aspx?d={ds}"
file_path = f"/data/daily_trades_{ds}.xlsx"
# Download logic...
return file_path
توجه:
dsبه طور خودکار مقدار تاریخ اجرای DAG را دریافت میکند.
MINIO_ACCESS_KEY و MINIO_SECRET_KEY استفاده میشود.@task()
def read_minio_credentials() -> Dict[str, str]:
access_key = Variable.get("MINIO_ACCESS_KEY")
secret_key = Variable.get("MINIO_SECRET_KEY")
endpoint = Variable.get("MINIO_ENDPOINT", default_var="minio:9000")
return {"access_key": access_key, "secret_key": secret_key, "endpoint": endpoint}
fa_date, fa_year) و میلادی (en_date) را اضافه میکنیم.ds از Context برای تعیین تاریخ استفاده میشود.@task()
def preprocess_convert_to_csv(file_path: str, ds: str) -> str:
csv_file_path = f"/data/daily_trades_{ds}.csv"
today_jdate = jdatetime.date.today()
df = pd.read_excel(file_path, header=0, skiprows=2, engine="openpyxl")
df = df.assign(fa_date=today_jdate.strftime("%Y-%m-%d"),
en_date=ds,
fa_year=today_jdate.year)
df.to_csv(csv_file_path, index=False, encoding="utf-8")
return csv_file_path
@task()
def upload_csv_to_minio(csv_file_path: str, minio_creds: Dict[str, str]) -> str:
client = Minio(minio_creds["endpoint"],
access_key=minio_creds["access_key"],
secret_key=minio_creds["secret_key"],
secure=False)
object_name = f"{jdatetime.date.today().year}/{path.basename(csv_file_path)}"
if not client.bucket_exists(MINIO_BUCKET):
client.make_bucket(MINIO_BUCKET)
client.fput_object(MINIO_BUCKET, object_name, csv_file_path)
return object_name
@task()
def cleanup_files(*file_paths):
for file_path in file_paths:
if path.exists(file_path):
remove(file_path)
minio_creds = read_minio_credentials()
xlsx_file_path = download_stock_exchange_xlsx_file(ds="{{ ds }}")
csv_file_path = preprocess_convert_to_csv(xlsx_file_path, ds="{{ ds }}")
minio_object = upload_csv_to_minio(csv_file_path, minio_creds)
cleanup = cleanup_files(xlsx_file_path, csv_file_path)
minio_object >> cleanup
ds مستقیماً از Context به Taskها منتقل میشود.ds، ti و غیره را دریافت کرد.ti برای XCom:def preprocess_convert_to_csv(**kwargs):
ti = kwargs["ti"]
ds = kwargs["ds"]
csv_file_path = f"/data/daily_trades_{ds}.csv"
# پردازش Excel...
ti.xcom_push(key="fa_year", value=jdatetime.date.today().year)
def print_xcom(**kwargs):
ti = kwargs["ti"]
fa_year = ti.xcom_pull(task_ids="Preprocess-Convert-To-CSV", key="fa_year")
print(f"📌 Pulled XCom value fa_year = {fa_year}")
ds, execution_date, …).