Fun story time: I was at twitter for a few years and tended to write quick hacks that people wanted to replace with better engineering.
We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.
I also wrote one era of twitter's service deploy system on a hacked up Capistrano.
These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.
Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.
I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.
Tangential to the topic but regarding the supposed Snowball Effect there is in real life no such thing. I have pushed large 'snowballs' down slopes --in reality they are snow cylinders as shown in the photo-- and they invariably do not get far. The reason being that when one side of the cylinder randomly thickens slightly with respect to the other side this causes the whole thing to turn in the opposite direction.
For example, if the RHS of your cylinder has a slightly larger radius than the LHS the cylinder will commence turning to the left.
The upshot is the thick side picks up more snow than the thin side and the disparity in radii increases more rapidly still. The cylinder becomes a truncated cone which turns sideways and halts!
It is highly dependent on the snow conditions and the recent weather. Sometimes even just the a couple hours are enough to change the conditions to have a good chance of rollerballs. The climate also has an impact, in my experience more coastal areas have more periods when they form.
And in some cases the rollerballs get too tall for the bonding strength of the snow, so they break into parts that can restart the cycle if the slope is steep enough.
Reading through this blog, to me it seems Parquet is lot like ClickHouse native data format.
Best part of ClickHouse native data format is I can use the same ClickHouse queries and can run in local or remote server/cluster and let ClickHouse to decide the available resources in the most performant way.
ClickHouse has a native and the fastest integration with Parquet so i can:
- Query local/s3 parquet data from command line using clickhouse-local.
- Query large amount of local/s3 data programmatically by offloading it to clickhouse server/cluster which can do processing in distributed fashion.
I've been struggeling with a tough parquet problem for a few months now.
I have a 15gb parquet file in a s3 bucket and I need to "unzip" and extract every row from the file to write into my database. The contents of the file are emails and I need to integrate them into our search function.
Is this possible to do without an unreasonable amount of RAM? Are there any affordable services that can help here?
Feel free to contact me (email in bio), happy to pay for a consult at the minimum.
I work with pyspark and parquet quite a lot. I never had to deal with parquet outside spark, but this is how I would do this:
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series):
# doc is a pandas series now
# your processing goes here -> write to DB e.t.c
pd_series_literal = Create a pd.Series that just contains the integer 0 to make spark happy
return pd_series_literal
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run
into memory issues
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization
I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.
It is trivial to map the file into Athena.
But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).
The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.
It's been a while (~5yr) since I've done anything with Spark, but IIRC it used to be very difficult to make reliable jobs with the Java or Python APIs due to the impedance mismatch between Scala's lazy evaluation semantics and the eager evaluation of Java and Python. I'd encounter perplexing OOMs whenever I tried to use the Python or Java APIs, so I (reluctantly) learned enough Scala to make the Spark go brr and all was well. Is it still like this?
Our company (scratchdata.com, open source) is literally built to solve the problem of schlepping large amounts of data between sources and destinations, so I have worked on this problem a lot personally and happy to nerd out about what works.
I - by my HPC-background - am wondering quite a bit what happened that 15GB-files are considered large data? Not being a crazy parquet-user, but:
- does this decompress to giant sizes?
- can't you split the file easily, because it includes row-based segments?
- why does it take months to solve this for one file?
It might not be fast, but a quick 1-off solution that you let run for a while would probably do that job. There shouldn't be a need to load the whole file into memory.
Have you given DuckDB a try? I'm using it to shuttle some hefty data between postgres and some parquet files on S3 and it's only a couple lines. Haven't noted any memory issues so far
with open('tmp/'+DUMP_NAME+'_cleaned.jsonl', mode='w', newline='\n', encoding='utf8') as f:
for row in df.collect(streaming=True).iter_rows(named=True):
row = {k: v for k, v in row.items() if (v is not None and v != [] and v != '')}
f.write(json.dumps(row, default=str) + '\n')
```
My suggestion is to load each row group individually, as they generally will be much smaller than your total file size. You can do this via pyarrow.ParquetFile.read_row_group. To truly optimize this for reading from s3 you could use fsspec’s open_parquet_file library which would allow you to only load each row group one at a time.
I had similiar issue, but for aggreagations. Use case was to "compress" large datasets into smaller aggregations for insertion into a costly db.
At first we used duckdb but memory became an issue there and we also bumped into a couple of issues with how duckdb handles arrays.
We then moved this workload to clickhouse local, which was faster and had more fine tuning options to our liking. in this case was limiting ram usage with i.e. max_bytes_before_external_group_by
I'm puzzled as to why this is a problem that has lasted months. My phone has enough RAM to work with this file in memory. Do not use pyspark, it is unbelievably slow and memory hogging if you hold it even slightly wrong. Spark is for tb-sized data, at minimum.
Have you tried downloading the file from s3 to /tmp, opening it with pandas, iterating through 1000 row chunks, pushing to DB? The default DF to SQL built into pandas doesn't batch the inserts so it will be about 10x slower than necessary, but speeding that up is a quick google->SO away.
I've spent many many hours trying these suggestions, didn't have much luck. iter_batches loads the whole file (or some very large amount of it) into memory.
This is the answer for a one-off or occasional problem unless your time is worthless.
$200 to rent a machine that can run the naive solution for an entire day is peanuts compared to the dev time for a “better” solution. Running that machine for eight hours would only cost enough to purchase about a half day of junior engineer time.
Perhaps look into using dlt from https://dlthub.com, using pyarrow or polars. It handles large datasets well, especially when using generators to process the data in chunks.
It's very interesting to see how a new "enterprise open source" project is born. The part where right at the start the author knows that they should have more than one company on board, and how the other companies each contribute their part.
No-one with the combination of motivation, time and skills needed get it into Debian. Someone wanted to get a Python implementation in, but it looks like they never found the time.
pandas is a python-centric, tabular data handler that works well in clouds (and desktop Debian). Pandas can read parquet data today, among other libs mentioned. The binary dot-so driver style is single-host centric and not the emphasis of these cloudy projects (and their cloudy funders)
Perhaps more alarm is called for when this python+pandas and parquet does not work on Debian, but that is not the case today.
ps- data access in clouds often uses the S3:// endpoint . Contrast to a POSIX endpoint using _fread()_ or similar.. many parquet-aware clients prefer the cloudy, un-POSIX method to access data and that is another reason it is not a simple package in Debian today.
As I understand it, pandas can read parquet if the pyarrow or fastparquet packages are available, but that's not the case and attempts to fix that have been underway for several years.
We never had scala thrift bindings, and the Java ones were awkward from Scala, so I wrote a thrift plugin in JRuby that used the Ruby thrift parser and ERb web templates to output some Scala code. Integrated with our build pipeline, worked great for the company.
I also wrote one era of twitter's service deploy system on a hacked up Capistrano.
These projects took a few days because they were dirty hacks, but I still got a below perf review for getting easily distracted, because I didn't yet know how to sell those company-wide projects.
Anyhow, about a month before that team kicked off Parquet, I showed them a columnar format I made for a hackweek based on Lucene's codec packages, and was using to power a mixpanel-alike analytics system.
I'm not sure whether they were inspired or terrified that my hack would reach production, but I like to think I had a small hand in getting Parquet kickstarted.
Partially explains how murder came to be https://github.com/lg/murder
> below perf review
That's some cheap bullshit. Fuck marketing-oriented corporate engineering.
For example, if the RHS of your cylinder has a slightly larger radius than the LHS the cylinder will commence turning to the left.
The upshot is the thick side picks up more snow than the thin side and the disparity in radii increases more rapidly still. The cylinder becomes a truncated cone which turns sideways and halts!
And in some cases the rollerballs get too tall for the bonding strength of the snow, so they break into parts that can restart the cycle if the slope is steep enough.
Best part of ClickHouse native data format is I can use the same ClickHouse queries and can run in local or remote server/cluster and let ClickHouse to decide the available resources in the most performant way.
ClickHouse has a native and the fastest integration with Parquet so i can:
- Query local/s3 parquet data from command line using clickhouse-local.
- Query large amount of local/s3 data programmatically by offloading it to clickhouse server/cluster which can do processing in distributed fashion.
- https://clickhouse.com/blog/apache-parquet-clickhouse-local-...
- https://clickhouse.com/blog/apache-parquet-clickhouse-local-...
I have a 15gb parquet file in a s3 bucket and I need to "unzip" and extract every row from the file to write into my database. The contents of the file are emails and I need to integrate them into our search function.
Is this possible to do without an unreasonable amount of RAM? Are there any affordable services that can help here?
Feel free to contact me (email in bio), happy to pay for a consult at the minimum.
https://duckdb.org/2024/03/29/external-aggregation.html
https://duckdb.org/2021/06/25/querying-parquet.html
If your DB is mysql or postgres, then you could read a stream from parquet, transform inline and write out to your DB
https://duckdb.org/2024/01/26/multi-database-support-in-duck...
And an unrelated, but interesting read about the parquet bomb
https://duckdb.org/2024/03/26/42-parquet-a-zip-bomb-for-the-...
- Write a pandas_udf function in pyspark.
- Parition your data into smaller bits so that the pandas_udf does not get too much data at the same time.
Something like:
```
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@f.pandas_udf(return_type=whatever)
def ingest(doc: pd.Series): # doc is a pandas series now
spark = SparkSession.builder.getOrCreate()df = spark.read.parquet("s3 path")
df = df.repartition(1000). # bump up this number if you run into memory issues
df = df.withColumn("foo", ingest(f.col("doc_column"))
```
Now the trick is, you can limit how much data is given to your pandas_udf by repartitioning your data. The more the partitions, the smaller the pd.Series that your pandas_udf gets. There's also the `spark.sql.execution.arrow.maxRecordsPerBatch` config that you can set in spark to limit memory consumption.
^ Probably overkill to bring spark into the equation, but this is one way to do it.
You can use a normal udf (i.e `f.udf()`) instead of a pandas_udf, but apparently that's slower due to java <-> python serialization
I just wanted to mention that AWS Athena eats 15G parquet files for breakfast.
It is trivial to map the file into Athena.
But you can't connect it to anything else than file output. But it can help you to for example write it to smaller chunks. Or choose another output format such as csv (although arbitrary email content in a csv feels like you are set up for parsing errors).
The benefit is that there is virtually no setup cost. And processing cost for a 15G file will be just a few cents.
Our company (scratchdata.com, open source) is literally built to solve the problem of schlepping large amounts of data between sources and destinations, so I have worked on this problem a lot personally and happy to nerd out about what works.
- does this decompress to giant sizes? - can't you split the file easily, because it includes row-based segments? - why does it take months to solve this for one file?
It might not be fast, but a quick 1-off solution that you let run for a while would probably do that job. There shouldn't be a need to load the whole file into memory.
``` df = pl.scan_parquet('tmp/'+DUMP_NAME+'_cleaned.parquet')
with open('tmp/'+DUMP_NAME+'_cleaned.jsonl', mode='w', newline='\n', encoding='utf8') as f: for row in df.collect(streaming=True).iter_rows(named=True): row = {k: v for k, v in row.items() if (v is not None and v != [] and v != '')} f.write(json.dumps(row, default=str) + '\n') ```
Have you tried downloading the file from s3 to /tmp, opening it with pandas, iterating through 1000 row chunks, pushing to DB? The default DF to SQL built into pandas doesn't batch the inserts so it will be about 10x slower than necessary, but speeding that up is a quick google->SO away.
Streams batches of rows
https://arrow.apache.org/docs/python/generated/pyarrow.parqu...
Edit — May need to do some extra work with s3fs too from what I recall with the default pandas s3 reading
Edit 2 — or check out pyarrow.fs.S3FileSystem :facepalm:
$200 to rent a machine that can run the naive solution for an entire day is peanuts compared to the dev time for a “better” solution. Running that machine for eight hours would only cost enough to purchase about a half day of junior engineer time.
Look at the parquet file metadata: use whatever tool you want for that. The Python parquet library is useful and supports s3.
How big are your row groups? If it’s one large row group then you will run into this issue.
What’s the number of rows in each row group?
I'm curious if anyone has experience with OpenLineage/Marquez or similar they'd like to share
https://search.debian.org/cgi-bin/omega?DB=en&P=parquet
https://bugs.debian.org/838338
These days Debian packaging has become a bit irrelevant, since you can just shove upstream releases into a container and go for it.
https://pandas.pydata.org/docs/reference/api/pandas.read_par...
https://packages.debian.org/buster/python3-pandas
Perhaps more alarm is called for when this python+pandas and parquet does not work on Debian, but that is not the case today.
ps- data access in clouds often uses the S3:// endpoint . Contrast to a POSIX endpoint using _fread()_ or similar.. many parquet-aware clients prefer the cloudy, un-POSIX method to access data and that is another reason it is not a simple package in Debian today.
Polars and DuckDB are much better about memory management.
https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=970021
What crap. That's 'source-available', NOT open-source.
But at least co-option of terminology is an indicator of success.