در ادامه مسیر یادگیری اسپارک در این جلسه سراغ یکی از پایهای ترین مفاهیم اسپارک یعنی RDD ها می رویم و همانطور که در جلسه اول هم اشاره کردیم، سنگ بنای توزیع شدگی و پردازش موازی در اسپارک ، توزیع دادهها در کلاستر در قالب موجودیتی است که به آن RDD میگوییم و پردازش داده، حتی اگر با دیتافریم یا SQL کار کنیم نهایتا و در پشت صحنه، مجموعهای از تبدیل ها و اکشنهایی خواهد بود که روی مجموعهای از RDD ها که مجموعه دادههای توزیع شده در کلاستر هستند، انجام میشود.
RDD (Resilient Distributed Dataset) ساختار داده اصلی اسپارک است:
برای این کارگاه از کلاستر محلی اسپارک که در جلسه اول و در WSL بالا آوردیم استفاده خواهیم کرد. بنابراین در ادامه این جلسه، فرض میکنیم که با بالا آوردن یک کلاستر اسپارک محلی با دو ورکر آشنا هستید.
import pyspark
from pyspark.sql import SparkSession
# ایجاد SparkSession برای حالت محلی
spark = SparkSession.builder \
.appName('RDD-Part1-Fundamentals') \
.master('local[*]') \
.config('spark.driver.memory', '2g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()
# بررسی نسخه اسپارک
print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")
print(f"App Name: {spark.sparkContext.appName}")
local – با ۱ ریسمان Executor اجرا شودlocal[2] – با ۲ ریسمان Executor اجرا شودlocal[*] – با تعداد ریسمانهای Executor برابر با هستههای منطقی ماشین اجرا شوددر این حالت، دیگر بخش UI اصلی اسپارک را نخواهیم داشت و تنها به ازای هر برنامه اسپارک که اجرا میکنیم روی پورت ۴۰۴۰ و ۴۰۴۱ و … میتوانیم وضعیت جابهای را مشاهده کنیم .
در این جلسه از این روش استفاده می کنیم و روش اصلی ما در این دوره خواهد بود .
# فقط اگر کلاستر اسپارک در حال اجراست
spark = SparkSession.builder \
.appName('RDD-Part1-Fundamentals') \
.master('spark://localhost:7077') \
.config('spark.driver.memory', '512M') \
.config('spark.executor.memory', '512M') \
.config('spark.executor.cores', '1') \
.getOrCreate()
سادهترین راه برای ایجاد RDD از یک مجموعه پایتون است:
# ایجاد RDD از یک لیست
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = spark.sparkContext.parallelize(numbers)
print(f"Type RDD: {type(numbers_rdd)}")
print(f"RDD: {numbers_rdd}")
برای رنجهای بزرگ کارآمدتر است:
# ایجاد RDD از range (کارآمدتر)
range_rdd = spark.sparkContext.parallelize(range(0, 100))
print(f"Range RDD: {range_rdd}")
# در بخشهای بعدی به صورت کامل پوشش داده میشود
text_rdd = spark.sparkContext.textFile("~/data/news.txt")
print(f"Text RDD: {text_rdd}")
پارتیشنها بلوکهای منطقی داده هستند که میتوانند به صورت موازی پردازش شوند. هر پارتیشن توسط یک هسته اجراکننده پردازش میشود.
چرا پارتیشنها مهم هستند؟
print(f"تعداد پارتیشنها: {numbers_rdd.getNumPartitions()}")
print(f"تعداد پارتیشنها: {text_rdd.getNumPartitions()}")
print(f"تعداد پارتیشنها: {range_rdd.getNumPartitions()}")
# ایجاد RDD با تعداد مشخص پارتیشن
numbers_rdd_4 = spark.sparkContext.parallelize(range(0, 100), 4)
print(f"تعداد پارتیشنها: {numbers_rdd_4.getNumPartitions()}")
numbers_rdd_8 = spark.sparkContext.parallelize(range(0, 100), 8)
print(f"تعداد پارتیشنها: {numbers_rdd_8.getNumPartitions()}")
text_rdd = spark.sparkContext.textFile("/home/smbanaie/data/news.txt",10)
print(f"تعداد پارتیشنها: {text_rdd.getNumPartitions()}")
glom() یک RDD برمیگرداند که در آن هر پارتیشن به لیستی از عناصر آن تبدیل شده است:
# ایجاد RDD کوچک برای مشاهده پارتیشنها
small_rdd = spark.sparkContext.parallelize(range(0, 10), 4)
# مشاهده توزیع داده در پارتیشنها
partitions = small_rdd.glom().collect()
print("توزیع داده در پارتیشنها:")
for i, partition in enumerate(partitions):
print(f"Partition {i}: {partition}")
خروجی نمونه:
Partition 0: [0, 1]
Partition 1: [2, 3, 4]
Partition 2: [5, 6]
Partition 3: [7, 8, 9]
اکشنها محاسبات را فعال کرده و نتایج را به درایور برمیگردانند یا در حافظه ذخیره میکنند.
⚠️ هشدار: فقط برای RDDهای کوچک استفاده کنید! تمام دادهها را به درایور میآورد.
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
data = numbers_rdd.collect()
print(f"Type: {type(data)}")
print(f"Data: {data}")
numbers_rdd = spark.sparkContext.parallelize(range(0, 1000))
count = numbers_rdd.count()
print(f"Count: {count}")
numbers_rdd = spark.sparkContext.parallelize([10, 20, 30, 40, 50])
first_element = numbers_rdd.first()
print(f"First element: {first_element}")
numbers_rdd = spark.sparkContext.parallelize(range(0, 100))
first_ten = numbers_rdd.take(10)
print(f"First 10 elements: {first_ten}")
عناصر را به ترتیب نزولی برمیگرداند:
numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
top_three = numbers_rdd.top(3)
print(f"Top 3 elements: {top_three}") # Output: [9, 8, 7]
numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
bottom_three = numbers_rdd.takeOrdered(3)
print(f"Bottom 3 elements: {bottom_three}") # Output: [1, 2, 3]
# دریافت ۳ عنصر با کوچکترین مقادیر منفی (بزرگترین مقادیر مثبت)
numbers_rdd = spark.sparkContext.parallelize([3, 7, 1, 9, 2, 8, 4])
result = numbers_rdd.takeOrdered(3, key=lambda x: -x)
print(f"Top 3 (using custom key): {result}") # Output: [9, 8, 7]
text_rdd.takeOrdered(5, key=lambda x: -len(x))
# ایجاد RDD از اعداد تصادفی
import random
random.seed(42)
random_numbers = [random.randint(1, 100) for _ in range(1000)]
random_rdd = spark.sparkContext.parallelize(random_numbers)
# آمار پایه
print(f"Count: {random_rdd.count()}")
print(f"Min: {random_rdd.min()}")
print(f"Max: {random_rdd.max()}")
print(f"Mean: {random_rdd.mean():.2f}")
print(f"Sum: {random_rdd.sum()}")
print(f"Std Dev: {random_rdd.stdev():.2f}")
# ایجاد RDD از کلمات
words = ["spark", "hadoop", "python", "scala", "java", "apache"]
words_rdd = spark.sparkContext.parallelize(words)
# یافتن طولانیترین کلمه
longest = words_rdd.top(1, key=lambda x: len(x))
print(f"Longest word: {longest[0]}")
# یافتن کوتاهترین کلمه
shortest = words_rdd.takeOrdered(1, key=lambda x: len(x))
print(f"Shortest word: {shortest[0]}")
# ایجاد RDD با اندازههای مختلف پارتیشن
data = range(0, 20)
for num_partitions in [2, 4, 8]:
rdd = spark.sparkContext.parallelize(data, num_partitions)
partitions = rdd.glom().collect()
print(f"\n{num_partitions} Partitions:")
for i, partition in enumerate(partitions):
print(f" Partition {i}: {len(partition)} elements - {partition}")
یک RDD با اعداد ۱-۵۰ با ۵ پارتیشن ایجاد کنید، سپس:
یک RDD با ۱۰۰ عدد صحیح تصادفی بین ۱-۱۰۰۰ ایجاد کنید، سپس:
✅ RDDها اساس اسپارک هستند – توزیعشده، غیرقابل تغییر، مقاوم در برابر خطا
✅ پارتیشنها موازیسازی را فعال میکنند – تعداد مناسب را برای اندازه داده خود انتخاب کنید
✅ اکشنها محاسبات را فعال میکنند – تبدیلها تنبل هستند، اکشنها اجرا میشوند
✅ با collect() احتیاط کنید – فقط برای مجموعه دادههای کوچک استفاده کنید
✅ اکشنهای مناسب را استفاده کنید – take() برای نمونهگیری، count() برای اندازه و غیره
# اسپارک را پس از اتمام کار متوقف کنید
spark.stop()
| Action | Description | Use Case |
|---|---|---|
collect() | Return all elements | Small datasets only |
count() | Count elements | Get size |
first() | Get first element | Quick peek |
take(n) | Get first n elements | Sample data |
top(n) | Get top n elements | Find maximums |
takeOrdered(n) | Get n smallest | Find minimums |
min() | Get minimum | Statistics |
max() | Get maximum | Statistics |
mean() | Calculate average | Statistics |
sum() | Calculate sum | Aggregation |
glom() | Show partition contents | Debug partitions |
محتوای ویدئویی
در بخش اول از کارگاه عملی کار با RDD، به بررسی اکشنهای اصلی و کنترل تعداد پارتیشنها هنگام ایجاد RDD ها می پردازیم. این فیلم در قسمت زیر قابل مشاهده است.
نکته : اگر احیانا فیلم آموزشی مربوطه را مشاهده نمیکنید، مطمئن شوید که با آی پی ایران متصل شدهاید .