در بخش قبل با مفهوم RDD، نحوهی ایجاد آن و فلسفهی پردازش توزیعشده در Apache Spark آشنا شدیم.
در این بخش، وارد هستهی واقعی کار با RDDها میشویم؛ جایی که دادهها را تبدیل، فیلتر، پردازش و آمادهی تحلیل میکنیم.
تمرکز اصلی این بخش بر روی Transformations است؛ عملیاتی که به ما اجازه میدهند بدون اجرای فوری، یک گراف محاسباتی بهینه بسازیم. درک درست این مفهوم، پایهی نوشتن برنامههای سریع، مقیاسپذیر و بهینه در Spark است.
در طول این بخش:
تعریف: تبدیلها، عملیاتی هستند که یک RDD جدید از یک RDD موجود میسازند. آنها یک “دستورالعمل” یا “تغییر” روی دادهها را تعریف میکنند، اما محاسبات را فوراً اجرا نمیکنند.
ویژگیهای کلیدی:
مثالها: map(), filter(), flatMap(), sample(), coalesce(), repartition()
تعریف: اکشنها، عملیاتی هستند که محاسبات تعریفشده توسط تبدیلها را فعال میکنند و نتیجه را به درایور (driver) یا یک سیستم ذخیرهسازی خارجی برمیگردانند.
ویژگیهای کلیدی:
None) برمیگرداند، نه یک RDD.مثالها: collect(), count(), first(), take(), top(), saveAsTextFile()
| جنبه | تبدیلها (Transformations) | اکشنها (Actions) |
|---|---|---|
| نوع بازگشتی | RDD جدید | مقدار یا Unit |
| اجرا | تنبل (Lazy – به تأخیر افتاده) | فوری (Eager – فوری) |
| مثالها | map, filter, flatMap | collect, count, reduce |
| هدف | ساخت گراف محاسباتی | فعالسازی اجرا |
تبدیلها در اسپارک Lazy یا تنبل هستند یعنی به صورت فوری اجرا نمیشوند. تمام تبدیلات حتی اگر چندین تبدیل را روی یک مجموعه داده اعمال کرده باشید تا به یک اکشن نرسیم، اسپارک کار انجام محاسبات را شروع نخواهد کرد و شما Job ای را در حال اجرا نخواهید دید:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('RDD-Part2-Transformations') \
.master('spark://Mojtaba.localdomain:7077') \
.config('spark.driver.memory', '1024M') \
.config('spark.executor.memory', '1024M') \
.config('spark.executor.cores', '1') \
.getOrCreate()
# این هنوز اجرا نمیشود!
numbers_rdd = spark.sparkContext.parallelize(range(0, 1000000))
filtered_rdd = numbers_rdd.filter(lambda x: x % 2 == 0) # اجرا نمیشود
mapped_rdd = filtered_rdd.map(lambda x: x ** 2) # اجرا نمیشود
# حالا اجرا میشود!
result = mapped_rdd.take(10) # اکشن اجرا را فعال میکند
print(result)
مزایای تخمین lazy:
filter() فقط عناصری را نگه میدارد که شرط را برآورده میکنند.
# ایجاد RDD با اعداد ۱-۲۰
numbers = spark.sparkContext.parallelize(range(1, 21))
# فیلتر کردن اعداد زوج
even_numbers = numbers.filter(lambda x: x % 2 == 0)
print("اعداد زوج:", even_numbers.collect())
# فیلتر کردن اعداد فرد
odd_numbers = numbers.filter(lambda x: x % 2 != 0)
print("اعداد فرد:", odd_numbers.collect())
numbers = spark.sparkContext.parallelize(range(1, 101))
# اعداد قابل تقسیم بر ۳ و ۵
divisible_by_3_and_5 = numbers.filter(lambda x: x % 3 == 0 and x % 5 == 0)
print("قابل تقسیم بر ۳ و ۵:", divisible_by_3_and_5.collect())
def is_prime(n):
"""بررسی اینکه آیا عدد اول است"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(n**0.5) + 1, 2):
if n % i == 0:
return False
return True
numbers = spark.sparkContext.parallelize(range(0, 100))
primes = numbers.filter(is_prime)
print(f"اعداد اول: {primes.take(20)}")
print(f"تعداد اعداد اول زیر ۱۰۰: {primes.count()}")
words = ["apache", "spark", "hadoop", "python", "scala", "java", "pandas"]
words_rdd = spark.sparkContext.parallelize(words)
# کلمات طولانیتر از ۵ کاراکتر
long_words = words_rdd.filter(lambda word: len(word) > 5)
print("کلمات طولانی:", long_words.collect())
# کلمات شروعشون با 's'
s_words = words_rdd.filter(lambda word: word.startswith('s'))
print("کلمات شروعشون با 's':", s_words.collect())
map() هر عنصر را با استفاده از یک تابع تبدیل میکند یعنی به ازای هر عنصر یک RDD، تابعی که به آن ارسال شده است را روی آن اجرا میکند و خروجی آن یک لیست شامل نتیجه اعمال این تابع است. مثلا هر عنصر لیست را به توان دو می رساند و نتیجه هم یک لیست جدید (یک RDD جدید) خواهد بود.
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# مربع هر عدد
squares = numbers.map(lambda x: x ** 2)
print("مربعها:", squares.collect())
# دوبرابر هر عدد
doubled = numbers.map(lambda x: x * 2)
print("دوبرابر شدهها:", doubled.collect())
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# تبدیل اعداد به رشته
strings = numbers.map(lambda x: f"Number_{x}")
print("رشتهها:", strings.collect())
# ایجاد تاپلها
tuples = numbers.map(lambda x: (x, x ** 2, x ** 3))
print("تاپلها (n, n², n³):", tuples.collect())
def complex_transform(x):
"""اعمال تبدیلهای چندگانه"""
if x % 2 == 0:
return x ** 2
else:
return x ** 3
numbers = spark.sparkContext.parallelize(range(1, 11))
transformed = numbers.map(complex_transform)
print("تبدیل پیچیده:", transformed.collect())
# ایجاد RDD از تاپلها (id, name)
people = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
people_rdd = spark.sparkContext.parallelize(people)
# استخراج فقط نامها
names = people_rdd.map(lambda x: x[1])
print("نامها:", names.collect())
# ایجاد تاپلهای جدید با دادههای تغییر یافته
upper_names = people_rdd.map(lambda x: (x[0], x[1].upper()))
print("نامها با حروف بزرگ:", upper_names.collect())
# افزودن فیلد محاسبهشده
with_length = people_rdd.map(lambda x: (x[0], x[1], len(x[1])))
print("با طول:", with_length.collect())
flatMap() همان کار مپ یا نگاشت را انجام میدهد اما در انتهای کار، اگر هر عنصر خودش یک آرایه باشد، آرایههای داخلی را حذف می کند به گونه ای که لیست نهایی، همه همسطح باشند بدون آرایههای داخلی .
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
rdd = spark.sparkContext.parallelize(data)
# استفاده از map - ساختار تودرتو را حفظ میکند
mapped = rdd.map(lambda x: x)
print("نتیجه map:", mapped.collect())
# خروجی: [[۱, ۲, ۳], [۴, ۵, ۶], [۷, ۸, ۹]]
# استفاده از flatMap - صاف میکند
flat_mapped = rdd.flatMap(lambda x: x)
print("نتیجه flatMap:", flat_mapped.collect())
# خروجی: [۱, ۲, ۳, ۴, ۵, ۶, ۷, ۸, ۹]
در مثال زیر چون خروجی تابع split یک آرایه است و خود آرایه ابتدایی هم از قبل موجود است ما یک آرایه درون آرایه خواهیم داشت که تابع flatMap آرایه داخلی را حذف و تنها عناصر داخل آرایه را در خروجی کنار هم قرار میدهد.
sentences = [
"Apache Spark is awesome",
"Python is great for data science",
"RDDs are fundamental"
]
sentences_rdd = spark.sparkContext.parallelize(sentences)
# تقسیم هر جمله به کلمات
words = sentences_rdd.flatMap(lambda sentence: sentence.split())
print("تمام کلمات:", words.collect())
print("تعداد کلمات:", words.count())
numbers = spark.sparkContext.parallelize([1, 2, 3])
# تولید رنج برای هر عدد
ranges = numbers.flatMap(lambda x: range(1, x + 1))
print("رنجها:", ranges.collect())
# خروجی: [۱, ۱, ۲, ۱, ۲, ۳]
# تولید جفتها
pairs = numbers.flatMap(lambda x: [(x, x * 2), (x, x * 3)])
print("جفتها:", pairs.collect())
text = [
"Hello world",
"Spark is fast",
"Python Spark RDD"
]
text_rdd = spark.sparkContext.parallelize(text)
# تبدیل به حروف کوچک، تقسیم و حذف کلمات کوتاه
words = text_rdd.flatMap(lambda line: line.lower().split()) \
.filter(lambda word: len(word) > 3)
print("کلمات پردازششده:", words.collect())
میتوانید چندین تبدیل را با هم زنجیره کنید:
numbers = spark.sparkContext.parallelize(range(1, 21))
# زنجیره پیچیده: فیلتر کردن زوجها، مربع کردنشان، نگه داشتن آنهایی که > ۵۰
result = numbers.filter(lambda x: x % 2 == 0) \
.map(lambda x: x ** 2) \
.filter(lambda x: x > 50) \
.collect()
print("نتیجه زنجیره:", result)
# داده خام: ورودیهای لاگ
logs = [
"۲۰۲۴-۰۱-۰۱ INFO User logged in",
"۲۰۲۴-۰۱-۰۱ ERROR Database connection failed",
"۲۰۲۴-۰۱-۰۱ INFO User logged out",
"۲۰۲۴-۰۱-۰۲ ERROR Timeout occurred",
"۲۰۲۴-۰۱-۰۲ INFO System started"
]
logs_rdd = spark.sparkContext.parallelize(logs)
# خط لوله: فیلتر کردن خطاها، استخراج تاریخ و پیام
error_logs = logs_rdd.filter(lambda log: "ERROR" in log) \
.map(lambda log: {
'date': log.split()[0],
'message': ' '.join(log.split()[2:])
})
print("خطاهای لاگ:")
for error in error_logs.collect():
print(f" {error['date']}: {error['message']}")
پی را با نمونهگیری تصادفی نقاط در یک مربع واحد و بررسی اینکه آیا درون دایره واحد قرار میگیرند تخمین میزنیم.
π ≈ ۴ × (نقاط درون دایره) / (تعداد کل نقاط)
import random
def is_inside_circle(p):
"""بررسی اینکه آیا یک نقطه تصادفی درون دایره واحد قرار دارد"""
x, y = random.random(), random.random()
return x * x + y * y < 1
# تعداد نمونهها
NUM_SAMPLES = 1000000
# ایجاد RDD با اندیسهای نمونه
samples_rdd = spark.sparkContext.parallelize(range(NUM_SAMPLES), 8)
# فیلتر کردن نقاط درون دایره
inside_circle = samples_rdd.filter(is_inside_circle)
# شمارش نقاط درون دایره
count_inside = inside_circle.count()
# تخمین پی
pi_estimate = 4.0 * count_inside / NUM_SAMPLES
print(f"تعداد کل نمونهها: {NUM_SAMPLES}")
print(f"نقاط درون دایره: {count_inside}")
print(f"تخمین پی: {pi_estimate}")
print(f"پی واقعی: {۳.۱۴۱۵۹۲۶۵۳۵۸۹۷۹۳}")
print(f"خطا: {abs(pi_estimate - 3.141592653589793):.6f}")
def estimate_pi(num_samples, num_partitions):
"""تخمین پی با پارتیشنهای مشخص"""
samples = spark.sparkContext.parallelize(range(num_samples), num_partitions)
inside = samples.filter(is_inside_circle).count()
return 4.0 * inside / num_samples
# تست تعداد پارتیشنهای مختلف
for partitions in [2, 4, 8, 16, 32]:
pi = estimate_pi(100_000_000, partitions)
print(f"پارتیشنها: {partitions:2d}, تخمین پی: {pi:.6f}")
یک نمونه تصادفی از RDD خود برای تست یا بررسی دریافت کنید.
large_rdd = spark.sparkContext.parallelize(range(1, 1001))
# دریافت ~۱۰٪ نمونه بدون جایگزینی
sample = large_rdd.sample(withReplacement=False, fraction=0.1, seed=42)
print(f"اندازه نمونه: {sample.count()}")
print(f"نمونه: {sample.take(20)}")
numbers = spark.sparkContext.parallelize(range(1, 11))
# بدون جایگزینی (هر عنصر حداکثر یک بار انتخاب میشود)
sample1 = numbers.sample(False, 0.5, seed=42)
print("بدون جایگزینی:", sample1.collect())
# با جایگزینی (عناصر میتوانند چندین بار انتخاب شوند)
sample2 = numbers.sample(True, 0.5, seed=42)
print("با جایگزینی:", sample2.collect())
# ابتدا روی نمونه توسعه دهید
large_dataset = spark.sparkContext.parallelize(range(1, 1000000))
# تست خط لوله روی ۱٪ نمونه
sample = large_dataset.sample(False, 0.01, seed=42)
# توسعه و تست منطق خود
result = sample.filter(lambda x: x % 7 == 0) \
.map(lambda x: (x, x ** 2)) \
.take(10)
print("نتیجه تست:", result)
# پس از رضایت، روی کل مجموعه داده اجرا کنید
# full_result = large_dataset.filter(...).map(...).collect()
برای کاهش تعداد پارتیشنها استفاده کنید (به صورت پیشفرض بدون shuffling):
# ایجاد RDD با ۱۰ پارتیشن
numbers = spark.sparkContext.parallelize(range(100), 10)
print(f"پارتیشنهای اصلی: {numbers.getNumPartitions()}")
print(f"توزیع اصلی: {numbers.glom().map(len).collect()}")
# کاهش به ۳ پارتیشن
coalesced = numbers.coalesce(3)
print(f"\nپس از coalesce:")
print(f"پارتیشنها: {coalesced.getNumPartitions()}")
print(f"توزیع: {coalesced.glom().map(len).collect()}")
برای افزایش یا توزیع مجدد پارتیشنها به صورت یکسان استفاده کنید (شامل shuffling):
# ایجاد RDD با ۲ پارتیشن (توزیع نابرابر)
numbers = spark.sparkContext.parallelize(range(100), 2)
print(f"توزیع اصلی: {numbers.glom().map(len).collect()}")
# repartition به ۵ (توزیع یکسانتر)
repartitioned = numbers.repartition(5)
print(f"پس از repartition: {repartitioned.glom().map(len).collect()}")
# از coalesce هنگام کاهش پارتیشنها استفاده کنید (پس از فیلتر)
large_rdd = spark.sparkContext.parallelize(range(1000000), 100)
filtered = large_rdd.filter(lambda x: x % 1000 == 0) # حالا فقط ~۱۰۰۰ عنصر
reduced = filtered.coalesce(10) # کاهش پارتیشنها برای تطابق با اندازه داده
# از repartition هنگام:
# ۱. افزایش پارتیشنها
# ۲. نیاز به توزیع یکسان
# ۳. قبل از عملیاتهایی که از موازیسازی بیشتر سود میبرند
با لیستی از جملات:
sentences = [
"Spark is amazing for big data",
"Python makes Spark easy to use",
"Big data processing with Spark RDD"
]
با دادههای نامرتب:
data = [" apple ", "BANANA", " Orange", "grape ", "", " "]
✅ تبدیلها تنبل هستند – یک گراف محاسباتی بسازید
✅ اکشنها اجرا را فعال میکنند – قبل از اجرا بهینهسازی کنید
✅ filter() انتخاب میکند – اندازه مجموعه داده را کاهش میدهد
✅ map() تبدیل میکند – نقشهبرداری یک به یک
✅ flatMap() صاف میکند – نقشهبرداری یک به چند
✅ زنجیرهسازی عملیات – خط لولههای پیچیده بسازید
✅ sample() برای تست – روی نمونههای کوچک توسعه دهید
✅ مدیریت پارتیشنها – coalesce پس از فیلتر کردن، repartition برای موازیسازی
| تبدیل (Transformation) | ورودی → خروجی | کاربرد |
|---|---|---|
filter(func) | RDD[T] → RDD[T] | انتخاب عناصر |
map(func) | RDD[T] → RDD[U] | تبدیل هر عنصر |
flatMap(func) | RDD[T] → RDD[U] | تبدیل و صاف کردن |
sample(...) | RDD[T] → RDD[T] | نمونهگیری تصادفی |
coalesce(n) | RDD[T] → RDD[T] | کاهش پارتیشنها |
repartition(n) | RDD[T] → RDD[T] | تغییر پارتیشنها |
محتوای ویدئویی
در فیلم آموزشی زیر مطالب بالا را به صورت عملی با هم مرور میکنیم.
نکته : اگر احیانا فیلم آموزشی این بخش در قسمت زیر قابل مشاهده نیست، اطمینان حاصل کنید که با آی پی ایران به مشاهده این صفحه مشغول هستید.