بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

کارگاه عملی: بهینه‌سازی RDD و توصیه‌های کاربردی – محتوای ویدئویی


۱. Persistence در RDD: متدهای cache() و persist()

چرا باید RDD را persist کنیم؟

به طور پیش‌فرض، Spark یک RDD را هر بار که یک action (مانند count() یا sum()) فراخوانی می‌شود، از ابتدا محاسبه می‌کند. اگر همان RDD چندین بار در actionهای مختلف استفاده شود، این محاسبه تکراری زمان و منابع زیادی هدر می‌دهد.

مثال بدون persistence (محاسبه دو بار انجام می‌شود):

data_rdd = spark.sparkContext.parallelize(range(1, 1000000))
filtered = data_rdd.filter(lambda x: x % 2 == 0)

count1 = filtered.count()   # محاسبه از صفر
sum1 = filtered.sum()       # دوباره محاسبه از صفر!

راه‌حل: با cache() یا persist()، نتیجه محاسبه را در حافظه (یا دیسک) نگه دارید.

مثال با cache():

filtered = data_rdd.filter(lambda x: x % 2 == 0)
filtered.cache()            # یا filtered.persist()

count1 = filtered.count()   # محاسبه و ذخیره در حافظه
sum1 = filtered.sum()       # از حافظه خوانده می‌شود → خیلی سریع‌تر!

filtered.unpersist()        # در پایان کار، حافظه را آزاد کنید

تفاوت cache() و persist()

  • rdd.cache() → کوتاه‌شده و معادل persist(StorageLevel.MEMORY_ONLY) است (فقط در حافظه RAM).
  • rdd.persist(StorageLevel.X) → امکان انتخاب سطح ذخیره‌سازی دلخواه را می‌دهد (مثلاً حافظه + دیسک).
from pyspark import StorageLevel

rdd.persist(StorageLevel.MEMORY_AND_DISK)  # توصیه‌شده برای داده‌های بزرگ

۲. سطوح ذخیره‌سازی (Storage Levels)

سطح ذخیره‌سازیتوضیح فارسیموارد استفاده پیشنهادینقاط ضعف/قوت
MEMORY_ONLYفقط در حافظه RAM (پیش‌فرض cache)داده کوچک که کاملاً در RAM جا می‌شودسریع‌ترین، اما اگر حافظه پر شود داده حذف می‌شود
MEMORY_ONLY_2همان بالا + دو کپی (replicated)وقتی تحمل خطا مهم استحافظه دو برابر مصرف می‌کند
MEMORY_AND_DISKابتدا حافظه، سرریز به دیسکتوصیه اصلی برای داده‌های بزرگ و پراستفادهتعادل خوب سرعت و ایمنی
MEMORY_AND_DISK_2همان بالا + دو کپیداده بزرگ + نیاز به تحمل خطا بالامصرف حافظه و دیسک بیشتر
DISK_ONLYفقط روی دیسکداده خیلی بزرگ که استفاده نادر داردکندترین، اما همیشه در دسترس
DISK_ONLY_2همان بالا + دو کپیداده خیلی بزرگ + تحمل خطاکند، اما ایمن
OFF_HEAPحافظه خارج از heap JVM (مثل Tachyon)وقتی مشکل Garbage Collection داریدپیچیده‌تر، اما GC کمتر

نکته عملی: در بیشتر پروژه‌های واقعی، MEMORY_AND_DISK بهترین انتخاب است چون اگر RAM پر شود، به طور خودکار به دیسک سرریز می‌کند.


۳. کی باید RDD را persist کنیم؟

باید persist کنید وقتی:

  • RDD چندین بار در actionهای مختلف استفاده می‌شود
  • محاسبه آن گران است (مثلاً شامل عملیات پیچیده map/filter یا shuffle)
  • نتیجه یک عملیات shuffle است (shuffle گران است)
  • در الگوریتم‌های تکراری (مثل machine learning یا graph processing)

مثال خوب – الگوریتم تکراری:

data = spark.sparkContext.parallelize([(1,2), (2,3), (3,4)])
data.cache()  # چون ۱۰۰ بار استفاده می‌شود

for iteration in range(100):
    gradient = data.map(compute_gradient).reduce(sum_gradients)
    update_model(gradient)

مثال بد – cache غیرضروری:

data = spark.sparkContext.parallelize(range(100))
data.cache()      # بیش از حد! محاسبه آن خیلی ارزان است
result = data.sum()

۴. نظارت و دیباگ RDDهای cached

بررسی وضعیت در کد:

rdd.cache()
rdd.count()  # برای فعال شدن cache

print(f"سطح ذخیره‌سازی: {rdd.getStorageLevel()}")
print(f"آیا cached است؟ {rdd.is_cached}")

در Spark UI:

  • تب Storage: لیست RDDهای cached، درصد ذخیره‌شده، حجم حافظه مصرفی
  • تب Executors: مصرف حافظه هر executor

نام‌گذاری RDD برای دیباگ راحت‌تر:

rdd = spark.sparkContext.parallelize(range(1000))
rdd.setName("RDD مهم من")
rdd.cache()
rdd.count()

print(f"نام RDD: {rdd.name()}")  # در Spark UI با همین نام ظاهر می‌شود

۵. درک lineage با toDebugString()

متد rdd.toDebugString() شجره‌نامه (lineage) RDD را نشان می‌دهد و کمک می‌کند بفهمید برنامه اجرایی چطور ساخته شده.

مثال ساده:

data = spark.sparkContext.parallelize(range(100), 4)
filtered = data.filter(lambda x: x % 2 == 0)
mapped = filtered.map(lambda x: (x, x**2))
reduced = mapped.reduceByKey(lambda a, b: a + b)

print(reduced.toDebugString().decode('utf-8'))

نکات کلیدی در خروجی:

  • (۴) → تعداد پارتیشن‌ها
  • | → وابستگی narrow (بدون shuffle → سریع، pipeline می‌شود)
  • +- → وابستگی wide (شامل shuffle → مرز stage جدید)
  • ShuffledRDD → محل shuffle (گران‌ترین بخش، شامل انتقال شبکه و دیسک)

چرا shuffle مهم است؟
shuffle باعث می‌شود داده بین executorها جابه‌جا شود → شبکه و دیسک درگیر می‌شوند → بزرگ‌ترین bottleneck عملکردی.

مقایسه مهم: reduceByKey خیلی بهتر از groupByKey است چون قبل از shuffle، aggregation محلی انجام می‌دهد و حجم داده انتقالی را به شدت کاهش می‌دهد.

# reduceByKey – کارآمد (combiner محلی دارد)
pairs.reduceByKey(lambda a, b: a + b)

# groupByKey – ناکارآمد (همه مقادیر خام را shuffle می‌کند)
pairs.groupByKey().mapValues(sum)

۶. تکنیک‌های بهینه‌سازی عملکرد

۱. فیلتر کردن قبل از محاسبات (Filter Early)
# بد: عملیات گران اول، سپس فیلتر
result = data.map(expensive_operation).filter(lambda x: x > 100).collect()

# خوب: فیلتر اول، سپس عملیات گران
result = data.filter(lambda x: x > 100).map(expensive_operation).collect()
۲. کاهش تعداد shuffleها

مثال بد (چندین shuffle جداگانه):

bad = pairs.groupByKey()\
           .mapValues(sum)\
           .filter(lambda x: x[1] > 1000)\
           .sortByKey()  # حداقل ۳ stage و چند shuffle

مثال خوب (حداقل shuffle):

good = pairs.reduceByKey(lambda a, b: a + b)\
            .filter(lambda x: x[1] > 1000)\
            .sortByKey()  # فقط ۲ stage
۳. استفاده از Broadcast Variables (برای جداول کوچک lookup)
# بدون broadcast – بد (کپی در هر task)
bad = orders_rdd.map(lambda o: (o, product_catalog[o.product_id]))

# با broadcast – خوب (یک کپی در هر executor)
bc_products = spark.sparkContext.broadcast(product_catalog)
good = orders_rdd.map(lambda o: (o, bc_products.value[o.product_id]))
۴. پارتیشن‌بندی مناسب
# تعداد پارتیشن مناسب: ۲ تا ۴ برابر تعداد هسته‌ها
num_cores = sc.defaultParallelism
data = spark.sparkContext.parallelize(data_list, num_cores * 3)

# پس از فیلتر سنگین که داده را خیلی کم می‌کند:
filtered.coalesce(10)  # کاهش پارتیشن‌های خالی
۵. اجتناب از collect() روی داده بزرگ
# بد
all_data = large_rdd.collect()  # ممکن است OOM بدهد

# خوب
sample = large_rdd.take(100)
count = large_rdd.count()

۷. الگوهای اشتباه رایج (Anti-Patterns)

  • collect() روی RDD بزرگ
  • استفاده از groupByKey به جای reduceByKey
  • ایجاد تعداد زیاد RDD کوچک در حلقه
  • shuffle غیرضروری (مثل sortBy وقتی ترتیب مهم نیست)
  • cache کردن همه چیز بدون دلیل

۸. چند توصیه کاربردی

# مثال بهینه‌شده شمارش کلمات
text = spark.sparkContext.textFile("large_file.txt")

word_counts = text.flatMap(lambda line: line.split()) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a + b) \
                  .cache()  # چندین بار استفاده می‌شود

word_counts.setName("Word Counts")  # برای Spark UI

total_words = word_counts.map(lambda x: x[1]).sum()
unique_words = word_counts.count()
top_10 = word_counts.top(10, key=lambda x: x[1])

word_counts.unpersist()  # آزاد کردن حافظه

۹. انتقال به DataFrames

مزایای DataFrames:

  • بهینه‌سازی خودکار توسط Catalyst Optimizer
  • اجرای سریع‌تر با Tungsten
  • کد تمیزتر و عملیات SQL-like

کی از RDD استفاده کنیم؟
فقط وقتی نیاز به کنترل خیلی دقیق یا منطق پیچیده کاملاً سفارشی دارید، یا با داده کاملاً unstructured کار می‌کنید.

برای پروژه‌های جدید → مستقیماً از DataFrames یا Spark SQL شروع کنید.


نکات کلیدی نهایی

  • RDDهای پراستفاده و آنهایی که هزینه بازتولید آنها بالاست را cache کنید
  • برای بیشتر موارد سطح MEMORY_AND_DISK را انتخاب کنید
  • تعداد shuffleها را به حداقل برسانید (مثلا reduceByKey > groupByKey)
  • همیشه Spark UI و toDebugString() را برای دیباگ چک کنید
  • پارتیشن‌ها را منطقی تنظیم کنید و بعد از فیلتر coalesce کنید
  • از broadcast variables برای داده‌های کوچک مشترک استفاده کنید

تبریک! حالا درک عمیقی از بهینه‌سازی RDD دارید 🎉
آماده‌اید به سراغ Spark SQL و DataFrames بروید که عملکرد بهتری دارند و کد کمتری نیاز دارند.

محتوای ویدئویی

آخرین کارگاه عملی بخش RDD که بیان عملی موارد فوق است را در قسمت زیر می‌توانید مشاهده کنید.

نکته : اگر احیانا فیلم آموزشی برای شما نمایش داده نمی‌شود، مطمئن شوید که با آی‌پی ایران در حال مشاهده این صفحه هستید.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید