Skip to content

tum-db/pg-cv

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Continuous Views for PostgreSQL

This repository contains the source code that we used to generate and benchmark continuous views in PostgreSQL for our paper Meet Me Halfway: Split Maintenance of Continuous Views. The feature set and syntax of this proof-of-concept implementation of continuous views in PostgreSQL differs from those in the paper. However, the underlying strategy of split maintenance is nearly identical, albeit less integrated and optimized. We evaluated our continuous view approach based on split maintenance strategy for queries 1, 3, 6, 15, and 20 of the TPC-H benchmark and compared it to eager and deferred maintenance strategies.

The Code

Features of Continuous Views in PostgreSQL

  • semi-automatic generation of continuous views
  • cte like query format
  • external integration through PLPython
  • sum-aggregates
  • count-aggregates
  • min-aggragates
  • max-aggregates
  • concurrent inserts and queries
  • query analysis and feasibility checks
  • deep integration and query optimization
  • fully-automatic query reshaping and splitting
  • avg-aggregates (possible through count- and sum-aggregates)
  • automatic deletion of continuous views
  • multiple continuous views on a single stream

Due to the external realization of the views in PostgreSQL and the limitations mentioned above, we cannot guarantee that all views accepted by our interface are correct. However, all views used for evaluation and specified as examples in this repo are transformed correctly.

Setup

The project depends on the PL/Python extension of PostgreSQL for parsing SQL-Queries. We provide a dockerfile for performing all necessary installation steps in a PostgreSQL container. Execute make up in the root folder to start the system and to create the default database continuous-view. For authentication, use the default user postgres with password 123456. The command make down stops the running container.

Repository Structure

  • plpython3.dockerfile: Dockerfile to start a PostgreSQL container with the PL/Python extension.
  • benchmark.sh: script for automatically executing the benchmark.
  • src\python\view_generator: python scripts for generating continuous views.
  • src\python\sql: SQL scripts for the TPC-H benchmark and the code generated for our proposed maintenance strategy.
  • benchmark\python\benchmark: python scripts for benchmarking different maintenance strategies within PostgresSQL.
  • benchmark\sql: SQL scripts for evaluating different maintenance strategies for continuous views.
  • data: place TPC-H data as generated by tpch-dbgen in CSV-format in here.

TPC-H data contains a trailing separator, which poses problems for PostgreSQL. Therefore, please run data/truncate_last.sh before starting the container for manual experiments. The benchmark will do this automatically.

The Makefile automatically binds the directories src, benchmark, and data into the container. This allows changes to both, scripts and data, while the container runs.

Continuous Views using Split Maintenance Strategy

Our split maintenance strategy allows both high insert throughput and high query performance while automatically generating all relevant parts of the code. As described in our paper, we identify pipeline breakers in the query to divide it for maintenance. We distinguish three different pipeline types:

  1. stream pipelines: pipelines that directly depend on a stream.
  2. upper pipelines: pipelines that indirectly depend on streams (i.e., they consume another upper pipeline or a stream pipeline).
  3. static pipelines: pipelines that do not depend on streams directly or indirectly.

For stream pipelines, we currently only support aggregating pipeline breakers. We evaluate inserts until this pipeline breaker and store the result in an auxiliary table. Our approach is implemented on top of PostgreSQL requiring no changes to the source code. Therefore, we have no access to the internal query pipeline. We perform the following steps for the external implementation of split maintenance:

  1. Split the query into the different pipelines.
  2. Identify static pipelines, stream pipelines, and upper pipelines based on pipeline breakers (aggregations) and queried tables.
  3. Generate insert functions and tables for stream pipelines and materialized views for static and upper pipelines.

Step 2 and 3 are performed full-automatically. Step 1 cannot be done without access to the internal query plan. Therefore, the user has to identify the different pipelines using common table expressions in the PostgreSQL version of continuous views. The type of a pipeline is specified using the following rules:

  • stream pipelines are identified by the names of the tables referenced in the from clause of the CTE. If one of the tables starts with stream_ the CTE is recognized as a stream pipeline.
  • upper pipelines either depend on a stream pipeline or another upper pipeline.
  • the remaining pipelines are classified as static pipelines.

For every static pipeline, we create a materialized view using the query extracted from the common table expression. For each stream pipelines, we create an auxiliary table that stores the aggregated results and a specialized insert function. The insert function takes an array of tuples, aggregates them, and inserts or updates the aggregated results in the table. For the upper pipelines, we create one materialized view that evaluates all remaining parts of the query. Refreshing the view updates the result of the query.

Usage:

The command psql -h localhost -p 5432 -U postgres -d continuous-view opens a connection to the database (the password is 123456).

To create the TPC-H schema and base tables for manual experiments please run \i src/sql/tpch/setup.sql.

When executing \i src/sql/continuous-views.sql, all relevant schemas and functions are loaded into the current PostgreSQL database. The meta information and materialized views for the split maintenance are stored in the schema cv.

The functions cv.createContinuousView(<query_name>, <query>) and cv.createContinuousViewFromFile(<query_name>, <filename>) are used to generate all tables, views, and functions needed by the query. The first argument is a unique identifier, the second one is either a string of the query or the path to the file containing the query. To avoid conflicts, we create the tables and views for the static and stream pipelines in a different schema named cv_<query_name>. For inserting tuples into the stream, we generate the function cv.<stream_table_name>_insert(tuples).

Restrictions:

  • All views, functions, and tables generated by our scripts must by deleted manually by dropping the schemas cv and cv_<query_name>.
  • Currently, we support only one continuous view per stream.
  • In contrast to continuous views in Umbra, this implementation does not ensure that queries are supportable as a continuous view. Further, it cannot modify join order or perform similar optimizations. The queries specified therefore have to be optimized and checked by the user beforehand.

Example:

Here we provide a small example how a query has to be transform into the expected CTE format and explain the generated SQL statements for the PostgreSQL version. For this we use the TPC-H query 20 without the order by clause:

select s_name, s_address
from supplier, nation
where s_suppkey in (select ps_suppkey
                    from partsupp
                    where ps_partkey in (select p_partkey
                                         from part
                                         where p_name like 'forest%')
                      and ps_availqty > (select 0.5 * sum(l_quantity)
                                         from lineitem
                                         where l_partkey = ps_partkey
                                           and l_suppkey = ps_suppkey
                                           and l_shipdate >= date '1994-01-01'
                                           and l_shipdate < date '1995-01-01'))
  and s_nationkey = n_nationkey
  and n_name = 'CANADA'

The following transformation was done by hand (see src/sql/tpch/queries/q20.sql):

with grouped_lineitem as (select l_partkey, l_suppkey, 0.5 * sum(l_quantity) as sum_quantity
                          from stream_lineitem
                          where l_shipdate >= date '1994-01-01'
                            and l_shipdate < date '1995-01-01'
                          group by l_partkey, l_suppkey),
     forestpartsupp as (select ps_suppkey, ps_partkey, ps_availqty
                        from partsupp, part
                        where p_name like 'forest%'
                          and ps_partkey = p_partkey),
     canadasupp as (select s_suppkey, s_name, s_address
                    from supplier, nation
                    where s_nationkey = n_nationkey
                      and n_name = 'CANADA')

select s_name, s_address
from canadasupp
where s_suppkey in (
    select ps_suppkey
    from forestpartsupp
    where ps_availqty > (
        select sum_quantity
        from grouped_lineitem
        where l_suppkey = ps_suppkey
          and l_partkey = ps_partkey
));

First we extracted the subquery on lineitem into the stream_pipeline grouped_lineitem. Note that the name of lineitem changed to stream_lineitem to automatically identify the stream pipeline. Furthermore, we defined two common table expression forestpartsupp and canadasupp that act as static pipelines. The rest of the query will be recognized as upper pipeline.

To generate the relevant items for the split maintenance we create a continuous view:

select cv.createContinuousViewFromFile('q20', '/src/sql/tpch/queries/q20.sql');

This will execute the following commands:

  1. Store some metadata and create a new schema for the query:
    insert into cv.continuous_views values ('q20');
    
    drop schema if exists cv_q20 cascade;
    create schema cv_q20;
    grant all on schema cv_q20 to postgres;
    grant all on schema cv_q20 to public;
    
    select set_config('search_path', 'cv_q20,' || current_setting('search_path'), true);
  2. Create an auxiliary table for the aggregated results for the stream pipeline:
    create table if not exists cv_q20.grouped_lineitem as (
        select l_partkey, l_suppkey, 0.5 * sum(l_quantity) as sum_quantity
        from stream_lineitem
        where l_shipdate >= date '1994-01-01' and l_shipdate < date '1995-01-01'
        group by l_partkey, l_suppkey);
    
    truncate table cv_q20.grouped_lineitem;
    create index cv_q20_grouped_lineitem_index on cv_q20.grouped_lineitem (l_partkey,l_suppkey);
  3. Create the insert function for the stream that updates the aggregation table:
    create or replace function cv.stream_lineitem_insert(entries stream_lineitem[]) returns void
    language  plpgsql
    as
    $$
    declare
        search_path text := current_setting('search_path');
        cv_q20_grouped_lineitem_tuple cv_q20.grouped_lineitem;
        cv_q20_grouped_lineitem_found_tuple cv_q20.grouped_lineitem;
    begin
        perform set_config('search_path', 'cv_q20, ' || search_path, true);
        for cv_q20_grouped_lineitem_tuple in with stream_lineitem as (select * from unnest(entries))
            (select l_partkey, l_suppkey, 0.5 * sum(l_quantity) as sum_quantity
             from stream_lineitem where l_shipdate >= date '1994-01-01' and l_shipdate < date '1995-01-01'
             group by l_partkey, l_suppkey)
    loop
        select * into cv_q20_grouped_lineitem_found_tuple
        from cv_q20.grouped_lineitem
        where cv_q20.grouped_lineitem.l_partkey = cv_q20_grouped_lineitem_tuple.l_partkey
          and cv_q20.grouped_lineitem.l_suppkey = cv_q20_grouped_lineitem_tuple.l_suppkey;
    
        if not found then
            insert into cv_q20.grouped_lineitem
            select cv_q20_grouped_lineitem_tuple.*;
        else
            update cv_q20.grouped_lineitem
            set sum_quantity = cv_q20_grouped_lineitem_tuple.sum_quantity +
                               cv_q20_grouped_lineitem_found_tuple.sum_quantity
            where cv_q20.grouped_lineitem.l_partkey = cv_q20_grouped_lineitem_tuple.l_partkey
              and cv_q20.grouped_lineitem.l_suppkey = cv_q20_grouped_lineitem_tuple.l_suppkey;
        end if;
    end loop;
    perform set_config('search_path', search_path, true);
    end
    $$;
    First, we update the search path to avoid renaming tables. Then, we evaluate the query for the pipeline. Using this, we update the entries in the auxiliary table. If we can find an existing entry, we update it, otherwise, we insert a new tuple.
  4. Create materialized views for the static pipelines:
    create materialized view if not exists cv_q20.forestpartsupp as (
        select ps_suppkey, ps_partkey, ps_availqty
        from partsupp, part
        where p_name like 'forest%'
          and ps_partkey = p_partkey);
    
    create index cv_q20_forestpartsupp_index on
        cv_q20.forestpartsupp (ps_suppkey,ps_partkey,ps_availqty);
    
    create materialized view if not exists cv_q20.canadasupp as (
        select s_suppkey, s_name, s_address
        from supplier, nation
        where s_nationkey = n_nationkey
          and n_name = 'CANADA');
    
     create index cv_q20_canadasupp_index on
        cv_q20.canadasupp (s_suppkey,s_name,s_address);
  5. Create one materialized view for the upper pipelines:
    create materialized view if not exists cv.q20 as
        select s_name, s_address
        from canadasupp
        where s_suppkey in (
            select ps_suppkey
            from forestpartsupp
            where ps_availqty > (
                select sum_quantity
                from grouped_lineitem
                where l_suppkey = ps_suppkey
                  and l_partkey = ps_partkey
    ));

After generating the continuous view, records can be inserted using:

select cv.stream_lineitem_insert(
    (array [(1, 155190, 7706, 1, 17.00, 21168.23, 0.04, 0.02, 'N', 'O', '1996-03-13', '1996-02-12',
                '1996-03-22', 'DELIVER IN PERSON', 'TRUCK', 'egular courts above the'),
            (1, 67310, 7311, 2, 36.00, 45983.16, 0.09, 0.06, 'N', 'O', '1996-04-12', '1996-02-28',
                '1996-04-20', 'TAKE BACK RETURN', 'MAIL', 'ly final dependencies: slyly bold')
            ])::stream_lineitem[]);

or copied from our provided lineitem base table using:

select cv.stream_lineitem_insert(
    array(
        select row(
            l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
            l_linestatus , l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,l_shipmode, l_comment)::stream_lineitem 
        from base_lineitem
        )
    );

refresh materialized view cv.q20; updates the result and select * from cv.q20; fetches it.

Benchmarking

We compare our split maintenance strategy against multiple other maintenance strategies:

  1. deferred maintenance (dv): materialize the entire stream and evaluate the query every time the result is read. (Regular PostgreSQL materialized views)
  2. eager maintenance with inserts (ev): materialize the entire stream and update the result with every insert.
  3. eager maintenance without inserts (fv): only update the result with every insert, but do not materialize the stream.
  4. batched eager maintenance (bv): insert the data of the stream into a temporary table until a threshold is reached; then update the result and truncate the temporary table.

We denote our split maintenance approach as cv. We provide a PL/Python script for executing the benchmarks within PostgreSQL. The SQL scripts for the five different TPC-H queries are located in the benchmark/sql/tpch directory. For every maintenance strategy, we provide the function used for inserting the stream data, refreshing the result, and querying it. Apart from split and deferred maintenance, these functions cannot be generated automatically.

Due to the overhead of the external integration, the insert functions perform best for batched data. Our tests have shown that 1000 records per batch is a reasonable size for streaming data. We also support small batch sizes, or even single tuples (1 - 10 records per batch). However, this leads to instability in the runtime and, thereby, in the experiment. Therefore, we recommend at least 100 tuples per batch for stable results.