Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to fill up local cache with messages from a custom offset on start up? #425

Open
onkarbanerjee opened this issue Jul 6, 2023 · 1 comment

Comments

@onkarbanerjee
Copy link

We are using goka to join event streams and it has been running for over two years now. So everytime the pod is restarted, the goka processor starts backfilling with two year old data. Now this is causing an issue with huge memory requirements for the service using the goka processor and also we do not need the two year old data. How can I configure the goka processor to start backfilling from custom offset or timestamp?

@frairon
Copy link
Contributor

frairon commented Jul 24, 2023

Hi @onkarbanerjee,

sorry for late reply! Currently there is no way to configure a goka-processor to stark backfilling from other than the beginning. This is intentional to guarantee integrity of the data. If you wanted to remove old entries in the processor table, then those entries should be deleted by the processor itself using ctx.Delete(). But how to do that is a different story :).

Usually, a processor is not meant to recover (backfill) all data from its table-topic. Instead, the table is persisted on disk and only recovers the changes since the last run. If you have only one instance, there wouldn't be anything to recover after a restart.
I'm guessing you're running on kubernetes, so consider the following points:

  • use statefulsets with persistent disks instead of deployments
  • configure a storage directory (default directory for goak is /tmp, which is not persisted)
  • do not use memory-based storage

How to configure a storage directory

When you initialize your processor you will have to configure the storage layer one way or another. Otherwise the data ends up in /tmp/goka, which is not the right place in almost all cases. To get the default storage, with just the folder changed, do

	goka.NewProcessor(
		[]string{"localhost:9092"}, // brokers
		goka.DefineGroup(
			"group",
			/* edges ...*/
		),
		goka.WithStorageBuilder(storage.DefaultBuilder("/path/to/persisted/directory")),
	)

As said, this is required in most cases for initializing processors or views and the documentation is probably lacking that info a bit :)

Let us know if that fixes the issue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants