Skip to content

Commit

Permalink
Merge pull request #199 from goccy/sync-table-metadata
Browse files Browse the repository at this point in the history
Sync table metadata
  • Loading branch information
goccy committed Jun 17, 2023
2 parents 456c584 + 3ea31db commit a4f2d6c
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -12,7 +12,7 @@ require (
github.com/goccy/go-json v0.10.0
github.com/goccy/go-yaml v1.9.5
github.com/goccy/go-zetasql v0.5.1
github.com/goccy/go-zetasqlite v0.14.1
github.com/goccy/go-zetasqlite v0.16.0
github.com/google/go-cmp v0.5.9
github.com/googleapis/gax-go/v2 v2.7.1
github.com/gorilla/mux v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -79,8 +79,8 @@ github.com/goccy/go-yaml v1.9.5 h1:Eh/+3uk9kLxG4koCX6lRMAPS1OaMSAi+FJcya0INdB0=
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
github.com/goccy/go-zetasql v0.5.1 h1:vP9wgUH5gylw8yL+gwO0wF7uW/fW5Dr1AAAzi8Kgevg=
github.com/goccy/go-zetasql v0.5.1/go.mod h1:6W14CJVKh7crrSPyj6NPk4c49L2NWnxvyDLsRkOm4BI=
github.com/goccy/go-zetasqlite v0.14.1 h1:Jt1mtRY/xpln6dbVo4AuMdAEWC8xKYF0hAJg2hYb3So=
github.com/goccy/go-zetasqlite v0.14.1/go.mod h1:ikLN7nRFum4sXL6kDxgIWrhH/9iZSdwXWXZzMUnuTjM=
github.com/goccy/go-zetasqlite v0.16.0 h1:kjd6g8tY1OIEefCHjeZYvfu0tQwVswEaYHR844XFTLY=
github.com/goccy/go-zetasqlite v0.16.0/go.mod h1:ikLN7nRFum4sXL6kDxgIWrhH/9iZSdwXWXZzMUnuTjM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
Expand Down
13 changes: 9 additions & 4 deletions internal/contentdata/repository.go
Expand Up @@ -172,6 +172,10 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
return nil, err
}
defer rows.Close()
changedCatalog, err := zetasqlite.ChangedCatalogFromRows(rows)
if err != nil {
return nil, fmt.Errorf("failed to get changed catalog: %w", err)
}
colNames, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get columns: %w", err)
Expand Down Expand Up @@ -238,10 +242,11 @@ func (r *Repository) Query(ctx context.Context, tx *connection.Tx, projectID, da
Schema: &bigqueryv2.TableSchema{
Fields: fields,
},
TotalRows: uint64(len(tableRows)),
JobComplete: true,
Rows: tableRows,
TotalBytes: totalBytes,
TotalRows: uint64(len(tableRows)),
JobComplete: true,
Rows: tableRows,
TotalBytes: totalBytes,
ChangedCatalog: changedCatalog,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions internal/types/types.go
Expand Up @@ -21,12 +21,13 @@ type (
}

QueryResponse struct {
JobReference *bigqueryv2.JobReference `json:"jobReference"`
Schema *bigqueryv2.TableSchema `json:"schema"`
Rows []*TableRow `json:"rows"`
TotalRows uint64 `json:"totalRows,string"`
JobComplete bool `json:"jobComplete"`
TotalBytes int64 `json:"-"`
JobReference *bigqueryv2.JobReference `json:"jobReference"`
Schema *bigqueryv2.TableSchema `json:"schema"`
Rows []*TableRow `json:"rows"`
TotalRows uint64 `json:"totalRows,string"`
JobComplete bool `json:"jobComplete"`
TotalBytes int64 `json:"-"`
ChangedCatalog *zetasqlite.ChangedCatalog `json:"-"`
}

TableDataList struct {
Expand Down
159 changes: 137 additions & 22 deletions server/handler.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/storage"
"github.com/goccy/go-json"
"github.com/goccy/go-zetasqlite"
"go.uber.org/zap"
bigqueryv2 "google.golang.org/api/bigquery/v2"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -1478,10 +1479,112 @@ func (h *jobsInsertHandler) Handle(ctx context.Context, r *jobsInsertRequest) (*
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit job: %w", err)
}
if response != nil && response.ChangedCatalog.Changed() {
if err := syncCatalog(ctx, r.server, response.ChangedCatalog); err != nil {
return nil, err
}
}
}

return job, nil
}

func syncCatalog(ctx context.Context, server *Server, cat *zetasqlite.ChangedCatalog) error {
for _, table := range cat.Table.Added {
if err := addTableMetadata(ctx, server, table); err != nil {
return err
}
}
for _, table := range cat.Table.Deleted {
if err := deleteTableMetadata(ctx, server, table); err != nil {
return err
}
}
return nil
}

func addTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.TableSpec) error {
if len(spec.NamePath) != 3 {
return fmt.Errorf("unexpected table name path: %v", spec.NamePath)
}
projectID := spec.NamePath[0]
datasetID := spec.NamePath[1]
tableID := spec.NamePath[2]
project, err := server.metaRepo.FindProject(ctx, projectID)
if err != nil {
return err
}
dataset := project.Dataset(datasetID)
if dataset == nil {
return fmt.Errorf("dataset %s is not found", datasetID)
}
fields := make([]*bigqueryv2.TableFieldSchema, 0, len(spec.Columns))
for _, column := range spec.Columns {
zetasqlType, err := column.Type.ToZetaSQLType()
if err != nil {
return err
}
fields = append(fields, types.TableFieldSchemaFromZetaSQLType(column.Name, zetasqlType))
}
conn, err := server.connMgr.Connection(ctx, projectID, datasetID)
if err != nil {
return err
}
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer tx.RollbackIfNotCommitted()
if _, err := createTableMetadata(ctx, tx, server, project, dataset, &bigqueryv2.Table{
TableReference: &bigqueryv2.TableReference{
ProjectId: projectID,
DatasetId: datasetID,
TableId: tableID,
},
Schema: &bigqueryv2.TableSchema{Fields: fields},
}); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}

func deleteTableMetadata(ctx context.Context, server *Server, spec *zetasqlite.TableSpec) error {
if len(spec.NamePath) != 3 {
return fmt.Errorf("unexpected table name path: %v", spec.NamePath)
}
projectID := spec.NamePath[0]
datasetID := spec.NamePath[1]
tableID := spec.NamePath[2]
project, err := server.metaRepo.FindProject(ctx, projectID)
if err != nil {
return err
}
dataset := project.Dataset(datasetID)
if dataset == nil {
return fmt.Errorf("dataset %s is not found", datasetID)
}
table := dataset.Table(tableID)
conn, err := server.connMgr.Connection(ctx, projectID, datasetID)
if err != nil {
return err
}
tx, err := conn.Begin(ctx)
if err != nil {
return err
}
defer tx.RollbackIfNotCommitted()
if err := table.Delete(ctx, tx.Tx()); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}

func (h *jobsInsertHandler) addQueryResultToDynamicDestinationTable(ctx context.Context, tx *connection.Tx, r *jobsInsertRequest, response *internaltypes.QueryResponse) error {
projectID := r.project.ID
jobID := r.job.JobReference.JobId
Expand Down Expand Up @@ -1618,6 +1721,11 @@ func (h *jobsQueryHandler) Handle(ctx context.Context, r *jobsQueryRequest) (*in
if err := tx.Commit(); err != nil {
return nil, err
}
if response.ChangedCatalog.Changed() {
if err := syncCatalog(ctx, r.server, response.ChangedCatalog); err != nil {
return nil, err
}
}
}
jobID := r.queryRequest.RequestId
if jobID == "" {
Expand Down Expand Up @@ -2434,10 +2542,9 @@ const (
SnapshotTableType TableType = "SNAPSHOT"
)

func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest) (*bigqueryv2.Table, *ServerError) {
table := r.table
func createTableMetadata(ctx context.Context, tx *connection.Tx, server *Server, project *metadata.Project, dataset *metadata.Dataset, table *bigqueryv2.Table) (*bigqueryv2.Table, *ServerError) {
now := time.Now().Unix()
table.Id = fmt.Sprintf("%s:%s.%s", r.project.ID, r.dataset.ID, r.table.TableReference.TableId)
table.Id = fmt.Sprintf("%s:%s.%s", project.ID, dataset.ID, table.TableReference.TableId)
table.CreationTime = now
table.LastModifiedTime = uint64(now)
table.Type = string(DefaultTableType) // TODO: need to handle other table types
Expand All @@ -2447,10 +2554,10 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
table.Kind = "bigquery#table"
table.SelfLink = fmt.Sprintf(
"http://%s/bigquery/v2/projects/%s/datasets/%s/tables/%s",
r.server.httpServer.Addr,
r.project.ID,
r.dataset.ID,
r.table.TableReference.TableId,
server.httpServer.Addr,
project.ID,
dataset.ID,
table.TableReference.TableId,
)
encodedTableData, err := json.Marshal(table)
if err != nil {
Expand All @@ -2460,24 +2567,14 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
if err := json.Unmarshal(encodedTableData, &tableMetadata); err != nil {
return nil, errInternalError(err.Error())
}

conn, err := r.server.connMgr.Connection(ctx, r.project.ID, r.dataset.ID)
if err != nil {
return nil, errInternalError(err.Error())
}
tx, err := conn.Begin(ctx)
if err != nil {
return nil, errInternalError(err.Error())
}
defer tx.RollbackIfNotCommitted()
if err := r.dataset.AddTable(
if err := dataset.AddTable(
ctx,
tx.Tx(),
metadata.NewTable(
r.server.metaRepo,
r.project.ID,
r.dataset.ID,
r.table.TableReference.TableId,
server.metaRepo,
project.ID,
dataset.ID,
table.TableReference.TableId,
tableMetadata,
),
); err != nil {
Expand All @@ -2486,6 +2583,24 @@ func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest
}
return nil, errInternalError(err.Error())
}
return table, nil
}

func (h *tablesInsertHandler) Handle(ctx context.Context, r *tablesInsertRequest) (*bigqueryv2.Table, *ServerError) {
conn, err := r.server.connMgr.Connection(ctx, r.project.ID, r.dataset.ID)
if err != nil {
return nil, errInternalError(err.Error())
}
tx, err := conn.Begin(ctx)
if err != nil {
return nil, errInternalError(err.Error())
}
defer tx.RollbackIfNotCommitted()

table, serverErr := createTableMetadata(ctx, tx, r.server, r.project, r.dataset, r.table)
if serverErr != nil {
return nil, serverErr
}
if r.table.Schema != nil {
if err := r.server.contentRepo.CreateTable(ctx, tx, r.table); err != nil {
return nil, errInternalError(err.Error())
Expand Down
10 changes: 10 additions & 0 deletions server/middleware.go
Expand Up @@ -4,6 +4,7 @@ import (
"compress/gzip"
"fmt"
"net/http"
"runtime"
"sync"

"github.com/gorilla/mux"
Expand All @@ -30,6 +31,15 @@ func recoveryMiddleware(s *Server) func(http.Handler) http.Handler {
if err := recover(); err != nil {
ctx := logger.WithLogger(r.Context(), s.logger)
errorResponse(ctx, w, errInternalError(fmt.Sprintf("%+v", err)))
var frame int = 1
for {
_, file, line, ok := runtime.Caller(frame)
if !ok {
break
}
s.logger.Error(fmt.Sprintf("%d: %v:%d", frame, file, line))
frame++
}
return
}
}()
Expand Down

0 comments on commit a4f2d6c

Please sign in to comment.