Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement pause and resume data ingest #134

Closed
wants to merge 0 commits into from

Conversation

breadchris
Copy link
Contributor

Closes #131

Currently, a new queue is made for every table that has items to process. In the case of sqs, this will not work properly unless a new queue is made for every table. An alternative approach to mitigate overhead of sqs queue deployments, would be to attach the name of the table to the queued item, deqeue, check if it belongs to a paused table, if it doesn't then process, otherwise put it back on the queue. Further clarification would be needed to address potential concerns of this approach.

Example flow:

hacked scratchdata % curl "http://localhost:8420/api/data/query?api_key=local&query=select%20*%20from%20events"                        

Catalog Error: Table with name events does not exist!
Did you mean "pg_views"?
hacked scratchdata % curl -X POST "http://localhost:8420/api/data/insert/events?api_key=local" --json '{"user": "alice", "event": "touch"}'
ok%                                                                                                                                                hacked scratchdata % curl "http://localhost:8420/api/data/query?api_key=local&query=select%20*%20from%20events"                            

[
        {"__row_id":1771673417511530496,"user":"alice","event":"touch"}
]
hacked scratchdata % curl -X POST "http://localhost:8420/api/data/pause/events?api_key=local&paused=true"                                 
ok%                                                                                                                                                
hacked scratchdata % curl -X POST "http://localhost:8420/api/data/insert/events?api_key=local" --json '{"user": "bob", "event": "touch"}'
ok%                                                                                                                                                hacked scratchdata % curl "http://localhost:8420/api/data/query?api_key=local&query=select%20*%20from%20events"           

[
        {"__row_id":1771673417511530496,"user":"alice","event":"touch"}
]
hacked scratchdata % curl -X POST "http://localhost:8420/api/data/pause/events?api_key=local&paused=false"                                 
ok%                                                                                                                                                
hacked scratchdata % curl "http://localhost:8420/api/data/query?api_key=local&query=select%20*%20from%20events"           

[
        {"__row_id":1771673417511530496,"user":"alice","event":"touch"},
        {"__row_id":1771673478022754304,"user":"bob","event":"touch"}
]

} else {
message, err := w.messageToStruct(item)
select {
case <-ctx.Done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually intentionally don't use a channel to dequeue messages, and here is why. (tell me if there is a logic error.) Suppose this sequence of events:

  1. queue.Dequeue() fetches a message from SQS
  2. We attempt to send that message to the channel, but it blocks because the worker is busy
  3. We ctrl-c the program

In this case, I think we'd lose a message. ctx.Done() will trigger, this worker will return and the program will terminate. Meanwhile we've already fetched a message from SQS, but it hasn't been consumed by anyone.

(No need to think about this now, just something to talk through!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a failure scenario. this is actually something I noticed in the code. The message should only be deleted from the queue if it is handled successfully https://chat.openai.com/share/1b1b2c58-6ee5-4b3f-b0f3-67260eb71780

The queue delete needs to be moved out of this function and is called after a successful processing https://github.com/scratchdata/scratchdata/blob/main/pkg/storage/queue/sqs/sqs.go#L75

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a lot of work just so we can a channel, doesn't it? To me, the question is whether we want consumers from the queue to have to contend with a Delete() operation in addition to Dequeue?

Other things to think about: What timeout should we set before SQS automatically re-queues an unacknowledged message? If we end up using Redis as a queue in the future (ie, BLPOP) then consuming+deleting are the same action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even in the case of redis, you would want to use BRPOPLPUSH to guarantee processing https://chat.openai.com/share/bcac4373-b4dc-4672-9f7e-c81ed984b209. I think the only way to implement this is to have an initial "acknowledgement" the item has been observed, and then an explicit "I'm done" by removing it from the queue, for example. And then in the failure case, it needs to go somewhere to be "re-acknowledged". Optimizing for durableness can be a rabbit hole, probably best to sketch out what the requirements are and pair that with some tech decisions that match those.

@poundifdef
Copy link
Contributor

Quick thought: I wonder if we could use dead-letter queues to implement a pause functionality by toggling where new messages are sent.

@breadchris
Copy link
Contributor Author

If you reject a message every time it belongs to a paused table, it will go to the back of the queue. A DLQ would be helpful if we want to keep track of paused queues that are putting pressure on the queue.

@poundifdef
Copy link
Contributor

If you reject a message every time it belongs to a paused table, it will go to the back of the queue. A DLQ would be helpful if we want to keep track of paused queues that are putting pressure on the queue.

I worry about chattiness+starvation here: re-queuing items over and over again, and consuming workers with the busywork of resending paused items.

I'm wondering if SQS is the right queue to use after all.

@breadchris
Copy link
Contributor Author

This is another YC company (also go OSS): https://hatchet.run/ they use rabbitmq currently for their queue.

@breadchris
Copy link
Contributor Author

from our conversation, we concluded that existing throughput can be handled by an SQL queue until we "feel the pain". When we hit that point, redis could be an option to explore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pause and resume data ingest
2 participants