Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A simple filter is 100x slower after a positional join. #11992

Open
cottrell opened this issue May 9, 2024 · 6 comments
Open

A simple filter is 100x slower after a positional join. #11992

cottrell opened this issue May 9, 2024 · 6 comments

Comments

@cottrell
Copy link

cottrell commented May 9, 2024

Posted as a discussion but probably belongs in issues.

Example of the issue. I can create some full reproduction script later if needed.

This is some kind of data_raw and pos_join_enriched, parquet files with exactly the same structure. The names of the files are the same. The rows are aligned. These datasets are loaded via pyarrow then con.register. And then a data_enriched via the positional_join query as shown below.

The filter query is quite efficient. The cik column is an ordered in so I think Zonemaps implicitly are used.

However the same query on the positional joined data is 100x slower. I would think it should be at most 2x slower.

In [13]: %time _ = con.query("select * from data_raw where cik = 746631").df()
CPU times: user 158 ms, sys: 102 ms, total: 260 ms
Wall time: 48 ms

In [14]: %time _ = con.query("select * from data_enriched where cik = 746631").df()
100% ▕████████████████████████████████████████████████████████████▏ 
CPU times: user 25.9 s, sys: 2.96 s, total: 28.8 s
Wall time: 4.5 s

In [15]: print(con.query("explain select * from data_raw where cik = 746631").df().explain_value[0])
┌───────────────────────────┐
│        ARROW_SCAN         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            end            │
│            val            │
│             fy            │
│             fp            │
│            form           │
│           filed           │
│          taxonomy         │
│            item           │
│           units           │
│            cik            │
│           start           │
│     __index_level_0__     │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│Filters: cik=746631 AND cik│
│         IS NOT NULL       │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│           EC: 1           │
└───────────────────────────┘                             


In [16]: print(con.query("explain select * from data_enriched where cik = 746631").df().explain_value[0])
┌───────────────────────────┐                             
│           FILTER          │                             
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                             
│       (cik = 746631)      │                             
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                             
│           EC: 1           │                             
└─────────────┬─────────────┘                                                          
┌─────────────┴─────────────┐                             
│      POSITIONAL_SCAN      ├──────────────┐              
└─────────────┬─────────────┘              │                                           
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│        ARROW_SCAN         ││        ARROW_SCAN         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            end            ││            name           │
│            val            ││          tickers          │
│             fy            ││         entityName        │
│             fp            ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            form           ││           EC: 1           │
│           filed           ││                           │
│          taxonomy         ││                           │
│            item           ││                           │
│           units           ││                           │
│            cik            ││                           │
│           start           ││                           │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││                           │
│           EC: 1           ││                           │
└───────────────────────────┘└───────────────────────────┘                             


In [17]: con.query("select * from sqlite_master where name = 'data_enriched'")
Out[17]: 
┌─────────┬───────────────┬───────────────┬──────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│  typenametbl_namerootpagesql                                                                  │
│ varcharvarcharvarcharint32varchar                                                                │
├─────────┼───────────────┼───────────────┼──────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ viewdata_enricheddata_enriched0CREATE VIEW data_enriched AS SELECT A.* EXCLUDE (__index_level_0__), B.* FROM data_raw AS A POSITIONAL JOIN pos_join_enriched AS B;\n │
└─────────┴───────────────┴───────────────┴──────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Originally posted by @cottrell in #11991

@soerenwolfers
Copy link

soerenwolfers commented May 11, 2024

(Disclaimer: I'm not a dev)

This looks not like a bug per se, but a missing filter pushdown.

For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.

It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.

@cottrell
Copy link
Author

(Disclaimer: I'm not a dev)

This looks not like a bug per se, but a missing filter pushdown.

For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.

It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.

Yes, that is what I was guessing after doing some code skimming. It did not seem trivial for me to figure out how to add it but it must be a "simple" feature omission, at least conceptually. I suspect positional joins are not yet used in anger, but this would be a key piece of getting there. I think they are a very important feature direction for DuckDB.

@cottrell
Copy link
Author

cottrell commented May 15, 2024

I've got some kind of repro, which was interesting because the discrepancy appears to be huge in the case of ordered integers (Zonemaps?). So there must be some optimization/pushdown in that case that is missing.

import logging
import os
import shutil
import string
import time

import duckdb
import numpy as np
import pandas as pd
import tqdm
from pyarrow.dataset import dataset

_mydir = os.path.dirname(__file__)

_data_dir = os.path.join(_mydir, 'data_example')
_data_dir_a = os.path.join(_data_dir, 'a')
_data_dir_b = os.path.join(_data_dir, 'b')

_duckdb_filename = os.path.join(_mydir, 'duckdb.db')


def create_data_on_disk(n_partitions=100, partition_size=1_000_000, seed=0, string_col_arity=1000):
    np.random.seed(seed)

    def write(df, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        logging.warning(f'writing {path}')
        df.to_parquet(path, index=False)

    if os.path.exists(_data_dir):
        shutil.rmtree(_data_dir)

    string_col_values = ['AAA'] + [
        ''.join(x)
        for x in [
            np.random.choice(list(string.ascii_letters + string.digits), size=5) for _ in range(string_col_arity - 1)
        ]
    ]
    eps = 1e-6
    p = np.array([eps] + [(1 - eps) / (string_col_arity - 1)] * (string_col_arity - 1))
    assert np.abs(p.sum() - 1) < 1e-6

    min_int = 0
    for partition in tqdm.tqdm(range(n_partitions)):
        filename = f'data_{partition:03d}.parquet'
        df = pd.DataFrame(np.random.randn(partition_size, 4), columns=['a', 'b', 'c', 'd'])
        df['a'] = np.random.choice(string_col_values, size=partition_size, p=p)
        max_int = min_int + np.random.randint(10, 20)
        df['b'] = np.random.randint(min_int, max_int, size=partition_size)
        df['d'] = np.random.randint(0, 100_000, size=partition_size)  # NOTE: this one no ordering
        min_int = max_int
        path = os.path.join(_data_dir_a, filename)
        write(df, path)
        df_ = pd.DataFrame(np.random.randn(partition_size, 3), columns=['e', 'f', 'g'])
        df_['e'] = df['d']  # NOTE: just as simple way of sanity checking
        path = os.path.join(_data_dir_b, filename)
        write(df_, path)


def get_duckdb_con():
    con = duckdb.connect(_duckdb_filename)
    dsa = dataset(_data_dir_a, format='parquet')
    con.register('A1', dsa)
    dsb = dataset(_data_dir_b, format='parquet')
    con.register('B1', dsb)
    con.execute(
        f"""
    create temp view A0 as (select * from parquet_scan('{_data_dir_a}/*'));
    create temp view B0 as (select * from parquet_scan('{_data_dir_b}/*'));
    create temp view E0 as (select * from A0 positional join B0);
    create temp view E1 as (select * from A1 positional join B1);
    """
    )
    return con


def test_against(table, query_flavour):
    """table should be A0, A1, E0, E1"""
    con = get_duckdb_con()
    query = {
        'A': f"select count(*) as ct from {table} where c > 0 and c <= 0.000001",
        'B': f"select count(*) as ct from {table} where a = 'AAA'",
        'C': f"select count(*) as ct from {table} where b = 999",
        'D': f"select count(*) as ct from {table} where d = 999",
    }[query_flavour]
    T = time.time()
    res = con.execute(query).df()
    T = time.time() - T
    return dict(res=res, time=T, query=query)


def test_all():
    """Example results:

      query_flavour                     name      time                                                        query  factor
    0             A              raw_dataset  0.360310  select count(*) as ct from A1 where c > 0 and c <= 0.000001       1
    1             A  positional join_dataset  2.707489  select count(*) as ct from E1 where c > 0 and c <= 0.000001       8
    2             B              raw_dataset  0.484304                select count(*) as ct from A1 where a = 'AAA'       1
    3             B  positional join_dataset  1.915267                select count(*) as ct from E1 where a = 'AAA'       4
    4             C              raw_dataset  0.012928                  select count(*) as ct from A1 where b = 999       1
    5             C  positional join_dataset  1.171541                  select count(*) as ct from E1 where b = 999      91
    6             D              raw_dataset  0.273574                  select count(*) as ct from A1 where d = 999       1
    7             D  positional join_dataset  1.271827                  select count(*) as ct from E1 where d = 999       5


    """
    out = list()
    for query_flavour in tqdm.tqdm(['A', 'B', 'C', 'D']):
        df = None
        for k, name in [
            # these are much slower
            # ('A0', 'raw_parquet_scan'),
            # ('E0', 'positional join_parquet_scan'),
            ('A1', 'raw_dataset'),
            ('E1', 'positional join_dataset'),
        ]:
            res = test_against(k, query_flavour=query_flavour)
            if df is None:
                df = res['res']
            else:
                df_ = res['res']
                assert df.shape[0] == df_.shape[0]
                pd.testing.assert_frame_equal(df, df_)
                # pd.testing.assert_frame_equal(df[['a', 'b', 'c']], df_[['a', 'b', 'c']])
            out.append(dict(query_flavour=query_flavour, name=name, time=res['time'], query=res['query']))
    df = pd.DataFrame(out)
    df['factor'] = (df['time'] / df.groupby('query_flavour')['time'].transform('min')).round(0).astype(int)
    return df

results like this (also pasted in the code)

In [4]; create_data_on_disk()  # run this once to setup
In [5]: test_all()
Out[5]: 
  query_flavour                     name      time                                                        query  factor
0             A              raw_dataset  0.360310  select count(*) as ct from A1 where c > 0 and c <= 0.000001       1
1             A  positional join_dataset  2.707489  select count(*) as ct from E1 where c > 0 and c <= 0.000001       8
2             B              raw_dataset  0.484304                select count(*) as ct from A1 where a = 'AAA'       1
3             B  positional join_dataset  1.915267                select count(*) as ct from E1 where a = 'AAA'       4
4             C              raw_dataset  0.012928                  select count(*) as ct from A1 where b = 999       1
5             C  positional join_dataset  1.171541                  select count(*) as ct from E1 where b = 999      91
6             D              raw_dataset  0.273574                  select count(*) as ct from A1 where d = 999       1
7             D  positional join_dataset  1.271827                  select count(*) as ct from E1 where d = 999       5

If someone could confirm this and report if it's sane, maybe we can spin it over to feature requests? Would be good to at least get some advice on how hard this is to implement if it a sane repro.

@Mytherin
Copy link
Collaborator

As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).

@cottrell
Copy link
Author

cottrell commented May 18, 2024

As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).

I understand that this is a (missing) feature but can you clarify here? It doesn't really make sense to me to have positional joins without filter push downs, or rather they have limited use. It would mean you need to replicate the ENTIRE set of filter columns in each column partition (or whatever we are calling the things we join). Which would obviate the benefits of the positional join to begin with. But I might be missing some key piece of understanding.

Mechanically, I think one has a filter which results in some row numbering (like a slice per file) and that should be broadcast across to the position join chunks before the position join is then applied.

I think perhaps what you are saying is that most of the predicate pushdowns and efficiency tricks are highly manual and implementation specific, especially in the case of the novel positional join, and that it would involve a lot of work to implement this? Or is there something I am missing that makes this operationally impossible and not merely painful/costly.

@Mytherin
Copy link
Collaborator

In order for a positional join to work with filter pushdowns, we essentially need two callbacks that are efficiently implemented for each scan:

  • Filter out rows that do not fulfill this criteria, but tell me how many rows were skipped
  • Skip N rows, but do not scan them

We could then use the first callback to apply the filter in the first result set, and the second to find the corresponding rows that belong to the subsequent (not yet filtered) rows. Adding these callbacks and efficiently implementing them for all the storage back-ends is not trivial, and they cannot be very efficiently implemented for all scanners (e.g. we cannot efficiently skip to the middle of a row group in Parquet files).

In cases where we are scanning e.g. an Arrow Record Batch Reader, these callbacks do not exist, and hence this will not be possible to support unless these or similar callbacks are also implemented for these back-ends.

All-in-all this is a rather large effort and I would not count on this being done in the near-term.

When working with Parquet files, perhaps you could try using file_row_number and joining on that instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants