Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POSSIBLE BUG] - Publisher get frozen when publishing without any delay #104

Open
sujit-baniya opened this issue Dec 25, 2023 · 4 comments

Comments

@sujit-baniya
Copy link

sujit-baniya commented Dec 25, 2023

I'm using BadgerDB as persistent DB and when trying to publish around 500K messages with Persistent messages for testing without any delay, it causes publisher to freeze.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log"
	"time"
	
	amqp "github.com/oarkflow/amqp/amqp091"
	
	grabbit "github.com/oarkflow/amqp"
)

func OnPubReattempting(name string, retry int) bool {
	log.Printf("callback_redo: {%s} retry count {%d}", name, retry)
	return true // want continuing
}

// OnNotifyPublish CallbackNotifyPublish
func OnNotifyPublish(confirm amqp.Confirmation, ch *grabbit.Channel) {
	log.Printf("callback: publish confirmed status [%v] from queue [%s]\n", confirm.Ack, ch.Queue())
}

// OnNotifyReturn CallbackNotifyReturn
func OnNotifyReturn(_ amqp.Return, ch *grabbit.Channel) {
	log.Printf("callback: publish returned from queue [%s]\n", ch.Queue())
}

func PublishMsg(publisher *grabbit.Publisher, start, end int) {
	message := amqp.Publishing{DeliveryMode: amqp.Persistent} // This just runs for some messages and just freezes
	// message := amqp.Publishing{}
	message.Headers = map[string]any{
		"next-queue": "I'm loving it",
	}
	data := make([]byte, 0, 64)
	buff := bytes.NewBuffer(data)
	
	for i := start; i < end; i++ {
		// <-time.After(1 * time.Millisecond)
		buff.Reset()
		buff.WriteString(fmt.Sprintf("test number %04d", i))
		message.Body = buff.Bytes()
		log.Println("going to send:", buff.String())
		
		if err := publisher.Publish(message); err != nil {
			log.Println("publishing failed with: ", err)
		}
	}
}

func main() {
	ctxMaster, ctxCancel := context.WithCancel(context.TODO())
	conn := grabbit.NewConnection("amqp://guest:guest@localhost:5672", amqp.Config{}, grabbit.WithConnectionCtx(ctxMaster))
	pubOpt := grabbit.DefaultPublisherOptions()
	pubOpt.WithKey("workload").WithContext(ctxMaster).WithConfirmationsCount(20)
	
	topos := make([]*grabbit.TopologyOptions, 0, 8)
	topos = append(topos, &grabbit.TopologyOptions{
		Name:          "workload",
		IsDestination: true,
		Durable:       true,
		Declare:       true,
	})
	publisher := grabbit.NewPublisher(conn, pubOpt,
		grabbit.WithChannelCtx(ctxMaster),
		grabbit.WithChannelTopology(topos),
		grabbit.OnChannelRecovering(OnPubReattempting),
		grabbit.OnPublishSuccess(OnNotifyPublish),
		grabbit.OnPublishFailure(OnNotifyReturn),
	)
	if !publisher.AwaitAvailable(30*time.Second, 1*time.Second) {
		log.Println("publisher not ready yet")
		ctxCancel()
		return
	}
	
	PublishMsg(publisher, 0, 500000)
}

Screenshot

Seems Channel is closed before all messages are being published

@valinurovam
Copy link
Owner

It's looke like some issue is here

log.Printf("callback: publish confirmed status [%v] from queue [%s]\n", confirm.Ack, ch.Queue())

channel.Queue method has a lock

func (ch *Channel) Queue() string {
	ch.baseChan.mu.RLock()
	defer ch.baseChan.mu.RUnlock()
	
	return ch.queue
}

If you change ch.Queue() for simple string, for example "queueName" - all is good

@valinurovam
Copy link
Owner

It looks like lock/unluck on ch.Queue is useless and make some decrease in performance

@sujit-baniya
Copy link
Author

@valinurovam Thank you for response.

So you mean, If I remove lock, it might work?

Question: Is lock necessary there? Because I don't see any operations that might cause deadlock

@valinurovam
Copy link
Owner

Looks like deadlock somewhere. If you change method like this

// Queue returns the active (as indicated by [IsDestination] option in topology options) queue name.
// Useful for finding the server assigned name.
func (ch *Channel) Queue() string {
	fmt.Println("try to get lock... ")
	ch.baseChan.mu.RLock()
	fmt.Println(" locked.")
	defer func() {
		fmt.Println(" unlocked.\n")
		ch.baseChan.mu.RUnlock()
	}()

	return ch.queue
}

Logs

try to get lock... 
2023/12/29 19:11:40 going to send: test number 1623
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
try to get lock... 
2023/12/29 19:11:40 going to send: test number 1624
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
2023/12/29 19:11:40 going to send: test number 1625
2023/12/29 19:11:40 going to send: test number 1626
2023/12/29 19:11:40 going to send: test number 1627
2023/12/29 19:11:40 going to send: test number 1628
try to get lock... 
2023/12/29 19:11:40 going to send: test number 1629
 locked.
 unlocked.

callback: publish confirmed status [true] from queue [workload]
try to get lock... 


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants