PySpark Data Skew Handling – Complete Guide

 




🔴 1. Problem Statement: Skewed Aggregation

df.groupBy("user_id").count()

❗ Issue

  • One user_id contains ~40% of total data
  • Spark sends same key → same partition
  • Result:
    • One task becomes extremely heavy
    • Other tasks finish early
    • Straggler problem → slow job

🧠 2. Why Skew Happens

Spark distributes data based on keys:

user_id = A → goes to one partition
user_id = B → another partition

If:

A = 40% of data

Then:

Partition for A = huge → bottleneck

🔍 3. How to Identify Skew (Practical Approach)

✅ Method 1: Distribution Check

from pyspark.sql.functions import count

df.groupBy("user_id") \
.agg(count("*").alias("cnt")) \
.orderBy("cnt", ascending=False) \
.show(10)

👉 Example Output:

user_id cnt
A 40,000,000 ← skew
B 1,000
C 900

✅ Method 2: Percentile Analysis

df.groupBy("user_id") \
.count() \
.selectExpr(
"percentile(count, array(0.5, 0.9, 0.99)) as percentiles"
).show(truncate=False)

👉 Example:

[100, 500, 1000000]
  • Median = 100
  • 99th percentile = 1,000,000 → 🚨 skew confirmed

✅ Method 3: Spark UI (Production Standard)

Go to:

Spark UI → Stages → Tasks

Look for:

  • One task running much longer
  • Uneven input sizes (GB vs MB)

🛠️ 4. Solution: Salting Technique

🧠 Concept

Split heavy key into multiple sub-keys → distribute load


🔧 Step 1: Add Salt Column

from pyspark.sql.functions import rand, floor

num_salts = 10

df_salted = df.withColumn(
"salt",
floor(rand() * num_salts)
)

👉 Transforms:

A → (A,0), (A,1), (A,2)...(A,9)

🔧 Step 2: Partial Aggregation

df_partial = df_salted.groupBy("user_id", "salt").count()

👉 Heavy key is now split across multiple partitions


🔧 Step 3: Final Aggregation

from pyspark.sql.functions import sum

df_final = df_partial.groupBy("user_id") \
.agg(sum("count").alias("count"))

📌 Full Code

from pyspark.sql.functions import rand, floor, sum

num_salts = 10

df_salted = df.withColumn("salt", floor(rand() * num_salts))

df_partial = df_salted.groupBy("user_id", "salt").count()

df_final = df_partial.groupBy("user_id") \
.agg(sum("count").alias("count"))

🚀 5. Why Salting Works

Without SaltingWith Salting
One partition handles key AKey A split across 10 partitions
SlowParallel
Straggler taskBalanced workload

⚖️ 6. Trade-offs of Salting

👍 Advantages

  • Fixes skew effectively
  • Improves parallelism
  • Reduces stragglers

👎 Disadvantages

1. Extra Computation

  • Two aggregations instead of one

2. More Shuffle

  • Additional column (salt)

3. Configuration Required

  • Need to choose num_salts

4. Not Always Needed

  • Overkill for small skew

🎯 Rule of Thumb

num_salts ≈ 5–10 (start small, tune)

⚡ 7. AQE (Adaptive Query Execution)

✅ Enable AQE

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

🔥 8. What AQE Actually Does

AQE can:

  • Detect skew at runtime
  • Split large shuffle partitions
  • Optimize joins

❗ 9. Critical Limitation of AQE

OperationAQE Effectiveness
JOIN✅ Handles skew well
GROUP BY❌ Limited

⚠️ Why AQE Fails for Aggregation

Example:

user_id = A → 40% data

👉 Spark must:

  • Bring all A records together
  • Maintain correctness

👉 So:

Cannot split A across reducers automatically

✅ 10. Do You Still Need Skew Detection?

✔️ YES

percentile(count, array(0.5, 0.9, 0.99))

Reason:

  • AQE does NOT show skewed keys
  • AQE does NOT provide distribution insights
  • Manual analysis is required

✅ 11. Do You Still Need Salting?

ScenarioNeed Salting
Skewed JOIN❌ Usually AQE handles
Skewed GROUP BY✅ YES
Extreme skew (40%+)✅ MUST
Mild skew❌ Optional

🧠 12. Real-World Decision Framework

Step 1: Enable AQE

spark.conf.set("spark.sql.adaptive.enabled", "true")

Step 2: Detect Skew

df.groupBy("user_id").count().orderBy("count", ascending=False).show()

Step 3: Decide

  • JOIN skew → AQE sufficient
  • Aggregation skew → Use salting

💡 13. Senior-Level Insights

  • Common skew keys:
    • NULL
    • Default values
    • Country = 'US'
  • Always validate via Spark UI
  • AQE is optimization, not logic correction

🎯 14. Final Interview Answer

“I identify skew using groupBy distribution and Spark UI task imbalance. For skewed aggregations, I apply salting—adding a random suffix to distribute heavy keys, performing partial aggregation, and then re-aggregating. While AQE helps with skewed joins, it does not fully solve aggregation skew because all records of a key must be processed together. Therefore, salting is still required in such cases.”

Comments

Popular posts from this blog

TIME-SERIES SQL

SCD TYPE 2 – INTERVIEW QUESTIONS + MERGE CODE

PySpark Interview Questions with Detailed Answers