Skip to content

Commit b22ec06

Browse files
author
Pierre
committed
Main update
1 parent 283016d commit b22ec06

File tree

8 files changed

+81
-77
lines changed

8 files changed

+81
-77
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
private_key.json

client/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package client
2+
3+
import (
4+
"cloud.google.com/go/pubsub"
5+
"context"
6+
"google.golang.org/api/option"
7+
"log"
8+
)
9+
10+
func InitPubSubClient(ctx context.Context, projectID, credFile string) (*pubsub.Client, error) {
11+
client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(credFile))
12+
if err != nil {
13+
log.Println("ERROR : client.InitPubSubClient : " + err.Error())
14+
return nil, err
15+
}
16+
return client, nil
17+
}

consumer/consumer.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,25 @@ func InitBuffer(bufferSize int) chan string {
1313
return buffer
1414
}
1515

16-
func Pull(projectID, subName, topicID, credFile string, buffer chan string) {
17-
ctx := context.Background()
18-
client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(credFile))
16+
func Pull(ctx context.Context, pubsubClient *pubsub.Client, subName, topicID string, buffer chan string) error {
1917
if err != nil {
20-
log.Println("pubsub.NewClient: %v", err)
18+
log.Println("ERROR : consumer.Pull : " + err.Error())
19+
return err
2120
}
2221

23-
topic := client.Topic(topicID)
22+
topic := pubsubClient.Topic(topicID)
2423

2524
// Create topic subscription if it does not yet exist.
26-
sub := client.Subscription(subName)
25+
sub := pubsubClient.Subscription(subName)
2726
exists, err := sub.Exists(ctx)
2827
if err != nil {
29-
log.Println("Error checking for subscription: %v", err)
28+
log.Println("ERROR : consumer.Pull : " + err.Error())
29+
return err
3030
}
3131
if !exists {
32-
if _, err = client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{Topic: topic}); err != nil {
33-
log.Println("Failed to create subscription: %v", err)
32+
if _, err = pubsubClient.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{Topic: topic}); err != nil {
33+
log.Println("ERROR : consumer.Pull : " + err.Error())
34+
return err
3435
}
3536
}
3637

@@ -40,6 +41,8 @@ func Pull(projectID, subName, topicID, credFile string, buffer chan string) {
4041
msg.Ack()
4142
})
4243
if err != nil {
43-
log.Println("ERROR : pull : %v", err)
44+
log.Println("ERROR : consumer.Pull : " + err.Error())
45+
return err
4446
}
47+
return nil
4548
}

example-consumer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"github.com/PierreKieffer/pubsubToolBox/client"
45
"github.com/PierreKieffer/pubsubToolBox/consumer"
56
"log"
67
)
@@ -9,14 +10,22 @@ var exit = make(chan bool)
910

1011
func main() {
1112

13+
ctx := context.Background()
14+
15+
projectID := "ai-datalake"
16+
topicID := "testTopic"
17+
subscriberName := "ludo-test"
18+
19+
pubsubClient, err := client.InitPubSubClient(ctx, projectID, "private_key.json")
20+
1221
// Init message buffer to receive pulled messages
1322
var buffer = consumer.InitBuffer(10)
1423

1524
// Launch local buffer consumer to process messages
1625
go ProcessBuffer(buffer)
1726

1827
// Launch the pubsub consumer to pull messages
19-
go consumer.Pull("PROJECT_ID", "SUBSCRIBER_NAME", "TOPIC_NAME", "private_key.json", buffer)
28+
go consumer.Pull(ctx, pubsubClient, subscriberName, topicID, buffer)
2029

2130
<-exit
2231

example-producer.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
11
package main
22

33
import (
4+
"context"
5+
"github.com/PierreKieffer/pubsubToolBox/client"
46
"github.com/PierreKieffer/pubsubToolBox/producer"
7+
"log"
58
)
69

710
func main() {
11+
ctx := context.Background()
12+
13+
projectID := "ai-datalake"
14+
topicID := "testTopic"
15+
16+
pubsubClient, err := client.InitPubSubClient(ctx, projectID, "private_key.json")
17+
if err != nil {
18+
log.Println(err)
19+
}
820

921
message := `{"Message" : "Hello world"}`
10-
producer.Publish("PROJECT_NAME", "TOPIC_NAME", "private_key.json", message)
22+
producer.Publish(ctx, pubsubClient, topicID, message)
1123

1224
}

producer/producer.go

Lines changed: 0 additions & 32 deletions
This file was deleted.

publisher/publisher.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package publisher
2+
3+
import (
4+
"context"
5+
"google.golang.org/api/option"
6+
"log"
7+
8+
"cloud.google.com/go/pubsub"
9+
)
10+
11+
func Publish(ctx context.Context, pubsubClient *pubsub.Client, topicID, message string) error {
12+
13+
t := pubsubClient.Topic(topicID)
14+
15+
result := t.Publish(ctx, &pubsub.Message{
16+
Data: []byte(message),
17+
})
18+
19+
id, err := result.Get(ctx)
20+
if err != nil {
21+
log.Println("ERROR : publisher.Publish : " + err.Error())
22+
return err
23+
}
24+
25+
log.Println("INFO : publisher.Publish : Message published : " + id)
26+
return nil
27+
}

pubsubInterface/pubsubInterface.go

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)