به طور پیشفرض، Spark یک RDD را هر بار که یک action (مانند count() یا sum()) فراخوانی میشود، از ابتدا محاسبه میکند. اگر همان RDD چندین بار در actionهای مختلف استفاده شود، این محاسبه تکراری زمان و منابع زیادی هدر میدهد.
مثال بدون persistence (محاسبه دو بار انجام میشود):
data_rdd = spark.sparkContext.parallelize(range(1, 1000000))
filtered = data_rdd.filter(lambda x: x % 2 == 0)
count1 = filtered.count() # محاسبه از صفر
sum1 = filtered.sum() # دوباره محاسبه از صفر!
راهحل: با cache() یا persist()، نتیجه محاسبه را در حافظه (یا دیسک) نگه دارید.
مثال با cache():
filtered = data_rdd.filter(lambda x: x % 2 == 0)
filtered.cache() # یا filtered.persist()
count1 = filtered.count() # محاسبه و ذخیره در حافظه
sum1 = filtered.sum() # از حافظه خوانده میشود → خیلی سریعتر!
filtered.unpersist() # در پایان کار، حافظه را آزاد کنید
rdd.cache() → کوتاهشده و معادل persist(StorageLevel.MEMORY_ONLY) است (فقط در حافظه RAM).rdd.persist(StorageLevel.X) → امکان انتخاب سطح ذخیرهسازی دلخواه را میدهد (مثلاً حافظه + دیسک).from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK) # توصیهشده برای دادههای بزرگ
| سطح ذخیرهسازی | توضیح فارسی | موارد استفاده پیشنهادی | نقاط ضعف/قوت |
|---|---|---|---|
MEMORY_ONLY | فقط در حافظه RAM (پیشفرض cache) | داده کوچک که کاملاً در RAM جا میشود | سریعترین، اما اگر حافظه پر شود داده حذف میشود |
MEMORY_ONLY_2 | همان بالا + دو کپی (replicated) | وقتی تحمل خطا مهم است | حافظه دو برابر مصرف میکند |
MEMORY_AND_DISK | ابتدا حافظه، سرریز به دیسک | توصیه اصلی برای دادههای بزرگ و پراستفاده | تعادل خوب سرعت و ایمنی |
MEMORY_AND_DISK_2 | همان بالا + دو کپی | داده بزرگ + نیاز به تحمل خطا بالا | مصرف حافظه و دیسک بیشتر |
DISK_ONLY | فقط روی دیسک | داده خیلی بزرگ که استفاده نادر دارد | کندترین، اما همیشه در دسترس |
DISK_ONLY_2 | همان بالا + دو کپی | داده خیلی بزرگ + تحمل خطا | کند، اما ایمن |
OFF_HEAP | حافظه خارج از heap JVM (مثل Tachyon) | وقتی مشکل Garbage Collection دارید | پیچیدهتر، اما GC کمتر |
نکته عملی: در بیشتر پروژههای واقعی، MEMORY_AND_DISK بهترین انتخاب است چون اگر RAM پر شود، به طور خودکار به دیسک سرریز میکند.
باید persist کنید وقتی:
مثال خوب – الگوریتم تکراری:
data = spark.sparkContext.parallelize([(1,2), (2,3), (3,4)])
data.cache() # چون ۱۰۰ بار استفاده میشود
for iteration in range(100):
gradient = data.map(compute_gradient).reduce(sum_gradients)
update_model(gradient)
مثال بد – cache غیرضروری:
data = spark.sparkContext.parallelize(range(100))
data.cache() # بیش از حد! محاسبه آن خیلی ارزان است
result = data.sum()
بررسی وضعیت در کد:
rdd.cache()
rdd.count() # برای فعال شدن cache
print(f"سطح ذخیرهسازی: {rdd.getStorageLevel()}")
print(f"آیا cached است؟ {rdd.is_cached}")
در Spark UI:
نامگذاری RDD برای دیباگ راحتتر:
rdd = spark.sparkContext.parallelize(range(1000))
rdd.setName("RDD مهم من")
rdd.cache()
rdd.count()
print(f"نام RDD: {rdd.name()}") # در Spark UI با همین نام ظاهر میشود
متد rdd.toDebugString() شجرهنامه (lineage) RDD را نشان میدهد و کمک میکند بفهمید برنامه اجرایی چطور ساخته شده.
مثال ساده:
data = spark.sparkContext.parallelize(range(100), 4)
filtered = data.filter(lambda x: x % 2 == 0)
mapped = filtered.map(lambda x: (x, x**2))
reduced = mapped.reduceByKey(lambda a, b: a + b)
print(reduced.toDebugString().decode('utf-8'))
نکات کلیدی در خروجی:
(۴) → تعداد پارتیشنها| → وابستگی narrow (بدون shuffle → سریع، pipeline میشود)+- → وابستگی wide (شامل shuffle → مرز stage جدید)ShuffledRDD → محل shuffle (گرانترین بخش، شامل انتقال شبکه و دیسک)چرا shuffle مهم است؟
shuffle باعث میشود داده بین executorها جابهجا شود → شبکه و دیسک درگیر میشوند → بزرگترین bottleneck عملکردی.
مقایسه مهم: reduceByKey خیلی بهتر از groupByKey است چون قبل از shuffle، aggregation محلی انجام میدهد و حجم داده انتقالی را به شدت کاهش میدهد.
# reduceByKey – کارآمد (combiner محلی دارد)
pairs.reduceByKey(lambda a, b: a + b)
# groupByKey – ناکارآمد (همه مقادیر خام را shuffle میکند)
pairs.groupByKey().mapValues(sum)
# بد: عملیات گران اول، سپس فیلتر
result = data.map(expensive_operation).filter(lambda x: x > 100).collect()
# خوب: فیلتر اول، سپس عملیات گران
result = data.filter(lambda x: x > 100).map(expensive_operation).collect()
مثال بد (چندین shuffle جداگانه):
bad = pairs.groupByKey()\
.mapValues(sum)\
.filter(lambda x: x[1] > 1000)\
.sortByKey() # حداقل ۳ stage و چند shuffle
مثال خوب (حداقل shuffle):
good = pairs.reduceByKey(lambda a, b: a + b)\
.filter(lambda x: x[1] > 1000)\
.sortByKey() # فقط ۲ stage
# بدون broadcast – بد (کپی در هر task)
bad = orders_rdd.map(lambda o: (o, product_catalog[o.product_id]))
# با broadcast – خوب (یک کپی در هر executor)
bc_products = spark.sparkContext.broadcast(product_catalog)
good = orders_rdd.map(lambda o: (o, bc_products.value[o.product_id]))
# تعداد پارتیشن مناسب: ۲ تا ۴ برابر تعداد هستهها
num_cores = sc.defaultParallelism
data = spark.sparkContext.parallelize(data_list, num_cores * 3)
# پس از فیلتر سنگین که داده را خیلی کم میکند:
filtered.coalesce(10) # کاهش پارتیشنهای خالی
# بد
all_data = large_rdd.collect() # ممکن است OOM بدهد
# خوب
sample = large_rdd.take(100)
count = large_rdd.count()
collect() روی RDD بزرگgroupByKey به جای reduceByKeysortBy وقتی ترتیب مهم نیست)# مثال بهینهشده شمارش کلمات
text = spark.sparkContext.textFile("large_file.txt")
word_counts = text.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.cache() # چندین بار استفاده میشود
word_counts.setName("Word Counts") # برای Spark UI
total_words = word_counts.map(lambda x: x[1]).sum()
unique_words = word_counts.count()
top_10 = word_counts.top(10, key=lambda x: x[1])
word_counts.unpersist() # آزاد کردن حافظه
مزایای DataFrames:
کی از RDD استفاده کنیم؟
فقط وقتی نیاز به کنترل خیلی دقیق یا منطق پیچیده کاملاً سفارشی دارید، یا با داده کاملاً unstructured کار میکنید.
برای پروژههای جدید → مستقیماً از DataFrames یا Spark SQL شروع کنید.
MEMORY_AND_DISK را انتخاب کنیدtoDebugString() را برای دیباگ چک کنیدتبریک! حالا درک عمیقی از بهینهسازی RDD دارید 🎉
آمادهاید به سراغ Spark SQL و DataFrames بروید که عملکرد بهتری دارند و کد کمتری نیاز دارند.
آخرین کارگاه عملی بخش RDD که بیان عملی موارد فوق است را در قسمت زیر میتوانید مشاهده کنید.
نکته : اگر احیانا فیلم آموزشی برای شما نمایش داده نمیشود، مطمئن شوید که با آیپی ایران در حال مشاهده این صفحه هستید.