بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

نگاهی عملی به تخصیص منابع به برنامه‌های اسپارک – محتوای ویدئویی

پایه‌ای برای مدیریت بهینه منابع و کارایی بالا در پردازش‌های توزیع‌شده

مقدمه: چرا قبل از RDD، به معماری نیاز داریم؟

قبل از آنکه به دنیای جذاب RDDها (Resilient Distributed Datasets) و توابع قدرتمند آن‌ها شیرجه بزنیم، باید یک سوال اساسی را پاسخ دهیم: “کدهای ما دقیقاً کجا و چگونه اجرا می‌شوند؟“.

تصور کنید می‌خواهید یک آسمان‌خراش بسازید (پردازش داده‌های بزرگ). RDDها آجرها و سیمان‌های شما هستند، اما اگر از فونداسیون و سازه ساختمان (معماری و مدیریت منابع اسپارک) چیزی ندانید، بهترین آجرها هم نمی‌توانند ساختمانی محکم بسازند.

در این جلسه، یاد می‌گیریم که چگونه به برنامه خود “بودجه” (رم و CPU) تخصیص دهیم و می‌بینیم که این بودجه‌بندی چگونه مستقیماً بر سرعت، پایداری و کارایی کارهای بعدی ما تأثیر می‌گذارد. این دانش، به شما کمک می‌کند دید عمیق‌تری نسبت به پشت صحنه اجرای برنامه‌ها و نحوه اجرای یک برنامه و تقسیم آن بین چند نود پیدا کنید.


۱. مروری بر معماری اسپارک

تصویر کلی

بیایید با یک تصویر ساده شروع کنیم. فرض کنید کلاستری با دو تا نود Worker و یک نود مستر داریم و به هر نود ورکر، دو گیگابایت رم و دو تا هسته سی پی یو اختصاص داده ایم. ورکرها در دنیای واقعی، نودهای اصلی اجرا کننده تسک ها در اسپارک هستند که Executor های هر برنامه، روی آنها ایجاد شده و منابع لازم را برای اجرای برنامه از ورکر دریافت کرده و بعد از اجرای تسک‌ها، مجددا منابع را به ورکر بر می گردانند.

حال فرض کنید یک برنامه ساده پایتون را به کمک PySpark می خواهیم در این کلاستر اجرا کنیم. با اجرای این برنامه ، اجزای زیر را خواهیم داشت :

اجزای کلیدی
  1. برنامه Driver (The Conductor)
    • نقش: این برنامه، “مغز متفکر” یا “مدیر ارکستر” عملیات شماست. جایی که تابع main() شما اجرا می‌شود.
    • وظایف:
      • SparkContext را ایجاد می‌کند (نقطه ورود به اسپارک).
      • وظایف (Tasks) را برای Executorها برنامه‌ریزی و تقسیم می‌کند.
      • وضعیت Executorها را نظارت می‌کند.
      • در یک فرآیند JVM جداگانه اجرا می‌شود.
    • تشبیه: Driver مانند سرآشپز یک رستوران است که دستور پخت را می‌داند و به آشپزها (Executorها) می‌گوید چه کاری انجام دهند، اما خودش مستقیماً در آشپزخانه پخت‌وپز نمی‌کند.
  2. مدیر خوشه (Cluster Manager)
    • نقش: “منبع‌بان” یا “تخصیص‌دهنده منابع” خوشه است.
    • وظایف:
      • منابع (رم و CPU) را به برنامه‌های مختلف اختصاص می‌دهد.
      • گره‌های کارگر (Worker Nodes) را مدیریت می‌کند.
    • انواع: Standalone، YARN، Mesos، Kubernetes. در حال حاضر ما از حالت Standalone استفاده می‌کنیم و این وظیفه را در این حالت، نود مستر بر عهده دارد.
  3. گره‌های کارگر (Worker Nodes)
    • نقش: این‌ها ماشین‌های فیزیکی یا مجازی هستند که کار واقعی روی آن‌ها انجام می‌شود.
    • وظایف:
      • میزبان فرآیندهای Executor هستند.
      • در تنظیمات ما: ۲ کارگر، هر کدام با ۲ گیگابایت رم و ۲ هسته پردازنده.
  4. Executorها (The Workers)
    • نقش: این‌ها “کارگران ماهر” در کارخانه ما هستند. آن‌ها وظایف محاسباتی را واقعاً اجرا می‌کنند.
    • وظایف:
      • فرآیندهای JVM هستند که روی گره‌های کارگر اجرا می‌شوند.
      • وظایف (Tasks) را که از Driver دریافت می‌کنند، اجرا می‌کنند.
      • داده‌ها را در حافظه (RAM) یا دیسک برای دسترسی سریع ذخیره می‌کنند (Caching).
      • هر Executor مقدار مشخصی از هسته‌های CPU و رم را در اختیار دارد.
    • نکته کلیدی: تخصیص رم و CPU به ازای هر Executor، موضوع اصلی این جلسه است و تعیین می‌کند که هر “کارگر” چقدر قدرتمند باشد.

۲. درک منابع خوشه شما

پیکربندی خوشه شما

فرض کنید خوشه شما به شکل زیر است:

کل منابع خوشه:
├── کارگر ۱: ۲ گیگابایت رم، ۲ هسته پردازنده
└── کارگر ۲: ۲ گیگابایت رم، ۲ هسته پردازنده

کل منابع قابل دسترس:
├── حافظه (RAM): ۴ گیگابایت
└── هسته (CPU): ۴ هسته

۳. پیکربندی منابع در اسپارک

حالا با چند مثال عملی می‌بینیم که چگونه این تنظیمات را در کد اعمال کنیم و تأثیر آن‌ها چیست.

پیکربندی ۱: برنامه کوچک
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('App1-Config-Koochak') \
    .master('spark://localhost:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '512M') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '2') \
    .getOrCreate()

print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4040")

این تنظیمات یعنی چه:

  • spark.driver.memory: به Driver برنامه ۵۱۲ مگابایت رم اختصاص می‌دهد.
  • spark.executor.memory: به هر Executor ۵۱۲ مگابایت رم می‌دهد.
  • spark.executor.cores: هر Executor فقط از ۱ هسته CPU استفاده می‌کند.
  • spark.cores.max: کل برنامه از حداکثر ۲ هسته استفاده می‌کند (یعنی ۲ Executor).
  • نتیجه: این برنامه می‌تواند حداکثر ۲ وظیفه (Task) را به صورت موازی اجرا کند. یک پیکربندی محتاطانه و کم‌مصرف.
پیکربندی ۲: برنامه متوسط
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('App2-Config-Motavaset') \
    .master('spark://Mojtaba.localdomain:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '1G') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '2') \
    .config('spark.executor.instances', '2') \
    .getOrCreate()

print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4041")

این تنظیمات یعنی چه:

  • spark.executor.memory: هر Executor حالا ۱ گیگابایت رم دارد (۲ Executor = ۲ گیگابایت در مجموع).
  • spark.executor.instances: ما به صراحت درخواست ۲ Executor کرده‌ایم.
  • spark.cores.max: کل برنامه از حداکثر ۲ هسته استفاده می‌کند (از کل خوشه استفاده می‌کند).
  • نتیجه: این برنامه از تمام هسته‌های خوشه استفاده می‌کند اما با حافظه بیشتری برای هر Executor. برای کارهای متوسط مناسب است.
پیکربندی ۳: برنامه سنگین
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('App3-Config-Parkhatar') \
    .master('spark://Mojtaba.localdomain:7077') \
    .config('spark.driver.memory', '1G') \
    .config('spark.executor.memory', '1G') \
    .config('spark.executor.cores', '2') \
    .config('spark.cores.max', '4') \
    .getOrCreate()

print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4042")

این تنظیمات یعنی چه:

  • spark.executor.cores: هر Executor از ۲ هسته استفاده می‌کند.
  • spark.cores.max: برنامه تلاش می‌کند از تمام ۴ هسته خوشه استفاده کند.
  • نتیجه: این برنامه تمام منابع خوشه را به خود اختصاص می‌دهد و از اجرای همزمان برنامه‌های دیگر جلوگیری می‌کند. اگر حافظه کافی وجود نداشته باشد (مثلاً ۲ Executor با ۱ گیگابایت رم روی خوشه ۴ گیگابایتی نگیرد)، برنامه ممکن است با خطا متوقف شود یا کارایی بسیار پایینی داشته باشد.

۴. پارامترهای کلیدی پیکربندی

پیکربندی‌های ضروری
پارامترتوضیحاتمثالتأثیر
spark.driver.memoryحافظه برای برنامه Driver512M, 1Gعملیات مربوط به مدیریت و نظارت
spark.executor.memoryحافظه به ازای هر Executor512M, 1Gحافظه در دسترس برای اجرای وظایف
spark.executor.coresتعداد هسته‌های CPU به ازای هر Executor۱, ۲, ۴تعداد وظایف موازی در هر Executor
spark.cores.maxکل هسته‌های قابل استفاده توسط برنامه۲, ۴محدودیت منابع CPU برای کل برنامه
spark.executor.instancesتعداد Executorهای ثابت۲, ۴سطح موازیسازی کلی برنامه
spark.dynamicAllocation.enabledفعال‌سازی تخصیص دینامیک Executorهاtrue, falseانعطاف‌پذیری در مصرف منابع

تخصیص دینامیک منابع (Dynamic Allocation):
این قابلیت به اسپارک اجازه می‌دهد تا بر اساس بار کاری، به صورت خودکار تعداد Executorها را افزایش یا کاهش دهد. این کار باعث بهینه‌سازی مصرف منابع و کاهش هزینه‌ها می‌شود.

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=20
تجزیه حافظه Executor

وقتی به یک Executor ۱ گیگابایت رم می‌دهید، این مقدار به شکل زیر تقسیم می‌شود:

حافظه Executor (1G) =
├── حافظه اجرا (Execution Memory - 60%): 600MB - برای محاسبات (مثل join, sort)
├── حافظه ذخیره‌سازی (Storage Memory - 40%): 400MB - برای کش کردن داده‌ها (cache, persist)
└── سربار (Overhead - ~10%): ~100MB - برای سربار JVM و سایر فرآیندها

نکته مهم: این نسبت ۶۰/۴۰ پیش‌فرض است و با پارامتر spark.memory.fraction قابل تغییر است.


۵. کارگاه عملی ۱: اجرای چندین برنامه به صورت همزمان

برای درک بهتر رقابت بر سر منابع، بیایید ۴ برنامه با پیکربندی‌های مختلف را همزمان اجرا کنیم.

راه‌اندازی: ایجاد ۴ برنامه متفاوت

این فایل‌ها را ایجاد کرده و در ترمینال‌های جداگانه اجرا کنید (با دستور spark-submit app1_light.py).

app1_light.py (برنامه سبک)
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName('App1-Sabik') \
    .master('spark://localhost:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '512M') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .getOrCreate()

print(f"\n{'='*60}")
print(f"اپلیکیشن سبک (App1) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4040")
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")

# یک کار ساده
rdd = spark.sparkContext.parallelize(range(1000000), 10)
result = rdd.map(lambda x: x * 2).reduce(lambda a, b: a + b)

print(f"نتیجه: {result}")
print("برنامه به مدت ۱۲۰ ثانیه در حالت خواب قرار می‌گیرد... به Spark UI مراجعه کنید!")

time.sleep(120)

spark.stop()
print("App1 متوقف شد")
app2_medium.py (برنامه متوسط)
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName('App2-Motavaset') \
    .master('spark://localhost:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '512M') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '2') \
    .getOrCreate()

print(f"\n{'='*60}")
print(f"اپلیکیشن متوسط (App2) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4041")  # پورت تغییر می‌کند!
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")

# یک کار متوسط
rdd = spark.sparkContext.parallelize(range(5000000), 20)
result = rdd.filter(lambda x: x % 2 == 0).count()

print(f"نتیجه: {result}")
print("برنامه به مدت ۱۸۰ ثانیه در حالت خواب قرار می‌گیرد... به Spark UI مراجعه کنید!")

time.sleep(180)

spark.stop()
print("App2 متوقف شد")
app3_heavy.py (برنامه سنگین)
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder \
    .appName('App3-Sangin') \
    .master('spark://localhost:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '1G') \
    .config('spark.executor.cores', '2') \
    .config('spark.cores.max', '4') \
    .getOrCreate()

print(f"\n{'='*60}")
print(f"اپلیکیشن سنگین (App3) شروع به کار کرد")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Spark UI: http://localhost:4042")  # پورت تغییر می‌کند!
print(f"Master UI: http://localhost:8080")
print(f"{'='*60}\n")

# یک کار سنگین
rdd = spark.sparkContext.parallelize(range(10000000), 40)
result = rdd.map(lambda x: x ** 2).reduce(lambda a, b: a + b)

print(f"نتیجه: {result}")
print("برنامه به مدت ۱۲۰ ثانیه در حالت خواب قرار می‌گیرد... به Spark UI مراجعه کنید!")

time.sleep(120)

spark.stop()
print("App3 متوقف شد")

چه چیزی را باید مشاهده کنید؟

  1. Master UI (http://localhost:8080): ببینید که چگونه برنامه‌ها در صف قرار می‌گیرند و منابع به آن‌ها اختصاص داده می‌شود.
  2. Spark UI هر برنامه (پورت‌های ۴۰۴۰, ۴۰۴۱, ۴۰۴۲): به تب “Executors” بروید و تعداد Executorها، حافظه و هسته‌های تخصیص‌ داده شده به هر برنامه را بررسی کنید.
  3. رفتار برنامه‌ها: وقتی app3_heavy را اجرا می‌کنید، مشاهده کنید که چگونه اجرای app1_light و app2_medium کند می‌شود یا در صف منتظر می‌مانند.

۶. بهترین شیوه‌ها و توصیه‌های عملی

  1. اندازه Executor: یک قانون کلی خوب، تخصیص ۵ هسته به ازای هر Executor است. Executorهای خیلی بزرگ (مثلاً با بیش از ۶ هسته) می‌توانند باعث “سربار GC” (Garbage Collection) شوند. Executorهای خیلی کوچک هم سربار ارتباطی را افزایش می‌دهند.
  2. تخصیص حافظه: همیشه ۱۰-۱۵ درصد از حافظه کل هر ورکر را برای سربار JVM در نظر بگیرید. مثلاً اگر یک نود ورکر ۱۶ گیگابایت رم دارد، حدود ۱۴-۱۵ گیگابایت آن را به اسپارک بدهید.
  3. تعداد Executorها: تعداد Executorها را بر اساس تعداد هسته‌ها تنظیم کنید. مثلاً در یک خوشه با ۱۰۰ هسته، اگر هر Executor ۵ هسته بگیرد، می‌توانید ۲۰ Executor داشته باشید.
  4. از تخصیص دینامیک استفاده کنید: مگر اینکه دلیل بسیار مشخصی برای ثابت نگه داشتن تعداد Executorها دارید، Dynamic Allocation را فعال کنید. این کار باعث استفاده بهینه از منابع در محیط‌های اشتراکی می‌شود.

۷. تله‌های رایج و نحوه اجتناب از آنها

  1. خطای OutOfMemoryError:
    • علت: تخصیص حافظه ناکافی به Executorها یا پارتیشن‌بندی‌های بسیار بزرگ داده.
    • راه‌حل: مقدار spark.executor.memory را افزایش دهید یا با افزایش تعداد پارتیشن‌ها (مثلاً با repartition) حجم داده هر پارتیشن را کاهش دهید.
  2. گرسنگی منابع (Resource Starvation):
    • علت: یک برنامه تمام منابع خوشه را اشغال می‌کند (مانند پیکربندی ۳).
    • راه‌حل: از spark.cores.max برای محدود کردن منابع هر برنامه استفاده کنید و از Queue Managers (مثل Fair Scheduler در YARN) برای مدیریت عادلانه منابع بین چندین کاربر بهره ببرید.
  3. استفاده ناکافی از منابع (Under-utilization):
    • علت: تخصیص هسته یا حافظه خیلی کم، باعث می‌شود برنامه‌ها به کندی اجرا شوند و منابع خوشه بلا استفاده بمانند.
    • راه‌حل: در Spark UI به تب “Executors” نگاه کنید. اگر هسته‌ها یا حافظه‌ها در حالت idle (بیکار) زیاد هستند، پیکربندی خود را بازبینی کنید.

محتوای ویدئویی

در بخش اول از کارگاه عملی امروز، ابتدا موضوع تخصیص منابع به برنامه های اسپارک را بر اساس توضیحات بالا به صورت عملی انجام داده‌ایم و همزمان با اجرای چند برنامه اسپارک، نحوه تخصیص منابع و برنامه های در حال اجرا و همچنین برنامه‌های منتظر تخصیص منابع و همچنین تعداد کل تسک ها و نحوه اجرای تسک ها به ازای هر Executor را در Spark UI متناظر با درایور هر برنامه، به صورت عملی بررسی کردیم. این فیلم آموزشی در قسمت زیر قابل مشاهده است.

نکته : اگر احیانا موفق به مشاهده فیلم نشده‌اید، مطمئن شوید که با آی پی ایران متصل شد‌ه‌اید.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید