Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for DatasourceV2: Sampling Pushdown and Limit Pushdown [Spark] #175

Open
1 of 6 tasks
osopardo1 opened this issue Mar 16, 2023 · 10 comments
Open
1 of 6 tasks
Assignees
Labels
enhancement New feature or request

Comments

@osopardo1
Copy link
Member

osopardo1 commented Mar 16, 2023

Related to #166 .

Qbeast-Spark should be compatible with latest versions of Delta Lake and Apache Spark, to benefit from any new features and major upgrades.
The change to Delta version 2.1.0 and Spark 3.3.0, reveal a set of interesting Pushdown operation that could be empowered with the Qbeast Metadata.

We should:

Thoughts on #68

  • Ideally, we would no longer need to compute the hash for the indexed columns in order to filter the records in memory. But then, we need to ensure the sample returned from OTreeIndex (or from any other class that is involved, such as ParquetFileFormat) is correct.
  • Since for each file we know the minWeight and maxWeight, we can determine how many rows we can read from it. The only thing we need to find out is where those records are filtered.
  • Here's a detailed blog that explains how Spark process Parquet files: https://animeshtrivedi.github.io/spark-parquet-reading
@osopardo1 osopardo1 added the enhancement New feature or request label Mar 16, 2023
@osopardo1
Copy link
Member Author

In DatasourceV2 there's also the possibility to build your own scan of the table, with more options than the Datasource V1 (which we are currently using).

Maybe it's worth to explore the DV2 API.

@alexeiakimov
Copy link
Contributor

IMHO we need to explore the Datasource V2 API, possibly we will end to drop the V1. To support both can be too much conditional logic.

@osopardo1
Copy link
Member Author

Yes, I agree. Do you think this can be done in the same PR #167 or it is best to do workaround first for Sampling and Limit Pushdown and migrate everything to V2 in a separate issue?

@alexeiakimov
Copy link
Contributor

alexeiakimov commented Mar 17, 2023

Well, I prefer to separate migration to the new versions of Spark/Delta and reworking the QbeastTable on the top of DataSource V2. So migration would mean that everything is compiling and running without new problems. And rework is a complex task, because they changed DataSource SPI a lot although it still has version V2. A good overview of the Spark 3.0 SPI could be found here https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/

@osopardo1 osopardo1 added status: in-progress This issue is in progress priority: normal This issue has normal priority labels Mar 17, 2023
@alexeiakimov
Copy link
Contributor

I would like to share some thought on Spark 3.x.x DataSource API V2.

  1. Surprisingly DataSource API V2 in Spark 2.x and in Spark 3. are different. A good general overview can be found https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/
  2. The Read API seems straightforward, the mixins for filtering, sampling and limit push down should be implemented for the ScanBuilder, the later passes the filters, sampling and limit to the Scan and Batch. Batch can compute the necessary files and pass them to PartitionReaderFactory. The later creates PartitionReader for each partition (we have just one). PartitionReader returns the table rows one by one like iterator.
  3. The Write API is much more challenging. A possible implementation does not have direct access to the original DataFrame, instead the rows are written by DataWriter one by one. In other words DataWriter is a callback, so there is no explicit notification when the writing starts. It means that we cannot assign weights by transforming the original DataFrame as it is done now. It also means that we have to start transaction lazily when we read the current index.

@osopardo1, @cugni , @Jiaweihu08 Could it make sense to create a temporary DataFrame to copy the data being written, and then to apply the algorithm we use now?

@osopardo1
Copy link
Member Author

osopardo1 commented Mar 20, 2023

Thank you for the overview!

  1. Very nice, a lot of code can be reused from OTreeIndex, once the filters and everything is pushed down.

  2. One solution for the Writer API is to keep a Fallback to Version 1. It is what we have implemented for the moment. The Writer Builder returns a V1Write, which will create an InsertableRelation, that calls our methods in IndexedTable for indexing and writing the DataFrame. I think we can migrate just Read features at the moment, while we consider moving everything else in the future.

@osopardo1 osopardo1 mentioned this issue Mar 20, 2023
3 tasks
@osopardo1
Copy link
Member Author

Well, I prefer to separate migration to the new versions of Spark/Delta and reworking the QbeastTable on the top of DataSource V2. So migration would mean that everything is compiling and running without new problems. And rework is a complex task, because they changed DataSource SPI a lot although it still has version V2. A good overview of the Spark 3.0 SPI could be found here https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/

Noted. We are going to merge #167 first and then migrate to V2. We can also split the development of migration in two:

  1. Migrate Read Features (plus add Sampling Pushdown and Limit)
  2. Migrate Writer (if needed)

@alexeiakimov
Copy link
Contributor

alexeiakimov commented Mar 20, 2023

Technically I prefer 4 steps:

  1. Implement Read API V2 to have a working pipeline.
  2. Add sampling push down
  3. Add limit push down
  4. Implement Write API V2 falling back to V1Write.

Probably small changes will be easier to review and to demonstrate

@osopardo1
Copy link
Member Author

Plan looks good to me. 👍

@osopardo1 osopardo1 self-assigned this Mar 20, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 24, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 27, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 27, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 27, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 29, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Mar 29, 2023
alexeiakimov added a commit to alexeiakimov/qbeast-spark that referenced this issue Apr 3, 2023
@osopardo1 osopardo1 changed the title Support for Spark 3.3.x Sampling Pushdown and Limit Pushdown Support for DatasourceV2: Sampling Pushdown and Limit Pushdown Sep 21, 2023
@osopardo1 osopardo1 changed the title Support for DatasourceV2: Sampling Pushdown and Limit Pushdown Support for DatasourceV2: Sampling Pushdown and Limit Pushdown [Spark] Sep 21, 2023
@osopardo1 osopardo1 added status: on-hold This issue is on hold and removed status: in-progress This issue is in progress priority: normal This issue has normal priority labels Sep 21, 2023
@osopardo1 osopardo1 removed the status: on-hold This issue is on hold label Oct 23, 2023
@osopardo1
Copy link
Member Author

I maintain this issue for future development plans. We need to rethink the design, the utility, and the properties involved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants