diff --git a/receiver/splunkenterprisereceiver/scraper.go b/receiver/splunkenterprisereceiver/scraper.go index 978411b168dd0..482c4a33fb619 100644 --- a/receiver/splunkenterprisereceiver/scraper.go +++ b/receiver/splunkenterprisereceiver/scraper.go @@ -12,6 +12,7 @@ import ( "io" "net/http" "strconv" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -52,36 +53,73 @@ func (s *splunkScraper) start(_ context.Context, h component.Host) (err error) { return nil } +// listens to the error channel and combines errors sent from different metric scrape functions, +// returning the combined error list should context timeout or a nil error value is sent in the +// channel signifying the end of a scrape cycle +func errorListener(eQueue <-chan error, eOut chan<- *scrapererror.ScrapeErrors) { + errs := &scrapererror.ScrapeErrors{} + + for err := range eQueue { + errs.Add(err) + } + eOut <- errs +} + // The big one: Describes how all scraping tasks should be performed. Part of the scraper interface func (s *splunkScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { - errs := &scrapererror.ScrapeErrors{} + var wg sync.WaitGroup + errOut := make(chan *scrapererror.ScrapeErrors) + var errs *scrapererror.ScrapeErrors now := pcommon.NewTimestampFromTime(time.Now()) - - s.scrapeLicenseUsageByIndex(ctx, now, errs) - s.scrapeAvgExecLatencyByHost(ctx, now, errs) - s.scrapeSchedulerCompletionRatioByHost(ctx, now, errs) - s.scrapeIndexerAvgRate(ctx, now, errs) - s.scrapeSchedulerRunTimeByHost(ctx, now, errs) - s.scrapeIndexerRawWriteSecondsByHost(ctx, now, errs) - s.scrapeIndexerCPUSecondsByHost(ctx, now, errs) - s.scrapeAvgIopsByHost(ctx, now, errs) - s.scrapeIndexThroughput(ctx, now, errs) - s.scrapeIndexesTotalSize(ctx, now, errs) - s.scrapeIndexesEventCount(ctx, now, errs) - s.scrapeIndexesBucketCount(ctx, now, errs) - s.scrapeIndexesRawSize(ctx, now, errs) - s.scrapeIndexesBucketEventCount(ctx, now, errs) - s.scrapeIndexesBucketHotWarmCount(ctx, now, errs) - s.scrapeIntrospectionQueues(ctx, now, errs) - s.scrapeIntrospectionQueuesBytes(ctx, now, errs) - s.scrapeIndexerPipelineQueues(ctx, now, errs) - s.scrapeBucketsSearchableStatus(ctx, now, errs) - s.scrapeIndexesBucketCountAdHoc(ctx, now, errs) + metricScrapes := []func(context.Context, pcommon.Timestamp, chan error){ + s.scrapeLicenseUsageByIndex, + s.scrapeIndexThroughput, + s.scrapeIndexesTotalSize, + s.scrapeIndexesEventCount, + s.scrapeIndexesBucketCount, + s.scrapeIndexesRawSize, + s.scrapeIndexesBucketEventCount, + s.scrapeIndexesBucketHotWarmCount, + s.scrapeIntrospectionQueues, + s.scrapeIntrospectionQueuesBytes, + s.scrapeAvgExecLatencyByHost, + s.scrapeIndexerPipelineQueues, + s.scrapeBucketsSearchableStatus, + s.scrapeIndexesBucketCountAdHoc, + s.scrapeSchedulerCompletionRatioByHost, + s.scrapeIndexerRawWriteSecondsByHost, + s.scrapeIndexerCPUSecondsByHost, + s.scrapeAvgIopsByHost, + s.scrapeSchedulerRunTimeByHost, + s.scrapeIndexerAvgRate, + } + errChan := make(chan error, len(metricScrapes)) + + go func() { + errorListener(errChan, errOut) + }() + + for _, fn := range metricScrapes { + wg.Add(1) + go func( + fn func(ctx context.Context, now pcommon.Timestamp, errs chan error), + ctx context.Context, + now pcommon.Timestamp, + errs chan error) { + // actual function body + defer wg.Done() + fn(ctx, now, errs) + }(fn, ctx, now, errChan) + } + + wg.Wait() + close(errChan) + errs = <-errOut return s.mb.Emit(), errs.Combine() } // Each metric has its own scrape function associated with it -func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkLicenseIndexUsage.Enabled || !s.splunkClient.isConfigured(typeCm) { @@ -104,20 +142,20 @@ func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcomm for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -132,7 +170,7 @@ func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcomm } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -147,7 +185,7 @@ func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcomm case "By": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkLicenseIndexUsageDataPoint(now, int64(v), indexName) @@ -155,7 +193,7 @@ func (s *splunkScraper) scrapeLicenseUsageByIndex(ctx context.Context, now pcomm } } -func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkSchedulerAvgExecutionLatency.Enabled { @@ -178,20 +216,20 @@ func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcom for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -210,7 +248,7 @@ func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcom } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -225,7 +263,7 @@ func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcom case "latency_avg_exec": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkSchedulerAvgExecutionLatencyDataPoint(now, v, host) @@ -233,7 +271,7 @@ func (s *splunkScraper) scrapeAvgExecLatencyByHost(ctx context.Context, now pcom } } -func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkIndexerAvgRate.Enabled { @@ -256,20 +294,20 @@ func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Ti for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -292,7 +330,7 @@ func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Ti } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -306,7 +344,7 @@ func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Ti case "indexer_avg_kbps": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexerAvgRateDataPoint(now, v, host) @@ -314,7 +352,7 @@ func (s *splunkScraper) scrapeIndexerAvgRate(ctx context.Context, now pcommon.Ti } } -func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkAggregationQueueRatio.Enabled { @@ -337,20 +375,20 @@ func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pco for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -374,7 +412,7 @@ func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pco } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } @@ -390,21 +428,21 @@ func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pco case "agg_queue_ratio": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkAggregationQueueRatioDataPoint(now, v, host) case "index_queue_ratio": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexerQueueRatioDataPoint(now, v, host) case "parse_queue_ratio": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkParseQueueRatioDataPoint(now, v, host) @@ -412,14 +450,14 @@ func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pco v, err := strconv.ParseInt(f.Value, 10, 64) ps = v if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkPipelineSetCountDataPoint(now, ps, host) case "typing_queue_ratio": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkTypingQueueRatioDataPoint(now, v, host) @@ -427,7 +465,7 @@ func (s *splunkScraper) scrapeIndexerPipelineQueues(ctx context.Context, now pco } } -func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkBucketsSearchableStatus.Enabled { @@ -450,20 +488,20 @@ func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now p for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -487,7 +525,7 @@ func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now p } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -507,7 +545,7 @@ func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now p v, err := strconv.ParseInt(f.Value, 10, 64) bc = v if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkBucketsSearchableStatusDataPoint(now, bc, host, searchable) @@ -515,7 +553,7 @@ func (s *splunkScraper) scrapeBucketsSearchableStatus(ctx context.Context, now p } } -func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkIndexesSize.Enabled { @@ -538,20 +576,20 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -575,7 +613,7 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -590,21 +628,21 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p case "total_size_gb": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexesSizeDataPoint(now, v, indexer) case "average_size_gb": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexesAvgSizeDataPoint(now, v, indexer) case "average_usage_perc": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexesAvgUsageDataPoint(now, v, indexer) @@ -612,7 +650,7 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p v, err := strconv.ParseInt(f.Value, 10, 64) bc = v if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexesMedianDataAgeDataPoint(now, bc, indexer) @@ -620,7 +658,7 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p v, err := strconv.ParseInt(f.Value, 10, 64) bc = v if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexesBucketCountDataPoint(now, bc, indexer) @@ -628,7 +666,7 @@ func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(ctx context.Context, now p } } -func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkSchedulerCompletionRatio.Enabled { @@ -651,20 +689,20 @@ func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -683,7 +721,7 @@ func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -698,7 +736,7 @@ func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context case "completion_ratio": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkSchedulerCompletionRatioDataPoint(now, v, host) @@ -706,7 +744,7 @@ func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(ctx context.Context } } -func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkIndexerRawWriteTime.Enabled { @@ -729,20 +767,20 @@ func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -761,7 +799,7 @@ func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -776,7 +814,7 @@ func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, case "raw_data_write_seconds": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexerRawWriteTimeDataPoint(now, v, host) @@ -784,7 +822,7 @@ func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(ctx context.Context, } } -func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkIndexerCPUTime.Enabled { @@ -807,20 +845,20 @@ func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now p for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -839,7 +877,7 @@ func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now p } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -854,7 +892,7 @@ func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now p case "service_cpu_seconds": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIndexerCPUTimeDataPoint(now, v, host) @@ -862,7 +900,7 @@ func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(ctx context.Context, now p } } -func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkIoAvgIops.Enabled { @@ -885,20 +923,20 @@ func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Tim for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -917,7 +955,7 @@ func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Tim } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -932,7 +970,7 @@ func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Tim case "iops": v, err := strconv.ParseInt(f.Value, 10, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkIoAvgIopsDataPoint(now, v, host) @@ -940,7 +978,7 @@ func (s *splunkScraper) scrapeAvgIopsByHost(ctx context.Context, now pcommon.Tim } } -func (s *splunkScraper) scrapeSchedulerRunTimeByHost(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeSchedulerRunTimeByHost(ctx context.Context, now pcommon.Timestamp, errs chan error) { // Because we have to utilize network resources for each KPI we should check that each metrics // is enabled before proceeding if !s.conf.MetricsBuilderConfig.Metrics.SplunkSchedulerAvgRunTime.Enabled { @@ -963,20 +1001,20 @@ func (s *splunkScraper) scrapeSchedulerRunTimeByHost(ctx context.Context, now pc for { req, err = s.splunkClient.createRequest(ctx, &sr) if err != nil { - errs.Add(err) + errs <- err return } res, err = s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } // if its a 204 the body will be empty because we are still waiting on search results err = unmarshallSearchReq(res, &sr) if err != nil { - errs.Add(err) + errs <- err } res.Body.Close() @@ -995,7 +1033,7 @@ func (s *splunkScraper) scrapeSchedulerRunTimeByHost(ctx context.Context, now pc } if time.Since(start) > s.conf.ControllerConfig.Timeout { - errs.Add(errMaxSearchWaitTimeExceeded) + errs <- errMaxSearchWaitTimeExceeded return } } @@ -1010,7 +1048,7 @@ func (s *splunkScraper) scrapeSchedulerRunTimeByHost(ctx context.Context, now pc case "run_time_avg": v, err := strconv.ParseFloat(f.Value, 64) if err != nil { - errs.Add(err) + errs <- err continue } s.mb.RecordSplunkSchedulerAvgRunTimeDataPoint(now, v, host) @@ -1040,7 +1078,7 @@ func unmarshallSearchReq(res *http.Response, sr *searchResponse) error { } // Scrape index throughput introspection endpoint -func (s *splunkScraper) scrapeIndexThroughput(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexThroughput(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkIndexerThroughput.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1052,26 +1090,26 @@ func (s *splunkScraper) scrapeIndexThroughput(ctx context.Context, now pcommon.T req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1081,7 +1119,7 @@ func (s *splunkScraper) scrapeIndexThroughput(ctx context.Context, now pcommon.T } // Scrape indexes extended total size -func (s *splunkScraper) scrapeIndexesTotalSize(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesTotalSize(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedTotalSize.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1092,26 +1130,26 @@ func (s *splunkScraper) scrapeIndexesTotalSize(ctx context.Context, now pcommon. req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1125,7 +1163,7 @@ func (s *splunkScraper) scrapeIndexesTotalSize(ctx context.Context, now pcommon. mb, err := strconv.ParseFloat(f.Content.TotalSize, 64) totalSize = int64(mb * 1024 * 1024) if err != nil { - errs.Add(err) + errs <- err } } @@ -1134,7 +1172,7 @@ func (s *splunkScraper) scrapeIndexesTotalSize(ctx context.Context, now pcommon. } // Scrape indexes extended total event count -func (s *splunkScraper) scrapeIndexesEventCount(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesEventCount(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedEventCount.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1146,26 +1184,26 @@ func (s *splunkScraper) scrapeIndexesEventCount(ctx context.Context, now pcommon req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1181,7 +1219,7 @@ func (s *splunkScraper) scrapeIndexesEventCount(ctx context.Context, now pcommon } // Scrape indexes extended total bucket count -func (s *splunkScraper) scrapeIndexesBucketCount(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesBucketCount(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedBucketCount.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1193,26 +1231,26 @@ func (s *splunkScraper) scrapeIndexesBucketCount(ctx context.Context, now pcommo req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1225,7 +1263,7 @@ func (s *splunkScraper) scrapeIndexesBucketCount(ctx context.Context, now pcommo if f.Content.TotalBucketCount != "" { totalBucketCount, err = strconv.ParseInt(f.Content.TotalBucketCount, 10, 64) if err != nil { - errs.Add(err) + errs <- err } } @@ -1234,7 +1272,7 @@ func (s *splunkScraper) scrapeIndexesBucketCount(ctx context.Context, now pcommo } // Scrape indexes extended raw size -func (s *splunkScraper) scrapeIndexesRawSize(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesRawSize(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedRawSize.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1246,26 +1284,26 @@ func (s *splunkScraper) scrapeIndexesRawSize(ctx context.Context, now pcommon.Ti req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1279,7 +1317,7 @@ func (s *splunkScraper) scrapeIndexesRawSize(ctx context.Context, now pcommon.Ti mb, err := strconv.ParseFloat(f.Content.TotalRawSize, 64) totalRawSize = int64(mb * 1024 * 1024) if err != nil { - errs.Add(err) + errs <- err } } s.mb.RecordSplunkDataIndexesExtendedRawSizeDataPoint(now, totalRawSize, name) @@ -1287,7 +1325,7 @@ func (s *splunkScraper) scrapeIndexesRawSize(ctx context.Context, now pcommon.Ti } // Scrape indexes extended bucket event count -func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedBucketEventCount.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1299,26 +1337,26 @@ func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now p req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1333,7 +1371,7 @@ func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now p bucketDir = "cold" bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Cold.EventCount, 10, 64) if err != nil { - errs.Add(err) + errs <- err } s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir) } @@ -1341,7 +1379,7 @@ func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now p bucketDir = "home" bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.EventCount, 10, 64) if err != nil { - errs.Add(err) + errs <- err } s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir) } @@ -1349,7 +1387,7 @@ func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now p bucketDir = "thawed" bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Thawed.EventCount, 10, 64) if err != nil { - errs.Add(err) + errs <- err } s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir) } @@ -1357,7 +1395,7 @@ func (s *splunkScraper) scrapeIndexesBucketEventCount(ctx context.Context, now p } // Scrape indexes extended bucket hot/warm count -func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkDataIndexesExtendedBucketHotCount.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1369,26 +1407,26 @@ func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1404,7 +1442,7 @@ func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now bucketHotCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.HotBucketCount, 10, 64) bucketDir = "hot" if err != nil { - errs.Add(err) + errs <- err } s.mb.RecordSplunkDataIndexesExtendedBucketHotCountDataPoint(now, bucketHotCount, name, bucketDir) } @@ -1412,7 +1450,7 @@ func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now bucketWarmCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.WarmBucketCount, 10, 64) bucketDir = "warm" if err != nil { - errs.Add(err) + errs <- err } s.mb.RecordSplunkDataIndexesExtendedBucketWarmCountDataPoint(now, bucketWarmCount, name, bucketDir) } @@ -1420,7 +1458,7 @@ func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(ctx context.Context, now } // Scrape introspection queues -func (s *splunkScraper) scrapeIntrospectionQueues(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIntrospectionQueues(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkServerIntrospectionQueuesCurrent.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1432,26 +1470,26 @@ func (s *splunkScraper) scrapeIntrospectionQueues(ctx context.Context, now pcomm req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } @@ -1468,7 +1506,7 @@ func (s *splunkScraper) scrapeIntrospectionQueues(ctx context.Context, now pcomm } // Scrape introspection queues bytes -func (s *splunkScraper) scrapeIntrospectionQueuesBytes(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { +func (s *splunkScraper) scrapeIntrospectionQueuesBytes(ctx context.Context, now pcommon.Timestamp, errs chan error) { if !s.conf.MetricsBuilderConfig.Metrics.SplunkServerIntrospectionQueuesCurrentBytes.Enabled || !s.splunkClient.isConfigured(typeIdx) { return } @@ -1480,26 +1518,26 @@ func (s *splunkScraper) scrapeIntrospectionQueuesBytes(ctx context.Context, now req, err := s.splunkClient.createAPIRequest(ctx, ept) if err != nil { - errs.Add(err) + errs <- err return } res, err := s.splunkClient.makeRequest(req) if err != nil { - errs.Add(err) + errs <- err return } defer res.Body.Close() body, err := io.ReadAll(res.Body) if err != nil { - errs.Add(err) + errs <- err return } err = json.Unmarshal(body, &it) if err != nil { - errs.Add(err) + errs <- err return } var name string