بخش اول: مفاهیم پایه و مبانی اسپارک
بخش دوم: بهینه‌سازی و معماری داخلی اسپارک
بخش سوم : پردازش داده‌های جریانی

آشنایی با RDD ها و انواع پردازش‌ها و اکشن‌های رایج – محتوای ویدئویی


وقتی شما در اسپارک می‌گویید داده‌ها را از یک فایل مثل Parquet، CSV یا HDFS بارگذاری کند، اسپارک این داده‌ها را ابتدا به یک آبجکت از نوع RDD تبدیل می‌کند.

RDD یا Resilient Distributed Dataset، واحد بنیادی داده در اسپارک است که وظیفه دارد داده‌ها را به‌صورت توزیع‌شده و مقاوم در برابر خطا مدیریت کند. RDDها غیرقابل تغییر (Immutable) هستند، یعنی هیچ‌گاه داده‌ها در همان RDD تغییر نمی‌کنند؛ هر بار که عملیاتی روی RDD انجام می‌دهیم، یک RDD جدید ایجاد می‌شود که نتیجه آن محاسبه را در خود دارد.

🔹 RDD در اسپارک چیست؟

RDD (Resilient Distributed Dataset) واحد بنیادی داده در اسپارک است. می‌توان آن را به صورت زیر توصیف کرد:

  • Resilient → تحمل خطا: اگر بخشی از داده یا Task خراب شود، RDD می‌تواند آن را بازسازی کند.
  • Distributed → توزیع شده: داده‌ها روی چندین نود و Executor تقسیم می‌شوند.
  • Dataset → مجموعه داده: RDD همانند یک لیست یا جدول است، اما توزیع شده و قابل پردازش موازی.

هر RDD می‌تواند از فایل‌های HDFS، S3، CSV، یا از RDDهای دیگر ایجاد شود.


🔹 اجزای اصلی یک RDD

  1. List of Partitions
    • داده‌ها در RDD به چند Partition تقسیم می‌شوند، که کوچک‌ترین واحد پردازش موازی در اسپارک هستند.
    • هر Partition یک زیرمجموعه منطقی از داده‌هاست و می‌تواند مستقل توسط Executorهای مختلف پردازش شود.
    • Partitionها همان چیزی هستند که امکان Parallelism و توزیع Taskها روی Workerها را فراهم می‌کنند.
  2. Computation Function
    • این تابع مشخص می‌کند که RDD فرزند یا نتیجه بعدی این RDD چگونه باید ایجاد شود.
    • هر Transformation (مثل map، filter) در واقع یک Computation Function به RDD اضافه می‌کند.
    • هنگام اجرای اکشن، اسپارک این تابع را روی هر Partition اجرا می‌کند تا RDD جدید تولید شود.
    • به عبارت دیگر، Computation Function مسئول تعریف رابطه بین RDDها و ایجاد RDDهای جدید است و باعث می‌شود RDDها Immutable باشند.
  3. Dependencies
    • RDD نگه می‌دارد که چگونه از RDDهای دیگر ایجاد شده و چه وابستگی‌هایی دارد.
    • این ساختار باعث ایجاد lineage DAG می‌شود که برای بازسازی Partitionهای از دست رفته یا بازیابی خطا استفاده می‌شود.
    • این ویژگی، RDD را مقاوم در برابر خطا می‌کند.
  4. Partitioner (اختیاری)
    • برای RDDهای Key-Value، Partitioner مشخص می‌کند داده‌ها چگونه بین Partitionها توزیع شوند.
    • به طور معمول Hash Partitioner استفاده می‌شود، اما می‌توان Partitioning سفارشی هم داشت.
    • Partitioner باعث می‌شود عملیات‌هایی مثل reduceByKey و groupByKey بهینه اجرا شوند.
  5. Preferred Locations (اختیاری)
    • لیستی از مکان‌های ترجیحی برای پردازش هر Partition است، مثل بلوک‌های داده در HDFS.
    • Scheduler با استفاده از این اطلاعات تلاش می‌کند Taskها را روی همان نود یا Executor اجرا کند تا داده کمتر در شبکه جابجا شود.

🔹 نحوه کار RDD و دسترسی از طریق Spark Context

  • RDDها همیشه از طریق Spark Context ایجاد و مدیریت می‌شوند:
rdd = sc.textFile("data.csv")
  • هر Transformation روی RDD (مثلاً map, filter) یک RDD جدید تولید می‌کند و هیچ داده‌ای در RDD اصلی تغییر نمی‌کند.
  • Execution فقط زمانی اتفاق می‌افتد که یک Action (مثل collect, count) صدا زده شود، و این باعث می‌شود Computation Function روی هر Partition اجرا شود.

🔹 نکات کلیدی

  1. Immutability: هیچ RDDی تغییر نمی‌کند؛ هر Transformation یک RDD جدید می‌سازد.
  2. Lazy Evaluation: Transformations فقط RDD جدید می‌سازند و پردازش واقعی با Actions انجام می‌شود.
  3. Parallelism و Partitioning: هر Partition می‌تواند مستقل روی Executorها پردازش شود.
  4. Fault Tolerance: با استفاده از Dependencies و lineage، اسپارک می‌تواند Partitionهای از دست رفته را بازسازی کند.
  5. Optimized Execution: Partitioner و Preferred Locations به اسپارک کمک می‌کنند تا عملیات روی داده‌ها بهینه و نزدیک به محل داده‌ها اجرا شود.

🔹 تفاوت RDD با DataFrame / Dataset

ویژگیRDDDataFrame / Dataset
سطح APIپایین (Functional)سطح بالا (SQL-like)
نوع دادهغیرساخت‌یافته یا ساخت‌یافتهساخت‌یافته
بهینه‌سازیبرنامه‌نویس Catalyst Optimizer خودکار
قابلیت خطادارددارد

🔹 ترنسفورمیشن‌ها (Transformations)

ترنسفورمیشن‌ها عملیات تغییر RDD بدون اجرای فوری هستند.
نتیجه‌ی یک ترنسفورمیشن، یک RDD جدید است و هیچ پردازشی تا زمانی که یک Action صدا زده شود، انجام نمی‌شود (Lazy Evaluation).

ترنسفورمیشن‌های اصلی:
  1. map(func)
    • هر عنصر RDD را با تابع func تبدیل می‌کند
    • مثال:
rdd = sc.parallelize([1, 2, 3])
rdd2 = rdd.map(lambda x: x * 2)  # [2, 4, 6]
  1. filter(func)
    • فقط عناصر که شرط func را دارند نگه می‌دارد
rdd3 = rdd2.filter(lambda x: x > 3)  # [4, 6]
  1. flatMap(func)
    • مشابه map اما اگر داده‌ها به صورت تو در تو باشند، آنها را به صورت Flat تبدیل می کند. مثل در مثال زیر ما دو جمله داریم که تابع Split به ازای هر جمله، آرایه ای از کلمات را بر می گرداند که در این صورت ما یک آرایه اصلی داریم برای دو جمله اولیه و هر جمله هم به یک آرایه از کلمات آن تبدیل می شود. یعنی آرایه درون آرایه . اینجا تابع flatMap بعد از اجرای تابع اصلی که اینجا همان Split است، نتیجه را Flat می کند یعنی آرایه داخلی را حذف میکند تا تنها یک آرایه داشته باشیم.
rdd4 = sc.parallelize(["hello world", "spark"])
rdd5 = rdd4.flatMap(lambda x: x.split(" "))  # ["hello", "world", "spark"]
  1. mapPartitions(func)
    • مشابه map، اما تابع روی کل پارتیشن اجرا می‌شود
    • مناسب وقتی بخواهیم عملیات روی دسته‌ای از داده‌ها انجام دهیم
  2. union(other_rdd)
    • ترکیب دو RDD
  3. distinct()
    • حذف عناصر تکراری
  4. groupByKey() / reduceByKey(func)
    • برای RDDهای کلیدی-مقداری (key-value)
    • reduceByKey عملیات تجمیع را روی هر کلید انجام می‌دهد و بازتوزیع داده‌ها می‌کند

توجه: ترنسفورمیشن‌هایی مانند reduceByKey باعث بازتوزیع داده‌ها بین پارتیشن‌ها می‌شوند.


🔹 اکشن‌ها (Actions)

اکشن‌ها پردازش را اجرا می‌کنند و خروجی به Driver بازمی‌گردد یا روی دیسک ذخیره می‌شود.

اکشن‌های اصلی:
  1. collect()
    • همه داده‌ها را به Driver می‌آورد
    • مناسب داده‌های کوچک
  2. count()
    • تعداد عناصر RDD را برمی‌گرداند
  3. take(n)
    • اولین n عنصر
  4. reduce(func)
    • یک عملیات تجمعی روی کل داده‌ها
rdd.reduce(lambda x, y: x + y)  # جمع کل
  1. saveAsTextFile(path)
    • داده‌ها را روی HDFS یا سیستم فایل ذخیره می‌کند
  2. countByKey()
    • برای RDDهای key-value تعداد هر کلید
  3. foreach(func)
    • اجرای تابع برای هر عنصر، بدون بازگرداندن نتیجه به Driver

🔹 مثال جامع

# ایجاد RDD از یک لیست
rdd = sc.parallelize([("apple", 2), ("banana", 3), ("apple", 4)])

# ترنسفورمیشن: جمع مقدارها بر اساس کلید
rdd2 = rdd.reduceByKey(lambda x, y: x + y)

# ترنسفورمیشن: فقط میوه‌هایی با جمع > 3
rdd3 = rdd2.filter(lambda kv: kv[1] > 3)

# اکشن: نمایش نتیجه
print(rdd3.collect())

خروجی:

[("apple", 6), ("banana", 3)]

توجه کنید: ابتدا ترنسفورمیشن‌ها فقط RDD جدید می‌سازند و پردازش اجرا نمی‌شود.
وقتی collect() صدا زده می‌شود، تمام مراحل Map/Reduce اجرا می‌شوند.


🔹 نکات کلیدی
  1. Lazy Evaluation → ترنسفورمیشن‌ها تا اجرای اکشن اجرا نمی‌شوند
  2. Fault Tolerance → RDD با DAG lineage قادر به بازسازی بخش‌های از دست رفته است
  3. پارتیشن‌بندی → RDD توزیع شده و عملیات‌ها روی پارتیشن‌ها موازی اجرا می‌شوند
  4. بازتوزیع داده‌ها → برخی ترنسفورمیشن‌ها (مثل reduceByKey) باعث جابجایی داده بین پارتیشن‌ها می‌شوند

تکنیک‌های ضروری برای کار حرفه‌ای با RDDها: Cache، Accumulator و Broadcast

حتماً! در ادامه، مفاهیم مهمی که در RDD و Spark کاربرد زیادی دارند و به بهینه‌سازی و مدیریت داده‌ها در کلاستر کمک می‌کنند را به زبان ساده اما دقیق توضیح می‌دهم: Broadcasting، Accumulators و Persist/Cache.


🔹 ۱) Broadcasting

Broadcast Variable وسیله‌ای است برای اشتراک‌گذاری داده‌های ثابت و بزرگ بین همه Executorها بدون کپی چندباره آن‌ها.

  • بدون Broadcast: اگر شما یک جدول کوچک یا لیست مرجع داشته باشید و بخواهید آن را در هر Task استفاده کنید، اسپارک به صورت پیش‌فرض آن داده را برای هر Task ارسال می‌کند → باعث مصرف زیاد حافظه و شبکه می‌شود.
  • با Broadcast، داده یک بار در Driver آماده می‌شود و به صورت بهینه در هر Executor به اشتراک گذاشته می‌شود.

مثال:

# لیست محصولات برای lookup
products = {"A": 100, "B": 200, "C": 300}

# ایجاد Broadcast
bc_products = sc.broadcast(products)

# استفاده در RDD
rdd = sc.parallelize(["A", "B", "C", "A"])
rdd2 = rdd.map(lambda x: bc_products.value[x])
print(rdd2.collect())  # [100, 200, 300, 100]

نکته: Broadcast برای داده‌های ثابت و خواندنی مناسب است و کمک می‌کند از بازتوزیع داده‌های غیرضروری جلوگیری شود.


🔹 ۲) Accumulators

Accumulator وسیله‌ای برای جمع‌آوری مقادیر از Taskهای مختلف به صورت امن و بدون هم‌زمانی (thread-safe) است.

  • به عنوان مثال برای شمارش، جمع‌زدن مقادیر یا جمع‌آوری لاگ‌ها استفاده می‌شود.
  • فقط Driver می‌تواند مقدار نهایی را بخواند، Taskها فقط مقدار اضافه می‌کنند.

مثال:

# ایجاد یک Accumulator
acc = sc.accumulator(0)

# استفاده در RDD
rdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: acc.add(x))

print(acc.value)  # 10

نکته: Accumulatorها برای debug یا آمارگیری مفید هستند و نمی‌توانند داده‌ای برای محاسبات بعدی بازگردانند.


🔹 ۳) Persist / Cache

RDDها به صورت پیش‌فرض Lazy هستند و هر بار که یک Action صدا زده شود، Transformations دوباره اجرا می‌شوند.

  • Cache / Persist به شما اجازه می‌دهد RDDها را در حافظه یا دیسک ذخیره کنید تا اجرای مجدد سریع‌تر شود.

تفاوت Cache و Persist

ویژگیCachePersist
پیش‌فرضMEMORY_ONLYمی‌توان سطح ذخیره‌سازی را مشخص کرد (MEMORY_ONLY، DISK_ONLY، MEMORY_AND_DISK و غیره)
کاربردسریع و سادهکنترل دقیق روی سطح ذخیره‌سازی

مثال:

rdd = sc.parallelize(range(1, 1000000)).map(lambda x: x*2)

# ذخیره در حافظه برای استفاده مجدد
rdd_cached = rdd.cache()

# اولین اکشن → محاسبه و ذخیره
print(rdd_cached.count())

# دومین اکشن → از حافظه خوانده می‌شود، بدون محاسبه مجدد
print(rdd_cached.take(5))

نکته: Cache/Persist برای RDDهایی که چند بار در محاسبات استفاده می‌شوند بسیار کاربردی است و باعث کاهش هزینه پردازش و شبکه می‌شود.


🔹 جمع‌بندی

مفهومکاربرد اصلینکته کلیدی
Broadcastاشتراک داده‌های ثابت بین Executorهافقط یک بار ارسال می‌شود، برای داده‌های کوچک یا متوسط
Accumulatorجمع‌آوری مقادیر از Taskهافقط Driver مقدار نهایی را می‌خواند، برای شمارش و آمارگیری
Persist / Cacheذخیره RDD در حافظه یا دیسک برای استفاده مجددکاهش اجرای مجدد Transformations و بهینه‌سازی پردازش

🎬 محتوای ویدئویی مفاهیم پایه اسپارک – بخش دوم، نگاهی عمیق‌تر به اسپارک

در بخش زیر، مفاهیم عمیق‌تر اسپارک – از جمله RDDها، عملیات رایج روی آن‌ها و سایر مبانی ضروری – در قالب یک ویدئوی آموزشی همراه با اسلایدهای تکمیلی ارائه شده‌اند. هدف این بخش آن است که درک دقیق‌تری از آنچه در پشت صحنه اجرای پردازش‌های داده در اسپارک رخ می‌دهد به شما بدهد. این محتوا نقش یک پیش‌نیاز مفهومی را دارد و شما را برای ورود به بخش عملی و اجرای واقعی پردازش‌ها در اسپارک آماده می‌کند.

⚠️ نکته: اگر ویدئو در قسمت زیر برای شما بارگذاری یا نمایش داده نمی‌شود، ابتدا مطمئن شوید که با آی‌پی ایران صفحه را مشاهده می‌کنید (فیلترشکن خاموش باشد) و در صورت نیاز، یک اینترنت پروایدر دیگر را امتحان کنید.

فروشگاه
جستجو
دوره ها

لطفا کلمات کلیدی را وارد کنید