New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A simple filter is 100x slower after a positional join. #11992
Comments
(Disclaimer: I'm not a dev) This looks not like a bug per se, but a missing filter pushdown. For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file. It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue. |
Yes, that is what I was guessing after doing some code skimming. It did not seem trivial for me to figure out how to add it but it must be a "simple" feature omission, at least conceptually. I suspect positional joins are not yet used in anger, but this would be a key piece of getting there. I think they are a very important feature direction for DuckDB. |
I've got some kind of repro, which was interesting because the discrepancy appears to be huge in the case of ordered integers (Zonemaps?). So there must be some optimization/pushdown in that case that is missing. import logging
import os
import shutil
import string
import time
import duckdb
import numpy as np
import pandas as pd
import tqdm
from pyarrow.dataset import dataset
_mydir = os.path.dirname(__file__)
_data_dir = os.path.join(_mydir, 'data_example')
_data_dir_a = os.path.join(_data_dir, 'a')
_data_dir_b = os.path.join(_data_dir, 'b')
_duckdb_filename = os.path.join(_mydir, 'duckdb.db')
def create_data_on_disk(n_partitions=100, partition_size=1_000_000, seed=0, string_col_arity=1000):
np.random.seed(seed)
def write(df, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
logging.warning(f'writing {path}')
df.to_parquet(path, index=False)
if os.path.exists(_data_dir):
shutil.rmtree(_data_dir)
string_col_values = ['AAA'] + [
''.join(x)
for x in [
np.random.choice(list(string.ascii_letters + string.digits), size=5) for _ in range(string_col_arity - 1)
]
]
eps = 1e-6
p = np.array([eps] + [(1 - eps) / (string_col_arity - 1)] * (string_col_arity - 1))
assert np.abs(p.sum() - 1) < 1e-6
min_int = 0
for partition in tqdm.tqdm(range(n_partitions)):
filename = f'data_{partition:03d}.parquet'
df = pd.DataFrame(np.random.randn(partition_size, 4), columns=['a', 'b', 'c', 'd'])
df['a'] = np.random.choice(string_col_values, size=partition_size, p=p)
max_int = min_int + np.random.randint(10, 20)
df['b'] = np.random.randint(min_int, max_int, size=partition_size)
df['d'] = np.random.randint(0, 100_000, size=partition_size) # NOTE: this one no ordering
min_int = max_int
path = os.path.join(_data_dir_a, filename)
write(df, path)
df_ = pd.DataFrame(np.random.randn(partition_size, 3), columns=['e', 'f', 'g'])
df_['e'] = df['d'] # NOTE: just as simple way of sanity checking
path = os.path.join(_data_dir_b, filename)
write(df_, path)
def get_duckdb_con():
con = duckdb.connect(_duckdb_filename)
dsa = dataset(_data_dir_a, format='parquet')
con.register('A1', dsa)
dsb = dataset(_data_dir_b, format='parquet')
con.register('B1', dsb)
con.execute(
f"""
create temp view A0 as (select * from parquet_scan('{_data_dir_a}/*'));
create temp view B0 as (select * from parquet_scan('{_data_dir_b}/*'));
create temp view E0 as (select * from A0 positional join B0);
create temp view E1 as (select * from A1 positional join B1);
"""
)
return con
def test_against(table, query_flavour):
"""table should be A0, A1, E0, E1"""
con = get_duckdb_con()
query = {
'A': f"select count(*) as ct from {table} where c > 0 and c <= 0.000001",
'B': f"select count(*) as ct from {table} where a = 'AAA'",
'C': f"select count(*) as ct from {table} where b = 999",
'D': f"select count(*) as ct from {table} where d = 999",
}[query_flavour]
T = time.time()
res = con.execute(query).df()
T = time.time() - T
return dict(res=res, time=T, query=query)
def test_all():
"""Example results:
query_flavour name time query factor
0 A raw_dataset 0.360310 select count(*) as ct from A1 where c > 0 and c <= 0.000001 1
1 A positional join_dataset 2.707489 select count(*) as ct from E1 where c > 0 and c <= 0.000001 8
2 B raw_dataset 0.484304 select count(*) as ct from A1 where a = 'AAA' 1
3 B positional join_dataset 1.915267 select count(*) as ct from E1 where a = 'AAA' 4
4 C raw_dataset 0.012928 select count(*) as ct from A1 where b = 999 1
5 C positional join_dataset 1.171541 select count(*) as ct from E1 where b = 999 91
6 D raw_dataset 0.273574 select count(*) as ct from A1 where d = 999 1
7 D positional join_dataset 1.271827 select count(*) as ct from E1 where d = 999 5
"""
out = list()
for query_flavour in tqdm.tqdm(['A', 'B', 'C', 'D']):
df = None
for k, name in [
# these are much slower
# ('A0', 'raw_parquet_scan'),
# ('E0', 'positional join_parquet_scan'),
('A1', 'raw_dataset'),
('E1', 'positional join_dataset'),
]:
res = test_against(k, query_flavour=query_flavour)
if df is None:
df = res['res']
else:
df_ = res['res']
assert df.shape[0] == df_.shape[0]
pd.testing.assert_frame_equal(df, df_)
# pd.testing.assert_frame_equal(df[['a', 'b', 'c']], df_[['a', 'b', 'c']])
out.append(dict(query_flavour=query_flavour, name=name, time=res['time'], query=res['query']))
df = pd.DataFrame(out)
df['factor'] = (df['time'] / df.groupby('query_flavour')['time'].transform('min')).round(0).astype(int)
return df results like this (also pasted in the code) In [4]; create_data_on_disk() # run this once to setup
In [5]: test_all()
Out[5]:
query_flavour name time query factor
0 A raw_dataset 0.360310 select count(*) as ct from A1 where c > 0 and c <= 0.000001 1
1 A positional join_dataset 2.707489 select count(*) as ct from E1 where c > 0 and c <= 0.000001 8
2 B raw_dataset 0.484304 select count(*) as ct from A1 where a = 'AAA' 1
3 B positional join_dataset 1.915267 select count(*) as ct from E1 where a = 'AAA' 4
4 C raw_dataset 0.012928 select count(*) as ct from A1 where b = 999 1
5 C positional join_dataset 1.171541 select count(*) as ct from E1 where b = 999 91
6 D raw_dataset 0.273574 select count(*) as ct from A1 where d = 999 1
7 D positional join_dataset 1.271827 select count(*) as ct from E1 where d = 999 5
If someone could confirm this and report if it's sane, maybe we can spin it over to feature requests? Would be good to at least get some advice on how hard this is to implement if it a sane repro. |
As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets). |
I understand that this is a (missing) feature but can you clarify here? It doesn't really make sense to me to have positional joins without filter push downs, or rather they have limited use. It would mean you need to replicate the ENTIRE set of filter columns in each column partition (or whatever we are calling the things we join). Which would obviate the benefits of the positional join to begin with. But I might be missing some key piece of understanding. Mechanically, I think one has a filter which results in some row numbering (like a slice per file) and that should be broadcast across to the position join chunks before the position join is then applied. I think perhaps what you are saying is that most of the predicate pushdowns and efficiency tricks are highly manual and implementation specific, especially in the case of the novel positional join, and that it would involve a lot of work to implement this? Or is there something I am missing that makes this operationally impossible and not merely painful/costly. |
In order for a positional join to work with filter pushdowns, we essentially need two callbacks that are efficiently implemented for each scan:
We could then use the first callback to apply the filter in the first result set, and the second to find the corresponding rows that belong to the subsequent (not yet filtered) rows. Adding these callbacks and efficiently implementing them for all the storage back-ends is not trivial, and they cannot be very efficiently implemented for all scanners (e.g. we cannot efficiently skip to the middle of a row group in Parquet files). In cases where we are scanning e.g. an Arrow Record Batch Reader, these callbacks do not exist, and hence this will not be possible to support unless these or similar callbacks are also implemented for these back-ends. All-in-all this is a rather large effort and I would not count on this being done in the near-term. When working with Parquet files, perhaps you could try using |
Posted as a discussion but probably belongs in issues.
Example of the issue. I can create some full reproduction script later if needed.
This is some kind of
data_raw
andpos_join_enriched
, parquet files with exactly the same structure. The names of the files are the same. The rows are aligned. These datasets are loaded via pyarrow then con.register. And then adata_enriched
via the positional_join query as shown below.The filter query is quite efficient. The cik column is an ordered in so I think Zonemaps implicitly are used.
However the same query on the positional joined data is 100x slower. I would think it should be at most 2x slower.
Originally posted by @cottrell in #11991
The text was updated successfully, but these errors were encountered: