Replies: 1 comment
-
Since s3_additional_kwargs is not supported, is there any acceptable solution to pass SSE-KMS information at this point? |
Beta Was this translation helpful? Give feedback.
0 replies
# for free
to join this conversation on GitHub.
Already have an account?
# to comment
-
AWS SDK for pandas on Ray - Runbook
The following runbook is a collection of tips, common errors and performance issues related to using AWS SDK for pandas on Ray.
Table of contents
Tips
Overriding the Ray runtime initialization
In distributed mode with Ray, AWS SDK for pandas attempts to detect an existing Ray cluster or to start a runtime by executing
ray.init
. Advanced users can override this process however by running the same command before theawswrangler
import command.This approach gives more flexibility to the user when it comes to configuring the Ray runtime. For instance, they can decide whether to include a dashboard or to turn logging on/off:
Please note that other libraries might also start the Ray runtime on import. Chief among them is
Modin
:In that case, AWS SDK for pandas does not attempt to start the runtime and just connects to the existing one instead.
Tune partitioning in Modin
Modin data frames rely on the concept of partitioning. Advanced users can achieve better performance by understanding how to tune them. More details are available in the Modin documentation.
Common Errors
API Layer Objects
Solution
When AWS SDK for pandas is running in distributed mode with Ray, APIs require Modin Data Frames instead of pandas Data Frames as inputs.
Rather than working with pandas Data Frames, you can change your import statement so that your script works with Modin Frames by default:
Alternatively, you can transform your existing Data Frame into a Modin Data Frame:
Unsupported arguments
Most AWS SDK for pandas calls support passing the
boto3_session
argument. While this is acceptable for an application running in a single process, distributed applications require the session to be serialized and passed to the worker nodes in the cluster. This constitutes a security risk. As a result, passingboto3_session
when using the Ray runtime is not supported.Performance
Avoid writing data to a single S3 object
When using AWS SDK for pandas without the distributed mode, it’s common to tell a function such as
to_parquet
to write to a path pointing to a single file. However, when using the distributed/Ray mode, the data in a Modin Data Frame may be stored across multiple nodes in your cluster. Tellingto_parquet
to write to a single object in S3 means that the data from the DataFrame may need to be transferred onto a single node, so that it can be written into a single file. Instead, it’s more efficient to provideto_parquet
with an S3 path and provide the parameterdataset=True
.When trying to write a distributed Modin Data Frame into a single file, the following warning is surfaced:
Writing bucketed data or using
max_rows_by_file
causes repartitioningSetting
bucketing_info
ormax_rows_by_file
as input arguments to write functions such asto_parquet
orto_csv
might lead to data movement within the cluster. This is because a repartitioning of the Data Frame is required to honor the desired bucketing or max rows by file configuration.Using pandas flags leads to slower performance when reading or writing CSV
The
read_csv
andto_csv
function definitions both have an argument namedpandas_kwargs
which allows the customer to pass any pandas argument for reading/writing Data frames, without AWS SDK for pandas needing to explicitly support it. When it comes to distributing code using Ray, PyArrow’s I/O functions offer significantly better performance than the equivalent functions in pandas.In order to preserve the ability to use any parameter used by pandas, while also wanting to optimize performance, the implementations for the
s3.read_csv
ands3.to_csv
attempt to use the underlying PyArrow I/O functions wherever possible. When this is not possible, the functions default to the slower Pandas I/O functions, and output the warning above.Writing bucketed leads to slower performance
Writing bucketed data (i.e. splitting the data into the user-specified number of buckets based on a hash) requires a groupby operation. As a result, data blocks must be shuffled over the entire cluster which is a highly inefficient operation, especially when the data set size cannot fit into memory.
The size of input files is too large for the target block size
Input files to read methods may throw the above warning when their size is higher than the target block size (512 MiB by default). It’s important to note that a block size refers to the size in memory, not on disk. Indeed an S3 object is likely to be compressed and increase significantly in size when represented in memory as a block.
The above can be mitigated by reducing the size of input files or by modifying the
DEFAULT_TARGET_MAX_BLOCK_SIZE
Ray environment variable.WARNING: Partitions of this data frame are detected to be split along column axis...
Modin data frames can be split across both column and row axes based on the provided configuration and number of columns in the data set. Some operations in AWS SDK for pandas require the data frame to be split across the row axis only to ensure blocks contain the complete set of columns. For those cases, when processing a Modin data frame, AWS SDK for pandas will automatically repartition the data frame along the row axis and display the warning below:
This might be an expensive operation. To avoid repartitioning, it is recommended to configure modin parameters to fit your data. See an example below:
For a complete list of Modin configuration options refer to the Modin documentation.
DataFrame columns with undefined type impact performance as data type must be inferred
When writing data frames using
s3.to_parquet
and there are columns with undefined types in the data frame, the library attempts to infer the dtype of columns which can have a negative performance impact. This latency will increase as data size increases.To mitigate this, s3.to_parquetallows passing the
dtype
argument which allows type casting of columns before writing to S3. See an example below:A
RayOutOfMemoryError
exception is raisedThe error is raised when 95% of memory on the machine/cluster is reached. The
ray memory
cli command can help debug and identify the source of the exception.Ray attempts to mitigate memory errors by spilling objects to disk. A warning is raised to inform the user that this process will impact performance. If you know that spilling to disk is likely, it’s recommended to use SSD storage instead of
Bucketing and partitioning
Bucketing requires a hash value to be calculated for the column that you wish to bucket by, which is then used to shuffle the data across M buckets. This means that bucketing requires a full data reshuffle, thus foregoing the parallelization benefit from distributed computing.
On the other hand, partitioning is implemented in such a way that each node is in charge of it’s own partitioning. If node A contains a block which is to be divided into two partitions, then node A will write two files to S3. Other nodes on the cluster will do the same, without needed to reshuffle any of the data.
Please node that AWS SDK for pandas supports both bucketing and partitioning in a single write API call. This allows you so bucket by one column and partition on another. In this scenario, the data will need to be reshuffled due to the bucketing.
S3FS
In distributed mode, S3Fs is used instead of boto3 for certain API calls.
These include listing a large number of S3 objects for example. This choice was made for performance reasons as a boto3 implementation can be much slower in some cases.
As a side effect, users won't be able to use the
s3_additional_kwargs
input parameter as it's currently not supported by S3Fs.Beta Was this translation helpful? Give feedback.
All reactions