Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner/spannertest): support multiple aggregations #3965

Merged
merged 5 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 60 additions & 58 deletions spanner/spannertest/db_query.go
Expand Up @@ -518,8 +518,7 @@ func (d *database) evalSelect(sel spansql.Select, qc *queryContext) (si *selIter
}

// Handle aggregation.
// TODO: Support more than one aggregation function; does Spanner support that?
aggI := -1
var aggI []int
for i, e := range sel.List {
// Supported aggregate funcs have exactly one arg.
f, ok := e.(spansql.Func)
Expand All @@ -530,12 +529,9 @@ func (d *database) evalSelect(sel spansql.Select, qc *queryContext) (si *selIter
if !ok {
continue
}
if aggI > -1 {
return nil, fmt.Errorf("only one aggregate function is supported")
}
aggI = i
aggI = append(aggI, i)
}
if aggI > -1 {
if len(aggI) > 0 {
raw, err := toRawIter(ri)
if err != nil {
return nil, err
Expand All @@ -545,20 +541,6 @@ func (d *database) evalSelect(sel spansql.Select, qc *queryContext) (si *selIter
// This may result in a [0,0) entry for empty inputs.
rowGroups = [][2]int{{0, len(raw.rows)}}
}
fexpr := sel.List[aggI].(spansql.Func)
fn := aggregateFuncs[fexpr.Name]
starArg := fexpr.Args[0] == spansql.Star
if starArg && !fn.AcceptStar {
return nil, fmt.Errorf("aggregate function %s does not accept * as an argument", fexpr.Name)
}
var argType spansql.Type
if !starArg {
ci, err := ec.colInfo(fexpr.Args[0])
if err != nil {
return nil, fmt.Errorf("evaluating aggregate function %s arg type: %v", fexpr.Name, err)
}
argType = ci.Type
}

// Prepare output.
rawOut := &rawIter{
Expand All @@ -569,29 +551,8 @@ func (d *database) evalSelect(sel spansql.Select, qc *queryContext) (si *selIter
cols: append([]colInfo(nil), raw.cols...),
}

var aggType spansql.Type
aggType := make([]*spansql.Type, len(aggI))
for _, rg := range rowGroups {
// Compute aggregate value across this group.
var values []interface{}
for i := rg[0]; i < rg[1]; i++ {
ec.row = raw.rows[i]
if starArg {
// A non-NULL placeholder is sufficient for aggregation.
values = append(values, 1)
} else {
x, err := ec.evalExpr(fexpr.Args[0])
if err != nil {
return nil, err
}
values = append(values, x)
}
}
x, typ, err := fn.Eval(values, argType)
if err != nil {
return nil, err
}
aggType = typ

var outRow row
// Output for the row group is the first row of the group (arbitrary,
// but it should be representative), and the aggregate value.
Expand All @@ -609,27 +570,68 @@ func (d *database) evalSelect(sel spansql.Select, qc *queryContext) (si *selIter
outRow = append(outRow, nil)
}
}
outRow = append(outRow, x)

for j, aggI := range aggI {
fexpr := sel.List[aggI].(spansql.Func)
fn := aggregateFuncs[fexpr.Name]
starArg := fexpr.Args[0] == spansql.Star
if starArg && !fn.AcceptStar {
return nil, fmt.Errorf("aggregate function %s does not accept * as an argument", fexpr.Name)
}
var argType spansql.Type
if !starArg {
ci, err := ec.colInfo(fexpr.Args[0])
if err != nil {
return nil, fmt.Errorf("evaluating aggregate function %s arg type: %v", fexpr.Name, err)
}
argType = ci.Type
}

// Compute aggregate value across this group.
var values []interface{}
for i := rg[0]; i < rg[1]; i++ {
ec.row = raw.rows[i]
if starArg {
// A non-NULL placeholder is sufficient for aggregation.
values = append(values, 1)
} else {
x, err := ec.evalExpr(fexpr.Args[0])
if err != nil {
return nil, err
}
values = append(values, x)
}
}
x, typ, err := fn.Eval(values, argType)
if err != nil {
return nil, err
}
aggType[j] = &typ

outRow = append(outRow, x)
}
rawOut.rows = append(rawOut.rows, outRow)
}

if aggType == (spansql.Type{}) {
// Fallback; there might not be any groups.
// TODO: Should this be in aggregateFunc?
aggType = int64Type
for j, aggI := range aggI {
fexpr := sel.List[aggI].(spansql.Func)
if aggType[j] == nil {
// Fallback; there might not be any groups.
// TODO: Should this be in aggregateFunc?
aggType[j] = &int64Type
}
rawOut.cols = append(rawOut.cols, colInfo{
Name: spansql.ID(fexpr.SQL()), // TODO: this is a bit hokey, but it is output only
Type: *aggType[j],
AggIndex: aggI + 1,
})
sel.List[aggI] = aggSentinel{ // Mutate query so evalExpr in selIter picks out the new value.
Type: *aggType[j],
AggIndex: aggI + 1,
}
}
rawOut.cols = append(raw.cols, colInfo{
Name: spansql.ID(fexpr.SQL()), // TODO: this is a bit hokey, but it is output only
Type: aggType,
AggIndex: aggI + 1,
})

ri = rawOut
ec.cols = rawOut.cols
sel.List[aggI] = aggSentinel{ // Mutate query so evalExpr in selIter picks out the new value.
Type: aggType,
AggIndex: aggI + 1,
}
}

// TODO: Support table sampling.
Expand Down
56 changes: 56 additions & 0 deletions spanner/spannertest/integration_test.go
Expand Up @@ -1145,6 +1145,62 @@ func TestIntegration_ReadsAndQueries(t *testing.T) {
{"Daniel"},
},
},
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use some additional test cases to cover:

  • Combination of aggregated values of different types
  • Combination of agg_func(*) and agg_func(col_name) combinations
  • Several aggregation expressions in combination with a GROUP BY clause

`SELECT MIN(Name), MAX(Name) FROM Staff`,
nil,
[][]interface{}{
{"Daniel", "Teal'c"},
},
},
{
`SELECT Cool, MIN(Name), MAX(Name), COUNT(*) FROM Staff GROUP BY Cool ORDER BY Cool`,
nil,
[][]interface{}{
{nil, "George", "Jack", int64(2)},
{false, "Daniel", "Sam", int64(2)},
{true, "Teal'c", "Teal'c", int64(1)},
},
},
{
`SELECT Tenure/2, Cool, Name FROM Staff WHERE Tenure/2 > 5`,
nil,
[][]interface{}{
{float64(5.5), false, "Daniel"},
},
},
{
`SELECT Tenure/2, MAX(Cool) FROM Staff WHERE Tenure/2 > 5 GROUP BY Tenure/2`,
nil,
[][]interface{}{
{float64(5.5), false},
},
},
{
`SELECT Tenure/2, Cool, MIN(Name) FROM Staff WHERE Tenure/2 >= 4 GROUP BY Tenure/2, Cool ORDER BY Cool DESC, Tenure/2`,
nil,
[][]interface{}{
{float64(4), true, "Teal'c"},
{float64(4.5), false, "Sam"},
{float64(5.5), false, "Daniel"},
{float64(5), nil, "Jack"},
},
},
{
`SELECT MIN(Cool), MAX(Cool), MIN(Tenure), MAX(Tenure), MIN(Height), MAX(Height), MIN(Name), MAX(Name), COUNT(*) FROM Staff`,
nil,
[][]interface{}{
{false, true, int64(6), int64(11), 1.73, 1.91, "Daniel", "Teal'c", int64(5)},
},
},
{
`SELECT Cool, MIN(Tenure), MAX(Tenure), MIN(Height), MAX(Height), MIN(Name), MAX(Name), COUNT(*) FROM Staff GROUP BY Cool ORDER BY Cool`,
nil,
[][]interface{}{
{nil, int64(6), int64(10), 1.73, 1.85, "George", "Jack", int64(2)},
{false, int64(9), int64(11), 1.75, 1.83, "Daniel", "Sam", int64(2)},
{true, int64(8), int64(8), 1.91, 1.91, "Teal'c", "Teal'c", int64(1)},
},
},
}
var failures int
for _, test := range tests {
Expand Down