New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: collect less sample_events for reporting #4615
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
||
func (groupingColumns GroupingColumns) generateHash() string { | ||
data := groupingColumns.WorkspaceID + groupingColumns.SourceDefinitionID + groupingColumns.SourceCategory + groupingColumns.SourceID + groupingColumns.DestinationDefinitionID + groupingColumns.DestinationID + groupingColumns.SourceTaskRunID + groupingColumns.SourceJobID + groupingColumns.SourceJobRunID + groupingColumns.TransformationID + groupingColumns.TransformationVersionID + groupingColumns.TrackingPlanID + strconv.Itoa(groupingColumns.TrackingPlanVersion) + groupingColumns.InPU + groupingColumns.PU + groupingColumns.Status + strconv.FormatBool(groupingColumns.TerminalState) + strconv.FormatBool(groupingColumns.InitialState) + strconv.Itoa(groupingColumns.StatusCode) + groupingColumns.EventName + groupingColumns.EventType + groupingColumns.ErrorType | ||
hash := md5.Sum([]byte(data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using an hashing algorithm like murmur3, since it faster/more efficient than md5 and only slightly worse with collisions.
} | ||
} | ||
|
||
func (es *EventSampler) IsSampleEventCollected(groupingColumns string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor]
func (es *EventSampler) IsSampleEventCollected(groupingColumns string) bool { | |
func (es *EventSampler) Exists(key string) bool { |
return exists | ||
} | ||
|
||
func (es *EventSampler) MarkSampleEventAsCollected(groupingColumns string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor]
func (es *EventSampler) MarkSampleEventAsCollected(groupingColumns string) { | |
func (es *EventSampler) Mark(groupingColumns string) { |
func (es *EventSampler) StartResetLoop() error { | ||
go func() { | ||
for { | ||
time.Sleep(es.resetDuration.Load()) | ||
es.mu.Lock() | ||
es.collectedSamples = make(map[string]bool) | ||
es.mu.Unlock() | ||
} | ||
}() | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of thoughts here:
No lifecycle control
The go routine spins on background and runs for ever. This can create go-routine and memory leaks.
Ideally you want to do the following for every exported method:
- Ability to stop the execution of the loop, usually we can do that by passing a ctx
- Ability to wait until the go routine stop
- Ability to interrupt the sleep
One way this can be achieved:
func (es *EventSampler) StartResetLoop() error { | |
go func() { | |
for { | |
time.Sleep(es.resetDuration.Load()) | |
es.mu.Lock() | |
es.collectedSamples = make(map[string]bool) | |
es.mu.Unlock() | |
} | |
}() | |
return nil | |
} | |
func (es *EventSampler) RunCleanup(ctx context.Context) error { | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-time.After(1 * time.Second): | |
es.mu.Lock() | |
es.collectedSamples = make(map[string]bool) | |
es.mu.Unlock() | |
} | |
} | |
} |
Memory allocations
The plan is to reset the map every time, but this can cause a lot memory to be dereference and GCed. However, most of the keys we are deleting are going to be added again, doing more memory allocations.
It might be better to store a time.Time instead of boolean and do two things:
- during check if exist, check if a key is expired
- you can keep the go routine to clean up keys that being a expired for a longer time
Or even better you can use the existing cachettl implementation, which clean-ups keys async during lookup.
When it comes to perfromance, it is always better to run benchmark to verify. Given the cardinality of keys we are planning to handle, it is worth considering.
This PR is considered to be stale. It has been open 20 days with no further activity thus it is going to be closed in 7 days. To avoid such a case please consider removing the stale label manually or add a comment to the PR. |
Description
Current Behaviour
sample_event
andsample_response
. On Reporting Service, we have hourly aggregates and we pick last sample_event for each group. We are collecting too many sample_events on Rudder ServerNew Behaviour
eventSamplingDuration
seconds (typically < 1 h)Analysis of sample_events we are collecting is documented in this linear ticket
Linear Ticket
https://linear.app/rudderstack/issue/OBS-461/collect-less-sample-events-on-rudder-server
completes OBS-461
Security