1
1
use std:: sync:: { Arc , LazyLock } ;
2
- use std:: { num :: NonZeroUsize , time:: Duration } ;
2
+ use std:: time:: Duration ;
3
3
4
4
use async_trait:: async_trait;
5
5
use bytes:: { Buf , BufMut } ;
6
6
use bytesize:: ByteSize ;
7
7
use memory_accounting:: { MemoryBounds , MemoryBoundsBuilder , UsageExpr } ;
8
8
use metrics:: { Counter , Gauge , Histogram } ;
9
9
use saluki_config:: GenericConfiguration ;
10
- use saluki_context:: { ContextResolver , ContextResolverBuilder } ;
11
10
use saluki_core:: {
12
11
components:: { sources:: * , ComponentContext } ,
13
12
observability:: ComponentMetricsExt as _,
@@ -20,7 +19,7 @@ use saluki_core::{
20
19
} ,
21
20
} ;
22
21
use saluki_env:: WorkloadProvider ;
23
- use saluki_error:: { generic_error , GenericError } ;
22
+ use saluki_error:: { ErrorContext as _ , GenericError } ;
24
23
use saluki_event:: metric:: { MetricMetadata , MetricOrigin } ;
25
24
use saluki_event:: { metric:: Metric , DataType , Event } ;
26
25
use saluki_io:: {
@@ -51,9 +50,13 @@ use self::framer::{get_framer, DsdFramer};
51
50
52
51
mod filters;
53
52
use self :: filters:: { is_event_allowed, is_metric_allowed, is_service_check_allowed, EnablePayloadsFilter , Filter } ;
53
+
54
54
mod origin;
55
55
use self :: origin:: { origin_from_metric_packet, DogStatsDOriginTagResolver , OriginEnrichmentConfiguration } ;
56
56
57
+ mod resolver;
58
+ use self :: resolver:: ContextResolvers ;
59
+
57
60
#[ derive( Debug , Snafu ) ]
58
61
#[ snafu( context( suffix( false ) ) ) ]
59
62
enum Error {
@@ -322,16 +325,8 @@ impl SourceBuilder for DogStatsDConfiguration {
322
325
. workload_provider
323
326
. clone ( )
324
327
. map ( |provider| DogStatsDOriginTagResolver :: new ( self . origin_enrichment . clone ( ) , provider) ) ;
325
- let context_string_interner_size = NonZeroUsize :: new ( self . context_string_interner_bytes . as_u64 ( ) as usize )
326
- . ok_or_else ( || generic_error ! ( "context_string_interner_size must be greater than 0" ) ) ?;
327
- let context_resolver = ContextResolverBuilder :: from_name ( "dogstatsd" )
328
- . expect ( "resolver name is not empty" )
329
- . with_interner_capacity_bytes ( context_string_interner_size)
330
- . with_idle_context_expiration ( Duration :: from_secs ( 30 ) )
331
- . with_expiration_interval ( Duration :: from_secs ( 1 ) )
332
- . with_heap_allocations ( self . allow_context_heap_allocations )
333
- . with_origin_tags_resolver ( maybe_origin_tags_resolver)
334
- . build ( ) ;
328
+ let context_resolvers = ContextResolvers :: new ( self , maybe_origin_tags_resolver)
329
+ . error_context ( "Failed to create context resolvers." ) ?;
335
330
336
331
let codec_config = DogstatsdCodecConfiguration :: default ( )
337
332
. with_timestamps ( self . no_aggregation_pipeline_support )
@@ -351,7 +346,7 @@ impl SourceBuilder for DogStatsDConfiguration {
351
346
FixedSizeVec :: with_capacity ( get_adjusted_buffer_size ( self . buffer_size ) )
352
347
} ) ,
353
348
codec,
354
- context_resolver ,
349
+ context_resolvers ,
355
350
pre_filters : Arc :: new ( vec ! [ Box :: new( enable_payloads_filter) ] ) ,
356
351
} ) )
357
352
}
@@ -394,7 +389,7 @@ pub struct DogStatsD {
394
389
listeners : Vec < Listener > ,
395
390
io_buffer_pool : FixedSizeObjectPool < BytesBuffer > ,
396
391
codec : DogstatsdCodec ,
397
- context_resolver : ContextResolver ,
392
+ context_resolvers : ContextResolvers ,
398
393
pre_filters : Arc < Vec < Box < dyn Filter + Send + Sync > > > ,
399
394
}
400
395
@@ -403,7 +398,7 @@ struct ListenerContext {
403
398
listener : Listener ,
404
399
io_buffer_pool : FixedSizeObjectPool < BytesBuffer > ,
405
400
codec : DogstatsdCodec ,
406
- context_resolver : ContextResolver ,
401
+ context_resolvers : ContextResolvers ,
407
402
}
408
403
409
404
struct HandlerContext {
@@ -412,7 +407,7 @@ struct HandlerContext {
412
407
codec : DogstatsdCodec ,
413
408
io_buffer_pool : FixedSizeObjectPool < BytesBuffer > ,
414
409
metrics : Metrics ,
415
- context_resolver : ContextResolver ,
410
+ context_resolvers : ContextResolvers ,
416
411
}
417
412
418
413
struct Metrics {
@@ -575,7 +570,7 @@ impl Source for DogStatsD {
575
570
listener,
576
571
io_buffer_pool : self . io_buffer_pool . clone ( ) ,
577
572
codec : self . codec . clone ( ) ,
578
- context_resolver : self . context_resolver . clone ( ) ,
573
+ context_resolvers : self . context_resolvers . clone ( ) ,
579
574
} ;
580
575
581
576
spawn_traced ( process_listener (
@@ -620,7 +615,7 @@ async fn process_listener(
620
615
mut listener,
621
616
io_buffer_pool,
622
617
codec,
623
- context_resolver ,
618
+ context_resolvers ,
624
619
} = listener_context;
625
620
tokio:: pin!( shutdown_handle) ;
626
621
@@ -645,7 +640,7 @@ async fn process_listener(
645
640
codec: codec. clone( ) ,
646
641
io_buffer_pool: io_buffer_pool. clone( ) ,
647
642
metrics: build_metrics( & listen_addr, source_context. component_context( ) ) ,
648
- context_resolver : context_resolver . clone( ) ,
643
+ context_resolvers : context_resolvers . clone( ) ,
649
644
} ;
650
645
spawn_traced( process_stream( stream, source_context. clone( ) , handler_context, stream_shutdown_coordinator. register( ) , filters. clone( ) ) ) ;
651
646
}
@@ -686,7 +681,7 @@ async fn drive_stream(
686
681
codec,
687
682
io_buffer_pool,
688
683
metrics,
689
- mut context_resolver ,
684
+ mut context_resolvers ,
690
685
} = handler_context;
691
686
692
687
debug ! ( %listen_addr, "Stream handler started." ) ;
@@ -767,7 +762,7 @@ async fn drive_stream(
767
762
match frames. next( ) {
768
763
Some ( Ok ( frame) ) => {
769
764
trace!( %listen_addr, %peer_addr, ?frame, "Decoded frame." ) ;
770
- match handle_frame( & frame[ ..] , & codec, & mut context_resolver , & metrics, & peer_addr, filters. clone( ) ) {
765
+ match handle_frame( & frame[ ..] , & codec, & mut context_resolvers , & metrics, & peer_addr, filters. clone( ) ) {
771
766
Ok ( Some ( event) ) => {
772
767
if let Some ( ( event, event_buffer) ) = event_buffer_manager. try_push( event) . await {
773
768
debug!( %listen_addr, %peer_addr, "Event buffer is full. Forwarding events." ) ;
@@ -847,7 +842,7 @@ async fn drive_stream(
847
842
}
848
843
849
844
fn handle_frame (
850
- frame : & [ u8 ] , codec : & DogstatsdCodec , context_resolver : & mut ContextResolver , source_metrics : & Metrics ,
845
+ frame : & [ u8 ] , codec : & DogstatsdCodec , context_resolvers : & mut ContextResolvers , source_metrics : & Metrics ,
851
846
peer_addr : & ConnectionAddress , filters : Arc < Vec < Box < dyn Filter + Send + Sync > > > ,
852
847
) -> Result < Option < Event > , ParseError > {
853
848
let parsed = match codec. decode_packet ( frame) {
@@ -875,7 +870,7 @@ fn handle_frame(
875
870
return Ok ( None ) ;
876
871
}
877
872
878
- match handle_metric_packet ( metric_packet, context_resolver , peer_addr) {
873
+ match handle_metric_packet ( metric_packet, context_resolvers , peer_addr) {
879
874
Some ( metric) => {
880
875
source_metrics. metrics_received ( ) . increment ( events_len) ;
881
876
Event :: Metric ( metric)
@@ -915,14 +910,21 @@ fn handle_frame(
915
910
}
916
911
917
912
fn handle_metric_packet (
918
- packet : MetricPacket , context_resolver : & mut ContextResolver , peer_addr : & ConnectionAddress ,
913
+ packet : MetricPacket , context_resolvers : & mut ContextResolvers , peer_addr : & ConnectionAddress ,
919
914
) -> Option < Metric > {
920
915
// Capture the origin from the packet, including any process ID information if we have it.
921
916
let mut origin = origin_from_metric_packet ( & packet) ;
922
917
if let ConnectionAddress :: ProcessLike ( Some ( creds) ) = & peer_addr {
923
918
origin. set_process_id ( creds. pid as u32 ) ;
924
919
}
925
920
921
+ // Choose the right context resolver based on whether or not this metric is pre-aggregated.
922
+ let context_resolver = if packet. timestamp . is_some ( ) {
923
+ context_resolvers. no_agg ( )
924
+ } else {
925
+ context_resolvers. primary ( )
926
+ } ;
927
+
926
928
// Try to resolve the context for this metric.
927
929
match context_resolver. resolve ( packet. metric_name , packet. tags . clone ( ) , Some ( origin) ) {
928
930
Some ( context) => {
@@ -1012,7 +1014,7 @@ mod tests {
1012
1014
net:: ConnectionAddress ,
1013
1015
} ;
1014
1016
1015
- use super :: handle_metric_packet;
1017
+ use super :: { handle_metric_packet, ContextResolvers } ;
1016
1018
1017
1019
#[ test]
1018
1020
fn no_metrics_when_interner_full_allocations_disallowed ( ) {
@@ -1024,7 +1026,8 @@ mod tests {
1024
1026
// We set our metric name to be longer than 31 bytes (the inlining limit) to ensure this.
1025
1027
1026
1028
let codec = DogstatsdCodec :: from_configuration ( DogstatsdCodecConfiguration :: default ( ) ) ;
1027
- let mut context_resolver = ContextResolverBuilder :: for_tests ( ) . with_heap_allocations ( false ) . build ( ) ;
1029
+ let context_resolver = ContextResolverBuilder :: for_tests ( ) . with_heap_allocations ( false ) . build ( ) ;
1030
+ let mut context_resolvers = ContextResolvers :: manual ( context_resolver. clone ( ) , context_resolver) ;
1028
1031
let peer_addr = ConnectionAddress :: from ( "1.1.1.1:1234" . parse :: < SocketAddr > ( ) . unwrap ( ) ) ;
1029
1032
1030
1033
let input = "big_metric_name_that_cant_possibly_be_inlined:1|c|#tag1:value1,tag2:value2,tag3:value3" ;
@@ -1033,7 +1036,7 @@ mod tests {
1033
1036
panic ! ( "Failed to parse packet." ) ;
1034
1037
} ;
1035
1038
1036
- let maybe_metric = handle_metric_packet ( packet, & mut context_resolver , & peer_addr) ;
1039
+ let maybe_metric = handle_metric_packet ( packet, & mut context_resolvers , & peer_addr) ;
1037
1040
assert ! ( maybe_metric. is_none( ) ) ;
1038
1041
}
1039
1042
}
0 commit comments