Skip to content

Commit

Permalink
[SQL] Support more features for window aggregates (#1742)
Browse files Browse the repository at this point in the history
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
  • Loading branch information
mihaibudiu committed May 14, 2024
1 parent 3624a5e commit 561dd6d
Show file tree
Hide file tree
Showing 44 changed files with 2,564 additions and 291 deletions.
91 changes: 91 additions & 0 deletions crates/dbsp/src/algebra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub use zset::{
OrdZSetFactories, ZBatch, ZBatchReader, ZCursor, ZSet, ZSetReader, ZTrace, ZWeight,
};

use core::fmt::{Debug, Display};
use num::PrimInt;
use rust_decimal::{prelude::One, prelude::Zero, Decimal};
use size_of::SizeOf;
use std::{
Expand All @@ -29,6 +31,95 @@ use std::{
rc::Rc,
};

/// Trait for types where the minimum value is known
pub trait MinValue {
fn min_value() -> Self;
}

/// Trait for types where the maximum value is known
pub trait MaxValue {
fn max_value() -> Self;
}

/// Trait for integer types returning the first
/// value representable in this type which is
/// too large for the previous narrower type
pub trait FirstLargeValue {
fn large() -> Self;
}

/// Trait for primitive integers that are unsigned
pub trait UnsignedPrimInt: PrimInt + FirstLargeValue + HasZero + Debug + Display {}

impl FirstLargeValue for u8 {
fn large() -> Self {
// There is no narrower unsigned type
0x1
}
}

impl FirstLargeValue for u16 {
fn large() -> Self {
0x100
}
}

impl FirstLargeValue for u32 {
fn large() -> Self {
0x10000
}
}

impl FirstLargeValue for u64 {
fn large() -> Self {
0x1_0000_0000
}
}

impl FirstLargeValue for u128 {
fn large() -> Self {
0x1_0000_0000_0000_0000
}
}

impl UnsignedPrimInt for u8 {}
impl UnsignedPrimInt for u16 {}
impl UnsignedPrimInt for u32 {}
impl UnsignedPrimInt for u64 {}
impl UnsignedPrimInt for u128 {}

/// Trait for primitive integers that are signed
pub trait SignedPrimInt:
PrimInt + Neg<Output = Self> + HasZero + HasOne + Ord + Debug + Display
{
}

// For all primitive signed integers we also implement
// MinValue and MaxValue
macro_rules! make_signed {
($type: ty) => {
impl MinValue for $type {
fn min_value() -> Self {
<$type>::MIN
}
}

impl MaxValue for $type {
fn max_value() -> Self {
<$type>::MAX
}
}

impl SignedPrimInt for $type {}
};
}

make_signed!(i8);
make_signed!(i16);
make_signed!(i32);
make_signed!(i64);
make_signed!(i128);

/// A trait for types that have a zero value.
///
/// This is similar to the standard Zero trait, but that
Expand Down
4 changes: 2 additions & 2 deletions crates/dbsp/src/operator/time_series/rolling_aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::dynamic::Erase;
use crate::trace::BatchReaderFactories;
use crate::{
algebra::MulByRef,
algebra::{MulByRef, UnsignedPrimInt},
dynamic::{DowncastTrait, DynData, DynDataTyped, DynOpt, DynPair, DynWeight},
operator::{
dynamic::{
Expand Down Expand Up @@ -147,7 +147,7 @@ where
impl<PK, TS, V> Stream<RootCircuit, OrdPartitionedIndexedZSet<PK, TS, DynDataTyped<TS>, V, DynData>>
where
PK: DBData,
TS: DBData + PrimInt,
TS: DBData + UnsignedPrimInt,
V: DBData,
{
/// Rolling aggregate of a partitioned stream over time range.
Expand Down
5 changes: 4 additions & 1 deletion docs/sql/grammar.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ withItem
AS '(' query ')'
orderItem
: expression [ ASC | DESC ]
: expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]
projectItem
: expression [ [ AS ] columnAlias ]
Expand Down Expand Up @@ -286,6 +286,9 @@ windowRange
Where `agg` is a window aggregate function as described in the [section
on aggregation](aggregates.md#window-aggregate-functions).

Currently we require window ranges to have constant values. This
precludes ranges such as `INTERVAL 1 YEAR`, which have variable sizes.

### LATENESS

See [Streaming SQL Extensions](streaming.md#lateness-expressions)
Expand Down
5 changes: 5 additions & 0 deletions sql-to-dbsp-compiler/SQL-compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@
</build>

<dependencies>
<dependency>
<groupId>org.apiguardian</groupId>
<artifactId>apiguardian-api</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@
* It is implemented as a sequence of 2 DBSP operators: partitioned_rolling_aggregate and
* map_index.
* This operator only operates correctly on deltas. To operate on collections it
* must differentiate its input, and integrate its output.
*/
* must differentiate its input, and integrate its output. */
public class DBSPWindowAggregateOperator extends DBSPAggregateOperatorBase {
public final DBSPExpression window;
// TODO: these fields should not be here
public final boolean ascending;
public final boolean nullsLast;

public DBSPWindowAggregateOperator(
CalciteObject node,
@Nullable DBSPExpression function, @Nullable DBSPAggregate aggregate,
DBSPExpression window,
DBSPTypeIndexedZSet outputType,
boolean ascending, boolean nullsLast,
DBSPOperator input) {
super(node, "window_aggregate", outputType, function, aggregate, true, input, false);
this.window = window;
this.ascending = ascending;
this.nullsLast = nullsLast;
// Expect a tuple with 2 fields
DBSPTypeTuple partAndTime = outputType.keyType.to(DBSPTypeTuple.class);
if (partAndTime.size() != 2)
Expand All @@ -63,6 +68,7 @@ public DBSPOperator withFunction(@Nullable DBSPExpression expression, DBSPType o
return new DBSPWindowAggregateOperator(
this.getNode(), expression, this.aggregate, this.window,
outputType.to(DBSPTypeIndexedZSet.class),
this.ascending, this.nullsLast,
this.input());
}

Expand All @@ -71,7 +77,8 @@ public DBSPOperator withInputs(List<DBSPOperator> newInputs, boolean force) {
if (force || this.inputsDiffer(newInputs))
return new DBSPWindowAggregateOperator(
this.getNode(), this.function, this.aggregate, this.window,
this.getOutputIndexedZSetType(), newInputs.get(0));
this.getOutputIndexedZSetType(),
this.ascending, this.nullsLast, newInputs.get(0));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public void postorder(DBSPWindowAggregateOperator node) {
DBSPExpression function = impl.asFold();
DBSPOperator result = new DBSPWindowAggregateOperator(node.getNode(),
function, null, node.window,
node.getOutputIndexedZSetType(), this.mapped(node.input()));
node.getOutputIndexedZSetType(), node.ascending, node.nullsLast,
this.mapped(node.input()));
this.map(node, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPSortExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPTupleExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPUnaryExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPUnsignedUnwrapExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPUnsignedWrapExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPUnwrapExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPVariablePath;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPBinaryLiteral;
Expand All @@ -71,6 +73,7 @@
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPDecimalLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPDoubleLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPGeoPointLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPI128Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPI16Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPI32Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPI64Literal;
Expand All @@ -86,6 +89,8 @@
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPStringLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPTimeLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPTimestampLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPU128Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPU16Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPU32Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPU64Literal;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPUSizeLiteral;
Expand Down Expand Up @@ -201,6 +206,53 @@ public VisitDecision preorder(DBSPSomeExpression expression) {
return VisitDecision.STOP;
}

void codegen(DBSPUnsignedWrapExpression.TypeSequence sequence) {
this.builder.append("<");
// In the type parameter we do not put the Option<>
sequence.dataType.setMayBeNull(false).accept(this);
this.builder.append(", ");
sequence.dataConvertedType.accept(this);
this.builder.append(", ");
sequence.intermediateType.accept(this);
this.builder.append(", ");
sequence.unsignedType.accept(this);
this.builder.append(">");
}

@Override
public VisitDecision preorder(DBSPUnsignedWrapExpression expression) {
this.builder.append("UnsignedWrapper")
.append("::")
.append(expression.getMethod())
.append("::");
this.codegen(expression.sequence);
this.builder.append("(");
expression.source.accept(this);
this.builder.append(", ")
.append(Boolean.toString(expression.ascending))
.append(", ")
.append(Boolean.toString(expression.nullsLast))
.append(")");
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPUnsignedUnwrapExpression expression) {
this.builder.append("UnsignedWrapper")
.append("::")
.append(expression.getMethod())
.append("::");
this.codegen(expression.sequence);
this.builder.append("(");
expression.source.accept(this);
this.builder.append(", ")
.append(Boolean.toString(expression.ascending))
.append(", ")
.append(Boolean.toString(expression.nullsLast))
.append(")");
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPFieldComparatorExpression expression) {
expression.source.accept(this);
Expand Down Expand Up @@ -491,12 +543,19 @@ public VisitDecision preorder(DBSPI32Literal literal) {
}

@Override
public VisitDecision preorder(DBSPU32Literal literal) {
public VisitDecision preorder(DBSPU16Literal literal) {
String val = Integer.toString(Objects.requireNonNull(literal.value));
this.builder.append(literal.wrapSome(val + literal.getIntegerType().getRustString()));
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPU32Literal literal) {
String val = Long.toString(Objects.requireNonNull(literal.value));
this.builder.append(literal.wrapSome(val + literal.getIntegerType().getRustString()));
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPI64Literal literal) {
if (literal.isNull)
Expand All @@ -506,9 +565,25 @@ public VisitDecision preorder(DBSPI64Literal literal) {
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPI128Literal literal) {
if (literal.isNull)
return this.doNull(literal);
String val = Objects.requireNonNull(literal.value).toString();
this.builder.append(literal.wrapSome(val + literal.getIntegerType().getRustString()));
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPU64Literal literal) {
String val = Long.toString(Objects.requireNonNull(literal.value));
String val = Objects.requireNonNull(literal.value).toString();
this.builder.append(literal.wrapSome(val + literal.getIntegerType().getRustString()));
return VisitDecision.STOP;
}

@Override
public VisitDecision preorder(DBSPU128Literal literal) {
String val = Objects.requireNonNull(literal.value).toString();
this.builder.append(literal.wrapSome(val + literal.getIntegerType().getRustString()));
return VisitDecision.STOP;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.dbsp.sqlCompiler.ir.expression.DBSPFieldComparatorExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPNoComparatorExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPOpcode;
import org.dbsp.sqlCompiler.ir.expression.DBSPUnsignedUnwrapExpression;
import org.dbsp.sqlCompiler.ir.expression.DBSPVariablePath;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPBoolLiteral;
import org.dbsp.sqlCompiler.ir.expression.literal.DBSPISizeLiteral;
Expand All @@ -74,6 +75,7 @@
import org.dbsp.sqlCompiler.ir.type.DBSPType;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeCode;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeIndexedZSet;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeRawTuple;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeStream;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeStruct;
import org.dbsp.sqlCompiler.ir.type.DBSPTypeTuple;
Expand Down Expand Up @@ -981,15 +983,29 @@ public VisitDecision preorder(DBSPWindowAggregateOperator operator) {
builder.append(");")
.newline();

// TODO: this is ugly https://github.com/feldera/feldera/issues/1733
this.builder.append("let ")
.append(operator.getOutputName())
.append(": ");
streamType.accept(this.innerVisitor);
DBSPTypeIndexedZSet ix = operator.getOutputIndexedZSetType();
// ix has the shape Tup2<Tup2<keyType, timestamp>, aggregate>
DBSPType aggregateType = ix.elementType;
DBSPTypeTuple key_ts = ix.keyType.to(DBSPTypeTuple.class);
assert key_ts.tupFields.length == 2;
DBSPType tsType = key_ts.tupFields[1];
DBSPVariablePath var = new DBSPVariablePath("ts_agg", new DBSPTypeRawTuple(tsType, aggregateType));
DBSPExpression ts = var.field(0);
DBSPUnsignedUnwrapExpression unwrap = new DBSPUnsignedUnwrapExpression(
operator.getNode(), ts, tsType, operator.ascending, operator.nullsLast);

builder.append(" = " )
.append(tmp)
.append(".map_index(|(key, ts_agg)| { ")
.append("( Tup2::new(key.clone(), ts_agg.0), ts_agg.1.unwrap_or_default() )")
.append("});");
.append("( Tup2::new(key.clone(), ");
// the next generates e.g., UnsignedWrapper::to_signed::<i32, i32, i64, u64>(ts_agg.0)
unwrap.accept(this.innerVisitor);
builder.append("), ts_agg.1.unwrap_or_default() ) });");
return VisitDecision.STOP;
}

Expand Down

0 comments on commit 561dd6d

Please sign in to comment.