Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding comfylite3 #31

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

davidroman0O
Copy link

Adding the comfylite3 wrapper of the famous go-sqlite3 driver which compensate the lack of goroutine support by giving the illusion of it.

Most other libraries that re-implement the entire sqlite3 driver won't support the latest features, like the recent JSON datatype, sometimes you just want it all!

No dependency added, very similar to the mysql implementation, eventually someone else would want to add a specific sqlite implementation for a pure golang version since go-sqlite3 might require CGO!

At least, it gives more options! Using sqlite3 with watermill for small side projects is very cool, especially when using an adapter pattern to switch between a real mysql to sqlite3 with the same implementation!

Nothing fancy in the code! 馃槃

@davidroman0O
Copy link
Author

davidroman0O commented Apr 19, 2024

Also here an example

package main

import (
	"context"
	"log"
	"time"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
	"github.com/ThreeDotsLabs/watermill/message"
         "github.com/davidroman0O/comfylite3"
)

func main() {

	db, err := comfylite3.Comfy(
		comfylite3.WithMemory(),
	)
	if err != nil {
		panic(err)
	}

	logger := watermill.NewStdLogger(false, false)

	subscriber, err := sql.NewSubscriber(
		db,
		sql.SubscriberConfig{
			SchemaAdapter:    sql.DefaultSQLite3Schema{},
			OffsetsAdapter:   sql.DefaultSQLite3OffsetsAdapter{},
			InitializeSchema: true,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	messages, err := subscriber.Subscribe(context.Background(), "example_topic")
	if err != nil {
		panic(err)
	}

	go process(messages)

	publisher, err := sql.NewPublisher(
		db,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultSQLite3Schema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	publishMessages(publisher)
}

func publishMessages(publisher message.Publisher) {
	for {
		msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

		if err := publisher.Publish("example_topic", msg); err != nil {
			panic(err)
		}

		time.Sleep(time.Second)
	}
}

func process(messages <-chan *message.Message) {
	for msg := range messages {
		log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

		// we need to Acknowledge that we received and processed the message,
		// otherwise, it will be resent over and over again.
		msg.Ack()
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant