optimized-union-cover

The Problem: Union function isn’t as Simple as it Seems

Picture this: You have a large dataset that you need to process in different ways, so you:

  • Split it into smaller pieces
  • Transform each piece differently
  • Put them back together using union

Sounds straightforward, right? Well, there’s a catch that most developers don’t know about.

The Hidden Performance Killer šŸŒ

Here’s what’s actually happening behind the scenes when you use union:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

    // Create two dataframes from 1 to 1000000
    val df1 = spark.range(10000000).toDF("value")
    val df2 = spark.range(10000000).toDF("value")

    // Perform inner join
    val df = df1.join(df2, Seq("value"), "inner")

    // Split into odd and even numbers
    val dfOdd = df.filter(col("value") % 2 === 1)
    val dfEven = df.filter(col("value") % 2 === 0)

    val dfOddAdded = dfOdd.withColumn("add_value", col("value") + 1)
    val dfEvenDivided = dfEven.withColumn("divide_value", col("value") / 2)

    // Union and count
    val result = dfOddAdded.union(dfEvenDivided).count()

    // Print the result
    println(s"Total count: $result")

    // Optional: Show the execution plan
    dfOddAdded.union(dfEvenDivided).explain("formatted")

When Spark sees this code, it doesn’t realize it can reuse data. Instead, it goes back to the beginning and processes your entire pipeline again for each part of the union!

Think of it like running an entire production line twice to make identical toys, just to paint half of them red and half blue. Instead, you could run the production line once and split the toys for different paint jobs at the end!

Look at this execution plan, we see twice the same part of the plan i.e. union operation will trigger the recomputation:

recompute-image

The Simple Fix: Cache is Your Friend!āš”

Here’s how to make your unions lightning fast:

1
2
3

    val df = df1.join(df2, Seq("value"), "inner")
    df.cache()

By adding cache(), you’re telling Spark: “Hey, keep this data in memory - we’ll need it again soon!”

Now imagine these scenarios:

  • You have billions of records to process šŸ˜§
  • Your job is running on AWS Glue where you pay per DPU hour šŸ’°
  • Without caching, you’re essentially:
    • Processing those billions of records twice
    • Doubling your computation time
    • Doubling your AWS costs šŸ”„

The cost impact is real:

  • 2x processing time = 2x DPU hours
  • 2x DPU hours = 2x your bill

This is why understanding caching isn’t just about performanceā€”it’s about your bottom line! šŸ’”

Let’s see the optimzied execution plan:

optimized-union-image

Let me explain how we can see caching in action in this execution plan:

  1. The key indicator is InMemoryRelation at the top. This shows that Spark is using a cached version of the data with these specifications:

    • StorageLevel(disk, memory, deserialized, 1 replicas): Data is stored both in memory and disk
    • CachedRDDBuilder: Shows it’s using the cached RDD version
  2. The execution plan shows:

    • Instead of recomputing the Range and Project operations multiple times
    • It’s reusing the cached data for subsequent operations
    • BroadcastHashJoin is working with the cached data, not recomputing from scratch
  3. Without caching, you would see:

    • Duplicate Range operations (0 to 1000000)
    • Multiple Project operations
    • More complex execution plan with repeated computations

Caveat

Here’s a good caveat about caching and disk spillage:

When using cache(), be careful! If you don’t have enough memory, Spark will “spill” the cached data to disk, which could actually make your job slower than not caching at all. Why? Because now you’re adding:

  • Disk I/O operations (writing to and reading from disk)
  • Data serialization/deserialization overhead
  • Network traffic if using distributed storage
  • Additional storage space management

It’s like ordering takeout but your fridge is full, so you have to store the food in the basement. Now every time you want to eat (access the data), you have to walk down to the basement, bring the food up, heat it up (deserialize), and then do this all over again. Sometimes it’s faster to just order fresh food (recompute the data) than deal with all that overhead! šŸƒā€ā™‚ļøšŸ”„

The key is to be strategic about what you cache and always monitor your memory usage. Remember: not all data needs to be cached! šŸ’”

Here’s a compelling conclusion for your blog post about Spark caching and performance:

Final Thoughts

Understanding Spark’s union operation and caching mechanism isn’t just about technical optimizationā€”it’s about being a thoughtful data engineer. Let’s recap the key takeaways:

  1. Know Your Data Pipeline:

    • Before adding cache(), understand if and where your data is being reused
    • Monitor your execution plans to identify redundant computations
    • Be strategic about what you cacheā€”not everything needs to be cached
  2. Consider the Trade-offs:

    • Caching can significantly improve performance when used correctly
    • But remember: insufficient memory leads to disk spillage, which can make things worse

Remember, the goal isn’t to cache everythingā€”it’s to cache smartly. Whether you’re processing millions of records or working with complex transformations, understanding these concepts will help you build more efficient and cost-effective data pipelines.

If you missed my previous deep dive into Spark’s union operation, you can catch up here. It complements this caching discussion nicely!

Happy Spark-ing! šŸš€


Found this helpful? Follow me for more data engineering tips and tricks