Skip to content

Commit

Permalink
add table and column metadata for bigquery (#141)
Browse files Browse the repository at this point in the history
Co-authored-by: poundifdef <jay@scratchdata.com>
  • Loading branch information
poundifdef and poundifdef committed Mar 27, 2024
1 parent 82cb218 commit 25a72e4
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 0 deletions.
6 changes: 6 additions & 0 deletions models/models.go
Expand Up @@ -13,3 +13,9 @@ type StorageServices struct {
Queue queue.Queue
BlobStore blobstore.BlobStore
}

type Column struct {
Name string `json:"name"`
Type string `json:"type"`
JSONType string `json:"-"`
}
4 changes: 4 additions & 0 deletions pkg/api/router.go
Expand Up @@ -24,6 +24,8 @@ var responseSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
type ScratchDataAPI interface {
Select(w http.ResponseWriter, r *http.Request)
Insert(w http.ResponseWriter, r *http.Request)
Tables(w http.ResponseWriter, r *http.Request)
Columns(w http.ResponseWriter, r *http.Request)

CreateQuery(w http.ResponseWriter, r *http.Request)
ShareData(w http.ResponseWriter, r *http.Request)
Expand All @@ -47,6 +49,8 @@ func CreateMux(apiFunctions ScratchDataAPI) *chi.Mux {
api.Post("/data/insert/{table}", apiFunctions.Insert)
api.Get("/data/query", apiFunctions.Select)
api.Post("/data/query", apiFunctions.Select)
api.Get("/tables", apiFunctions.Tables)
api.Get("/tables/{table}/columns", apiFunctions.Columns)

api.Get("/destinations", apiFunctions.GetDestinations)
api.Post("/destinations", apiFunctions.CreateDestination)
Expand Down
44 changes: 44 additions & 0 deletions pkg/api/table.go
@@ -0,0 +1,44 @@
package api

import (
"net/http"

"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
)

func (a *ScratchDataAPIStruct) Tables(w http.ResponseWriter, r *http.Request) {
databaseID := a.AuthGetDatabaseID(r.Context())
dest, err := a.destinationManager.Destination(r.Context(), databaseID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

tables, err := dest.Tables()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

render.JSON(w, r, tables)
}

func (a *ScratchDataAPIStruct) Columns(w http.ResponseWriter, r *http.Request) {
table := chi.URLParam(r, "table")
databaseID := a.AuthGetDatabaseID(r.Context())

dest, err := a.destinationManager.Destination(r.Context(), databaseID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

columns, err := dest.Columns(table)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

render.JSON(w, r, columns)
}
71 changes: 71 additions & 0 deletions pkg/destinations/bigquery/tables.go
@@ -0,0 +1,71 @@
package bigquery

import (
"context"
"errors"
"fmt"
"strings"

"github.com/rs/zerolog/log"
"github.com/scratchdata/scratchdata/models"
"google.golang.org/api/iterator"
)

func (b *BigQueryServer) Columns(table string) ([]models.Column, error) {
rc := []models.Column{}

tokens := strings.Split(table, ".")
if len(tokens) != 2 {
return nil, errors.New("Table should be in the format dataset.table")
}

datasetID := tokens[0]
tableID := tokens[1]

ctx := context.TODO()
tableInfo := b.conn.Dataset(datasetID).Table(tableID)
meta, err := tableInfo.Metadata(ctx)
if err != nil {
return nil, err
}

for _, field := range meta.Schema {
rc = append(rc, models.Column{
Name: field.Name,
Type: string(field.Type),
})
}
return rc, nil
}

func (b *BigQueryServer) Tables() ([]string, error) {
rc := []string{}

ctx := context.TODO()
it := b.conn.Datasets(ctx)
for {
dataset, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}

// For each dataset, list all tables.
tableIt := dataset.Tables(ctx)
for {
table, err := tableIt.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Error().Err(err).Str("dataset_id", dataset.DatasetID).Msg("Failed to list tables")
continue
}
rc = append(rc, fmt.Sprintf("%s.%s", dataset.DatasetID, table.TableID))
}
}

return rc, nil
}
15 changes: 15 additions & 0 deletions pkg/destinations/clickhouse/tables.go
@@ -0,0 +1,15 @@
package clickhouse

import (
"errors"

"github.com/scratchdata/scratchdata/models"
)

func (b *ClickhouseServer) Columns(table string) ([]models.Column, error) {
return []models.Column{}, errors.New("not implemented")
}

func (b *ClickhouseServer) Tables() ([]string, error) {
return []string{}, errors.New("not implemented")
}
3 changes: 3 additions & 0 deletions pkg/destinations/destinations.go
Expand Up @@ -25,6 +25,9 @@ type Destination interface {
QueryJSON(query string, writer io.Writer) error
QueryCSV(query string, writer io.Writer) error

Tables() ([]string, error)
Columns(table string) ([]models.Column, error)

CreateEmptyTable(name string) error
CreateColumns(table string, filePath string) error
InsertFromNDJsonFile(table string, filePath string) error
Expand Down
15 changes: 15 additions & 0 deletions pkg/destinations/duckdb/tables.go
@@ -0,0 +1,15 @@
package duckdb

import (
"errors"

"github.com/scratchdata/scratchdata/models"
)

func (b *DuckDBServer) Columns(table string) ([]models.Column, error) {
return []models.Column{}, errors.New("not implemented")
}

func (b *DuckDBServer) Tables() ([]string, error) {
return []string{}, errors.New("not implemented")
}
15 changes: 15 additions & 0 deletions pkg/destinations/redshift/tables.go
@@ -0,0 +1,15 @@
package redshift

import (
"errors"

"github.com/scratchdata/scratchdata/models"
)

func (b *RedshiftServer) Columns(table string) ([]models.Column, error) {
return []models.Column{}, errors.New("not implemented")
}

func (b *RedshiftServer) Tables() ([]string, error) {
return []string{}, errors.New("not implemented")
}

0 comments on commit 25a72e4

Please sign in to comment.