PySpark Data Skew Handling – Complete Guide
🔴 1. Problem Statement: Skewed Aggregation
df.groupBy("user_id").count()
❗ Issue
-
One
user_idcontains ~40% of total data - Spark sends same key → same partition
-
Result:
- One task becomes extremely heavy
- Other tasks finish early
- Straggler problem → slow job
🧠 2. Why Skew Happens
Spark distributes data based on keys:
user_id = A → goes to one partition
user_id = B → another partition
If:
A = 40% of data
Then:
Partition for A = huge → bottleneck
🔍 3. How to Identify Skew (Practical Approach)
✅ Method 1: Distribution Check
from pyspark.sql.functions import count
df.groupBy("user_id") \
.agg(count("*").alias("cnt")) \
.orderBy("cnt", ascending=False) \
.show(10)
👉 Example Output:
user_id cnt
A 40,000,000 ← skew
B 1,000
C 900
✅ Method 2: Percentile Analysis
df.groupBy("user_id") \
.count() \
.selectExpr(
"percentile(count, array(0.5, 0.9, 0.99)) as percentiles"
).show(truncate=False)
👉 Example:
[100, 500, 1000000]
- Median = 100
- 99th percentile = 1,000,000 → 🚨 skew confirmed
✅ Method 3: Spark UI (Production Standard)
Go to:
Spark UI → Stages → Tasks
Look for:
- One task running much longer
- Uneven input sizes (GB vs MB)
🛠️ 4. Solution: Salting Technique
🧠 Concept
Split heavy key into multiple sub-keys → distribute load
🔧 Step 1: Add Salt Column
from pyspark.sql.functions import rand, floor
num_salts = 10
df_salted = df.withColumn(
"salt",
floor(rand() * num_salts)
)
👉 Transforms:
A → (A,0), (A,1), (A,2)...(A,9)
🔧 Step 2: Partial Aggregation
df_partial = df_salted.groupBy("user_id", "salt").count()
👉 Heavy key is now split across multiple partitions
🔧 Step 3: Final Aggregation
from pyspark.sql.functions import sum
df_final = df_partial.groupBy("user_id") \
.agg(sum("count").alias("count"))
📌 Full Code
from pyspark.sql.functions import rand, floor, sum
num_salts = 10
df_salted = df.withColumn("salt", floor(rand() * num_salts))
df_partial = df_salted.groupBy("user_id", "salt").count()
df_final = df_partial.groupBy("user_id") \
.agg(sum("count").alias("count"))
🚀 5. Why Salting Works
| Without Salting | With Salting |
|---|---|
| One partition handles key A | Key A split across 10 partitions |
| Slow | Parallel |
| Straggler task | Balanced workload |
⚖️ 6. Trade-offs of Salting
👍 Advantages
- Fixes skew effectively
- Improves parallelism
- Reduces stragglers
👎 Disadvantages
1. Extra Computation
- Two aggregations instead of one
2. More Shuffle
-
Additional column (
salt)
3. Configuration Required
-
Need to choose
num_salts
4. Not Always Needed
- Overkill for small skew
🎯 Rule of Thumb
num_salts ≈ 5–10 (start small, tune)
⚡ 7. AQE (Adaptive Query Execution)
✅ Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
🔥 8. What AQE Actually Does
AQE can:
- Detect skew at runtime
- Split large shuffle partitions
- Optimize joins
❗ 9. Critical Limitation of AQE
| Operation | AQE Effectiveness |
|---|---|
| JOIN | ✅ Handles skew well |
| GROUP BY | ❌ Limited |
⚠️ Why AQE Fails for Aggregation
Example:
user_id = A → 40% data
👉 Spark must:
- Bring all A records together
- Maintain correctness
👉 So:
Cannot split A across reducers automatically
✅ 10. Do You Still Need Skew Detection?
✔️ YES
percentile(count, array(0.5, 0.9, 0.99))
Reason:
- AQE does NOT show skewed keys
- AQE does NOT provide distribution insights
- Manual analysis is required
✅ 11. Do You Still Need Salting?
| Scenario | Need Salting |
|---|---|
| Skewed JOIN | ❌ Usually AQE handles |
| Skewed GROUP BY | ✅ YES |
| Extreme skew (40%+) | ✅ MUST |
| Mild skew | ❌ Optional |
🧠 12. Real-World Decision Framework
Step 1: Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
Step 2: Detect Skew
df.groupBy("user_id").count().orderBy("count", ascending=False).show()
Step 3: Decide
- JOIN skew → AQE sufficient
- Aggregation skew → Use salting
💡 13. Senior-Level Insights
-
Common skew keys:
-
NULL - Default values
- Country = 'US'
-
- Always validate via Spark UI
- AQE is optimization, not logic correction
🎯 14. Final Interview Answer
“I identify skew using groupBy distribution and Spark UI task imbalance. For skewed aggregations, I apply salting—adding a random suffix to distribute heavy keys, performing partial aggregation, and then re-aggregating. While AQE helps with skewed joins, it does not fully solve aggregation skew because all records of a key must be processed together. Therefore, salting is still required in such cases.”
Comments
Post a Comment