I have an personal pet peeve about Parquet that is solved, incompatibly, by basically every "data lake / lakehouse" layer on top, and I'd love to see it become compatible: ranged partitioning.
I have an application which ought to be a near-perfect match for Parquet. I have a source of timestamped data (basically a time series, except that the intervals might not be evenly spaced -- think log files). A row is a timestamp and a bunch of other columns, and all the columns have data types that Parquet handles just fine [0]. The data accumulates, and it's written out in batches, and the batches all have civilized sizes. The data is naturally partitioned on some partition column, and there is only one writer for each value of the partition column. So far, so good -- the operation of writing a batch is a single file creation or create call to any object store. The partition column maps to the de-facto sort-of-standard Hive partitioning scheme.
Except that the data is (obviously) also partitioned on the timestamp -- each batch covers a non-overlapping range of timestamps. And Hive partitioning can't represent this. So none of the otherwise excellent query tools can naturally import the data unless I engage in a gross hack:
I could also partition on a silly column like "date". This involves aligning batches to date boundaries and also makes queries uglier.
I could just write the files and import ".parquet". This kills performance and costs lots of money.
I could use Iceberg or Delta Lake or whatever for the sole benefit that their client tools can handle ranged partitions. Gee thanks. I don't actually need any of the other complexity.
It would IMO be really really nice if everyone could come up with a directory-name or filename scheme for ranged partitioning.
[0] My other peeve is that a Parquet row and an Arrow row and a Thrift message and a protobuf message, etc, are
almost* but not quite the same thing. It would be awesome if there was a companion binary format for a single Parquet row or a stream of rows so that tools could cooperate more easily on producing the data that eventually gets written into Parquet files.
Why is the footer metadata not sufficient for this need? The metadata should contain the min and max timestamp values from the respective column of interest, so that when executing a query, the query tool should be optimizing its query by reading the metadata to determine if that parquet file should be read or not depending on what time range is in the query.
Because the footer metadata is in the Parquet file, which is already far too late to give an efficient query.
If I have an S3 bucket containing five years worth of Parquet files, each covering a few days worth of rows, and I tell my favorite query tool (DuckDB, etc) about that bucket, then the tool will need to do a partial read (which is multiple operations, I think, since it will need to find the footer and then read the footer) of ~500 files just to find out which ones contain the data of interest. A good query plan would be to do a single list operation on the bucket to find the file names and then to read the file or files needed to answer my query.
Iceberg and Delta Lake (I think -- I haven't actually tried it) can do this, but plain Parquet plus Hive partitioning can't, and I'm not aware of any other lightweight scheme that is well supported that can do it. My personal little query tool (which predates Parquet) can do it just fine by the simple expedient of reading directory names.
This can also be done using row group metadata within the parquet file. The row group metadata can include the range values of ordinals so you can "partition" on timestamps without having to have a file per time range.
Hive supports 2 kinds of partitioning, injected and dynamic. You can totally use a partition key like the hour in UNIX time. It's an integer starting at some epoch and incrementing by 3600.
Now your query engine might require you to specify the partitions or range of partitions you want to query on; you absolutely can use datepartition >=a and datepartition<b in your query. Iceberg seems to fix that and just let you use the timestamp; presumably the metadata is smart enough to exclude the partitions you don't care about.
This is exactly what I meant by “I could also partition on a silly column like "date". This involves aligning batches to date boundaries and also makes queries uglier.”
In the lower level arrow/parquet libraries you can control the row groups, and even the data pages (although it’s a lot more work). I have used this heavily with the arrow-rs crate to drastically improve (like 10x) how quickly data could be queried from files. Some row groups will have just a few rows, others will have thousands, but being able to bypass searching in many row groups makes the skew irrelevant.
Just beware that one issue you can have is the limit of row groups per file (2^15).
Time series data is naturally difficult to work with, but avoidable. One solution is to not query raw time series data files. Instead, segment your time series data before you store it, normalizing the timestamps as part of event processing. Sliding window observations will help you find where the event begins and then you adjust the offset until you find where the time series returns to its zero-like position. That's your event.
Segmenting data is exactly what writing it into non-overlapping Parquet files is. My point is that many tools can read a bucket full of these segments, and most of them can handle a scheme where each file corresponds to a single value of a column, but none of them can agree on how to efficiently segment the data where each segment contains a range, unless a new column is invented for the purpose and all queries add complexity to map onto this type of segmentation.
There’s nothing conceptually or algorithmically difficult about what I want to do. All that’s needed is to encode a range of times into the path of a segment. But Hive didn’t do this, and everyone implemented Hive’s naming scheme, and that’s the status quo now.
I think maybe this is a pet peeve of Hive and not of Parquet? Yes, it does require opening the Parquet file to look at the min, max range for the column, but only that data and if the data isn’t in range there shouldn’t be further requests.
That is the kind of metadata that is useful to push up, into something like DuckLake.
I guess my peeve could be restated as: Hive’s naming scheme doesn’t handle this, and Parquet per se can’t handle it because it’s out of scope, but all the awesome tools (e.g. DuckDB), when used on Parquet files without something with “lake” or “ice” in the name, use the Hive scheme.
Someone could buck the trend and extend Hive’s scheme to support ranges.
This looks awesome. One of my biggest gripe's personally with Iceberg (less-so Delta Lake, but similar) is how difficult it is to just try out on a laptop. Delta Lake has vanilla Python implementations, but those are fragemented and buggy IME. Iceberg has just never worked locally, you need a JVM cluster and a ton of setup. I went down a similar road of trying to use sqlite/postgres+duckdb+parquet files in blob storage, but it was a lot of work.
It seems like this will just work out of the box, and just scale up to very reasonable data sizes. And the work from the DuckDB folks is typically excellent. It's clear they understand this space. Excited to try it out!
Have you tried out PyIceberg yet? It's a pure Python implementation and it works pretty well. It supports a SQL Catalog as well as an In-Memory Catalog via a baked in SQLite SQL Catalog.
I was thinking of putting something together for this. Like a helm chart that works with k3s.
datapains has some good stuff to get trino running and you can get a hivemetastore running with some hacking. I dorked around with it and then got the iceberg connector working with trino and see how it all works. I load data in to a dumb hive with a trino table pointed at it and then insert from select ... in to iceberg.
If the duck guys have some simple to get running stuff, they could probably start to eat everyone else' lunch.
They make a really good criticism of Iceberg: if we have a database anyway, why are we bothering to store metadata in files?
I don’t think DuckLake itself will succeed in getting adopted beyond DuckDB, but I would not be surprised if over time the catalog just absorbs the metadata, and the original Iceberg format fades into history as a transitional form.
Hopefully this clarifies the value proposition for others:
Existing Lakehouse systems like Iceberg store crucial table information (like schema and file lists) as many small "metadata files" in cloud object storage (like S3). Accessing these files requires numerous network calls, making operations like query planning and updating tables inefficient and prone to conflicts. DuckLake solves this by putting all that metadata into a fast, transactional SQL database, using a single query to get what's needed, which is much quicker and more reliable.
They support syncing to Iceberg by writing the manifest and metadata files on demand, and they already have read support for Iceberg. They just fixed Iceberg's core issues but it's not a direct competitor as you can use DuckLake along with Iceberg in a very nice and bidirectional way.
metadata bloat can be due to a few things, but it’s manageable.
* number of snapshots
* frequent large schema changes
* lots of small files/row level updates
* lots of stats
The last one IIRC used to be pretty bad especially with larger schemas.
Most engines have ways to help with this - compaction, snapshot exportation, etc… Though it can still be up to the user. S3 tables is supposed to do some of this for you.
If metadata is below 1-5MB it’s really not an issue. Your commit rate is effectively limited by the size of your metadata and the number of writers you have.
I’ve written scripts to fix 1GB+ metadata files in production. Usually it was pruning snapshots without deleting files (relying on bucket policy to later clean things up) or removing old schema versions.
I’m building a poor man’s datalake at work, basically putting parquet files in blob storage using deltalake-rs’ python bindings and duck db for querying.
However, I constantly run in to problems with concurrent writes. I have a cloud function triggered ever x minutes to pull data from API and that’s fine.
But if I need to run a backfill I risk that that process will run at the same time as the timer triggered function. Especially if I load my backfill queue with hundreds of runs that needs to be pulled and they start saturating the workers in the cloud function.
I have an application which ought to be a near-perfect match for Parquet. I have a source of timestamped data (basically a time series, except that the intervals might not be evenly spaced -- think log files). A row is a timestamp and a bunch of other columns, and all the columns have data types that Parquet handles just fine [0]. The data accumulates, and it's written out in batches, and the batches all have civilized sizes. The data is naturally partitioned on some partition column, and there is only one writer for each value of the partition column. So far, so good -- the operation of writing a batch is a single file creation or create call to any object store. The partition column maps to the de-facto sort-of-standard Hive partitioning scheme.
Except that the data is (obviously) also partitioned on the timestamp -- each batch covers a non-overlapping range of timestamps. And Hive partitioning can't represent this. So none of the otherwise excellent query tools can naturally import the data unless I engage in a gross hack:
I could also partition on a silly column like "date". This involves aligning batches to date boundaries and also makes queries uglier.
I could just write the files and import ".parquet". This kills performance and costs lots of money.
I could use Iceberg or Delta Lake or whatever for the sole benefit that their client tools can handle ranged partitions. Gee thanks. I don't actually need any of the other complexity.
It would IMO be really really nice if everyone could come up with a directory-name or filename scheme for ranged partitioning.
[0] My other peeve is that a Parquet row and an Arrow row and a Thrift message and a protobuf message, etc, are
almost* but not quite the same thing. It would be awesome if there was a companion binary format for a single Parquet row or a stream of rows so that tools could cooperate more easily on producing the data that eventually gets written into Parquet files.If I have an S3 bucket containing five years worth of Parquet files, each covering a few days worth of rows, and I tell my favorite query tool (DuckDB, etc) about that bucket, then the tool will need to do a partial read (which is multiple operations, I think, since it will need to find the footer and then read the footer) of ~500 files just to find out which ones contain the data of interest. A good query plan would be to do a single list operation on the bucket to find the file names and then to read the file or files needed to answer my query.
Iceberg and Delta Lake (I think -- I haven't actually tried it) can do this, but plain Parquet plus Hive partitioning can't, and I'm not aware of any other lightweight scheme that is well supported that can do it. My personal little query tool (which predates Parquet) can do it just fine by the simple expedient of reading directory names.
Now your query engine might require you to specify the partitions or range of partitions you want to query on; you absolutely can use datepartition >=a and datepartition<b in your query. Iceberg seems to fix that and just let you use the timestamp; presumably the metadata is smart enough to exclude the partitions you don't care about.
Just beware that one issue you can have is the limit of row groups per file (2^15).
There’s nothing conceptually or algorithmically difficult about what I want to do. All that’s needed is to encode a range of times into the path of a segment. But Hive didn’t do this, and everyone implemented Hive’s naming scheme, and that’s the status quo now.
That is the kind of metadata that is useful to push up, into something like DuckLake.
Someone could buck the trend and extend Hive’s scheme to support ranges.
It seems like this will just work out of the box, and just scale up to very reasonable data sizes. And the work from the DuckDB folks is typically excellent. It's clear they understand this space. Excited to try it out!
https://py.iceberg.apache.org/
https://www.definite.app/blog/cloud-iceberg-duckdb-aws
(Disclosure, I am a developer of marimo.)
datapains has some good stuff to get trino running and you can get a hivemetastore running with some hacking. I dorked around with it and then got the iceberg connector working with trino and see how it all works. I load data in to a dumb hive with a trino table pointed at it and then insert from select ... in to iceberg.
If the duck guys have some simple to get running stuff, they could probably start to eat everyone else' lunch.
https://delta-io.github.io/delta-rs/
Dead Comment
I don’t think DuckLake itself will succeed in getting adopted beyond DuckDB, but I would not be surprised if over time the catalog just absorbs the metadata, and the original Iceberg format fades into history as a transitional form.
Existing Lakehouse systems like Iceberg store crucial table information (like schema and file lists) as many small "metadata files" in cloud object storage (like S3). Accessing these files requires numerous network calls, making operations like query planning and updating tables inefficient and prone to conflicts. DuckLake solves this by putting all that metadata into a fast, transactional SQL database, using a single query to get what's needed, which is much quicker and more reliable.
https://quesma.com/blog-detail/apache-iceberg-practical-limi...
Even Snowflake was using FoundationDB for metadata, whereas Iceberg attempts to use blob storage even for the metadata layer.
They support syncing to Iceberg by writing the manifest and metadata files on demand, and they already have read support for Iceberg. They just fixed Iceberg's core issues but it's not a direct competitor as you can use DuckLake along with Iceberg in a very nice and bidirectional way.
* number of snapshots
* frequent large schema changes
* lots of small files/row level updates
* lots of stats
The last one IIRC used to be pretty bad especially with larger schemas.
Most engines have ways to help with this - compaction, snapshot exportation, etc… Though it can still be up to the user. S3 tables is supposed to do some of this for you.
If metadata is below 1-5MB it’s really not an issue. Your commit rate is effectively limited by the size of your metadata and the number of writers you have.
I’ve written scripts to fix 1GB+ metadata files in production. Usually it was pruning snapshots without deleting files (relying on bucket policy to later clean things up) or removing old schema versions.
However, I constantly run in to problems with concurrent writes. I have a cloud function triggered ever x minutes to pull data from API and that’s fine.
But if I need to run a backfill I risk that that process will run at the same time as the timer triggered function. Especially if I load my backfill queue with hundreds of runs that needs to be pulled and they start saturating the workers in the cloud function.