Skip to content

Commit

Permalink
feat(spanner/spannertest): implement FULL JOIN (#3218)
Browse files Browse the repository at this point in the history
This doesn't fit into the same structure as the other JOINs, but is
approximately implemented as a LEFT JOIN with a catchup phase of
emitting the unmatched RHS rows.
  • Loading branch information
dsymonds committed Nov 18, 2020
1 parent 5c44019 commit 99f7212
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 19 deletions.
2 changes: 1 addition & 1 deletion spanner/spannertest/README.md
Expand Up @@ -21,7 +21,7 @@ by ascending esotericism:
- more aggregation functions
- SELECT HAVING
- case insensitivity
- FULL JOIN, multiple joins
- multiple joins
- alternate literal types (esp. strings)
- STRUCT types
- transaction simulation
Expand Down
104 changes: 86 additions & 18 deletions spanner/spannertest/db_query.go
Expand Up @@ -741,7 +741,10 @@ func newJoinIter(lhs, rhs *rawIter, lhsEC, rhsEC evalContext, sfj spansql.Select
ji.primary, ji.secondaryOrig = rhs, lhs
ji.primaryOffset, ji.secondaryOffset = len(rhsEC.cols), 0
case spansql.FullJoin:
return nil, evalContext{}, fmt.Errorf("TODO: can't yet evaluate FULL JOIN")
// FULL JOIN is implemented as a LEFT JOIN with tracking for which rows of the RHS
// have been used. Then, at the end of the iteration, the unused RHS rows are emitted.
ji.nullPad = true
ji.used = make([]bool, 0, 10) // arbitrary preallocation
}
ji.ec.cols, ji.ec.row = nil, nil

Expand Down Expand Up @@ -781,11 +784,12 @@ func (ji *joinIter) prepNonUsing(on spansql.BoolExpr, lhsEC, rhsEC evalContext)
}
return b != nil && *b, nil
}
ji.zero = func(primary row) {
ji.zero = func(primary, secondary row) {
for i := range ji.ec.row {
ji.ec.row[i] = nil
}
copy(ji.ec.row[ji.primaryOffset:], primary)
copy(ji.ec.row[ji.secondaryOffset:], secondary)
}
}

Expand Down Expand Up @@ -844,11 +848,18 @@ func (ji *joinIter) prepUsing(using []spansql.ID, lhsEC, rhsEC evalContext, flip
return r[i]
}
// populate writes the data to ji.ec.row in the correct positions.
populate := func(primary, secondary row) { // secondary may be nil
populate := func(primary, secondary row) { // either may be nil
j := 0
for _, pi := range primaryUsing {
ji.ec.row[j] = primary[pi]
j++
if primary != nil {
for _, pi := range primaryUsing {
ji.ec.row[j] = primary[pi]
j++
}
} else {
for _, si := range secondaryUsing {
ji.ec.row[j] = secondary[si]
j++
}
}
lhs, rhs := primary, secondary
if flipped {
Expand All @@ -873,8 +884,8 @@ func (ji *joinIter) prepUsing(using []spansql.ID, lhsEC, rhsEC evalContext, flip
populate(primary, secondary)
return true, nil
}
ji.zero = func(primary row) {
populate(primary, nil)
ji.zero = func(primary, secondary row) {
populate(primary, secondary)
}
return nil
}
Expand All @@ -894,16 +905,24 @@ type joinIter struct {
// should be yielded with null padding (e.g. OUTER JOINs).
nullPad bool

primaryRow row // current row from primary, or nil if it is time to advance
secondary *rawIter // current clone of secondary
any bool // true if any secondary rows have matched primaryRow
primaryRow row // current row from primary, or nil if it is time to advance
secondary *rawIter // current clone of secondary
secondaryRead int // number of rows already read from secondary
any bool // true if any secondary rows have matched primaryRow

// cond reports whether the primary and secondary rows "join" (e.g. the ON clause is true).
// It populates ec.row with the output.
cond func(primary, secondary row) (bool, error)
// zero populates ec.row with the primary row and sets the remainder to NULL.
// This is used when nullPad is true and a primary row doesn't match any secondary row.
zero func(primary row)
// zero populates ec.row with the primary or secondary row data (either of which may be nil),
// and sets the remainder to NULL.
// This is used when nullPad is true and a primary or secondary row doesn't match.
zero func(primary, secondary row)

// For FULL JOIN, this tracks the secondary rows that have been used.
// It is non-nil when being used.
used []bool
zeroUnused bool // set when emitting unused secondary rows
unusedIndex int // next index of used to check
}

func (ji *joinIter) Cols() []colInfo { return ji.ec.cols }
Expand All @@ -915,37 +934,82 @@ func (ji *joinIter) nextPrimary() error {
return err
}
ji.secondary = ji.secondaryOrig.clone()
ji.secondaryRead = 0
ji.any = false
return nil
}

func (ji *joinIter) Next() (row, error) {
if ji.primaryRow == nil {
if err := ji.nextPrimary(); err != nil {
if ji.primaryRow == nil && !ji.zeroUnused {
err := ji.nextPrimary()
if err == io.EOF && ji.used != nil {
// Drop down to emitting unused secondary rows.
ji.zeroUnused = true
ji.secondary = nil
goto scanJiUsed
}
if err != nil {
return nil, err
}
}
scanJiUsed:
if ji.zeroUnused {
if ji.secondary == nil {
ji.secondary = ji.secondaryOrig.clone()
ji.secondaryRead = 0
}
for ji.unusedIndex < len(ji.used) && ji.used[ji.unusedIndex] {
ji.unusedIndex++
}
if ji.unusedIndex >= len(ji.used) || ji.secondaryRead >= len(ji.used) {
// Truly finished.
return nil, io.EOF
}
var secondaryRow row
for ji.secondaryRead <= ji.unusedIndex {
var err error
secondaryRow, err = ji.secondary.Next()
if err != nil {
return nil, err
}
ji.secondaryRead++
}
ji.zero(nil, secondaryRow)
return ji.ec.row, nil
}

for {
secondaryRow, err := ji.secondary.Next()
if err == io.EOF {
// Finished the current primary row.

if !ji.any && ji.nullPad {
ji.zero(ji.primaryRow)
ji.zero(ji.primaryRow, nil)
ji.primaryRow = nil
return ji.ec.row, nil
}

// Advance to next one.
if err := ji.nextPrimary(); err != nil {
err := ji.nextPrimary()
if err == io.EOF && ji.used != nil {
ji.zeroUnused = true
ji.secondary = nil
goto scanJiUsed
}
if err != nil {
return nil, err
}
continue
}
if err != nil {
return nil, err
}
ji.secondaryRead++
if ji.used != nil {
for len(ji.used) < ji.secondaryRead {
ji.used = append(ji.used, false)
}
}

// We have a pair of rows to consider.
match, err := ji.cond(ji.primaryRow, secondaryRow)
Expand All @@ -956,6 +1020,10 @@ func (ji *joinIter) Next() (row, error) {
continue
}
ji.any = true
if ji.used != nil {
// Make a note that we used this secondary row.
ji.used[ji.secondaryRead-1] = true
}
return ji.ec.row, nil
}
}
Expand Down
28 changes: 28 additions & 0 deletions spanner/spannertest/integration_test.go
Expand Up @@ -959,6 +959,34 @@ func TestIntegration_ReadsAndQueries(t *testing.T) {
{int64(2), "b", int64(3), "d"},
},
},
{
// Same as in docs, but with a weird ORDER BY clause to match the row ordering.
`SELECT * FROM JoinA FULL OUTER JOIN JoinB ON JoinA.w = JoinB.y ORDER BY w IS NULL, w, x, y, z`,
nil,
[][]interface{}{
{int64(1), "a", nil, nil},
{int64(2), "b", int64(2), "k"},
{int64(3), "c", int64(3), "m"},
{int64(3), "c", int64(3), "n"},
{int64(3), "d", int64(3), "m"},
{int64(3), "d", int64(3), "n"},
{nil, nil, int64(4), "p"},
},
},
{
// Same as the previous, but using a USING clause instead of an ON clause.
`SELECT * FROM JoinC FULL OUTER JOIN JoinD USING (x) ORDER BY x, y, z`,
nil,
[][]interface{}{
{int64(1), "a", nil},
{int64(2), "b", "k"},
{int64(3), "c", "m"},
{int64(3), "c", "n"},
{int64(3), "d", "m"},
{int64(3), "d", "n"},
{int64(4), nil, "p"},
},
},
{
`SELECT * FROM JoinA LEFT OUTER JOIN JoinB AS B ON JoinA.w = B.y ORDER BY w, x, y, z`,
nil,
Expand Down

0 comments on commit 99f7212

Please sign in to comment.