Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a way to get a list of the keys associated with a processor instance? #446

Open
oliviabarnett opened this issue Feb 9, 2024 · 2 comments

Comments

@oliviabarnett
Copy link

oliviabarnett commented Feb 9, 2024

goka newbie here --

I have a goka setup working that allows me to batch actions coming in under a certain key and submit if the length of actions in the batch exceeds 10, but I also want to add a timer functionality that runs in a loop externally and dumps the batch of actions every 30 seconds. I think I can make this happen with the use of a control message like this:

func (uq *userActionQueue) userActionsProcessing(ctx goka.Context, msg any) {
	if err != nil {
		return
	}

	batch := uq.getOrCreateBatch(ctx)

        // If a control message is identified, submit batch 
	controlMsg, ok := msg.(*ControlMessage)
	if ok && controlMsg.Action == TimedSubmitBatchAction {
		uq.submitBatch(ctx.Context(), batch)
		batch.Clear()
		ctx.SetValue(batch)
		return
	}

	event, ok := msg.(*UserActionsEvent)
	if !ok {
		return
	}
	
	// Store item, check length of batch and potentially submit
	...

I want to do something like this for the timer:

	go func() {
		ticker := time.NewTicker(3 * time.Second)
		defer ticker.Stop()
		keys := []string{"?"}
		for range ticker.C {
			for _, key := range keys {
				submitBatchEvent := ControlMessage{Action: TimedSubmitBatchAction}
				err := uq.userActionEmitter.EmitSync(key, submitBatchEvent)
				if err != nil {
					return
				}
			}

		}
	}()

but obviously, this requires me to have a list of the keys for which this processor instance is responsible. I know I can get information about the keys from a view using the group table topic, but won't that give me all of the keys across all processors? Clearly the processor knows which keys it is responsible for because it is able to load up that local state after a crash, so I am wondering if I can get access to this list somehow? But I could also be misunderstanding how this works.

@frairon
Copy link
Contributor

frairon commented Feb 12, 2024

Hey Olivia,

welcome to goka :). Since goka is quite coupled to Kafka, maybe some background on this. Kafka topics divides the data into partitions. When goka starts a new Processor, this represents a Group Consumer for Kafka, i.e. all members of it will split the partitions and Kafka will take care of that distribution. In a goka View however, we just create a consumer per partition that dump all messages to disk locally so they can be used for whatever is necessary.

So you're right, we can't know which processor-instance is responsible for which key. But if you are iterating over a View all keys will be included. If those keys are then sent via an Emitter to the Processor, they will end up in the correct partition and the processor instance responsible for this partition will process it.

What you're implementing is essentially a count-based windowing-mechanism, right? Just note that goka is currently not supporting this very well so you have to fall back to iteration-based solutions via views etc. The problem with those is that they do not scale very well (e.g. iterating over couple of million entries just takes too long) and second, that they will introduce all kinds of race conditions and duplications etc. But maybe it's fine for your usecase - just wanted to point that out.

Hope that helps, sorry for the delay!

@oliviabarnett
Copy link
Author

oliviabarnett commented Feb 12, 2024

Hey! Thanks for the response. Ok, got it -- this makes sense. The duplicate message issue is something we are trying to design around in a reliable and scalable way. For now, I think I can find a work around by having the timer live in a singleton instance, but definitely something I would want if support for if this ever does become available through goka!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants