I know — Spark is sometimes frustrating to work with.
Why? Because Spark gives memory errors a lot of times, and it is only when you genuinely work on big datasets with spark, would you be able to truly work with Spark.
This post is going to be about — “Practical Spark and memory management tips for Data Scientists.”
1. Map Side Joins
The syntax of joins in Spark is pretty similar to pandas:
df3 = df1.join(df2, df1.column == df2.column,how='left')
But I faced a problem. The df1 had around 1Billion rows while df2 had around 100 Rows. When I tried the above join, it didn’t work and failed with memory exhausted errors after running for 20 minutes.
I was writing this code on a pretty big cluster with more than 400 executors with each executor having more than 4GB RAM. I was stumped as I tried to repartition my data frames using multiple schemes, but nothing seemed to work.
So what should I do? Is Spark not able to work with a mere billion rows? Not Really. I just needed to use Map-side joins or broadcasting in Spark terminology.
from pyspark.sql.functions import broadcast df3 = df1.join(broadcast(df2), df1.column == df2.column,how='left')
Using the simple broadcasting code above, I was able to send the smaller df2 to all the nodes, and this didn’t take a lot of time or memory. What happens in the backend is that a copy of df2 is sent to all the partitions and each partition uses that copy to do the join. That means that there is no data movement when it comes to df1, which is a lot bigger than df2.
2. Spark Cluster Configurations
Set the Parallelism and worker nodes based on your task size
What also made my life difficult while I was starting work with Spark was the way the Spark cluster needs to be configured. Your spark cluster might need a lot of custom configuration ad tuning based on the job you want to run.
Some of the most important configurations and options are as follows:
a. spark.sql.shuffle.partitions and spark.default.parallelism:
spark.sql.shuffle.partitions configures the number of partitions to use when shuffling data for joins or aggregations. The spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by the user. The default value for these is 200.
In simple words, these set the degree of parallelism you want to have in your cluster.
If you don’t have a lot of data, the value of 200 is fine, but if you have huge data, you might want to increase these numbers. It also depends on the number of executors you have. My cluster was pretty big with 400 executors, so I kept this at 1200. A rule of thumb is to keep it as a multiple of the number of executors so that each executor ends up with multiple jobs.
sqlContext.setConf( "spark.sql.shuffle.partitions", 800) sqlContext.setConf( "spark.default.parallelism", 800)
I was working with .parquet files in Spark, and most of my data columns were strings. But somehow whenever I loaded the data in Spark, the string columns got converted into binary format on which I was not able to use any string manipulation functions. The way I solved this was by using:
The above configuration converts the binary format to string while loading parquet files. Now it is a default configuration I set whenever I work with Spark.
c. Yarn Configurations:
There are other configurations that you might need to tune that define your cluster. But these need to be set up when the cluster is starting and are not as dynamic as the above ones. The few I want to put down here are for managing memory spills on the executor nodes. Sometimes the executor core gets a lot of work.
- spark.yarn.executor.memoryOverhead: 8192
- yarn.nodemanager.vmem-check-enabled: False
There are a lot of configurations that you might want to tune while setting up your spark cluster. You can take a look at them in the official docs.
Keeping the workers happy by having them handle an equal amount of data
You might want to repartition your data if you feel your data has been skewed while working with all the transformations and joins. The simplest way to do it is by using:
df = df.repartition(1000)
Sometimes you might also want to repartition by a known scheme as this scheme might be used by a certain join or aggregation operation later on. You can use multiple columns to repartition using:
df = df.repartition('cola', 'colb','colc','cold')
You can get the number of partitions in a data frame using:
You can also check out the distribution of records in a partition by using the glom function. This helps in understanding the skew in the data that happens while working with various transformations.
There are a lot of things we don’t know, we don’t know. These are called unknown unknowns. It is only by multiple code failures and reading up on multiple stack overflow threads that we understand what we need.
Here I have tried to summarize a few of the problems that I faced around memory issues and configurations while working with Spark and how to solve them. There are a lot of other configuration options in Spark, which I have not covered, but I hope this post gave you some clarity on how to set these and use them.