بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

کارگاه عملی: RDD Transformations – محتوای ویدئویی

در بخش قبل با مفهوم RDD، نحوه‌ی ایجاد آن و فلسفه‌ی پردازش توزیع‌شده در Apache Spark آشنا شدیم.
در این بخش، وارد هسته‌ی واقعی کار با RDDها می‌شویم؛ جایی که داده‌ها را تبدیل، فیلتر، پردازش و آماده‌ی تحلیل می‌کنیم.

تمرکز اصلی این بخش بر روی Transformations است؛ عملیاتی که به ما اجازه می‌دهند بدون اجرای فوری، یک گراف محاسباتی بهینه بسازیم. درک درست این مفهوم، پایه‌ی نوشتن برنامه‌های سریع، مقیاس‌پذیر و بهینه در Spark است.

در طول این بخش:

  • یاد می‌گیریم Spark چگونه با Lazy Evaluation کار می‌کند
  • تفاوت عملیات‌هایی که فقط تعریف می‌شوند با آن‌هایی که اجرا را آغاز می‌کنند را می‌فهمیم
  • با پرکاربردترین تبدیل‌های RDD در دنیای واقعی کار می‌کنیم
  • و در نهایت، با یک مثال عملی (تخمین عدد π) اثر پارتیشن‌بندی و موازی‌سازی را لمس می‌کنیم

۱. مفاهیم پایه

تبدیل (Transformation)

تعریف: تبدیل‌ها، عملیاتی هستند که یک RDD جدید از یک RDD موجود می‌سازند. آن‌ها یک “دستورالعمل” یا “تغییر” روی داده‌ها را تعریف می‌کنند، اما محاسبات را فوراً اجرا نمی‌کنند.

ویژگی‌های کلیدی:

  • نوع بازگشتی: همیشه یک RDD جدید برمی‌گرداند.
  • اجرا: تنبل (Lazy Evaluation) است. یعنی تبدیل‌ها فوراً اجرا نمی‌شوند. به جای آن، اسپارک یک “گراف محاسباتی” (DAG) از تمام تبدیل‌هایی که باید انجام شوند، می‌سازد. محاسبات واقعی فقط زمانی انجام می‌شود که یک اکشن فراخوانی شود.
  • هدف: برای ساختن یک خط لوله (pipeline) از تبدیل‌های داده. شما با زنجیره‌ای از تبدیل‌ها مشخص می‌کنید که چه کاری باید روی داده‌ها انجام شود، اما هنوز آن را اجرا نکرده‌اید.

مثال‌ها: map(), filter(), flatMap(), sample(), coalesce(), repartition()


اکشن (Action)

تعریف: اکشن‌ها، عملیاتی هستند که محاسبات تعریف‌شده توسط تبدیل‌ها را فعال می‌کنند و نتیجه را به درایور (driver) یا یک سیستم ذخیره‌سازی خارجی برمی‌گردانند.

ویژگی‌های کلیدی:

  • نوع بازگشتی: یک مقدار (مثل یک لیست، عدد، یا None) برمی‌گرداند، نه یک RDD.
  • اجرا: فوری (Eager Evaluation) است. وقتی یک اکشن فراخوانی می‌شود، اسپارک تمام تبدیل‌های موجود در گراف محاسباتی را به ترتیب و به صورت موازی اجرا می‌کند تا نتیجه نهایی را تولید کند.
  • هدف: برای دریافت نتایج یا ذخیره داده‌ها. اکشن‌ها محاسبات را آغاز می‌کنند.

مثال‌ها: collect(), count(), first(), take(), top(), saveAsTextFile()

تفاوت‌های کلیدی
جنبهتبدیل‌ها (Transformations)اکشن‌ها (Actions)
نوع بازگشتیRDD جدیدمقدار یا Unit
اجراتنبل (Lazy – به تأخیر افتاده)فوری (Eager – فوری)
مثال‌هاmap, filter, flatMapcollect, count, reduce
هدفساخت گراف محاسباتیفعال‌سازی اجرا
تخمین تنبل (Lazy Evaluation)

تبدیل‌ها در اسپارک Lazy یا تنبل هستند یعنی به صورت فوری اجرا نمی‌شوند. تمام تبدیلات حتی اگر چندین تبدیل را روی یک مجموعه داده اعمال کرده باشید تا به یک اکشن نرسیم، اسپارک کار انجام محاسبات را شروع نخواهد کرد و شما Job ای را در حال اجرا نخواهید دید:

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('RDD-Part2-Transformations') \
    .master('spark://Mojtaba.localdomain:7077') \
    .config('spark.driver.memory', '1024M') \
    .config('spark.executor.memory', '1024M') \
    .config('spark.executor.cores', '1') \
    .getOrCreate()

# این هنوز اجرا نمی‌شود!
numbers_rdd = spark.sparkContext.parallelize(range(0, 1000000))
filtered_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)  # اجرا نمی‌شود
mapped_rdd = filtered_rdd.map(lambda x: x ** 2)  # اجرا نمی‌شود

# حالا اجرا می‌شود!
result = mapped_rdd.take(10)  # اکشن اجرا را فعال می‌کند
print(result)

مزایای تخمین lazy:

  • اسپارک می‌تواند کل برنامه محاسباتی را بهینه‌سازی کند
  • از محاسبات غیرضروری جلوگیری می‌کند
  • می‌تواند از خواندن داده‌های غیرضروری صرف‌نظر کند

۲. filter() – انتخاب عناصر

filter() فقط عناصری را نگه می‌دارد که شرط را برآورده می‌کنند.

فیلتر کردن پایه
# ایجاد RDD با اعداد ۱-۲۰
numbers = spark.sparkContext.parallelize(range(1, 21))

# فیلتر کردن اعداد زوج
even_numbers = numbers.filter(lambda x: x % 2 == 0)
print("اعداد زوج:", even_numbers.collect())

# فیلتر کردن اعداد فرد
odd_numbers = numbers.filter(lambda x: x % 2 != 0)
print("اعداد فرد:", odd_numbers.collect())
شرایط چندگانه
numbers = spark.sparkContext.parallelize(range(1, 101))

# اعداد قابل تقسیم بر ۳ و ۵
divisible_by_3_and_5 = numbers.filter(lambda x: x % 3 == 0 and x % 5 == 0)
print("قابل تقسیم بر ۳ و ۵:", divisible_by_3_and_5.collect())
فیلتر کردن با توابع
def is_prime(n):
    """بررسی اینکه آیا عدد اول است"""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(n**0.5) + 1, 2):
        if n % i == 0:
            return False
    return True

numbers = spark.sparkContext.parallelize(range(0, 100))
primes = numbers.filter(is_prime)

print(f"اعداد اول: {primes.take(20)}")
print(f"تعداد اعداد اول زیر ۱۰۰: {primes.count()}")
فیلتر کردن رشته‌ها
words = ["apache", "spark", "hadoop", "python", "scala", "java", "pandas"]
words_rdd = spark.sparkContext.parallelize(words)

# کلمات طولانی‌تر از ۵ کاراکتر
long_words = words_rdd.filter(lambda word: len(word) > 5)
print("کلمات طولانی:", long_words.collect())

# کلمات شروع‌شون با 's'
s_words = words_rdd.filter(lambda word: word.startswith('s'))
print("کلمات شروع‌شون با 's':", s_words.collect())

۳. map() – تبدیل عناصر

map() هر عنصر را با استفاده از یک تابع تبدیل می‌کند یعنی به ازای هر عنصر یک RDD، تابعی که به آن ارسال شده است را روی آن اجرا می‌کند و خروجی آن یک لیست شامل نتیجه اعمال این تابع است. مثلا هر عنصر لیست را به توان دو می رساند و نتیجه هم یک لیست جدید (یک RDD جدید) خواهد بود.

کاربرد پایه تابع map
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# مربع هر عدد
squares = numbers.map(lambda x: x ** 2)
print("مربع‌ها:", squares.collect())

# دوبرابر هر عدد
doubled = numbers.map(lambda x: x * 2)
print("دوبرابر شده‌ها:", doubled.collect())
تبدیل نوع عناصر یک RDD
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# تبدیل اعداد به رشته
strings = numbers.map(lambda x: f"Number_{x}")
print("رشته‌ها:", strings.collect())

# ایجاد تاپل‌ها
tuples = numbers.map(lambda x: (x, x ** 2, x ** 3))
print("تاپل‌ها (n, n², n³):", tuples.collect())
مپینگ یا نگاشت عناصر به کمک توابع پیچیده
def complex_transform(x):
    """اعمال تبدیل‌های چندگانه"""
    if x % 2 == 0:
        return x ** 2
    else:
        return x ** 3

numbers = spark.sparkContext.parallelize(range(1, 11))
transformed = numbers.map(complex_transform)
print("تبدیل پیچیده:", transformed.collect())
کار با تاپل‌ها
# ایجاد RDD از تاپل‌ها (id, name)
people = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
people_rdd = spark.sparkContext.parallelize(people)

# استخراج فقط نام‌ها
names = people_rdd.map(lambda x: x[1])
print("نام‌ها:", names.collect())

# ایجاد تاپل‌های جدید با داده‌های تغییر یافته
upper_names = people_rdd.map(lambda x: (x[0], x[1].upper()))
print("نام‌ها با حروف بزرگ:", upper_names.collect())

# افزودن فیلد محاسبه‌شده
with_length = people_rdd.map(lambda x: (x[0], x[1], len(x[1])))
print("با طول:", with_length.collect())

۴. flatMap() – هم‌سطح نتایج

flatMap() همان کار مپ یا نگاشت را انجام می‌دهد اما در انتهای کار، اگر هر عنصر خودش یک‌ آرایه باشد، آرایه‌های داخلی را حذف می کند به گونه ای که لیست نهایی، همه هم‌سطح باشند بدون آرایه‌های داخلی .

تفاوت: map در مقابل flatMap
data = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
rdd = spark.sparkContext.parallelize(data)

# استفاده از map - ساختار تودرتو را حفظ می‌کند
mapped = rdd.map(lambda x: x)
print("نتیجه map:", mapped.collect())
# خروجی: [[۱, ۲, ۳], [۴, ۵, ۶], [۷, ۸, ۹]]

# استفاده از flatMap - صاف می‌کند
flat_mapped = rdd.flatMap(lambda x: x)
print("نتیجه flatMap:", flat_mapped.collect())
# خروجی: [۱, ۲, ۳, ۴, ۵, ۶, ۷, ۸, ۹]
تبدیل رشته‌ها به کلمات تشکیل دهنده آنها

در مثال زیر چون خروجی تابع split یک آرایه است و خود آرایه ابتدایی هم از قبل موجود است ما یک آرایه درون آرایه خواهیم داشت که تابع flatMap آرایه داخلی را حذف و تنها عناصر داخل آرایه را در خروجی کنار هم قرار می‌دهد.

sentences = [
    "Apache Spark is awesome",
    "Python is great for data science",
    "RDDs are fundamental"
]
sentences_rdd = spark.sparkContext.parallelize(sentences)

# تقسیم هر جمله به کلمات
words = sentences_rdd.flatMap(lambda sentence: sentence.split())
print("تمام کلمات:", words.collect())
print("تعداد کلمات:", words.count())
تولید مقادیر چندگانه
numbers = spark.sparkContext.parallelize([1, 2, 3])

# تولید رنج برای هر عدد
ranges = numbers.flatMap(lambda x: range(1, x + 1))
print("رنج‌ها:", ranges.collect())
# خروجی: [۱, ۱, ۲, ۱, ۲, ۳]

# تولید جفت‌ها
pairs = numbers.flatMap(lambda x: [(x, x * 2), (x, x * 3)])
print("جفت‌ها:", pairs.collect())
مثال عملی: پردازش متن
text = [
    "Hello world",
    "Spark is fast",
    "Python Spark RDD"
]
text_rdd = spark.sparkContext.parallelize(text)

# تبدیل به حروف کوچک، تقسیم و حذف کلمات کوتاه
words = text_rdd.flatMap(lambda line: line.lower().split()) \
                .filter(lambda word: len(word) > 3)

print("کلمات پردازش‌شده:", words.collect())

۵. زنجیره‌سازی تبدیل‌ها

می‌توانید چندین تبدیل را با هم زنجیره کنید:

numbers = spark.sparkContext.parallelize(range(1, 21))

# زنجیره پیچیده: فیلتر کردن زوج‌ها، مربع کردنشان، نگه داشتن آن‌هایی که > ۵۰
result = numbers.filter(lambda x: x % 2 == 0) \
                .map(lambda x: x ** 2) \
                .filter(lambda x: x > 50) \
                .collect()

print("نتیجه زنجیره:", result)
ساخت یک خط پردازش داده (Data Pipeline)
# داده خام: ورودی‌های لاگ
logs = [
    "۲۰۲۴-۰۱-۰۱ INFO User logged in",
    "۲۰۲۴-۰۱-۰۱ ERROR Database connection failed",
    "۲۰۲۴-۰۱-۰۱ INFO User logged out",
    "۲۰۲۴-۰۱-۰۲ ERROR Timeout occurred",
    "۲۰۲۴-۰۱-۰۲ INFO System started"
]

logs_rdd = spark.sparkContext.parallelize(logs)

# خط لوله: فیلتر کردن خطاها، استخراج تاریخ و پیام
error_logs = logs_rdd.filter(lambda log: "ERROR" in log) \
                     .map(lambda log: {
                         'date': log.split()[0],
                         'message': ' '.join(log.split()[2:])
                     })

print("خطاهای لاگ:")
for error in error_logs.collect():
    print(f"  {error['date']}: {error['message']}")

۶. مثال عملی: تخمین پی (Pi) مونت‌کارلو

الگوریتم

پی را با نمونه‌گیری تصادفی نقاط در یک مربع واحد و بررسی اینکه آیا درون دایره واحد قرار می‌گیرند تخمین می‌زنیم.

π ≈ ۴ × (نقاط درون دایره) / (تعداد کل نقاط)
پیاده‌سازی
import random

def is_inside_circle(p):
    """بررسی اینکه آیا یک نقطه تصادفی درون دایره واحد قرار دارد"""
    x, y = random.random(), random.random()
    return x * x + y * y < 1

# تعداد نمونه‌ها
NUM_SAMPLES = 1000000

# ایجاد RDD با اندیس‌های نمونه
samples_rdd = spark.sparkContext.parallelize(range(NUM_SAMPLES), 8)

# فیلتر کردن نقاط درون دایره
inside_circle = samples_rdd.filter(is_inside_circle)

# شمارش نقاط درون دایره
count_inside = inside_circle.count()

# تخمین پی
pi_estimate = 4.0 * count_inside / NUM_SAMPLES

print(f"تعداد کل نمونه‌ها: {NUM_SAMPLES}")
print(f"نقاط درون دایره: {count_inside}")
print(f"تخمین پی: {pi_estimate}")
print(f"پی واقعی: {۳.۱۴۱۵۹۲۶۵۳۵۸۹۷۹۳}")
print(f"خطا: {abs(pi_estimate - 3.141592653589793):.6f}")
تأثیر پارتیشن‌ها
def estimate_pi(num_samples, num_partitions):
    """تخمین پی با پارتیشن‌های مشخص"""
    samples = spark.sparkContext.parallelize(range(num_samples), num_partitions)
    inside = samples.filter(is_inside_circle).count()
    return 4.0 * inside / num_samples

# تست تعداد پارتیشن‌های مختلف
for partitions in [2, 4, 8, 16, 32]:
    pi = estimate_pi(100_000_000, partitions)
    print(f"پارتیشن‌ها: {partitions:2d}, تخمین پی: {pi:.6f}")

۷. sample() – نمونه‌گیری تصادفی

یک نمونه تصادفی از RDD خود برای تست یا بررسی دریافت کنید.

نمونه‌گیری پایه (بدون جایگزینی)
large_rdd = spark.sparkContext.parallelize(range(1, 1001))

# دریافت ~۱۰٪ نمونه بدون جایگزینی
sample = large_rdd.sample(withReplacement=False, fraction=0.1, seed=42)
print(f"اندازه نمونه: {sample.count()}")
print(f"نمونه: {sample.take(20)}")
با و بدون جایگزینی
numbers = spark.sparkContext.parallelize(range(1, 11))

# بدون جایگزینی (هر عنصر حداکثر یک بار انتخاب می‌شود)
sample1 = numbers.sample(False, 0.5, seed=42)
print("بدون جایگزینی:", sample1.collect())

# با جایگزینی (عناصر می‌توانند چندین بار انتخاب شوند)
sample2 = numbers.sample(True, 0.5, seed=42)
print("با جایگزینی:", sample2.collect())
استفاده عملی: تست پردازش های سنگین
# ابتدا روی نمونه توسعه دهید
large_dataset = spark.sparkContext.parallelize(range(1, 1000000))

# تست خط لوله روی ۱٪ نمونه
sample = large_dataset.sample(False, 0.01, seed=42)

# توسعه و تست منطق خود
result = sample.filter(lambda x: x % 7 == 0) \
               .map(lambda x: (x, x ** 2)) \
               .take(10)

print("نتیجه تست:", result)

# پس از رضایت، روی کل مجموعه داده اجرا کنید
# full_result = large_dataset.filter(...).map(...).collect()

۸. coalesce() و repartition()

coalesce() – کاهش پارتیشن‌ها

برای کاهش تعداد پارتیشن‌ها استفاده کنید (به صورت پیش‌فرض بدون shuffling):

# ایجاد RDD با ۱۰ پارتیشن
numbers = spark.sparkContext.parallelize(range(100), 10)
print(f"پارتیشن‌های اصلی: {numbers.getNumPartitions()}")
print(f"توزیع اصلی: {numbers.glom().map(len).collect()}")

# کاهش به ۳ پارتیشن
coalesced = numbers.coalesce(3)
print(f"\nپس از coalesce:")
print(f"پارتیشن‌ها: {coalesced.getNumPartitions()}")
print(f"توزیع: {coalesced.glom().map(len).collect()}")
repartition() – تغییر پارتیشن‌ها با shuffling

برای افزایش یا توزیع مجدد پارتیشن‌ها به صورت یکسان استفاده کنید (شامل shuffling):

# ایجاد RDD با ۲ پارتیشن (توزیع نابرابر)
numbers = spark.sparkContext.parallelize(range(100), 2)
print(f"توزیع اصلی: {numbers.glom().map(len).collect()}")

# repartition به ۵ (توزیع یکسان‌تر)
repartitioned = numbers.repartition(5)
print(f"پس از repartition: {repartitioned.glom().map(len).collect()}")
چه زمانی از کدام استفاده کنیم؟
# از coalesce هنگام کاهش پارتیشن‌ها استفاده کنید (پس از فیلتر)
large_rdd = spark.sparkContext.parallelize(range(1000000), 100)
filtered = large_rdd.filter(lambda x: x % 1000 == 0)  # حالا فقط ~۱۰۰۰ عنصر
reduced = filtered.coalesce(10)  # کاهش پارتیشن‌ها برای تطابق با اندازه داده

# از repartition هنگام:
# ۱. افزایش پارتیشن‌ها
# ۲. نیاز به توزیع یکسان
# ۳. قبل از عملیات‌هایی که از موازی‌سازی بیشتر سود می‌برند

۹. تمرین‌های عملی

تمرین ۱: پردازش کلمات

با لیستی از جملات:

sentences = [
    "Spark is amazing for big data",
    "Python makes Spark easy to use",
    "Big data processing with Spark RDD"
]
  1. یک RDD ایجاد کنید
  2. به کلمات تقسیم کنید
  3. به حروف کوچک تبدیل کنید
  4. کلمات طولانی‌تر از ۳ کاراکتر را فیلتر کنید
  5. تعداد کلمات منحصر به فرد را بشمارید
تمرین ۲: تمیزکاری داده

با داده‌های نامرتب:

data = ["  apple  ", "BANANA", "  Orange", "grape  ", "", "   "]
  1. حذف رشته‌های خالی/فقط فضای خالی
  2. فضای خالی را برش دهید
  3. به حروف کوچک تبدیل کنید
  4. به ترتیب الفبایی مرتب کنید
تمرین ۳: چالش اعداد اول
  1. یک RDD با اعداد ۱-۱۰۰۰ ایجاد کنید
  2. فقط اعداد اول را نگه دارید
  3. هر عدد اول را به توان دو برسانید
  4. فقط آن‌هایی که > ۱۰۰۰۰ هستند را نگه دارید
  5. ۱۰ تای برتر را بگیرید
تمرین ۴: آزمایش تخمین پی
  1. پی را با ۱۰۰، ۱K، ۱۰K، ۱۰۰K، ۱M نمونه تخمین بزنید
  2. دقت در مقابل تعداد نمونه‌ها را رسم کنید
  3. با ۲، ۴، ۸، ۱۶ پارتیشن تست کنید – آیا بر دقت یا سرعت تأثیر دارد؟

۱۰. نکات کلیدی

تبدیل‌ها تنبل هستند – یک گراف محاسباتی بسازید

اکشن‌ها اجرا را فعال می‌کنند – قبل از اجرا بهینه‌سازی کنید

filter() انتخاب می‌کند – اندازه مجموعه داده را کاهش می‌دهد

map() تبدیل می‌کند – نقشه‌برداری یک به یک

flatMap() صاف می‌کند – نقشه‌برداری یک به چند

زنجیره‌سازی عملیات – خط لوله‌های پیچیده بسازید

sample() برای تست – روی نمونه‌های کوچک توسعه دهید

مدیریت پارتیشن‌ها – coalesce پس از فیلتر کردن، repartition برای موازی‌سازی


کارت مرجع سریع

تبدیل (Transformation)ورودی → خروجیکاربرد
filter(func)RDD[T] → RDD[T]انتخاب عناصر
map(func)RDD[T] → RDD[U]تبدیل هر عنصر
flatMap(func)RDD[T] → RDD[U]تبدیل و صاف کردن
sample(...)RDD[T] → RDD[T]نمونه‌گیری تصادفی
coalesce(n)RDD[T] → RDD[T]کاهش پارتیشن‌ها
repartition(n)RDD[T] → RDD[T]تغییر پارتیشن‌ها

محتوای ویدئویی

در فیلم آموزشی زیر مطالب بالا را به صورت عملی با هم مرور می‌کنیم.

نکته : اگر احیانا فیلم آموزشی این بخش در قسمت زیر قابل مشاهده نیست، اطمینان حاصل کنید که با آی پی ایران به مشاهده این صفحه مشغول هستید.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید