Repartitioning in PySpark
The following is a chat transcript between Rob and ChatGPT regarding repartitioning for speed in PySpark jobs
August 14, 2023
I'm running a large glue job in pyspark. It is many gigabytes of data where I need to sort it by date, and add columns for year month and day. The table size is probably 200gb (I don't know). I have it set to use 40 G.2X workers, which is 80 DPUs. I want to call "repartition" on the dataframe after loading the data. How many partitions should I repartition to?
ChatGPT
The number of partitions is critical to PySpark performance. If you create too few partitions, you will have fewer concurrent tasks, leading to inefficient utilization of your resources. On the other hand, too many partitions could also lead to inefficiencies, as the scheduling overhead and task startup cost could overshadow the computation time.
It's often recommended to have 2-4 partitions for each CPU in your cluster when running computationally intensive tasks. However, for I/O intensive tasks (like reading and writing from/to databases), you can have a higher number of partitions than the number of CPUs.
The number of cores in your setup depends on the specific instance type of your workers. According to the information available till the time of my training cut-off in September 2021, Glue G.2X workers are based on the AWS Glue version 2.0, but it's not clear how many cores they have. If, for example, each of these workers had 4 cores, with 40 workers you would have 160 cores.
For a CPU-intensive task, you might want between 320 to 640 partitions (2 to 4 per core). For I/O intensive tasks, you could increase this further. However, if the data is extremely large (like your case seems to be), you might want to increase the number of partitions beyond this guideline.
Keep in mind that these are just general guidelines and optimal settings can vary significantly depending on the specifics of your job and data.
Please also note that the size of each partition is important. Ideally, each partition should be small enough to fit into the memory of a single worker node (a few GBs), but not so small that you have too many partitions. If you have 200 GB of data and you want your partitions to be around 1 GB each, you might choose to have around 200 partitions.
Ultimately, the optimal number of partitions is something you often need to determine through testing. You might try different numbers of partitions and see which gives you the best performance. You also need to consider the transformations you're doing (like sorting and adding columns), as they might require additional resources.
I have a large glue job that is writing out to a csv file. One csv field called targeted_dma_list throws this error: Cannot write array field targeted_dma_list to CSV
ChatGPT
The error is occurring because CSV is a simple, tabular format that does not natively support complex data types, such as arrays or structs. When you're trying to write the targeted_dma_list
field, which is an array, to CSV, it's unable to process it and throws an error.
There are a few ways to handle this depending on what you need:
Flatten the Array: If the array isn't very large and you can combine the elements into a single string or a new delimited format, you can use the
concat_ws
function to combine the elements with a delimiter. For example, you can use the pipe character '|' as a delimiter:pythonCopy code
from pyspark.sql import functions as F df = df.withColumn('targeted_dma_list', F.concat_ws('|', 'targeted_dma_list'))
This will transform an array like
['a', 'b', 'c']
into a string like'a|b|c'
.Explode the Array: If you need each element of the array to be on its own row in the CSV, you can use the
explode
function:pythonCopy code
from pyspark.sql import functions as F df = df.withColumn('targeted_dma_list', F.explode('targeted_dma_list'))
Note that this will create a new row for each item in the array, which can significantly increase the size of your data.
Serialize to JSON: Another option is to convert the array to a JSON string. This can be a good option if your array is complex or you want to keep all the data together:
pythonCopy code
from pyspark.sql import functions as F df = df.withColumn('targeted_dma_list', F.to_json('targeted_dma_list'))
This will transform an array like
['a', 'b', 'c']
into a JSON string like'["a","b","c"]'
.
After transforming the targeted_dma_list
field, you should be able to write the DataFrame to a CSV file.
in aws glue, to use a working directly for shuffle spill, do I need to pass any special arguments for that?
ChatGPT
AWS Glue, which is built on top of Apache Spark, automatically manages the data shuffling process, including shuffle spills. As per my knowledge cutoff in September 2021, there's no specific configuration or command for directing AWS Glue to use a working directory for shuffle spill.
However, there are several configuration options in Spark that control shuffle behavior, including spills. These are typically specified in the spark-defaults.conf
file or through --conf
options when submitting a job.
Here are a couple of those configuration options:
spark.shuffle.spill
: This controls whether Spark should spill data to disk if it doesn't all fit in memory. The default is true.spark.shuffle.spill.compress
: This controls whether to compress data when spilling during shuffles. The default value is true.
These parameters control the behavior of shuffle spill in Spark, and most likely in Glue as well, as Glue uses Spark under the hood.
For your specific need, if there have been any updates or changes to AWS Glue's handling of shuffle spills or any new configurations introduced after my training cutoff in 2021, I recommend checking the latest AWS Glue and Apache Spark documentation or reaching out to AWS support for the most accurate and up-to-date information.
I got the error, "no space left on device" when writing out a dynamic frame.
ChatGPT
The "No space left on device" error indicates that the disk storage on your worker nodes is full. This can occur during data shuffling (when data is rearranged between partitions) or when writing out the results, particularly when dealing with large datasets.
When using AWS Glue, your data processing happens on ephemeral storage that is attached to the instances powering your jobs. If this storage runs out, you'll receive such an error.
Here are a few ways you can handle this issue:
Increase Disk Space by Using a Larger Worker Type: AWS Glue provides different worker types that come with varying amounts of memory and storage. By choosing a larger worker type, you'll get more disk space for your job.
Optimize Your Job to Use Less Disk Space:
Increase the Number of Partitions: By increasing the number of partitions, you can reduce the size of the data each worker node has to process, which could reduce the amount of disk space needed.
Optimize Transformations: Some transformations can cause a large increase in the amount of data (like cartesian joins or exploding arrays). If possible, try to optimize your transformations to use less disk space.
Use Columnar Data Formats: If you're working with Spark or DynamicFrames, it can be more efficient to use a columnar data format like Parquet, as it is often more space-efficient than row-based formats like CSV or JSON.
Clean up Temporary Directories: Spark uses certain directories for shuffle data and other temporary data. If these directories are not being cleaned up properly, they could be using up disk space. You may want to review your configurations related to these directories.
Increase Disk Cleanup Frequency: Spark automatically cleans up old shuffle data, but you may be able to increase the frequency of this cleanup by adjusting the
spark.cleaner.periodicGC.interval
setting.
Remember that it's crucial to monitor your job's resource usage to anticipate when you might need more resources. AWS Glue provides CloudWatch metrics that you can use for this.
If the problem persists, it might be necessary to take a closer look at your specific job and data to find a more tailored solution.
Report content|Terms of use|Privacy policy