Pair RDDها، RDDهایی هستند که هر عنصر آنها یک tuple از نوع (key, value) است:
spark = SparkSession.builder \
.appName('RDD-Part3-PairRDDs') \
.master('spark://Mojtaba.localdomain:7077') \
.config('spark.driver.memory', '1024M') \
.config('spark.executor.memory', '1024M') \
.config('spark.executor.cores', '1') \
.getOrCreate()
# مثالهایی از Pair RDD
user_ages = [("Alice", 28), ("Bob", 35), ("Charlie", 42)]
word_counts = [("spark", 10), ("python", 15), ("hadoop", 8)]
scores = [("math", 95), ("science", 88), ("math", 92)]
# روش ۱: از لیست tupleها
pairs = [("apple", 5), ("banana", 3), ("apple", 7)]
pairs_rdd = spark.sparkContext.parallelize(pairs)
print(pairs_rdd.collect())
# روش ۲: با استفاده از map
numbers = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
pairs_from_map = numbers.map(lambda x: (x, x ** 2))
print(pairs_from_map.collect())
# روش ۳: از دادههای متنی
text = ["apple red", "banana yellow", "apple green"]
text_rdd = spark.sparkContext.parallelize(text)
word_color_pairs = text_rdd.map(lambda line: tuple(line.split()))
print(word_color_pairs.collect())
reduceByKey() مقادیر مربوط به یک key را با استفاده از یک تابع ترکیب میکند.
# داده فروش: (محصول, مبلغ)
sales = [
("apple", 100),
("banana", 50),
("apple", 150),
("banana", 75),
("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)
# جمع فروش هر محصول
total_sales = sales_rdd.reduceByKey(lambda a, b: a + b)
print("جمع فروش هر محصول:", total_sales.collect())
# خروجی: [('banana', 125), ('apple', 450)]
scores = [
("Alice", 85),
("Bob", 90),
("Alice", 92),
("Bob", 88),
("Alice", 78)
]
scores_rdd = spark.sparkContext.parallelize(scores)
# بیشترین نمره هر دانشآموز
max_scores = scores_rdd.reduceByKey(lambda a, b: max(a, b))
print("بیشترین نمره:", max_scores.collect())
# کمترین نمره هر دانشآموز
min_scores = scores_rdd.reduceByKey(lambda a, b: min(a, b))
print("کمترین نمره:", min_scores.collect())
# شمارش رخدادها (روش بهینه نیست، اما مفهوم را نشان میدهد)
count_scores = scores_rdd.mapValues(lambda x: 1).reduceByKey(lambda a, b: a + b)
print("تعداد نمره هر دانشآموز:", count_scores.collect())
text = [
"apache spark is great",
"spark makes big data easy",
"apache hadoop and spark work together"
]
text_rdd = spark.sparkContext.parallelize(text)
# Word Count معروف
word_counts = text_rdd.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: -x[1])
print("تعداد کلمات:")
for word, count in word_counts.collect():
print(f" {word}: {count}")
groupByKey() تمام مقادیر یک key را در یک iterable جمعآوری میکند.
sales = [
("apple", 100),
("banana", 50),
("apple", 150),
("banana", 75),
("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)
# گروهبندی فروش هر محصول
grouped = sales_rdd.groupByKey()
# تبدیل ResultIterable به لیست
grouped_list = grouped.mapValues(list)
print("فروش گروهبندیشده:", grouped_list.collect())
# خروجی: [('banana', [50, 75]), ('apple', [100, 150, 200])]
scores = [
("Alice", 85),
("Bob", 90),
("Alice", 92),
("Bob", 88),
("Alice", 78),
("Bob", 95)
]
scores_rdd = spark.sparkContext.parallelize(scores)
# میانگین نمره هر دانشآموز
def calculate_average(values):
values_list = list(values)
return sum(values_list) / len(values_list)
avg_scores = scores_rdd.groupByKey() \
.mapValues(calculate_average)
print("میانگین نمرات:")
for name, avg in avg_scores.collect():
print(f" {name}: {avg:.2f}")
groupByKey() برای دادههای بزرگ میتواند ناکارآمد باشد:
# ❌ ناکارآمد
inefficient = sales_rdd.groupByKey() \
.mapValues(lambda values: sum(values))
# ✅ کارآمد
efficient = sales_rdd.reduceByKey(lambda a, b: a + b)
# هر دو نتیجه یکسان میدهند، اما reduceByKey بسیار سریعتر است!
دلیل؟ groupByKey() تمام دادهها را shuffle میکند، در حالی که reduceByKey() ابتدا در هر partition کاهش محلی انجام میدهد.
reduceByKey:
Partition 1: (a,1), (b,2), (a,3) → کاهش محلی → (a,4), (b,2)
Partition 2: (a,2), (b,1), (b,3) → کاهش محلی → (a,2), (b,4)
↓ Shuffle (فقط مقادیر کاهشیافته)
نهایی: (a,6), (b,6)
groupByKey:
Partition 1: (a,1), (b,2), (a,3)
Partition 2: (a,2), (b,1), (b,3)
↓ Shuffle (تمام مقادیر!)
نهایی: (a,[1,3,2]), (b,[2,1,3]) → سپس کاهش در mapValues
import time
# ساخت دیتاست بزرگ
large_data = []
for i in range(10_000_000):
key = f"key_{i % 100}"
large_data.append((key, i))
large_rdd = spark.sparkContext.parallelize(large_data, 10)
# تست reduceByKey
start = time.time()
result1 = large_rdd.reduceByKey(lambda a, b: a + b).count()
time1 = time.time() - start
# تست groupByKey + mapValues
start = time.time()
result2 = large_rdd.groupByKey().mapValues(sum).count()
time2 = time.time() - start
print(f"زمان reduceByKey: {time1:.3f} ثانیه")
print(f"زمان groupByKey: {time2:.3f} ثانیه")
print(f"سرعت بیشتر: {time2/time1:.2f}x")
aggregateByKey() انعطافپذیرتر از reduceByKey() است و اجازه میدهد نوع ورودی و خروجی متفاوت باشد.
rdd.aggregateByKey(
zeroValue, # مقدار اولیه برای محاسبات لازم برای هر کلید
seqFunc, # ترکیب مقادیر داخل یک پارتیشن
combFunc # ترکیب نتایج بین پارتیشن ها
)
scores = [
("Alice", 85),
("Bob", 90),
("Alice", 92),
("Bob", 88),
("Alice", 78)
]
scores_rdd = spark.sparkContext.parallelize(scores, 2)
# محاسبه میانگین با aggregateByKey
avg_scores = scores_rdd.aggregateByKey(
(۰, ۰), # اولیه: (جمع, تعداد)
lambda acc, value: (acc[0] + value, acc[1] + 1), # داخل partition
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]) # بین partitionها
).mapValues(lambda x: x[0] / x[1])
print("میانگین نمرات:")
for name, avg in avg_scores.collect():
print(f" {name}: {avg:.2f}")
فرض کنید همزمان نیاز داریم که اطلاعات آماری متناظر با هر کلید را محاسبه کنیم. یعنی مقادیر آن کلید را گروه بندی کرده و سپس اطلاعات آماری زیر را محاسبه کنیم.
def seq_func(acc, value):
return (
acc[0] + value, # sum
acc[1] + 1, # count
min(acc[2], value), # min
max(acc[3], value) # max
)
def comb_func(acc1, acc2):
return (
acc1[0] + acc2[0],
acc1[1] + acc2[1],
min(acc1[2], acc2[2]),
max(acc1[3], acc2[3])
)
import sys
stats = scores_rdd.aggregateByKey(
(۰, ۰, sys.maxsize, -sys.maxsize), # (sum, count, min, max)
seq_func,
comb_func
).mapValues(lambda x: {
'sum': x[0],
'count': x[1],
'min': x[2],
'max': x[3],
'avg': x[0] / x[1]
})
print("آمار کامل:")
for name, stat in stats.collect():
print(f" {name}: {stat}")
شمارش تعداد تکرار هر عنصر:
fruits = ["apple", "banana", "apple", "orange", "banana", "apple"]
fruits_rdd = spark.sparkContext.parallelize(fruits)
counts = fruits_rdd.countByValue()
print("تعداد میوهها:", dict(counts))
# خروجی: {'apple': 3, 'banana': 2, 'orange': 1}
sales = [
("apple", 100),
("banana", 50),
("apple", 150),
("banana", 75),
("apple", 200)
]
sales_rdd = spark.sparkContext.parallelize(sales)
transaction_counts = sales_rdd.countByKey()
print("تعداد تراکنش هر محصول:", dict(transaction_counts))
# خروجی: {'apple': 3, 'banana': 2}
logs = [
("ERROR", "Database connection failed"),
("INFO", "User logged in"),
("ERROR", "Timeout occurred"),
("WARNING", "High memory usage"),
("ERROR", "Invalid input"),
("INFO", "Request processed")
]
logs_rdd = spark.sparkContext.parallelize(logs)
# شمارش بر اساس سطح لاگ
level_counts = logs_rdd.countByKey()
print("تعداد هر سطح لاگ:")
for level, count in level_counts.items():
print(f" {level}: {count}")
تغییر مقادیر بدون تأثیر روی key (بدون shuffle):
prices = [("apple", 100), ("banana", 50), ("orange", 75)]
prices_rdd = spark.sparkContext.parallelize(prices)
# اعمال تخفیف ۱۰٪
discounted = prices_rdd.mapValues(lambda price: price * 0.9)
print("قیمت با تخفیف:", discounted.collect())
# افزودن واحد پول
with_currency = prices_rdd.mapValues(lambda price: f"${price}")
print("با واحد پول:", with_currency.collect())
enrollments = [
("Alice", ["Math", "Physics"]),
("Bob", ["Chemistry", "Biology", "Math"]),
("Charlie", ["Physics"])
]
enrollments_rdd = spark.sparkContext.parallelize(enrollments)
# flatten کردن درسها با حفظ نام دانشآموز
student_courses = enrollments_rdd.flatMapValues(lambda courses: courses)
print("جفت دانشآموز-درس:", student_courses.collect())
word_counts = [("spark", 10), ("python", 15), ("hadoop", 8), ("scala", 12)]
word_counts_rdd = spark.sparkContext.parallelize(word_counts)
# مرتبسازی بر اساس key (الفبایی)
sorted_by_word = word_counts_rdd.sortByKey()
print("مرتبشده بر اساس کلمه:", sorted_by_word.collect())
# مرتبسازی معکوس
sorted_reverse = word_counts_rdd.sortByKey(ascending=False)
print("مرتبسازی معکوس:", sorted_reverse.collect())
# مرتبسازی بر اساس مقدار
sorted_by_count = word_counts_rdd.map(lambda x: (x[1], x[0])) \
.sortByKey(ascending=False) \
.map(lambda x: (x[1], x[0]))
print("مرتبشده بر اساس تعداد:", sorted_by_count.collect())
pairs = [("a", 1), ("b", 2), ("c", 3)]
pairs_rdd = spark.sparkContext.parallelize(pairs)
# تمام کلیدها
keys = pairs_rdd.keys()
print("کلیدها:", keys.collect())
# تمام مقادیر
values = pairs_rdd.values()
print("مقادیر:", values.collect())
# کلیدهای منحصربهفرد
unique_keys = pairs_rdd.keys().distinct()
print("کلیدهای یکتا:", unique_keys.collect())
sales_data = [
("۲۰۲۴-۰۱", "ProductA", 1000),
("۲۰۲۴-۰۱", "ProductB", 1500),
("۲۰۲۴-۰۲", "ProductA", 1200),
("۲۰۲۴-۰۲", "ProductB", 1300),
("۲۰۲۴-۰۱", "ProductA", 800)
]
sales_rdd = spark.sparkContext.parallelize(sales_data)
# فروش کل هر ماه
monthly_sales = sales_rdd.map(lambda x: (x[0], x[2])) \
.reduceByKey(lambda a, b: a + b) \
.sortByKey()
print("فروش ماهانه:")
for month, total in monthly_sales.collect():
print(f" {month}: ${total}")
# فروش کل هر محصول
product_sales = sales_rdd.map(lambda x: (x[1], x[2])) \
.reduceByKey(lambda a, b: a + b)
print("\nفروش محصولات:")
for product, total in product_sales.collect():
print(f" {product}: ${total}")
grades = [
("Alice", "Math", 85),
("Alice", "Physics", 92),
("Bob", "Math", 78),
("Bob", "Physics", 88),
("Charlie", "Math", 95),
("Charlie", "Physics", 91)
]
grades_rdd = spark.sparkContext.parallelize(grades)
# میانگین هر دانشآموز
student_avgs = grades_rdd.map(lambda x: (x[0], x[2])) \
.aggregateByKey(
(۰, ۰),
lambda acc, v: (acc[0] + v, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
) \
.mapValues(lambda x: x[0] / x[1])
print("میانگین دانشآموزان:")
for student, avg in student_avgs.collect():
print(f" {student}: {avg:.2f}")
# میانگین هر درس
subject_avgs = grades_rdd.map(lambda x: (x[1], x[2])) \
.aggregateByKey(
(۰, ۰),
lambda acc, v: (acc[0] + v, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
) \
.mapValues(lambda x: x[0] / x[1])
print("\nمیانگین دروس:")
for subject, avg in subject_avgs.collect():
print(f" {subject}: {avg:.2f}")
داده تراکنشها:
transactions = [
("user1", "laptop", 1200),
("user2", "phone", 800),
("user1", "mouse", 25),
("user3", "laptop", 1200),
("user2", "keyboard", 75)
]
محاسبه کنید:
لاگهای سرور:
logs = [
("server1", "ERROR", "10:00"),
("server2", "INFO", "10:01"),
("server1", "WARNING", "10:02"),
("server1", "ERROR", "10:03"),
("server2", "ERROR", "10:04")
]
پیدا کنید:
متن:
text = "the quick brown fox jumps over the lazy dog"
✅ Pair RDDها عملیات مبتنی بر key را ممکن میکنند
✅ reduceByKey() معمولاً بهتر از groupByKey() است
✅ aggregateByKey() برای aggregationهای پیچیده
✅ mapValues() مقادیر را بدون shuffle تغییر میدهد
✅ countByKey/Value() برای تحلیل فرکانس
✅ انتخاب عملیات مناسب تأثیر زیادی روی عملکرد دارد
| موقعیت | استفاده کنید | استفاده نکنید |
|---|---|---|
| جمع بر اساس key | reduceByKey() | groupByKey().mapValues(sum) |
| شمارش بر اساس key | countByKey() | groupByKey().mapValues(len) |
| تغییر فقط مقادیر | mapValues() | map(lambda x: (x[0], f(x[1]))) |
| آمار پیچیده | aggregateByKey() | groupByKey() سپس محاسبه |
فیلم آموزشی بخش Pair RDD در قسمت زیر قابل مشاهده است.
نکته : مطمئن شوید که با آی پی ایران متصل هستید که مشکلی در نمایش فیلم نداشته باشید .
د