Skip to content

Commit

Permalink
Merge pull request #4 from FindHotel/pipe-with-sns-topic
Browse files Browse the repository at this point in the history
Add aws_sns_topic parameter to snowflake_pipe resource
  • Loading branch information
velppa committed Jul 30, 2020
2 parents 1c3997a + a8f01d4 commit a43a6d7
Show file tree
Hide file tree
Showing 15 changed files with 89 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -160,6 +160,7 @@ These resources do not enforce exclusive attachment of a grant, it is the user's
| NAME | TYPE | DESCRIPTION | OPTIONAL | REQUIRED | COMPUTED | DEFAULT |
|----------------------|--------|-----------------------------------------------------------------------------------------------------------------|----------|-----------|----------|---------|
| auto_ingest | bool | Specifies a auto_ingest param for the pipe. | true | false | false | false |
| aws_sns_topic | string | Specifies AWS SNS Topic that SQS managed by Snowflake will subscribe to. | true | false | false | <nil> |
| comment | string | Specifies a comment for the pipe. | true | false | false | <nil> |
| copy_statement | string | Specifies the copy statement for the pipe. | false | true | false | <nil> |
| database | string | The database in which to create the pipe. | false | true | false | <nil> |
Expand Down
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.13.3
0.13.2-fh2
11 changes: 11 additions & 0 deletions pkg/resources/pipe.go
Expand Up @@ -52,6 +52,13 @@ var pipeSchema = map[string]*schema.Schema{
return false
},
},
"aws_sns_topic": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Required: false,
ForceNew: true,
Description: "Specifies AWS SNS Topic that SQS managed by Snowflake will subscribe to.",
},
"auto_ingest": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -146,6 +153,10 @@ func CreatePipe(data *schema.ResourceData, meta interface{}) error {
builder.WithCopyStatement(v.(string))
}

if v, ok := data.GetOk("aws_sns_topic"); ok {
builder.WithAWSSNSTopic(v.(string))
}

if v, ok := data.GetOk("comment"); ok {
builder.WithComment(v.(string))
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/resources/pipe_test.go
Expand Up @@ -41,6 +41,30 @@ func TestPipeCreate(t *testing.T) {
})
}

func TestPipeCreateWithAWSSNSTopic(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "test_pipe",
"database": "test_db",
"schema": "test_schema",
"comment": "great comment",
"aws_sns_topic": "topic_arn",
}
d := schema.TestResourceDataRaw(t, resources.Pipe().Schema, in)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`^CREATE PIPE "test_db"."test_schema"."test_pipe" AWS_SNS_TOPIC = 'topic_arn' COMMENT = 'great comment'$`,
).WillReturnResult(sqlmock.NewResult(1, 1))

expectReadPipe(mock)
err := resources.CreatePipe(d, db)
r.NoError(err)
})
}

func expectReadPipe(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{
"created_on", "name", "database_name", "schema_name", "definition", "owner", "notification_channel", "comment"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/resource_monitor_acceptance_test.go
Expand Up @@ -2,13 +2,17 @@ package resources_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccResourceMonitor(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccResourceMonitor")
}
// TODO test more attributes
name := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)

Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/role_acceptance_test.go
Expand Up @@ -2,13 +2,17 @@ package resources_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccRole(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccRole")
}
prefix := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
prefix2 := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)

Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/role_grants_acceptance_test.go
Expand Up @@ -3,6 +3,7 @@ package resources_test
import (
"fmt"
"log"
"os"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -89,6 +90,9 @@ func testCheckRolesAndUsers(path string, roles, users []string) func(state *terr
}

func TestAccGrantRole(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccGrantRole")
}
role1 := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
role2 := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
role3 := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/schema_acceptance_test.go
Expand Up @@ -2,13 +2,17 @@ package resources_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccSchema(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccSchema")
}
accName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)

resource.Test(t, resource.TestCase{
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/stage_acceptance_test.go
Expand Up @@ -2,13 +2,17 @@ package resources_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccStage(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccStage")
}
accName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)

resource.Test(t, resource.TestCase{
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/stage_test.go
Expand Up @@ -55,5 +55,5 @@ func expectReadStageShow(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{
"created_on", "name", "database_name", "schema_name", "url", "has_credentials", "has_encryption_key", "owner", "comment", "region", "type", "cloud", "notification_channel", "storage_integration"},
).AddRow("2019-12-23 17:20:50.088 +0000", "test_stage", "test_db", "test_schema", "s3://load/test/", "N", "Y", "test", "great comment", "us-east-1", "EXTERNAL", "AWS", "NULL", "NULL")
mock.ExpectQuery(`^SHOW STAGES LIKE 'test_stage' IN DATABASE "test_db"$`).WillReturnRows(rows)
mock.ExpectQuery(`^SHOW STAGES LIKE 'test_stage' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows)
}
4 changes: 4 additions & 0 deletions pkg/resources/task_acceptance_test.go
Expand Up @@ -3,6 +3,7 @@ package resources_test
import (
"bytes"
"fmt"
"os"
"testing"
"text/template"

Expand Down Expand Up @@ -157,6 +158,9 @@ var (
)

func Test_AccTask(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping Test_AccTask")
}
resource.Test(t, resource.TestCase{
Providers: providers(),
Steps: []resource.TestStep{
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/user_acceptance_test.go
Expand Up @@ -3,6 +3,7 @@ package resources_test
import (
"fmt"
"log"
"os"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -31,6 +32,9 @@ func checkBool(path, attr string, value bool) func(*terraform.State) error {
}

func TestAccUser(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccUser")
}
r := require.New(t)
prefix := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
prefix2 := randomdata.Email()
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/view_acceptance_test.go
Expand Up @@ -2,13 +2,17 @@ package resources_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccView(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccView")
}
accName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)

resource.Test(t, resource.TestCase{
Expand Down
7 changes: 7 additions & 0 deletions pkg/resources/view_grant_acceptance_test.go
Expand Up @@ -12,6 +12,9 @@ import (
)

func TestAccViewGrantBasic(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_DATABASE_TESTS"); ok {
t.Skip("Skipping TestAccViewGrantBasic")
}
viewName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
databaseName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
Expand Down Expand Up @@ -55,6 +58,10 @@ func TestAccViewGrantShares(t *testing.T) {
}

func TestAccFutureViewGrantChange(t *testing.T) {
if _, ok := os.LookupEnv("SKIP_SHARE_TESTS"); ok {
t.Skip("Skipping TestAccFutureViewGrantChange")
}

viewName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
databaseName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
roleName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
Expand Down
13 changes: 12 additions & 1 deletion pkg/snowflake/pipe.go
Expand Up @@ -12,6 +12,7 @@ type PipeBuilder struct {
name string
db string
schema string
awsSNSTopic string
autoIngest bool
comment string
copyStatement string
Expand Down Expand Up @@ -50,12 +51,18 @@ func (pb *PipeBuilder) WithComment(c string) *PipeBuilder {
return pb
}

// WithURL adds a URL to the PipeBuilder
// WithCopyStatement adds a copy statement to the PipeBuilder
func (pb *PipeBuilder) WithCopyStatement(s string) *PipeBuilder {
pb.copyStatement = s
return pb
}

// WithAWSSNSTopic adds an AWS SNS Topic ARN to the PipeBuilder
func (pb *PipeBuilder) WithAWSSNSTopic(arn string) *PipeBuilder {
pb.awsSNSTopic = arn
return pb
}

// Pipe returns a pointer to a Builder that abstracts the DDL operations for a pipe.
//
// Supported DDL operations are:
Expand Down Expand Up @@ -84,6 +91,10 @@ func (pb *PipeBuilder) Create() string {
q.WriteString(` AUTO_INGEST = TRUE`)
}

if pb.awsSNSTopic != "" {
q.WriteString(fmt.Sprintf(` AWS_SNS_TOPIC = '%v'`, EscapeString(pb.awsSNSTopic)))
}

if pb.comment != "" {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(pb.comment)))
}
Expand Down

0 comments on commit a43a6d7

Please sign in to comment.