بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

کارگاه عملی : مفاهیم پایه RDD – محتوای ویدئویی


در ادامه مسیر یادگیری اسپارک در این جلسه سراغ یکی از پایه‌ای ترین مفاهیم اسپارک یعنی RDD ها می رویم و همانطور که در جلسه اول هم اشاره کردیم، سنگ بنای توزیع شدگی و پردازش موازی در اسپارک ، توزیع داده‌ها در کلاستر در قالب موجودیتی است که به آن RDD می‌گوییم و پردازش داده، حتی اگر با دیتافریم یا SQL کار کنیم نهایتا و در پشت صحنه، مجموعه‌ای از تبدیل ها و اکشن‌هایی خواهد بود که روی مجموعه‌ای از RDD ها که مجموعه داده‌های توزیع شده در کلاستر هستند، انجام می‌شود.

۱. RDD چیست؟

RDD (Resilient Distributed Dataset) ساختار داده اصلی اسپارک است:

  • Resilient (پایدار): مقاوم در برابر خطا، می‌تواند از خطاهای نودها بازیابی شود
  • Distributed (توزیع‌شده): داده‌ها در چندین نود در یک کلاستر تقسیم می‌شوند
  • Dataset (مجموعه داده): مجموعه‌ای از عناصر داده
ویژگی‌های کلیدی RDDها
  1. Immutable (غیرقابل تغییر): پس از ایجاد، قابل تغییر نیستند (اما می‌توان RDDهای جدید ساخت)
  2. Lazily Evaluated (تخمین تنبل): تبدیل‌ها تا زمان فراخوانی اکشن اجرا نمی‌شوند
  3. Partitioned (پارتیشن‌بندی شده): داده‌ها برای پردازش موازی به پارتیشن‌ها تقسیم می‌شوند
  4. Type-Safe (ایمن از نظر نوع): در Scala به صورت قوی تایپ شده‌اند، در Python به صورت پویا تایپ شده‌اند

۲. راه‌اندازی اسپارک

برای این کارگاه از کلاستر محلی اسپارک که در جلسه اول و در WSL بالا آوردیم استفاده خواهیم کرد. بنابراین در ادامه این جلسه، فرض میکنیم که با بالا آوردن یک کلاستر اسپارک محلی با دو ورکر آشنا هستید.

ایجاد SparkSession – حالت محلی (Local Mode)
import pyspark
from pyspark.sql import SparkSession

# ایجاد SparkSession برای حالت محلی
spark = SparkSession.builder \
    .appName('RDD-Part1-Fundamentals') \
    .master('local[*]') \
    .config('spark.driver.memory', '2g') \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()

# بررسی نسخه اسپارک
print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")
print(f"App Name: {spark.sparkContext.appName}")
درک حالت محلی (Local Mode)
  • local – با ۱ ریسمان Executor اجرا شود
  • local[2] – با ۲ ریسمان Executor اجرا شود
  • local[*] – با تعداد ریسمان‌های Executor برابر با هسته‌های منطقی ماشین اجرا شود

در این حالت، دیگر بخش UI اصلی اسپارک را نخواهیم داشت و تنها به ازای هر برنامه اسپارک که اجرا می‌کنیم روی پورت ۴۰۴۰ و ۴۰۴۱ و … می‌توانیم وضعیت جاب‌های را مشاهده کنیم .

ایجاد SparkSession – حالت کلاستر (Cluster Mode)

در این جلسه از این روش استفاده می کنیم و روش اصلی ما در این دوره خواهد بود .

# فقط اگر کلاستر اسپارک در حال اجراست
spark = SparkSession.builder \
    .appName('RDD-Part1-Fundamentals') \
    .master('spark://localhost:7077') \
    .config('spark.driver.memory', '512M') \
    .config('spark.executor.memory', '512M') \
    .config('spark.executor.cores', '1') \
    .getOrCreate()

۳. ایجاد RDDها

روش ۱: موازی‌سازی یک مجموعه

ساده‌ترین راه برای ایجاد RDD از یک مجموعه پایتون است:

# ایجاد RDD از یک لیست
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = spark.sparkContext.parallelize(numbers)

print(f"Type RDD: {type(numbers_rdd)}")
print(f"RDD: {numbers_rdd}")
روش ۲: ایجاد از range

برای رنج‌های بزرگ کارآمدتر است:

# ایجاد RDD از range (کارآمدتر)
range_rdd = spark.sparkContext.parallelize(range(0, 100))
print(f"Range RDD: {range_rdd}")
روش ۳: خواندن از فایل‌ها (پیش‌نمایش)
# در بخش‌های بعدی به صورت کامل پوشش داده می‌شود
text_rdd = spark.sparkContext.textFile("~/data/news.txt")
print(f"Text RDD: {text_rdd}")

۴. درک پارتیشن‌ها

پارتیشن‌ها بلوک‌های منطقی داده هستند که می‌توانند به صورت موازی پردازش شوند. هر پارتیشن توسط یک هسته اجراکننده پردازش می‌شود.

چرا پارتیشن‌ها مهم هستند؟

  • پردازش موازی را فعال می‌کنند
  • تأثیر قابل توجهی بر عملکرد دارند
  • کم بودن آن‌ها = استفاده ناکامل از کلاستر
  • زیاد بودن آن‌ها = هزینه اضافی
بررسی تعداد پارتیشن‌ها
print(f"تعداد پارتیشن‌ها: {numbers_rdd.getNumPartitions()}")
print(f"تعداد پارتیشن‌ها: {text_rdd.getNumPartitions()}")
print(f"تعداد پارتیشن‌ها: {range_rdd.getNumPartitions()}")
کنترل پارتیشن‌ها در زمان ایجاد
# ایجاد RDD با تعداد مشخص پارتیشن
numbers_rdd_4 = spark.sparkContext.parallelize(range(0, 100), 4)
print(f"تعداد پارتیشن‌ها: {numbers_rdd_4.getNumPartitions()}")

numbers_rdd_8 = spark.sparkContext.parallelize(range(0, 100), 8)
print(f"تعداد پارتیشن‌ها: {numbers_rdd_8.getNumPartitions()}")

text_rdd = spark.sparkContext.textFile("/home/smbanaie/data/news.txt",10)
print(f"تعداد پارتیشن‌ها: {text_rdd.getNumPartitions()}")
مشاهده پارتیشن‌ها با glom()

glom() یک RDD برمی‌گرداند که در آن هر پارتیشن به لیستی از عناصر آن تبدیل شده است:

# ایجاد RDD کوچک برای مشاهده پارتیشن‌ها
small_rdd = spark.sparkContext.parallelize(range(0, 10), 4)

# مشاهده توزیع داده در پارتیشن‌ها
partitions = small_rdd.glom().collect()
print("توزیع داده در پارتیشن‌ها:")
for i, partition in enumerate(partitions):
    print(f"Partition {i}: {partition}")

خروجی نمونه:

Partition 0: [0, 1]
Partition 1: [2, 3, 4]
Partition 2: [5, 6]
Partition 3: [7, 8, 9]

۵. اکشن‌های پایه RDD

اکشن‌ها محاسبات را فعال کرده و نتایج را به درایور برمی‌گردانند یا در حافظه ذخیره می‌کنند.

collect() – دریافت تمام عناصر

⚠️ هشدار: فقط برای RDDهای کوچک استفاده کنید! تمام داده‌ها را به درایور می‌آورد.

numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
data = numbers_rdd.collect()
print(f"Type: {type(data)}")
print(f"Data: {data}")
count() – شمارش عناصر
numbers_rdd = spark.sparkContext.parallelize(range(0, 1000))
count = numbers_rdd.count()
print(f"Count: {count}")
first() – دریافت اولین عنصر
numbers_rdd = spark.sparkContext.parallelize([10, 20, 30, 40, 50])
first_element = numbers_rdd.first()
print(f"First element: {first_element}")
take(n) – دریافت n عنصر اول
numbers_rdd = spark.sparkContext.parallelize(range(0, 100))
first_ten = numbers_rdd.take(10)
print(f"First 10 elements: {first_ten}")
top(n) – دریافت n عنصر برتر

عناصر را به ترتیب نزولی برمی‌گرداند:

numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
top_three = numbers_rdd.top(3)
print(f"Top 3 elements: {top_three}")  # Output: [9, 8, 7]
takeOrdered(n) – دریافت n عنصر به ترتیب صعودی
numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
bottom_three = numbers_rdd.takeOrdered(3)
print(f"Bottom 3 elements: {bottom_three}")  # Output: [1, 2, 3]
takeOrdered با کلید سفارشی
# دریافت ۳ عنصر با کوچکترین مقادیر منفی (بزرگترین مقادیر مثبت)
numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
result = numbers_rdd.takeOrdered(3, key=lambda x: -x)
print(f"Top 3 (using custom key): {result}")  # Output: [9, 8, 7]

text_rdd.takeOrdered(5, key=lambda x: -len(x))

۶. مثال‌های عملی

مثال ۱: یافتن آمار

# ایجاد RDD از اعداد تصادفی
import random
random.seed(42)
random_numbers = [random.randint(1, 100) for _ in range(1000)]
random_rdd = spark.sparkContext.parallelize(random_numbers)

# آمار پایه
print(f"Count: {random_rdd.count()}")
print(f"Min: {random_rdd.min()}")
print(f"Max: {random_rdd.max()}")
print(f"Mean: {random_rdd.mean():.2f}")
print(f"Sum: {random_rdd.sum()}")
print(f"Std Dev: {random_rdd.stdev():.2f}")
مثال ۲: کار با رشته‌ها
# ایجاد RDD از کلمات
words = ["spark", "hadoop", "python", "scala", "java", "apache"]
words_rdd = spark.sparkContext.parallelize(words)

# یافتن طولانی‌ترین کلمه
longest = words_rdd.top(1, key=lambda x: len(x))
print(f"Longest word: {longest[0]}")

# یافتن کوتاه‌ترین کلمه
shortest = words_rdd.takeOrdered(1, key=lambda x: len(x))
print(f"Shortest word: {shortest[0]}")
مثال ۳: درک توزیع پارتیشن
# ایجاد RDD با اندازه‌های مختلف پارتیشن
data = range(0, 20)

for num_partitions in [2, 4, 8]:
    rdd = spark.sparkContext.parallelize(data, num_partitions)
    partitions = rdd.glom().collect()

    print(f"\n{num_partitions} Partitions:")
    for i, partition in enumerate(partitions):
        print(f"  Partition {i}: {len(partition)} elements - {partition}")

۷. تمرین‌های عملی

تمرین ۱: ایجاد و بررسی RDDها

یک RDD با اعداد ۱-۵۰ با ۵ پارتیشن ایجاد کنید، سپس:

  1. تعداد پارتیشن‌ها را چاپ کنید
  2. نحوه توزیع داده را نشان دهید
  3. ۵ عنصر اول را دریافت کنید
  4. ۵ عنصر آخر را دریافت کنید (نکته: از top استفاده کنید)
  5. مجموع را محاسبه کنید
تمرین ۲: چالش آماری

یک RDD با ۱۰۰ عدد صحیح تصادفی بین ۱-۱۰۰۰ ایجاد کنید، سپس:

  1. مقدار حداقل را پیدا کنید
  2. مقدار حداکثر را پیدا کنید
  3. میانگین را محاسبه کنید
  4. تعداد اعداد زوج را بشمارید (شما به filter نیاز دارید – پیش‌نمایش!)

۸. نکات کلیدی

RDDها اساس اسپارک هستند – توزیع‌شده، غیرقابل تغییر، مقاوم در برابر خطا

پارتیشن‌ها موازی‌سازی را فعال می‌کنند – تعداد مناسب را برای اندازه داده خود انتخاب کنید

اکشن‌ها محاسبات را فعال می‌کنند – تبدیل‌ها تنبل هستند، اکشن‌ها اجرا می‌شوند

با collect() احتیاط کنید – فقط برای مجموعه داده‌های کوچک استفاده کنید

اکشن‌های مناسب را استفاده کنید – take() برای نمونه‌گیری، count() برای اندازه و غیره



۹. پاک‌سازی
# اسپارک را پس از اتمام کار متوقف کنید
spark.stop()

کارت مرجع سریع

ActionDescriptionUse Case
collect()Return all elementsSmall datasets only
count()Count elementsGet size
first()Get first elementQuick peek
take(n)Get first n elementsSample data
top(n)Get top n elementsFind maximums
takeOrdered(n)Get n smallestFind minimums
min()Get minimumStatistics
max()Get maximumStatistics
mean()Calculate averageStatistics
sum()Calculate sumAggregation
glom()Show partition contentsDebug partitions

محتوای ویدئویی

در بخش اول از کارگاه عملی کار با RDD‌، به بررسی اکشن‌های اصلی و کنترل تعداد پارتیشن‌ها هنگام ایجاد RDD ها می پردازیم. این فیلم در قسمت زیر قابل مشاهده است.

نکته : اگر احیانا فیلم آموزشی مربوطه را مشاهده نمی‌کنید، مطمئن شوید که با آی پی ایران متصل شده‌اید .

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید