پایهای برای مدیریت بهینه منابع و کارایی بالا در پردازشهای توزیعشده
قبل از آنکه به دنیای جذاب RDDها (Resilient Distributed Datasets) و توابع قدرتمند آنها شیرجه بزنیم، باید یک سوال اساسی را پاسخ دهیم: “کدهای ما دقیقاً کجا و چگونه اجرا میشوند؟“.
تصور کنید میخواهید یک آسمانخراش بسازید (پردازش دادههای بزرگ). RDDها آجرها و سیمانهای شما هستند، اما اگر از فونداسیون و سازه ساختمان (معماری و مدیریت منابع اسپارک) چیزی ندانید، بهترین آجرها هم نمیتوانند ساختمانی محکم بسازند.
در این جلسه، یاد میگیریم که چگونه به برنامه خود “بودجه” (رم و CPU) تخصیص دهیم و میبینیم که این بودجهبندی چگونه مستقیماً بر سرعت، پایداری و کارایی کارهای بعدی ما تأثیر میگذارد. این دانش، به شما کمک میکند دید عمیقتری نسبت به پشت صحنه اجرای برنامهها و نحوه اجرای یک برنامه و تقسیم آن بین چند نود پیدا کنید.
بیایید با یک تصویر ساده شروع کنیم. فرض کنید کلاستری با دو تا نود Worker و یک نود مستر داریم و به هر نود ورکر، دو گیگابایت رم و دو تا هسته سی پی یو اختصاص داده ایم. ورکرها در دنیای واقعی، نودهای اصلی اجرا کننده تسک ها در اسپارک هستند که Executor های هر برنامه، روی آنها ایجاد شده و منابع لازم را برای اجرای برنامه از ورکر دریافت کرده و بعد از اجرای تسکها، مجددا منابع را به ورکر بر می گردانند.
حال فرض کنید یک برنامه ساده پایتون را به کمک PySpark می خواهیم در این کلاستر اجرا کنیم. با اجرای این برنامه ، اجزای زیر را خواهیم داشت :
main() شما اجرا میشود.SparkContext را ایجاد میکند (نقطه ورود به اسپارک).فرض کنید خوشه شما به شکل زیر است:
کل منابع خوشه:
├── کارگر ۱: ۲ گیگابایت رم، ۲ هسته پردازنده
└── کارگر ۲: ۲ گیگابایت رم، ۲ هسته پردازنده
کل منابع قابل دسترس:
├── حافظه (RAM): ۴ گیگابایت
└── هسته (CPU): ۴ هسته
حالا با چند مثال عملی میبینیم که چگونه این تنظیمات را در کد اعمال کنیم و تأثیر آنها چیست.
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('App1-Config-Koochak') \
.master('spark://localhost:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '512M') \
.config('spark.executor.cores', '1') \
.config('spark.cores.max', '2') \
.getOrCreate()
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4040")
این تنظیمات یعنی چه:
spark.driver.memory: به Driver برنامه ۵۱۲ مگابایت رم اختصاص میدهد.spark.executor.memory: به هر Executor ۵۱۲ مگابایت رم میدهد.spark.executor.cores: هر Executor فقط از ۱ هسته CPU استفاده میکند.spark.cores.max: کل برنامه از حداکثر ۲ هسته استفاده میکند (یعنی ۲ Executor).import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('App2-Config-Motavaset') \
.master('spark://Mojtaba.localdomain:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '1G') \
.config('spark.executor.cores', '1') \
.config('spark.cores.max', '2') \
.config('spark.executor.instances', '2') \
.getOrCreate()
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4041")
این تنظیمات یعنی چه:
spark.executor.memory: هر Executor حالا ۱ گیگابایت رم دارد (۲ Executor = ۲ گیگابایت در مجموع).spark.executor.instances: ما به صراحت درخواست ۲ Executor کردهایم.spark.cores.max: کل برنامه از حداکثر ۲ هسته استفاده میکند (از کل خوشه استفاده میکند).import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('App3-Config-Parkhatar') \
.master('spark://Mojtaba.localdomain:7077') \
.config('spark.driver.memory', '1G') \
.config('spark.executor.memory', '1G') \
.config('spark.executor.cores', '2') \
.config('spark.cores.max', '4') \
.getOrCreate()
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4042")
این تنظیمات یعنی چه:
spark.executor.cores: هر Executor از ۲ هسته استفاده میکند.spark.cores.max: برنامه تلاش میکند از تمام ۴ هسته خوشه استفاده کند.| پارامتر | توضیحات | مثال | تأثیر |
|---|---|---|---|
spark.driver.memory | حافظه برای برنامه Driver | 512M, 1G | عملیات مربوط به مدیریت و نظارت |
spark.executor.memory | حافظه به ازای هر Executor | 512M, 1G | حافظه در دسترس برای اجرای وظایف |
spark.executor.cores | تعداد هستههای CPU به ازای هر Executor | ۱, ۲, ۴ | تعداد وظایف موازی در هر Executor |
spark.cores.max | کل هستههای قابل استفاده توسط برنامه | ۲, ۴ | محدودیت منابع CPU برای کل برنامه |
spark.executor.instances | تعداد Executorهای ثابت | ۲, ۴ | سطح موازیسازی کلی برنامه |
spark.dynamicAllocation.enabled | فعالسازی تخصیص دینامیک Executorها | true, false | انعطافپذیری در مصرف منابع |
تخصیص دینامیک منابع (Dynamic Allocation):
این قابلیت به اسپارک اجازه میدهد تا بر اساس بار کاری، به صورت خودکار تعداد Executorها را افزایش یا کاهش دهد. این کار باعث بهینهسازی مصرف منابع و کاهش هزینهها میشود.
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=20
وقتی به یک Executor ۱ گیگابایت رم میدهید، این مقدار به شکل زیر تقسیم میشود:
حافظه Executor (1G) =
├── حافظه اجرا (Execution Memory - 60%): 600MB - برای محاسبات (مثل join, sort)
├── حافظه ذخیرهسازی (Storage Memory - 40%): 400MB - برای کش کردن دادهها (cache, persist)
└── سربار (Overhead - ~10%): ~100MB - برای سربار JVM و سایر فرآیندها
نکته مهم: این نسبت ۶۰/۴۰ پیشفرض است و با پارامتر spark.memory.fraction قابل تغییر است.
برای درک بهتر رقابت بر سر منابع، بیایید ۴ برنامه با پیکربندیهای مختلف را همزمان اجرا کنیم.
این فایلها را ایجاد کرده و در ترمینالهای جداگانه اجرا کنید (با دستور spark-submit app1_light.py).
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder \
.appName('App1-Sabik') \
.master('spark://localhost:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '512M') \
.config('spark.executor.cores', '1') \
.config('spark.cores.max', '1') \
.getOrCreate()
print(f"\n{'='*60}")
print(f"اپلیکیشن سبک (App1) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4040")
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")
# یک کار ساده
rdd = spark.sparkContext.parallelize(range(1000000), 10)
result = rdd.map(lambda x: x * 2).reduce(lambda a, b: a + b)
print(f"نتیجه: {result}")
print("برنامه به مدت ۱۲۰ ثانیه در حالت خواب قرار میگیرد... به Spark UI مراجعه کنید!")
time.sleep(120)
spark.stop()
print("App1 متوقف شد")
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder \
.appName('App2-Motavaset') \
.master('spark://localhost:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '512M') \
.config('spark.executor.cores', '1') \
.config('spark.cores.max', '2') \
.getOrCreate()
print(f"\n{'='*60}")
print(f"اپلیکیشن متوسط (App2) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4041") # پورت تغییر میکند!
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")
# یک کار متوسط
rdd = spark.sparkContext.parallelize(range(5000000), 20)
result = rdd.filter(lambda x: x % 2 == 0).count()
print(f"نتیجه: {result}")
print("برنامه به مدت ۱۸۰ ثانیه در حالت خواب قرار میگیرد... به Spark UI مراجعه کنید!")
time.sleep(180)
spark.stop()
print("App2 متوقف شد")
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder \
.appName('App3-Sangin') \
.master('spark://localhost:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '1G') \
.config('spark.executor.cores', '2') \
.config('spark.cores.max', '4') \
.getOrCreate()
print(f"\n{'='*60}")
print(f"اپلیکیشن سنگین (App3) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4042") # پورت تغییر میکند!
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")
# یک کار سنگین
rdd = spark.sparkContext.parallelize(range(10000000), 40)
result = rdd.map(lambda x: x ** 2).reduce(lambda a, b: a + b)
print(f"نتیجه: {result}")
print("برنامه به مدت ۱۲۰ ثانیه در حالت خواب قرار میگیرد... به Spark UI مراجعه کنید!")
time.sleep(120)
spark.stop()
print("App3 متوقف شد")
چه چیزی را باید مشاهده کنید؟
app3_heavy را اجرا میکنید، مشاهده کنید که چگونه اجرای app1_light و app2_medium کند میشود یا در صف منتظر میمانند.spark.executor.memory را افزایش دهید یا با افزایش تعداد پارتیشنها (مثلاً با repartition) حجم داده هر پارتیشن را کاهش دهید.spark.cores.max برای محدود کردن منابع هر برنامه استفاده کنید و از Queue Managers (مثل Fair Scheduler در YARN) برای مدیریت عادلانه منابع بین چندین کاربر بهره ببرید.در بخش اول از کارگاه عملی امروز، ابتدا موضوع تخصیص منابع به برنامه های اسپارک را بر اساس توضیحات بالا به صورت عملی انجام دادهایم و همزمان با اجرای چند برنامه اسپارک، نحوه تخصیص منابع و برنامه های در حال اجرا و همچنین برنامههای منتظر تخصیص منابع و همچنین تعداد کل تسک ها و نحوه اجرای تسک ها به ازای هر Executor را در Spark UI متناظر با درایور هر برنامه، به صورت عملی بررسی کردیم. این فیلم آموزشی در قسمت زیر قابل مشاهده است.
نکته : اگر احیانا موفق به مشاهده فیلم نشدهاید، مطمئن شوید که با آی پی ایران متصل شدهاید.