Skip to content

Commit

Permalink
Proposal for batch support
Browse files Browse the repository at this point in the history
Related to golang#5171

Change-Id: I46a6d12b46d3802a338e5733ca81e8a0fb2ae125
  • Loading branch information
muhlemmer committed Oct 17, 2020
1 parent 5faa828 commit b4dc545
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/database/sql/driver/driver.go
Expand Up @@ -542,3 +542,24 @@ func (noRows) LastInsertId() (int64, error) {
func (noRows) RowsAffected() (int64, error) {
return 0, errors.New("no RowsAffected available after DDL statement")
}

// CopyResult signals termination of a CopyIn.
// Both Res and Err may be populated in case of a partial write.
type CopyResult struct {
Res Result
Err error
}

// Copier is an optional interface that may be implemented by a Conn.
type Copier interface {

// CopyIn sends a batch of data to the server in a single execution.
// The driver may do this asynchronously.
//
// The data channel may be buffered.
// The sender closes the data channel when all data is sent.
//
// Exact one of CopyResult must always be send on the result channel.
// Either after all data is flushed, or after encountering an error.
CopyIn(ctx context.Context, table string, columns ...string) (data chan<- []NamedValue, result <-chan CopyResult)
}
93 changes: 93 additions & 0 deletions src/database/sql/sql.go
Expand Up @@ -3293,3 +3293,96 @@ func withLock(lk sync.Locker, fn func()) {
defer lk.Unlock() // in case fn panics
fn()
}

// Batch is input to a CopyIn command.
type Batch interface {
Table() string
Columns() []string
// Next returns a row of data in a batch.
// It should return io.EOF when no more data is available.
Next() ([]interface{}, error)
}

// ErrNotCopier is returned when the driver does not support batch operations.
var ErrNotCopier = errors.New("sql: driver does not support CopyIn")

// CopyIn sends the provided batch in a single, asynchronous operation.
//
// In case of an error, partial data may have been written to the database.
// If supported, a driver may return both Result and error to indicate the amount of rows written.
func (tx *Tx) CopyIn(ctx context.Context, batch Batch) (Result, error) {
dc, release, err := tx.grabConn(ctx)
if err != nil {
return nil, err
}
return tx.db.copyDC(ctx, dc, release, batch)
}

func (db *DB) copyDC(ctx context.Context, dc *driverConn, release func(error), batch Batch) (res Result, err error) {
defer func() {
release(err)
}()

copier, ok := dc.ci.(driver.Copier)
if !ok {
err = ErrNotCopier
return
}

withLock(dc, func() {
data, result := copier.CopyIn(ctx, batch.Table(), batch.Columns()...)

for {
var (
args []interface{}
nvdargs []driver.NamedValue
)

args, err = batch.Next()
if err != nil {
close(data)
r := <-result

if err == io.EOF {
res, err = r.Res, r.Err
} else {
res, err = r.Res, fmt.Errorf("sql Batch: %w", err)
}

return
}

nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
if err != nil {
return
}

select {

case r := <-result:
res, err = r.Res, r.Err
return

case <-ctx.Done():
close(data)

r := <-result

// Give priority for the error reported from the driver.
// If its nil, we set the context error.
if err = r.Err; err == nil {
err = ctx.Err()
}

return

case data <- nvdargs:
continue
}

}

})

return
}

0 comments on commit b4dc545

Please sign in to comment.