بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

کارگاه عملی: Pair RDDها و عملیات پیشرفته – محتوای ویدئویی


۱. مقدمه‌ای بر Pair RDDها

Pair RDD چیست؟

Pair RDDها، RDDهایی هستند که هر عنصر آن‌ها یک tuple از نوع (key, value) است:

spark = SparkSession.builder \
    .appName('RDD-Part3-PairRDDs') \
    .master('spark://Mojtaba.localdomain:7077') \
    .config('spark.driver.memory', '1024M') \
    .config('spark.executor.memory', '1024M') \
    .config('spark.executor.cores', '1') \
    .getOrCreate()

# مثال‌هایی از Pair RDD
user_ages = [("Alice", 28), ("Bob", 35), ("Charlie", 42)]
word_counts = [("spark", 10), ("python", 15), ("hadoop", 8)]
scores = [("math", 95), ("science", 88), ("math", 92)]
چرا Pair RDD؟
  • امکان انجام عملیات مبتنی بر key (مانند grouping، aggregation و join)
  • پشتیبانی کارآمد از عملیات shuffle
  • پایه‌ای برای عملیات شبیه به SQL
  • امکان بهینه‌سازی co-partitioning
روش‌های ساخت Pair RDD
# روش ۱: از لیست tupleها
pairs = [("apple", 5), ("banana", 3), ("apple", 7)]
pairs_rdd = spark.sparkContext.parallelize(pairs)
print(pairs_rdd.collect())

# روش ۲: با استفاده از map
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
pairs_from_map = numbers.map(lambda x: (x, x ** 2))
print(pairs_from_map.collect())

# روش ۳: از داده‌های متنی
text = ["apple red", "banana yellow", "apple green"]
text_rdd = spark.sparkContext.parallelize(text)
word_color_pairs = text_rdd.map(lambda line: tuple(line.split()))
print(word_color_pairs.collect())

۲. reduceByKey() – تجمیع بر اساس Key

reduceByKey() مقادیر مربوط به یک key را با استفاده از یک تابع ترکیب می‌کند.

مثال ساده
# داده فروش: (محصول, مبلغ)
sales = [
    ("apple", 100),
    ("banana", 50),
    ("apple", 150),
    ("banana", 75),
    ("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)

# جمع فروش هر محصول
total_sales = sales_rdd.reduceByKey(lambda a, b: a + b)
print("جمع فروش هر محصول:", total_sales.collect())
# خروجی: [('banana', 125), ('apple', 450)]
توابع reduce مختلف
scores = [
    ("Alice", 85),
    ("Bob", 90),
    ("Alice", 92),
    ("Bob", 88),
    ("Alice", 78)
]
scores_rdd = spark.sparkContext.parallelize(scores)

# بیشترین نمره هر دانش‌آموز
max_scores = scores_rdd.reduceByKey(lambda a, b: max(a, b))
print("بیشترین نمره:", max_scores.collect())

# کمترین نمره هر دانش‌آموز
min_scores = scores_rdd.reduceByKey(lambda a, b: min(a, b))
print("کمترین نمره:", min_scores.collect())

# شمارش رخدادها (روش بهینه نیست، اما مفهوم را نشان می‌دهد)
count_scores = scores_rdd.mapValues(lambda x: 1).reduceByKey(lambda a, b: a + b)
print("تعداد نمره هر دانش‌آموز:", count_scores.collect())
مثال عملی: Word Count کلاسیک
text = [
    "apache spark is great",
    "spark makes big data easy",
    "apache hadoop and spark work together"
]
text_rdd = spark.sparkContext.parallelize(text)

# Word Count معروف
word_counts = text_rdd.flatMap(lambda line: line.split()) \
                      .map(lambda word: (word, 1)) \
                      .reduceByKey(lambda a, b: a + b) \
                      .sortBy(lambda x: -x[1])

print("تعداد کلمات:")
for word, count in word_counts.collect():
    print(f"  {word}: {count}")

۳. groupByKey() – گروه‌بندی مقادیر بر اساس Key

groupByKey() تمام مقادیر یک key را در یک iterable جمع‌آوری می‌کند.

استفاده ساده
sales = [
    ("apple", 100),
    ("banana", 50),
    ("apple", 150),
    ("banana", 75),
    ("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)

# گروه‌بندی فروش هر محصول
grouped = sales_rdd.groupByKey()

# تبدیل ResultIterable به لیست
grouped_list = grouped.mapValues(list)
print("فروش گروه‌بندی‌شده:", grouped_list.collect())
# خروجی: [('banana', [50, 75]), ('apple', [100, 150, 200])]
محاسبه آمار پس از گروه‌بندی
scores = [
    ("Alice", 85),
    ("Bob", 90),
    ("Alice", 92),
    ("Bob", 88),
    ("Alice", 78),
    ("Bob", 95)
]
scores_rdd = spark.sparkContext.parallelize(scores)

# میانگین نمره هر دانش‌آموز
def calculate_average(values):
    values_list = list(values)
    return sum(values_list) / len(values_list)

avg_scores = scores_rdd.groupByKey() \
                       .mapValues(calculate_average)

print("میانگین نمرات:")
for name, avg in avg_scores.collect():
    print(f"  {name}: {avg:.2f}")
⚠️ هشدار عملکرد: groupByKey()

groupByKey() برای داده‌های بزرگ می‌تواند ناکارآمد باشد:

# ❌ ناکارآمد
inefficient = sales_rdd.groupByKey() \
                       .mapValues(lambda values: sum(values))

# ✅ کارآمد
efficient = sales_rdd.reduceByKey(lambda a, b: a + b)

# هر دو نتیجه یکسان می‌دهند، اما reduceByKey بسیار سریع‌تر است!

دلیل؟ groupByKey() تمام داده‌ها را shuffle می‌کند، در حالی که reduceByKey() ابتدا در هر partition کاهش محلی انجام می‌دهد.


۴. مقایسه reduceByKey و groupByKey

مقایسه تصویری
reduceByKey:
Partition 1: (a,1), (b,2), (a,3) → کاهش محلی → (a,4), (b,2)
Partition 2: (a,2), (b,1), (b,3) → کاهش محلی → (a,2), (b,4)
            ↓ Shuffle (فقط مقادیر کاهش‌یافته)
نهایی: (a,6), (b,6)

groupByKey:
Partition 1: (a,1), (b,2), (a,3)
Partition 2: (a,2), (b,1), (b,3)
            ↓ Shuffle (تمام مقادیر!)
نهایی: (a,[1,3,2]), (b,[2,1,3]) → سپس کاهش در mapValues
مقایسه بنچمارک
import time

# ساخت دیتاست بزرگ
large_data = []
for i in range(10_000_000):
    key = f"key_{i % 100}"
    large_data.append((key, i))

large_rdd = spark.sparkContext.parallelize(large_data, 10)

# تست reduceByKey
start = time.time()
result1 = large_rdd.reduceByKey(lambda a, b: a + b).count()
time1 = time.time() - start

# تست groupByKey + mapValues
start = time.time()
result2 = large_rdd.groupByKey().mapValues(sum).count()
time2 = time.time() - start

print(f"زمان reduceByKey: {time1:.3f} ثانیه")
print(f"زمان groupByKey: {time2:.3f} ثانیه")
print(f"سرعت بیشتر: {time2/time1:.2f}x")

۵. aggregateByKey() – تجمیع پیشرفته

aggregateByKey() انعطاف‌پذیرتر از reduceByKey() است و اجازه می‌دهد نوع ورودی و خروجی متفاوت باشد.

Syntax
rdd.aggregateByKey(
    zeroValue,    # مقدار اولیه برای محاسبات لازم برای هر کلید
    seqFunc,      # ترکیب مقادیر داخل یک پارتیشن
    combFunc      # ترکیب نتایج بین پارتیشن ها
)
مثال: محاسبه میانگین
scores = [
    ("Alice", 85),
    ("Bob", 90),
    ("Alice", 92),
    ("Bob", 88),
    ("Alice", 78)
]
scores_rdd = spark.sparkContext.parallelize(scores, 2)

# محاسبه میانگین با aggregateByKey
avg_scores = scores_rdd.aggregateByKey(
    (۰, ۰),  # اولیه: (جمع, تعداد)
    lambda acc, value: (acc[0] + value, acc[1] + 1),  # داخل partition
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])  # بین partitionها
).mapValues(lambda x: x[0] / x[1])

print("میانگین نمرات:")
for name, avg in avg_scores.collect():
    print(f"  {name}: {avg:.2f}")
مثال: جمع‌آوری چندین آمار

فرض کنید همزمان نیاز داریم که اطلاعات آماری متناظر با هر کلید را محاسبه کنیم. یعنی مقادیر آن کلید را گروه بندی کرده و سپس اطلاعات آماری زیر را محاسبه کنیم.

def seq_func(acc, value):
    return (
        acc[0] + value,           # sum
        acc[1] + 1,               # count
        min(acc[2], value),       # min
        max(acc[3], value)        # max
    )

def comb_func(acc1, acc2):
    return (
        acc1[0] + acc2[0],
        acc1[1] + acc2[1],
        min(acc1[2], acc2[2]),
        max(acc1[3], acc2[3])
    )

import sys
stats = scores_rdd.aggregateByKey(
    (۰, ۰, sys.maxsize, -sys.maxsize),  # (sum, count, min, max)
    seq_func,
    comb_func
).mapValues(lambda x: {
    'sum': x[0],
    'count': x[1],
    'min': x[2],
    'max': x[3],
    'avg': x[0] / x[1]
})

print("آمار کامل:")
for name, stat in stats.collect():
    print(f"  {name}: {stat}")

۶. countByValue() و countByKey()

countByValue() – شمارش رخدادها

شمارش تعداد تکرار هر عنصر:

fruits = ["apple", "banana", "apple", "orange", "banana", "apple"]
fruits_rdd = spark.sparkContext.parallelize(fruits)

counts = fruits_rdd.countByValue()
print("تعداد میوه‌ها:", dict(counts))
# خروجی: {'apple': 3, 'banana': 2, 'orange': 1}
countByKey() – شمارش مقادیر هر key
sales = [
    ("apple", 100),
    ("banana", 50),
    ("apple", 150),
    ("banana", 75),
    ("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)

transaction_counts = sales_rdd.countByKey()
print("تعداد تراکنش هر محصول:", dict(transaction_counts))
# خروجی: {'apple': 3, 'banana': 2}
مثال عملی: تحلیل لاگ
logs = [
    ("ERROR", "Database connection failed"),
    ("INFO", "User logged in"),
    ("ERROR", "Timeout occurred"),
    ("WARNING", "High memory usage"),
    ("ERROR", "Invalid input"),
    ("INFO", "Request processed")
]
logs_rdd = spark.sparkContext.parallelize(logs)

# شمارش بر اساس سطح لاگ
level_counts = logs_rdd.countByKey()
print("تعداد هر سطح لاگ:")
for level, count in level_counts.items():
    print(f"  {level}: {count}")

۷. mapValues() و flatMapValues()

mapValues() – تغییر فقط مقادیر

تغییر مقادیر بدون تأثیر روی key (بدون shuffle):

prices = [("apple", 100), ("banana", 50), ("orange", 75)]
prices_rdd = spark.sparkContext.parallelize(prices)

# اعمال تخفیف ۱۰٪
discounted = prices_rdd.mapValues(lambda price: price * 0.9)
print("قیمت با تخفیف:", discounted.collect())

# افزودن واحد پول
with_currency = prices_rdd.mapValues(lambda price: f"${price}")
print("با واحد پول:", with_currency.collect())
flatMapValues() – تغییر و flatten کردن مقادیر
enrollments = [
    ("Alice", ["Math", "Physics"]),
    ("Bob", ["Chemistry", "Biology", "Math"]),
    ("Charlie", ["Physics"])
]
enrollments_rdd = spark.sparkContext.parallelize(enrollments)

# flatten کردن درس‌ها با حفظ نام دانش‌آموز
student_courses = enrollments_rdd.flatMapValues(lambda courses: courses)
print("جفت دانش‌آموز-درس:", student_courses.collect())

۸. sortByKey() – مرتب‌سازی Pair RDDها

word_counts = [("spark", 10), ("python", 15), ("hadoop", 8), ("scala", 12)]
word_counts_rdd = spark.sparkContext.parallelize(word_counts)

# مرتب‌سازی بر اساس key (الفبایی)
sorted_by_word = word_counts_rdd.sortByKey()
print("مرتب‌شده بر اساس کلمه:", sorted_by_word.collect())

# مرتب‌سازی معکوس
sorted_reverse = word_counts_rdd.sortByKey(ascending=False)
print("مرتب‌سازی معکوس:", sorted_reverse.collect())

# مرتب‌سازی بر اساس مقدار
sorted_by_count = word_counts_rdd.map(lambda x: (x[1], x[0])) \
                                 .sortByKey(ascending=False) \
                                 .map(lambda x: (x[1], x[0]))
print("مرتب‌شده بر اساس تعداد:", sorted_by_count.collect())

۹. keys() و values() – استخراج کلیدها یا مقادیر

pairs = [("a", 1), ("b", 2), ("c", 3)]
pairs_rdd = spark.sparkContext.parallelize(pairs)

# تمام کلیدها
keys = pairs_rdd.keys()
print("کلیدها:", keys.collect())

# تمام مقادیر
values = pairs_rdd.values()
print("مقادیر:", values.collect())

# کلیدهای منحصربه‌فرد
unique_keys = pairs_rdd.keys().distinct()
print("کلیدهای یکتا:", unique_keys.collect())

۱۰. مثال‌های عملی

مثال ۱: تحلیل فروش
sales_data = [
    ("۲۰۲۴-۰۱", "ProductA", 1000),
    ("۲۰۲۴-۰۱", "ProductB", 1500),
    ("۲۰۲۴-۰۲", "ProductA", 1200),
    ("۲۰۲۴-۰۲", "ProductB", 1300),
    ("۲۰۲۴-۰۱", "ProductA", 800)
]

sales_rdd = spark.sparkContext.parallelize(sales_data)

# فروش کل هر ماه
monthly_sales = sales_rdd.map(lambda x: (x[0], x[2])) \
                         .reduceByKey(lambda a, b: a + b) \
                         .sortByKey()

print("فروش ماهانه:")
for month, total in monthly_sales.collect():
    print(f"  {month}: ${total}")

# فروش کل هر محصول
product_sales = sales_rdd.map(lambda x: (x[1], x[2])) \
                         .reduceByKey(lambda a, b: a + b)

print("\nفروش محصولات:")
for product, total in product_sales.collect():
    print(f"  {product}: ${total}")
مثال ۲: تحلیل نمرات
grades = [
    ("Alice", "Math", 85),
    ("Alice", "Physics", 92),
    ("Bob", "Math", 78),
    ("Bob", "Physics", 88),
    ("Charlie", "Math", 95),
    ("Charlie", "Physics", 91)
]
grades_rdd = spark.sparkContext.parallelize(grades)

# میانگین هر دانش‌آموز
student_avgs = grades_rdd.map(lambda x: (x[0], x[2])) \
                         .aggregateByKey(
                             (۰, ۰),
                             lambda acc, v: (acc[0] + v, acc[1] + 1),
                             lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
                         ) \
                         .mapValues(lambda x: x[0] / x[1])

print("میانگین دانش‌آموزان:")
for student, avg in student_avgs.collect():
    print(f"  {student}: {avg:.2f}")

# میانگین هر درس
subject_avgs = grades_rdd.map(lambda x: (x[1], x[2])) \
                         .aggregateByKey(
                             (۰, ۰),
                             lambda acc, v: (acc[0] + v, acc[1] + 1),
                             lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
                         ) \
                         .mapValues(lambda x: x[0] / x[1])

print("\nمیانگین دروس:")
for subject, avg in subject_avgs.collect():
    print(f"  {subject}: {avg:.2f}")

۱۱. تمرین‌ها

تمرین ۱: تحلیل فروشگاه آنلاین

داده تراکنش‌ها:

transactions = [
    ("user1", "laptop", 1200),
    ("user2", "phone", 800),
    ("user1", "mouse", 25),
    ("user3", "laptop", 1200),
    ("user2", "keyboard", 75)
]

محاسبه کنید:

  1. مجموع هزینه هر کاربر
  2. درآمد کل هر محصول
  3. میانگین ارزش تراکنش هر کاربر
  4. تعداد خرید هر کاربر
تمرین ۲: پردازش لاگ

لاگ‌های سرور:

logs = [
    ("server1", "ERROR", "10:00"),
    ("server2", "INFO", "10:01"),
    ("server1", "WARNING", "10:02"),
    ("server1", "ERROR", "10:03"),
    ("server2", "ERROR", "10:04")
]

پیدا کنید:

  1. شمارش هر سطح لاگ
  2. شمارش سطوح لاگ برای هر سرور
  3. سرورهایی که بیش از ۱ خطا دارند
تمرین ۳: تحلیل متن (bigrams)

متن:

text = "the quick brown fox jumps over the lazy dog"
  1. ساخت bigramها (جفت کلمات متوالی)
  2. شمارش هر bigram
  3. پیدا کردن پرتکرارترین bigram

۱۲. نکات کلیدی

Pair RDDها عملیات مبتنی بر key را ممکن می‌کنند

reduceByKey() معمولاً بهتر از groupByKey() است

aggregateByKey() برای aggregationهای پیچیده

mapValues() مقادیر را بدون shuffle تغییر می‌دهد

countByKey/Value() برای تحلیل فرکانس

انتخاب عملیات مناسب تأثیر زیادی روی عملکرد دارد


۱۳. نکات عملکردی

موقعیتاستفاده کنیداستفاده نکنید
جمع بر اساس keyreduceByKey()groupByKey().mapValues(sum)
شمارش بر اساس keycountByKey()groupByKey().mapValues(len)
تغییر فقط مقادیرmapValues()map(lambda x: (x[0], f(x[1])))
آمار پیچیدهaggregateByKey()groupByKey() سپس محاسبه

محتوای ویدئویی

فیلم آموزشی بخش Pair RDD در قسمت زیر قابل مشاهده است.

نکته : مطمئن شوید که با آی پی ایران متصل هستید که مشکلی در نمایش فیلم نداشته باشید .

د

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید