return to table of content

Amazon's exabyte-scale migration from Apache Spark to Ray on EC2

mannyv
13 replies
4d16h

Crazy that the project took almost 4 years end-to-end, and it's still ongoing.

I had no idea anything at AWS had that long of an attention span.

It's funny and telling that in the end, it's all backed by CSVs in s3. Long live CSV!

wenc
5 replies
2d23h

Most people are moving away from CSV for big datasets, except in exceptional cases involving linear reads (append only ETL). CSV has one big upside which is human readability. But so many downsides: poor random access, no typing, no compression, complex parser needing to handle exceptions.

100pctremote
4 replies
2d22h

Most people don't directly query or otherwise operate on raw CSV, though. Large source datasets in CSV format still reign in many enterprises, but these are typically read into a dataframe, manipulated and stored as Parquet and the like, then operated upon by DuckDB, Polars, etc., or modeled (E.g. DBT) and pushed to an OLAP target.

wenc
3 replies
2d21h

There are folks who still directly query CSV formats in a data lake using a query engine like Athena or Spark or Redshift Spectrum — which ends up being much slower and consuming more resources than is necessary due to full table scans.

CSV is only good for append only.

But so is Parquet and if you can write Parquet from the get go, you save on storage as well has have a directly queryable column store from the start.

CSV still exists because of legacy data generating processes and dearth of Parquet familiarity among many software engineers. CSV is simple to generate and easy to troubleshoot without specialized tools (compared to Parquet which requires tools like Visidata). But you pay for it elsewhere.

fragmede
1 replies
2d17h

how about using Sqlite database files as an interchange format?

wenc
0 replies
2d15h

I haven't thought about sqlite as a data interchange format, but I was looking at deploying sqlite as a data lake format some time ago, and found it wanting.

1. Dynamically typed (with type affinity) [1]. This causes problems with there are multiple data generating processes. The new sqlite has a STRICT table type that enforces types but only for the few basic types that it has.

2. Doesn't have a date/time type [1]. This is problematic because you can store dates as TEXT, REAL or INTEGER (it's up to the developer) and if you have sqlite files from > 1 source, date fields could be any of those types, and you have to convert between them.

3. Isn't columnar, so complex analytics at scale is not performant.

I guess one can use sqlite as a data interchange format, but it's not ideal.

One area sqlite does excel in is as a application file format [2] and that's where it is mostly used [3].

[1] https://www.sqlite.org/datatype3.html

[2] https://www.sqlite.org/appfileformat.html

[3] https://en.wikipedia.org/wiki/SQLite#Notable_uses

cmollis
0 replies
2d20h

exactly.. parquet is good for append only.. stream mods to parquet in new partitions.. compact, repeat.

thedood
2 replies
4d16h

Hi mannyv - one of the devs that worked on the migration here. It has been a pretty long project - approached with caution due to the criticality of keeping our BI datasets healthy - but the preliminary results produced year-over-year kept looking promising enough to keep after it. =)

Also, we mostly have Parquet data cataloged in S3 today, but delimited text is indeed ubiquitous and surprisingly sticky, so we continue to maintain some very large datasets natively in this format. However, while the table's data producer may prefer to write delimited text, they are almost always converted to Parquet during the compaction process to produce a read-optimized table variant downstream.

gregw2
1 replies
2d20h

Are you all shifting over to storing as iceberg-enriched parquet yet and letting it (within, say Athena) manage compaction or thinking about it, or is it not worth it since this new Ray+Parquet thing is working for you?

thedood
0 replies
2d20h

As alluded to in the blog post, Ray+Parquet+Iceberg is the next frontier we'd like to make our compactor and similar procedures available on in open source so that the community can start bringing similar benefits for their Iceberg workloads. Stay tuned. =)

whoevercares
1 replies
2d20h

Is it really AWS? I don’t recall any service called BDT

p0rkbelly
0 replies
2d16h

The second paragraph discusses that BDT is an internal team at Amazon Retail. They used AWS and Ray to do this.

qwerp
0 replies
2d16h

our plans are measured in centuries

jerrygenser
0 replies
4d16h

They reference parquet files, not sure if it's only CSV or CSV even figures in that heavily other than the first iteration before migrating to spark

robertnishihara
11 replies
2d21h

I'm one of the creators of Ray. A few thoughts :)

1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14

2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.

3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.

4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.

justsocrateasin
3 replies
2d20h

I'm just learning about this tool now and had a brief question if you have the time:

The paper mentions support for zero-copy intranode object sharing which links to serialization in the Ray docs - https://docs.ray.io/en/latest/ray-core/objects/serialization...

I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized.

Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later.

robertnishihara
2 replies
2d20h

Your right that the serialization / deserialization overhead can quickly exceed the compute time. To avoid this you have to get a lot of small things right. And given our focus on ML workloads, this is particularly important when sharing large numerical arrays between processes (especially processes running on the same node).

One of the key things is to make sure the serialized object is stored in a data format where the serialized object does not need to be "transformed" in order to access it. For example, a numpy array can be created in O(1) time from a serialized blob by initializing a Python object with the right shape and dtype and a pointer to the right offset in the serialized blob. We also use projects like Apache Arrow that put a lot of care into this.

Example in more detail:

Imagine the object you are passing from process A to process B is a 1GB numpy array of floats. In the serialization step, process A produces a serialized blob of bytes that is basically just the 1GB numpy array plus a little bit of metadata. Process A writes that serialized blob into shared memory. This step of "writing into shared memory" still involves O(N) work, where N is the size of the array (though you can have multiple threads do the memcpy in parallel and be limited just by memory bandwidth).

In the deserialization step, process B accesses the same shared memory blob (process A and B are on the same machine). It reads a tiny bit of metadata to figure out the type of the serialized object and shape and so on. Then it constructs a numpy array with the correct shape and type and with a pointer to the actual data in shared memory at the right offset. Therefore it doesn't need to touch all of the bytes of data, it just does O(1) work instead of O(N).

That's the basic idea. You can imagine generalizing this beyond numpy arrays, but it's most effective for objects that include large numerical data (e.g., objects that include numpy arrays).

There are a bunch of little details to get right, e.g., serializing directly into shared memory instead of creating a serialized copy in process A and then copying it into shared memory. Doing the write into shared memory in parallel with a bunch of threads. Getting the deserialization right. You also have to make sure that the starting addresses of the numpy arrays are 64-byte aligned (if memory serves) so that you don't accidentally trigger a copy later on.

EDIT: I edited the above to add more detail.

Xophmeister
1 replies
2d19h

This is probably a naive question, but how do two processes share address space? mmap?

robertnishihara
0 replies
2d19h

Yeah, mmap, I think this is the relevant line [1].

Fun fact, very early on, we used to create one mmapped file per serialized object, but that very quickly broke down.

Then we switched to mmapping one large file at the start and storing all of the serialized objects in that file. But then as objects get allocated and deallocated, you need to manage the memory inside of that mmapped file, and we just repurposed a malloc implementation to handle that.

[1] https://github.com/ray-project/ray/blob/21202f6ddc3ceaf74fbc...

zacmps
2 replies
2d19h

Super cool to see you here.

I've also looked at ray for running data pipelines before (at much much smaller scales) for the reasons you suggest (unstructured data, mixed CPU/GPU compute).

One thing I've wanted is an incremental computation framework (i.e., salsa [1]) built on ray so that I can write jobs that transparently reuse intermediate results from an object store if their dependents haven't changed.

Do you know if anyone has thought of building something like this?

[1] https://github.com/salsa-rs/salsa

robertnishihara
0 replies
2d17h

Other folks have built data processing libraries on top of Ray: Modin and Daft come to mind.

But I'm not aware of anything exactly like what you're referring to!

jonmoore
0 replies
2d2h

I asked the same question to one of the core devs at a recent event and he (1) said that some people in finance have done related things and (2) suggested using the Ray slack to connect with developers and power users who might have helpful advice.

I agree this is a very interesting area to consider Ray for. There are lots of projects/products that provide core components that could be used but there’s no widely used library. It feels like one is overdue.

theLiminator
1 replies
2d18h

Curious if you know how well Ray works with multithreaded python libraries? For example, when using jax with ray, I have to ensure the import ordering imports ray first, as forking a threaded process leads to deadlocks in Python. Do you know how to ensure that ray handles forking the python interpreter correctly?

robertnishihara
0 replies
2d18h

Multi-threaded libraries (e.g., numpy and PyTorch on CPUs come to mind) are well supported. In scenarios where many processes are each running heavily multi-threaded computations, it can help to pin specific processes to specific cores (e.g., using tools like psutil) to avoid contention.

The scenario where a Ray task forks is probably not very well supported. You can certainly start a subprocess from within a Ray task, but I think forking could easily cause issues.

You can definitely use Ray + Jax, but you probably need to avoid forking a process within a Ray worker.

nubinetwork
1 replies
2d17h

this is not a typical ray use case

Must be good enough if you're willing to dogfood it though?

robertnishihara
0 replies
17h32m

To clarify, what I mean is that working with "exabytes" is atypical. Most use cases are at a slightly smaller scale :)

Data processing workloads are quite common on Ray, especially with unstructured data.

Also, I work on Ray, which is the underlying framework used here, but all the work in the post was done by the Amazon team.

parhamn
6 replies
2d20h

Im curious, how do data scientists use these massive datasets, especially the old stuff. Is it more of a compliance and need/should-save type thing or is the data actually useful? Im baffled by these numbers having never used a large BI tool, and am genuinely curious how the data is actually used operationally.

As a layman, I imagine lots of it loses relevancy very quickly, e.g Amazon sales data from 5 years ago is marginally useful to determining future trends and analyzing new consumer behavior regimes?

smfjaw
1 replies
2d19h

I work in finance and it's great having big historical datasets, even if the figures are far lower in previous years it's good to see system 'shocks' and these can be used at a different magnitude/scaled for future forecasting

refset
0 replies
2d6h

Yep backtesting is critical step in the deployment of new trading algorithms.

Related, I rather enjoyed reading this other thread from June: "Ask HN: Is KDB a sane choice for a datalake in 2024?" https://news.ycombinator.com/item?id=40625800

gregw2
1 replies
2d20h

If you have seasonal demand patterns, you generally need three years history to do good predictive analytics.

I do tend to agree data from five years ago is rarely relevant BUT our business is still using for some BI purposes data from the fiscal year before COVID as a comparison baseline for certain analytics/business processes which have been slow to reach pre-COVID levels of performance. So that means we are now using data 6 years old, comparing this year to that pre-COVID year for certain analytics!

physicsguy
0 replies
2d12h

Yeah 100%. I worked in wind energy for a while and the DS team would be pulling as much data as they could get to establish a baseline for normality due to seasonal trends in the wind. This also varied enormously around the world - for e.g. the UK is fairly windy all year, but India typically gets 2/3 of it's generated wind energy in the monsoon season which is about 3 months.

mr_toad
0 replies
2d13h

Customer retention, infrequent purchases, and time series forecasts all benefit from having at least several years of data.

jedberg
0 replies
2d

Training a foundation model that understands shopping queries. 25 years of shopping data goes a long way.

jiripospisil
5 replies
2d22h

From the typical Amazon EC2 customer’s perspective, this translates to saving over $120MM/year on Amazon EC2 on-demand R5 instance charges.

Does the sales team know about this? /jk

andrewxdiamond
3 replies
2d22h

Amazon sales and business teams are constantly focused on reducing costs for customers and they celebrate this internally too.

I have seen dozens of big ticket “we saved this customer $xxxK/year” posts on slack and other internal venues, the customer obsession is real.

immibis
2 replies
2d21h

Even the reduced cost is still ten times what it costs on a traditional provider, though, right?

sofixa
1 replies
2d9h

Wildly depends. How many people would it take to set up, deploy, maintain, secure, back up something like S3, RDS, SQS similarly redundantly based on top of just raw compute?

For some organisations, it will make sense to internalise and DIY everything. For many others, AWS/equivalent enables them to break down internal silos (instead of having to file a ticket to get storage team to provision storage for your new fancy RabbitMQ cluster, multiplied by 10 for networking, security AV bullshit, etc.) and just consume everything as a service. It's much faster, and in some cases even maybe cheaper.

Cf. Dropbox that started on S3, and a decade afterwards, migrated to self-managed datacenters.

immibis
0 replies
2d2h

If we ignore data egress fees, S3 is a rare exception to the rule that all of AWS is 10 times overpriced.

taeric
0 replies
2d22h

This assumes they aren't reinvesting the money back to other tangible improvements in the services, though?

igmor
5 replies
2d21h

Can you share any data on how big of a cluster is running Ray jobs?

thedood
4 replies
2d21h

From the blog post, the largest individual Ray cluster that was observed running a production compaction job in Q1 had 26,846 vCPUs and ~210TiB of RAM. This is roughly equivalent to a Ray cluster composed of 839 r5.8xlarge EC2 nodes (w/ 32 vCPUs and 256GiB RAM per node).

layoric
3 replies
2d18h

For those interested, this would be at a cost of:

- ~$1691/hour on demand

- ~$1065/hour reserved

- ~$521/hour spot

Not including any related data transfer costs.

LarsDu88
2 replies
1d21h

Wow that is not NEARLY as expensive as I would have imagined considering the scale of the data involved.

artemisart
1 replies
1d20h

That's also $41k / 26k / 13k per day or $1.2M / 767k / 375k per month!

LarsDu88
0 replies
1d20h

I mean, for all of Amazon's business intelligence, running on servers operated by Amazon... tis a mere pittance.

esafak
5 replies
3d15h

Are we talking about big data ETL here? I did not know Ray was suited for it.

thedood
4 replies
3d2h

This is a specialized ETL use-case - similar to taking a single SQL query and creating a dedicated distributed application tailored to run only that query. The lower-level primitives in Ray Core (tasks and actors) are general purpose enough to make building this type of application possible, but you'll be hard pressed to quickly (i.e., with less than 1 day of effort) make any arbitrary SQL query or dataframe operation run with better efficiency or scale on Ray than on dedicated data processing frameworks like Spark. IMO, the main value add of frameworks like Spark lies more in unlocking "good enough" efficiency and scale for almost any ETL job relatively quickly and easily, even if it may not run your ETL job optimally.

dekhn
3 replies
2d19h

Speaking as a distributed computing nerd, Ray is definitely one of the more interesting and exciting frameworks I've seen in a while. It's one of those systems where reading the manual, I can see that I'm not going to have to learn anything new, because the mental model resembles so many distributed systems I've worked with before (I dunno about anybody else, but tensorflow is an example of a distributed system that forced me to forget basically everything I knew before I could be even remotely productive in it).

Unclear if it's in the best interests of anyscale to promote Ray as a general purpose cluster productivity tool, even if it's good at that more general use case.

robertnishihara
2 replies
2d19h

I'm glad you find it exciting!

Our intention from the start was for Ray to be general purpose. And the core Ray APIs are quite general (basically just scheduling a Python function somewhere in a cluster or instantiating a Python class as a process somewhere in the cluster).

We had AI use cases in mind from the start, since we were grad students in AI. But the generality has really been important since AI workloads encompass a huge variety of computational patterns (allreduce style communication patterns on GPUs for training, embarrassingly parallel data processing workloads on spot instances, and so on).

refset
0 replies
2d6h

Is there anybody trying to build a SQL database on Ray yet? Asking for a friend.

dekhn
0 replies
2d18h

Oh, I know all that, I used to work at Google and give lots of money to the various groups associated with Ion Stoica's groups at Berkeley to help stimulate more open source alternatives to Borg/MapReduce/Flume/TensorFlow. Keep up the good work.

zitterbewegung
4 replies
2d22h

I was in a workshop that taught me Ray. It was interesting to know that the people who started Spark were also involved in making Ray.

This is not badmouthing either project just an observation and if you architected one task you would be good at attacking the same problem better .

fs111
3 replies
2d21h

Well spark was really a showcase Project for mesos when it was created. Now everyone knows a lot more

bigcat12345678
2 replies
2d11h

Spark was never a showcase for mesos

Mesos was a derivative idea from some sporadic idea from 2 level scheduling from inside Google based on mapreduce

Mesos was wrong from day one, they thought they have the right idea, but was really caught up by a Small group of Google engineers who happened to be from academia. These engineers were very good at having theoretically nice idea

In the time mesos was invented, Google had its own mesos, which is a similar project by learning wrong lesson from Borg. That thing is Omega.

Eventually everyone decided that Borg was right all along, thus the kubernetes

fs111
1 replies
2d8h

I have no idea what any of the google tech has to do with anything here.

Quoting from the original spark paper:

Spark is built on top of Mesos [16, 15], a “cluster operat- > ing system” that lets multiple parallel applications share > a cluster in a fine-grained manner and provides an API > for applications to launch tasks on a cluster

https://people.csail.mit.edu/matei/papers/2010/hotcloud_spar...

Note how Matei Zaharia - the inventor of spark - is also on the mesos paper:

https://people.eecs.berkeley.edu/~alig/papers/mesos.pdf

dekhn
0 replies
2d3h

The RAD lab folks who built Mesos were aware of Borg and how it approached the problem of schedling a bunch of different jobs on a collection of disparate hardware. Prior to borg, most large-scale clusters were managed with batch queue software, while borg and mesos are more from the "service management"- a collection of jobs that run concurrently, with priority levels used to preempt lower-priority jobs to allow higher-priority jobs to schedule and run "immediately".

The need for this pops up for nearly every large scale data processing enterprise- with k8s replacing mesos, yarn, and other systems as the cluster scheduler du-jour.

One of the big advantages of a service scheduler versus a batch queue is that you can implement a batch queue on top of a service scheduler much more easily than you can implement a service scheduler on top of a batch queue.

jgalt212
4 replies
2d20h

Slightly flip, but it's interesting that no one believes in or brags about cost savings via statistical sampling techniques these days.

dekhn
3 replies
2d19h

well, I can save money by eating only lentils, but I prefer a richer diet. As do BI folks in a highly profitable company.

jgalt212
1 replies
2d5h

It's a terribly inelegant and inefficient solution that no one should be "proud" of. The only time you need N=all is for the general ledger.

disgruntledphd2
0 replies
2d2h

The only time you need N=all is for the general ledger.

If you're predicting for each user, you need all of the data.

And generally you probably wouldn't want to sample too much for BI as it could lead to people making wrong decisions.

But yeah, in general sampling rocks and is super effective.

rubenvanwyk
0 replies
2d7h

Winning comment.

quadrature
3 replies
4d14h

Anyone know enough about ray to comment on what the exact performance unlock was ?. They mention that it gave them enough control over the distribution of work so that they could avoid unnecessary reads/write. That seems like a good win but I would assume that doing compaction in python would be quite slow.

thedood
2 replies
4d13h

Some of the initial differentiators are described at the bottom of our design doc at https://github.com/ray-project/deltacat/blob/main/deltacat/c.... But yes, controlling file I/O was also an important part of this since it allowed us to (1) run more targeted downloads/reads of only the Parquet row groups and columns participating in compaction and (2) track dirty/clean files to skip unnecessary re-writes of "clean" files that weren't altered by compaction. Also, just better leveraging catalog metadata (e.g., primary key indexes if available) to filter out more files in the initial scan, and to copy clean files into the compacted variant by reference (when supported by the underlying catalog format).

The trick with doing compaction in Python was to ensure that the most performance-sensitive code was delegated to more optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code paths. If we did all of our per-record processing ops in pure Python, compaction would indeed be much slower.

quadrature
0 replies
4d3h

Thanks a lot for the explanation. Sounds a lot like how Pyspark allows for declarative definitions of computation that is actually executed in Java.

ZeroCool2u
0 replies
2d20h

This is one of the first times I've heard of people using Daft in the wild. Would you be able to elaborate on where Daft came in handy?

Edit: Nvm, I kept reading! Thanks for the interesting post!

e28eta
2 replies
2d2h

I wonder what they’re doing to combat the growth rate of their data. A 13x speed up, or 82% cost reduction is great, but doesn’t seem significant enough compared to the growth of the business and (my assumption) demand for adding new data sources and added data to existing sources.

Like, if the current latency is ~60 minutes for 90% of updates, will it ever be better than that? Won’t it just slowly degrade until the next multi-year migration?

PS: this article was infuriating to read on iPad - it kept jumping back to the top of the page and couldn’t figure out why

thedood
1 replies
2d

As noted in the bottom of the design doc at https://github.com/ray-project/deltacat/blob/main/deltacat/c..., we also improved the runtime efficiency of compaction from O(nlogn) to O(n). However, a lot of this also comes down to making intentional data engineering decisions to control how physical data is laid out (and retained) across files to keep reads/writes as localized as possible. For example, we found that grouping records according to the date they were last updated to be very helpful, as outlined in our 2022 Ray Summit talk: https://youtu.be/u1XqELIRabI?t=1589.

e28eta
0 replies
1d20h

That’s a nice durable improvement! Thanks

whoiscroberts
1 replies
4d4h

Ray user here, what language actors are they using? Ray support Python Java and cpp actors…

thedood
0 replies
4d2h

We wrote the compactor in Python but, as noted in my previous response to quadrature, most of the performance sensitive code is written in C++ and Rust (but still invoked from Python).

whoevercares
1 replies
2d19h

I wonder if similar performance can be achieved with Spark accelerator like https://github.com/apache/datafusion-comet. Of course it didn’t exist 4 years ago, but would it cheaper to build?

spott
0 replies
2d14h

It sounds like a lot of the value here was in a more appropriate level of abstraction, not in the speed of the compute itself.

Ray allowed them to optimize elements that spark didn’t, and that was what improved performance, not that spark itself was slow.

jaychia
1 replies
2d14h

I work on Daft and we’ve been collaborating with the team at Amazon to make this happen for about a year now!

We love Ray, and are excited about the awesome ecosystem of useful + scalable tools that run on it for model training and serving. We hope that Daft can complement the rest of the Ray ecosystem to enable large scale ETL/analytics to also run on your existing Ray clusters. If you have an existing Ray cluster setup, you absolutely should have access to best-in-class ETL/analytics without having to run a separate Spark cluster.

Also, on the nerdier side of things - the primitives that Ray provides gives us a real opportunity to build a solid non-JVM based, vectorized distributed query engine. We’re already seeing extremely good performance improvements here vs Spark, and are really excited about some of the upcoming work to get even better performance and memory stability.

This collaboration with Amazon really battle-tested our framework :) happy to answer any questions if folks have them.

thedood
0 replies
2d

Good to see you here! It's been great working with Daft to further improve data processing on Ray, and the early results of incorporating Daft into the compactor have been very impressive. Also agree with the overall sentiment here that Ray clusters should be able to run best-in-class ETL without requiring a separate cluster maintained by another framework (Spark or otherwise). This also creates an opportunity to avoid many inefficient, high-latency cross-cluster data exchange ops often run out of necessity today (e.g., through an intermediate cloud storage layer like S3).

jameskraus
1 replies
2d13h

I wonder if there are any good primers to these technologies. Maybe a DDIA-like book or some lectures?

OJFord
0 replies
2d3h

Need a phonebook just for all the Apache ones.

alberth
1 replies
2d6h

Has anyone found a good ELI5 site for all the different AI toolsets.

Eg Ray, Databricks, Notebooks etc

OutOfHere
1 replies
2d20h

Can you help us understand how others can use and derive value from Ray DeltaCAT? What would be the specific use cases?

thedood
0 replies
2d19h

Some of DeltaCAT's goals and use cases have been discussed in this 2022 talk: https://youtu.be/M3pZDp1zock?t=4676

Today, our immediate next goal for DeltaCAT is to ensure that the compactor, and similar procedures for Ray, can run on Apache Iceberg. So, if you're an Iceberg user relying on procedures like Spark's "rewrite_data_files" and/or "rewrite_positional_delete_files" to compact your datasets today, then DeltaCAT will let you easily run similar compaction procedures on Ray to realize similar efficiency/scale improvements (even if it winds up delegating some of the work to other projects like PyIceberg, Daft, etc. along the way).

Going forward, we'd like DeltaCAT to also provide better general-purpose abstractions (e.g., reading/writing/altering large datasets) to simplify writing Ray apps in Python that work across (1) different catalog formats like Iceberg, Hudi, and Delta and (2) different distributed data processing frameworks like Ray Data, Daft, Modin, etc.

From the perspective of an internal DeltaCAT developer, another goal is to just reduce the maintainability burden and dev hours required to write something like a compactor that works across multiple catalogs (i.e., by ensuring that all interfaces used by such a procedure can be readily implemented for multiple catalog formats like Iceberg, Hudi, Delta, etc.).

Narhem
1 replies
4d17h

Absolutely insane work. So much data you’d think they would come up with a custom solution instead of using the “newest available toolkit” but I understand how much of a mess dealing with that much data is.

thedood
0 replies
4d15h

Hi Narhem - one of the devs that worked on the migration here. The data volume, and subsequent compute power required to process it, is actually one of the things that led us to Ray (or Ray Core specifically) since it had the distributed computing primitives (tasks and actors) that we needed to build out our envisioned solution with very few compromises. One thing we DIDN'T want to do was just throw another one-liner SQL statement running on a new data processing framework at the problem, since that leads us back to the problems we had with Spark - not enough low-level control for such an important problem.

In short, after evaluating our options, Ray seemed to strike the best balance between the one efficiency extreme of, say, building out custom "compaction-optimized" hardware/clusters, and the other maintainability extreme of just letting the latest managed cloud service run a 1-liner SQL statement for us without ever looking under the hood.

Regardless, I expect both our existing solution and the distributed compute frameworks leveraged to deliver it to continue to evolve over time.

uptownfunk
0 replies
2d12h

Are they that bored over there at amazon.

maxnevermind
0 replies
1d17h

Why did it take multiple years to do that I wonder? After a new compaction framework is ironed out it should not be that difficult to onboard/spread it across to all the tables in especially considering they had a parallel setup in Spark still, so they can afford hiccups in a new setup.

hiyer
0 replies
2d14h

We chose Ray over Spark in my previous company mostly because we were a Python shop and Ray is Python-native (though it's implemented in C++ I believe). It worked very well for us even for real-time queries - though we were obviously nowhere near the scale that AWS is at.

PeterCorless
0 replies
2d2h

I remember when everyone shifted from Apache Hadoop to Apache Spark. This seems like a possibly similar sea change. I am not sure if many other users will embrace Ray over Spark, but this is a sign that people are looking to either improve Spark on some fundamental levels, or are going to reach out to new technologies to resolve their problems. Cool stuff.

LarsDu88
0 replies
1d21h

As someone who used Ray for ML, this is impressive as hell.

I did not even realize it was that viable a substitute for straight data processing tasks.

100pctremote
0 replies
2d22h

Rather nuts. New challenge: build datacenters quickly enough to support the new platform.