Skip to content

Commit

Permalink
Merge pull request #2 from quadtrix/1-checkforexpiry
Browse files Browse the repository at this point in the history
queue locking redesigned
  • Loading branch information
quadtrix committed Aug 8, 2023
2 parents 7442fc8 + d245b21 commit 39b4654
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 27 deletions.
132 changes: 111 additions & 21 deletions basicqueue.go
@@ -1,4 +1,4 @@
//Package basicqueue implements very simple unicast and broadcast message queueing between goroutines
// Package basicqueue implements very simple unicast and broadcast message queueing between goroutines
package basicqueue

import (
Expand Down Expand Up @@ -80,6 +80,7 @@ type BasicQueue struct {
AESengine aesengine.AESEngine
isJsonQueue bool
locked bool
lockedBy string
maxQueueDepth int
}

Expand Down Expand Up @@ -209,20 +210,28 @@ func NewEncryptedJsonQueue(slog *servicelogger.Logger, qtype BasicQueueType, qna
}

func (bq *BasicQueue) AddMessage(identifier string, messagetext string) (err error) {
if bq.locked {
bq.slog.LogError(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", "Queue is locked")
return errors.New("queue is locked")
err = bq.waitForUnlock("AddMessage", 5*time.Second)
if err != nil {
return err
}
err = bq.setLock("AddMessage")
if err != nil {
bq.slog.LogError(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Failed to acquire lock: %s", err.Error()))
return err
}
if !bq.producerExists(identifier) {
bq.slog.LogWarn(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s, not a registered producer", identifier))
bq.unsetLock("AddMessage")
return errors.New("not a registered producer")
}
if bq.isJsonQueue {
bq.slog.LogWarn(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s. This is a JSON queue. Use method AddJsonMessage instead", identifier))
bq.unsetLock("AddMessage")
return errors.New("incorrect method, use AddJsonMessage for JSON queues")
}
if bq.maxQueueDepth > -1 && bq.messages.msgCount == bq.maxQueueDepth {
bq.slog.LogError(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s, queue is full", identifier))
bq.unsetLock("AddMessage")
return errors.New("queue is full")
}
var exptime time.Time
Expand All @@ -249,24 +258,33 @@ func (bq *BasicQueue) AddMessage(identifier string, messagetext string) (err err
bq.messages.messages = append(bq.messages.messages, msg)
bq.messages.msgCount++
bq.slog.LogTrace(fmt.Sprintf("AddMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Adding message at position %d with ID %s: %s", bq.messages.msgCount-1, msg.messageID, msg.message))
bq.unsetLock("AddMessage")
return nil
}

func (bq *BasicQueue) AddJsonMessage(identifier string, source string, destination string, msgtype string, messagetext string) (err error) {
if bq.locked {
bq.slog.LogError(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", "Queue is locked")
return errors.New("queue is locked")
err = bq.waitForUnlock("AddJsonMessage", 5*time.Second)
if err != nil {
return err
}
err = bq.setLock("AddJsonMessage")
if err != nil {
bq.slog.LogError(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Failed to acquire lock: %s", err.Error()))
return err
}
if !bq.producerExists(identifier) {
bq.slog.LogWarn(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s (%s), not a registered producer", source, identifier))
bq.unsetLock("AddJsonMessage")
return errors.New("not a registered producer")
}
if !bq.isJsonQueue {
bq.slog.LogWarn(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s (%s). This is not a JSON queue. Use method AddMessage instead", source, identifier))
bq.unsetLock("AddJsonMessage")
return errors.New("incorrect method, use AddMessage")
}
if bq.maxQueueDepth > -1 && bq.messages.msgCount == bq.maxQueueDepth {
bq.slog.LogError(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting message from %s (%s). Queue is full", source, identifier))
bq.unsetLock("AddJsonMessage")
return errors.New("queue is full")
}
jms := JSonQueueMessage{
Expand Down Expand Up @@ -294,6 +312,7 @@ func (bq *BasicQueue) AddJsonMessage(identifier string, source string, destinati
}
marshaled, err := json.Marshal(jms)
if err != nil {
bq.unsetLock("AddJsonMessage")
return err
}
msg := QueueMessage{
Expand All @@ -306,6 +325,7 @@ func (bq *BasicQueue) AddJsonMessage(identifier string, source string, destinati
bq.messages.messages = append(bq.messages.messages, msg)
bq.messages.msgCount++
bq.slog.LogTrace(fmt.Sprintf("AddJsonMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Adding message from %s to %s (type %s) at position %d with ID %s", jms.Source, jms.Destination, jms.MessageType, bq.messages.msgCount-1, msg.messageID))
bq.unsetLock("AddJsonMessage")
return nil
}

Expand Down Expand Up @@ -343,6 +363,16 @@ func (bq *BasicQueue) PollWithHistory(identifier string, messageIDHistory []stri
}

func (bq *BasicQueue) removeMessage(index int) {
err := bq.waitForUnlock("removeMessage", 5*time.Second)
if err != nil {
bq.slog.LogError(fmt.Sprintf("removeMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Unlock timeout: %s", err.Error()))
return
}
err = bq.setLock("removeMessage")
if err != nil {
bq.slog.LogError(fmt.Sprintf("removeMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Failed to acquire lock: %s", err.Error()))
return
}
if bq.messages.msgCount > index {
bq.slog.LogTrace(fmt.Sprintf("removeMessage.%s", bq.qname), "basicqueue", fmt.Sprintf("Removing message %s (%d)", bq.messages.messages[index].messageID, index))
if index == 0 {
Expand All @@ -352,6 +382,7 @@ func (bq *BasicQueue) removeMessage(index int) {
bq.messages.messages = []QueueMessage{}
}
bq.messages.msgCount = len(bq.messages.messages)
bq.unsetLock("removeMessage")
return
}
if index == bq.messages.msgCount-1 {
Expand All @@ -361,6 +392,7 @@ func (bq *BasicQueue) removeMessage(index int) {
bq.messages.messages = append(bq.messages.messages[:index-1], bq.messages.messages[index+1:]...)
}
bq.messages.msgCount = len(bq.messages.messages)
bq.unsetLock("removeMessage")
}
}

Expand All @@ -374,21 +406,79 @@ func (bq *BasicQueue) loopExpiryCheck() {
}
}

func (bq *BasicQueue) waitForUnlock(caller string, timeout time.Duration) error {
timer_start := time.Now()
if !bq.locked {
return nil
} else {
bq.slog.LogTrace(fmt.Sprintf("waitForUnlock.%s", bq.qname), "basicqueue", fmt.Sprintf("Queue is locked (owner: %s), started wait, timeout: %ds", bq.lockedBy, (int)(timeout.Seconds())))

for bq.locked {
if time.Since(timer_start) > timeout {
bq.slog.LogError(fmt.Sprintf("%s(waitForUnlock)", caller), "basicqueue", "Timed out waiting for queue to unlock")
return errors.New("timed out waiting for queue to unlock")
}
time.Sleep(time.Second)
}
}
bq.slog.LogTrace(fmt.Sprintf("waitForUnlock.%s", bq.qname), "basicqueue", fmt.Sprintf("Lock was released in : %dms", (int)(time.Since(timer_start).Milliseconds())))
return nil
}

func (bq *BasicQueue) setLock(caller string) error {
if bq.locked {
return fmt.Errorf("queue is already locked, owned by %s", bq.lockedBy)
}
bq.locked = true
bq.lockedBy = caller
bq.slog.LogTrace(fmt.Sprintf("%s.%s", caller, bq.qname), "basicqueue", fmt.Sprintf("Locked queue by %s", caller))
return nil
}

func (bq *BasicQueue) unsetLock(caller string) error {
if !bq.locked {
return fmt.Errorf("queue is not locked")
}
if bq.lockedBy != caller {
return fmt.Errorf("not lock owner")
}
bq.locked = false
bq.lockedBy = ""
bq.slog.LogTrace(fmt.Sprintf("%s.%s", caller, bq.qname), "basicqueue", "Unlocked queue")
return nil
}

func (bq *BasicQueue) checkForExpiry() {
err := bq.waitForUnlock("checkForExpiry", 5*time.Second)
if err != nil {
bq.slog.LogError(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Queue is still locked: %s", err.Error()))
return
}
if bq.messages.msgCount > 0 {
bq.locked = true
err = bq.setLock("checkForExpiry")
if err != nil {
bq.slog.LogError(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Failed to acquire lock: %s", err.Error()))
return
}
//bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Starting message expiration check for queue %s", bq.qname))
for i := bq.messages.msgCount - 1; i >= 0; i-- {
if bq.messages.messages[i].expires {
//bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Message %s expires at %s", bq.messages.messages[i].messageID, bq.messages.messages[i].expiration.Format("2006-01-02 15:04:05")))
if time.Now().After(bq.messages.messages[i].expiration) {
// Expired message, pop it from the queue
//bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Message %s (%d) has expired. Removing it from queue %s", bq.messages.messages[i].messageID, i, bq.qname))
bq.removeMessage(i)
if bq.messages.msgCount <= i {
bq.slog.LogError(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", "queue lock not respected, queue is altered while locked")
} else {
if bq.messages.messages[i].expires {
//bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Message %s expires at %s", bq.messages.messages[i].messageID, bq.messages.messages[i].expiration.Format("2006-01-02 15:04:05")))
if time.Now().After(bq.messages.messages[i].expiration) {
// Expired message, pop it from the queue
//bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", fmt.Sprintf("Message %s (%d) has expired. Removing it from queue %s", bq.messages.messages[i].messageID, i, bq.qname))
bq.unsetLock("checkForExpiry")
bq.removeMessage(i)
bq.setLock("checkForExpiry")
}
}
}
}
bq.locked = false
bq.unsetLock("checkForExpiry")
return
}
bq.slog.LogTrace(fmt.Sprintf("checkForExpiry.%s", bq.qname), "basicqueue", "No messages to check")
}
Expand Down Expand Up @@ -471,7 +561,7 @@ func (bq *BasicQueue) readSpecificJsonMessage(index int) (jqm JSonQueueMessage,
}
return jqm, err
}
return jqm, errors.New(fmt.Sprintf("message index out of bounds [%d] with size [%d]", index, bq.messages.msgCount))
return jqm, fmt.Errorf("message index out of bounds [%d] with size [%d]", index, bq.messages.msgCount)
}

func (bq *BasicQueue) readSpecificMessage(index int) (msgtext string, msgid string, err error) {
Expand All @@ -495,7 +585,7 @@ func (bq *BasicQueue) readSpecificMessage(index int) (msgtext string, msgid stri
}
return msgtext, msgid, nil
}
return "", "", errors.New(fmt.Sprintf("basicqueue.readSpecificMessage.%s: queue index out of bounds %d>%d", bq.qname, index, bq.messages.msgCount))
return "", "", fmt.Errorf("basicqueue.readSpecificMessage.%s: queue index out of bounds %d>%d", bq.qname, index, bq.messages.msgCount)
}

func (bq *BasicQueue) ReadSpecificJsonMessage(messageid string) (jqm JSonQueueMessage, err error) {
Expand All @@ -516,7 +606,7 @@ func (bq *BasicQueue) Read(identifier string) (msg string, err error) {
bq.slog.LogWarn(fmt.Sprintf("Read.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting read from %s, not a registered consumer", identifier))
}
if bq.messages.msgCount == 0 {
return "", errors.New(fmt.Sprintf("[Read.%s] nl.quadtrix.delta.basicqueue no messages in queue", bq.qname))
return "", fmt.Errorf("[Read.%s] nl.quadtrix.delta.basicqueue no messages in queue", bq.qname)
}
return bq.readFirstMessage(), nil
}
Expand All @@ -543,22 +633,22 @@ func (bq *BasicQueue) ReadJsonWithHistory(identifier string, messageIDHistory []
return bq.readSpecificJsonMessage(index)
}
}
return jqm, errors.New("No unread messages in queue")
return jqm, errors.New("no unread messages in queue")
}

func (bq *BasicQueue) ReadWithHistory(identifier string, messageIDHistory []string) (msg string, msgid string, err error) {
if !bq.consumerExists(identifier) {
bq.slog.LogWarn(fmt.Sprintf("ReadWithHistory.%s", bq.qname), "basicqueue", fmt.Sprintf("Rejecting read from %s, not a registered consumer", identifier))
}
if bq.messages.msgCount == 0 {
return "", "", errors.New(fmt.Sprintf("[ReadWithHistory.%s] nl.quadtrix.delta.basicqueue no messages in queue", bq.qname))
return "", "", fmt.Errorf("[ReadWithHistory.%s] nl.quadtrix.delta.basicqueue no messages in queue", bq.qname)
}
for index, message := range bq.messages.messages {
if !bq.isInHistory(message.messageID, messageIDHistory) {
return bq.readSpecificMessage(index)
}
}
return "", "", errors.New(fmt.Sprintf("basicqueue.ReadWithHistory.%s: No unread messages in queue", bq.qname))
return "", "", fmt.Errorf("basicqueue.ReadWithHistory.%s: No unread messages in queue", bq.qname)
}

func (bq BasicQueue) QStats(identifier string) (msgcount int, numproducers int, numconsumers int, encrypted bool, err error) {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -4,6 +4,6 @@ go 1.20

require (
github.com/google/uuid v1.3.0
github.com/quadtrix/aesengine v0.0.0-20220907055548-598d44c87ea8
github.com/quadtrix/servicelogger v0.0.0-20221003131534-7ba9cadd9093
github.com/quadtrix/aesengine v0.0.0-20230802091001-7827ba49b67d
github.com/quadtrix/servicelogger v1.0.1-rc2
)
8 changes: 4 additions & 4 deletions go.sum
@@ -1,6 +1,6 @@
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/quadtrix/aesengine v0.0.0-20220907055548-598d44c87ea8 h1:odhv8xDihZS1YTDq9c+IlSxxCJx590A/d8rVv0ENZvs=
github.com/quadtrix/aesengine v0.0.0-20220907055548-598d44c87ea8/go.mod h1:b5iN1GjSIoU+ztXQGk/K1BQR08pI2F4xdRHsMiMd6WY=
github.com/quadtrix/servicelogger v0.0.0-20221003131534-7ba9cadd9093 h1:4U0RPrNTS3LkVD6m1xz9pmQphv2HRisXLN/12I4pazg=
github.com/quadtrix/servicelogger v0.0.0-20221003131534-7ba9cadd9093/go.mod h1:ITv9L1MAFwu6E+sv6rrAGTzoKD3NRX7Qtcbui9g5QiM=
github.com/quadtrix/aesengine v0.0.0-20230802091001-7827ba49b67d h1:xNo0macBVLMWCAAPRblR4Ai3Xz8iAoNf522Y32pkXCE=
github.com/quadtrix/aesengine v0.0.0-20230802091001-7827ba49b67d/go.mod h1:MTYiXwLJz46KqVSTWTCCw2o19FVwQqjhNm6G9TwEbMI=
github.com/quadtrix/servicelogger v1.0.1-rc2 h1:aLULdGhkjshe7CNs/3ZFiqGEYHUSRx9vyo0LtmUZcKA=
github.com/quadtrix/servicelogger v1.0.1-rc2/go.mod h1:Flopvk6JG16uUTEUMkHxNOaM9Yi0xyqAL/bl4Yf9Odc=

0 comments on commit 39b4654

Please sign in to comment.