Skip to content

Commit

Permalink
Merge pull request #153 from daku10/gcs-import-wildcard
Browse files Browse the repository at this point in the history
Add GCS wildcard import support
  • Loading branch information
goccy committed May 4, 2023
2 parents 500650e + 5205ff7 commit fc6db7c
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 6 deletions.
45 changes: 39 additions & 6 deletions server/handler.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/goccy/go-json"
"go.uber.org/zap"
bigqueryv2 "google.golang.org/api/bigquery/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/goccy/bigquery-emulator/internal/connection"
Expand Down Expand Up @@ -1079,12 +1080,44 @@ func (h *jobsInsertHandler) importFromGCS(ctx context.Context, r *jobsInsertRequ
}
bucketName := paths[0]
objectPath := strings.Join(paths[1:], "/")
reader, err := client.Bucket(bucketName).Object(objectPath).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get gcs object reader for %s: %w", uri, err)
}
if err := h.importFromGCSObject(ctx, r, reader); err != nil {
return nil, err
switch strings.Count(objectPath, "*") {
case 0:
reader, err := client.Bucket(bucketName).Object(objectPath).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get gcs object reader for %s: %w", uri, err)
}
if err := h.importFromGCSObject(ctx, r, reader); err != nil {
return nil, err
}
case 1:
splitPath := strings.Split(objectPath, "*")
prefix := splitPath[0]
suffix := splitPath[1]
query := &storage.Query{
Prefix: prefix,
}
query.SetAttrSelection([]string{"Name"})
it := client.Bucket(bucketName).Objects(ctx, query)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("failed to list gcs object for %s: %w", uri, err)
}
if strings.HasSuffix(attrs.Name, suffix) {
reader, err := client.Bucket(bucketName).Object(attrs.Name).NewReader(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get gcs object reader for %s: %w", uri, err)
}
if err := h.importFromGCSObject(ctx, r, reader); err != nil {
return nil, err
}
}
}
default:
return nil, fmt.Errorf("the number of wildcards in gcs uri must be 0 or 1")
}
}
endTime := time.Now()
Expand Down
155 changes: 155 additions & 0 deletions server/server_test.go
Expand Up @@ -1597,6 +1597,161 @@ func TestImportFromGCS(t *testing.T) {
}
}

func TestImportWithWildcardFromGCS(t *testing.T) {
const (
projectID = "test"
datasetID = "dataset1"
tableID = "table_a"
publicHost = "127.0.0.1"
bucketName = "test-bucket"
sourceName = "path/to/*.json"
)

var (
targetSourceFiles = []string{
"path/to/data.json",
"path/to/under/data.json",
}
nonTargetSourceFiles = []string{
"path/not/data.json",
"path/to/data.csv",
}
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
project := types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("id", types.INT64),
types.NewColumn("value", types.INT64),
},
nil,
),
),
)
if err := bqServer.Load(server.StructSource(project)); err != nil {
t.Fatal(err)
}

testServer := bqServer.TestServer()
files := make([]string, len(targetSourceFiles)+len(nonTargetSourceFiles))
copy(files, targetSourceFiles)
copy(files[len(targetSourceFiles):], nonTargetSourceFiles)
var initialObjects []fakestorage.Object
for i, file := range files {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
for j := 0; j < 3; j++ {
if err := enc.Encode(map[string]interface{}{
"id": i*10 + j + 1,
"value": (i+1)*10 + j + 1,
}); err != nil {
t.Fatal(err)
}
}
initialObjects = append(initialObjects, fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: bucketName,
Name: file,
Size: int64(len(buf.Bytes())),
},
Content: buf.Bytes(),
})
}

storageServer, err := fakestorage.NewServerWithOptions(fakestorage.Options{
InitialObjects: initialObjects,
PublicHost: publicHost,
Scheme: "http",
})
if err != nil {
t.Fatal(err)
}

storageServerURL := storageServer.URL()
u, err := url.Parse(storageServerURL)
if err != nil {
t.Fatal(err)
}
storageEmulatorHost := fmt.Sprintf("http://%s:%s", publicHost, u.Port())
t.Setenv("STORAGE_EMULATOR_HOST", storageEmulatorHost)

defer func() {
testServer.Close()
bqServer.Stop(ctx)
storageServer.Stop()
}()

client, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer client.Close()

gcsSourceURL := fmt.Sprintf("gs://%s/%s", bucketName, sourceName)
gcsRef := bigquery.NewGCSReference(gcsSourceURL)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.AutoDetect = true
loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
t.Fatal(err)
}
status, err := job.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if status.Err() != nil {
t.Fatal(status.Err())
}

query := client.Query(fmt.Sprintf("SELECT * FROM %s.%s", datasetID, tableID))
it, err := query.Read(ctx)
if err != nil {
t.Fatal(err)
}

type row struct {
ID int64
Value int64
}
var rows []*row
for {
var r row
if err := it.Next(&r); err != nil {
if err == iterator.Done {
break
}
t.Fatal(err)
}
rows = append(rows, &r)
}
if diff := cmp.Diff([]*row{
{ID: 1, Value: 11},
{ID: 2, Value: 12},
{ID: 3, Value: 13},
{ID: 11, Value: 21},
{ID: 12, Value: 22},
{ID: 13, Value: 23},
}, rows); diff != "" {
t.Errorf("(-want +got):\n%s", diff)
}
}

func TestExportToGCS(t *testing.T) {
const (
projectID = "test"
Expand Down

0 comments on commit fc6db7c

Please sign in to comment.