1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14
2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.
3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.
4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.
I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized.
Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later.
Your right that the serialization / deserialization overhead can quickly exceed the compute time. To avoid this you have to get a lot of small things right. And given our focus on ML workloads, this is particularly important when sharing large numerical arrays between processes (especially processes running on the same node).
One of the key things is to make sure the serialized object is stored in a data format where the serialized object does not need to be "transformed" in order to access it. For example, a numpy array can be created in O(1) time from a serialized blob by initializing a Python object with the right shape and dtype and a pointer to the right offset in the serialized blob. We also use projects like Apache Arrow that put a lot of care into this.
Example in more detail:
Imagine the object you are passing from process A to process B is a 1GB numpy array of floats. In the serialization step, process A produces a serialized blob of bytes that is basically just the 1GB numpy array plus a little bit of metadata. Process A writes that serialized blob into shared memory. This step of "writing into shared memory" still involves O(N) work, where N is the size of the array (though you can have multiple threads do the memcpy in parallel and be limited just by memory bandwidth).
In the deserialization step, process B accesses the same shared memory blob (process A and B are on the same machine). It reads a tiny bit of metadata to figure out the type of the serialized object and shape and so on. Then it constructs a numpy array with the correct shape and type and with a pointer to the actual data in shared memory at the right offset. Therefore it doesn't need to touch all of the bytes of data, it just does O(1) work instead of O(N).
That's the basic idea. You can imagine generalizing this beyond numpy arrays, but it's most effective for objects that include large numerical data (e.g., objects that include numpy arrays).
There are a bunch of little details to get right, e.g., serializing directly into shared memory instead of creating a serialized copy in process A and then copying it into shared memory. Doing the write into shared memory in parallel with a bunch of threads. Getting the deserialization right. You also have to make sure that the starting addresses of the numpy arrays are 64-byte aligned (if memory serves) so that you don't accidentally trigger a copy later on.
I've also looked at ray for running data pipelines before (at much much smaller scales) for the reasons you suggest (unstructured data, mixed CPU/GPU compute).
One thing I've wanted is an incremental computation framework (i.e., salsa [1]) built on ray so that I can write jobs that transparently reuse intermediate results from an object store if their dependents haven't changed.
Do you know if anyone has thought of building something like this?
I asked the same question to one of the core devs at a recent event and he (1) said that some people in finance have done related things and (2) suggested using the Ray slack to connect with developers and power users who might have helpful advice.
I agree this is a very interesting area to consider Ray for. There are lots of projects/products that provide core components that could be used but there’s no widely used library. It feels like one is overdue.
Curious if you know how well Ray works with multithreaded python libraries? For example, when using jax with ray, I have to ensure the import ordering imports ray first, as forking a threaded process leads to deadlocks in Python. Do you know how to ensure that ray handles forking the python interpreter correctly?
Multi-threaded libraries (e.g., numpy and PyTorch on CPUs come to mind) are well supported. In scenarios where many processes are each running heavily multi-threaded computations, it can help to pin specific processes to specific cores (e.g., using tools like psutil) to avoid contention.
The scenario where a Ray task forks is probably not very well supported. You can certainly start a subprocess from within a Ray task, but I think forking could easily cause issues.
You can definitely use Ray + Jax, but you probably need to avoid forking a process within a Ray worker.
Mesos was a derivative idea from some sporadic idea from 2 level scheduling from inside Google based on mapreduce
Mesos was wrong from day one, they thought they have the right idea, but was really caught up by a Small group of Google engineers who happened to be from academia. These engineers were very good at having theoretically nice idea
In the time mesos was invented, Google had its own mesos, which is a similar project by learning wrong lesson from Borg. That thing is Omega.
Eventually everyone decided that Borg was right all along, thus the kubernetes
Im curious, how do data scientists use these massive datasets, especially the old stuff. Is it more of a compliance and need/should-save type thing or is the data actually useful? Im baffled by these numbers having never used a large BI tool, and am genuinely curious how the data is actually used operationally.
As a layman, I imagine lots of it loses relevancy very quickly, e.g Amazon sales data from 5 years ago is marginally useful to determining future trends and analyzing new consumer behavior regimes?
If you have seasonal demand patterns, you generally need three years history to do good predictive analytics.
I do tend to agree data from five years ago is rarely relevant BUT our business is still using for some BI purposes data from the fiscal year before COVID as a comparison baseline for certain analytics/business processes which have been slow to reach pre-COVID levels of performance. So that means we are now using data 6 years old, comparing this year to that pre-COVID year for certain analytics!
Yeah 100%. I worked in wind energy for a while and the DS team would be pulling as much data as they could get to establish a baseline for normality due to seasonal trends in the wind. This also varied enormously around the world - for e.g. the UK is fairly windy all year, but India typically gets 2/3 of it's generated wind energy in the monsoon season which is about 3 months.
I work in finance and it's great having big historical datasets, even if the figures are far lower in previous years it's good to see system 'shocks' and these can be used at a different magnitude/scaled for future forecasting
Anyone know enough about ray to comment on what the exact performance unlock was ?. They mention that it gave them enough control over the distribution of work so that they could avoid unnecessary reads/write. That seems like a good win but I would assume that doing compaction in python would be quite slow.
Some of the initial differentiators are described at the bottom of our design doc at https://github.com/ray-project/deltacat/blob/main/deltacat/c.... But yes, controlling file I/O was also an important part of this since it allowed us to (1) run more targeted downloads/reads of only the Parquet row groups and columns participating in compaction and (2) track dirty/clean files to skip unnecessary re-writes of "clean" files that weren't altered by compaction. Also, just better leveraging catalog metadata (e.g., primary key indexes if available) to filter out more files in the initial scan, and to copy clean files into the compacted variant by reference (when supported by the underlying catalog format).
The trick with doing compaction in Python was to ensure that the most performance-sensitive code was delegated to more optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code paths. If we did all of our per-record processing ops in pure Python, compaction would indeed be much slower.
Hi mannyv - one of the devs that worked on the migration here. It has been a pretty long project - approached with caution due to the criticality of keeping our BI datasets healthy - but the preliminary results produced year-over-year kept looking promising enough to keep after it. =)
Also, we mostly have Parquet data cataloged in S3 today, but delimited text is indeed ubiquitous and surprisingly sticky, so we continue to maintain some very large datasets natively in this format. However, while the table's data producer may prefer to write delimited text, they are almost always converted to Parquet during the compaction process to produce a read-optimized table variant downstream.
Are you all shifting over to storing as iceberg-enriched parquet yet and letting it (within, say Athena) manage compaction or thinking about it, or is it not worth it since this new Ray+Parquet thing is working for you?
Most people are moving away from CSV for big datasets, except in exceptional cases involving linear reads (append only ETL). CSV has one big upside which is human readability. But so many downsides: poor random access, no typing, no compression, complex parser needing to handle exceptions.
Most people don't directly query or otherwise operate on raw CSV, though. Large source datasets in CSV format still reign in many enterprises, but these are typically read into a dataframe, manipulated and stored as Parquet and the like, then operated upon by DuckDB, Polars, etc., or modeled (E.g. DBT) and pushed to an OLAP target.
I remember when everyone shifted from Apache Hadoop to Apache Spark. This seems like a possibly similar sea change. I am not sure if many other users will embrace Ray over Spark, but this is a sign that people are looking to either improve Spark on some fundamental levels, or are going to reach out to new technologies to resolve their problems. Cool stuff.
I work on Daft and we’ve been collaborating with the team at Amazon to make this happen for about a year now!
We love Ray, and are excited about the awesome ecosystem of useful + scalable tools that run on it for model training and serving. We hope that Daft can complement the rest of the Ray ecosystem to enable large scale ETL/analytics to also run on your existing Ray clusters. If you have an existing Ray cluster setup, you absolutely should have access to best-in-class ETL/analytics without having to run a separate Spark cluster.
Also, on the nerdier side of things - the primitives that Ray provides gives us a real opportunity to build a solid non-JVM based, vectorized distributed query engine. We’re already seeing extremely good performance improvements here vs Spark, and are really excited about some of the upcoming work to get even better performance and memory stability.
This collaboration with Amazon really battle-tested our framework :) happy to answer any questions if folks have them.
Good to see you here! It's been great working with Daft to further improve data processing on Ray, and the early results of incorporating Daft into the compactor have been very impressive. Also agree with the overall sentiment here that Ray clusters should be able to run best-in-class ETL without requiring a separate cluster maintained by another framework (Spark or otherwise). This also creates an opportunity to avoid many inefficient, high-latency cross-cluster data exchange ops often run out of necessity today (e.g., through an intermediate cloud storage layer like S3).
1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14
2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.
3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.
4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.
The paper mentions support for zero-copy intranode object sharing which links to serialization in the Ray docs - https://docs.ray.io/en/latest/ray-core/objects/serialization...
I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized.
Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later.
One of the key things is to make sure the serialized object is stored in a data format where the serialized object does not need to be "transformed" in order to access it. For example, a numpy array can be created in O(1) time from a serialized blob by initializing a Python object with the right shape and dtype and a pointer to the right offset in the serialized blob. We also use projects like Apache Arrow that put a lot of care into this.
Example in more detail:
Imagine the object you are passing from process A to process B is a 1GB numpy array of floats. In the serialization step, process A produces a serialized blob of bytes that is basically just the 1GB numpy array plus a little bit of metadata. Process A writes that serialized blob into shared memory. This step of "writing into shared memory" still involves O(N) work, where N is the size of the array (though you can have multiple threads do the memcpy in parallel and be limited just by memory bandwidth).
In the deserialization step, process B accesses the same shared memory blob (process A and B are on the same machine). It reads a tiny bit of metadata to figure out the type of the serialized object and shape and so on. Then it constructs a numpy array with the correct shape and type and with a pointer to the actual data in shared memory at the right offset. Therefore it doesn't need to touch all of the bytes of data, it just does O(1) work instead of O(N).
That's the basic idea. You can imagine generalizing this beyond numpy arrays, but it's most effective for objects that include large numerical data (e.g., objects that include numpy arrays).
There are a bunch of little details to get right, e.g., serializing directly into shared memory instead of creating a serialized copy in process A and then copying it into shared memory. Doing the write into shared memory in parallel with a bunch of threads. Getting the deserialization right. You also have to make sure that the starting addresses of the numpy arrays are 64-byte aligned (if memory serves) so that you don't accidentally trigger a copy later on.
EDIT: I edited the above to add more detail.
I've also looked at ray for running data pipelines before (at much much smaller scales) for the reasons you suggest (unstructured data, mixed CPU/GPU compute).
One thing I've wanted is an incremental computation framework (i.e., salsa [1]) built on ray so that I can write jobs that transparently reuse intermediate results from an object store if their dependents haven't changed.
Do you know if anyone has thought of building something like this?
[1] https://github.com/salsa-rs/salsa
I agree this is a very interesting area to consider Ray for. There are lots of projects/products that provide core components that could be used but there’s no widely used library. It feels like one is overdue.
But I'm not aware of anything exactly like what you're referring to!
The scenario where a Ray task forks is probably not very well supported. You can certainly start a subprocess from within a Ray task, but I think forking could easily cause issues.
You can definitely use Ray + Jax, but you probably need to avoid forking a process within a Ray worker.
Must be good enough if you're willing to dogfood it though?
Data processing workloads are quite common on Ray, especially with unstructured data.
Also, I work on Ray, which is the underlying framework used here, but all the work in the post was done by the Amazon team.
This is not badmouthing either project just an observation and if you architected one task you would be good at attacking the same problem better .
Mesos was a derivative idea from some sporadic idea from 2 level scheduling from inside Google based on mapreduce
Mesos was wrong from day one, they thought they have the right idea, but was really caught up by a Small group of Google engineers who happened to be from academia. These engineers were very good at having theoretically nice idea
In the time mesos was invented, Google had its own mesos, which is a similar project by learning wrong lesson from Borg. That thing is Omega.
Eventually everyone decided that Borg was right all along, thus the kubernetes
As a layman, I imagine lots of it loses relevancy very quickly, e.g Amazon sales data from 5 years ago is marginally useful to determining future trends and analyzing new consumer behavior regimes?
I do tend to agree data from five years ago is rarely relevant BUT our business is still using for some BI purposes data from the fiscal year before COVID as a comparison baseline for certain analytics/business processes which have been slow to reach pre-COVID levels of performance. So that means we are now using data 6 years old, comparing this year to that pre-COVID year for certain analytics!
Related, I rather enjoyed reading this other thread from June: "Ask HN: Is KDB a sane choice for a datalake in 2024?" https://news.ycombinator.com/item?id=40625800
The trick with doing compaction in Python was to ensure that the most performance-sensitive code was delegated to more optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code paths. If we did all of our per-record processing ops in pure Python, compaction would indeed be much slower.
Edit: Nvm, I kept reading! Thanks for the interesting post!
I had no idea anything at AWS had that long of an attention span.
It's funny and telling that in the end, it's all backed by CSVs in s3. Long live CSV!
Also, we mostly have Parquet data cataloged in S3 today, but delimited text is indeed ubiquitous and surprisingly sticky, so we continue to maintain some very large datasets natively in this format. However, while the table's data producer may prefer to write delimited text, they are almost always converted to Parquet during the compaction process to produce a read-optimized table variant downstream.
Dead Comment
We love Ray, and are excited about the awesome ecosystem of useful + scalable tools that run on it for model training and serving. We hope that Daft can complement the rest of the Ray ecosystem to enable large scale ETL/analytics to also run on your existing Ray clusters. If you have an existing Ray cluster setup, you absolutely should have access to best-in-class ETL/analytics without having to run a separate Spark cluster.
Also, on the nerdier side of things - the primitives that Ray provides gives us a real opportunity to build a solid non-JVM based, vectorized distributed query engine. We’re already seeing extremely good performance improvements here vs Spark, and are really excited about some of the upcoming work to get even better performance and memory stability.
This collaboration with Amazon really battle-tested our framework :) happy to answer any questions if folks have them.