Skip to content

Commit

Permalink
[GOBBLIN-2053] Add header and fix prefix configs opentelemetry (#3933)
Browse files Browse the repository at this point in the history
Add header and fix prefix configs opentelemetry
  • Loading branch information
Will-Lo committed Apr 25, 2024
1 parent 77f9d38 commit df9a0c1
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,18 +905,18 @@ public class ConfigurationKeys {
public static final String METRICS_CUSTOM_BUILDERS = METRICS_CONFIGURATIONS_PREFIX + "reporting.custom.builders";

// Opentelemetry based metrics reporting
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED =
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemtry.metrics.enabled";
public static final String METRICS_REPORTING_OPENTELEMETRY_PREFIX = "metrics.reporting.opentelemetry.";
public static final String METRICS_REPORTING_OPENTELEMETRY_ENABLED = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "enabled";

public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.configs";
public static final String METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "configs.";
public static final Boolean DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED = false;

public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT =
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.endpoint";
public static final String METRICS_REPORTING_OPENTELEMETRY_ENDPOINT = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "endpoint";

public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS =
METRICS_CONFIGURATIONS_PREFIX + "reporting.opentelemetry.interval.millis";
// Headers to add to the OpenTelemetry HTTP Exporter, formatted as a JSON String with string keys and values
public static final String METRICS_REPORTING_OPENTELEMETRY_HEADERS = METRICS_REPORTING_OPENTELEMETRY_PREFIX + "headers";

public static final String METRICS_REPORTING_OPENTELEMETRY_INTERVAL_MILLIS = METRICS_CONFIGURATIONS_PREFIX + "interval.millis";

/**
* Rest server configuration properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static PathFilter instantiatePathFilter(Properties props) {
try {
Class<?> pathFilterClass = Class.forName(props.getProperty(PATH_FILTER_KEY));
return (PathFilter) GobblinConstructorUtils.invokeLongestConstructor(pathFilterClass,
PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(props, CONFIGURATION_KEY_PREFIX));
PropertiesUtils.extractChildProperties(props, CONFIGURATION_KEY_PREFIX));
} catch (ReflectiveOperationException exception) {
throw new RuntimeException(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.gobblin.metrics;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import com.google.common.base.Optional;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -32,6 +35,7 @@
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
Expand All @@ -40,20 +44,32 @@
* A metrics reporter wrapper that uses the OpenTelemetry standard to emit metrics
* Currently separated from the legacy codehale metrics as we need to maintain backwards compatibility, but eventually
* can replace the old metrics system with tighter integrations once it's stable
* Defaults to using the HTTP exporter where it expects an endpoint and optional headers in JSON string format
*/

@Slf4j
public class OpenTelemetryMetrics extends OpenTelemetryMetricsBase {

private static OpenTelemetryMetrics GLOBAL_INSTANCE;
private static final Long DEFAULT_OPENTELEMETRY_REPORTING_INTERVAL_MILLIS = 10000L;

private OpenTelemetryMetrics(State state) {
super(state);
}

@Override
protected MetricExporter initializeMetricExporter(State state) {
Preconditions.checkArgument(state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT),
"OpenTelemetry endpoint must be provided");
OtlpHttpMetricExporterBuilder httpExporterBuilder = OtlpHttpMetricExporter.builder();
httpExporterBuilder.setEndpoint(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT));

if (state.contains(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS)) {
Map<String, String> headers = parseHttpHeaders(state.getProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_HEADERS));
for (Map.Entry<String, String> header : headers.entrySet()) {
httpExporterBuilder.addHeader(header.getKey(), header.getValue());
}
}
return httpExporterBuilder.build();
}

Expand All @@ -67,8 +83,9 @@ public static OpenTelemetryMetrics getInstance(State state) {

@Override
protected void initialize(State state) {
Properties metricProps = PropertiesUtils.extractPropertiesWithPrefix(state.getProperties(), Optional.of(
ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS));
log.info("Initializing OpenTelemetry metrics");
Properties metricProps = PropertiesUtils.extractChildProperties(state.getProperties(),
ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_CONFIGS_PREFIX);
AttributesBuilder attributesBuilder = Attributes.builder();
for (String key : metricProps.stringPropertyNames()) {
attributesBuilder.put(AttributeKey.stringKey(key), metricProps.getProperty(key));
Expand All @@ -87,4 +104,15 @@ protected void initialize(State state) {

this.openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).buildAndRegisterGlobal();
}

protected static Map<String, String> parseHttpHeaders(String headersString) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(headersString, HashMap.class);
} catch (Exception e) {
String errMsg = "Failed to parse headers: " + headersString;
log.error(errMsg, e);
throw new RuntimeException(errMsg);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.metrics;

import java.util.Map;

import org.testng.Assert;
import org.testng.annotations.Test;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;


public class OpenTelemetryMetricsTest {

@Test
public void testInitializeOpenTelemetryFailsWithoutEndpoint() {
State opentelemetryState = new State();
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, "true");
Assert.assertThrows(IllegalArgumentException.class, () -> {
OpenTelemetryMetrics.getInstance(opentelemetryState);
});
}

@Test
public void testInitializeOpenTelemetrySucceedsWithEndpoint() {
State opentelemetryState = new State();
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED, "true");
opentelemetryState.setProp(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENDPOINT, "http://localhost:4317");
// Should not throw an exception
OpenTelemetryMetrics.getInstance(opentelemetryState);
Assert.assertTrue(true);
}

@Test
public void testHeadersParseCorrectly() {
Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders(
"{\"Content-Type\":\"application/x-protobuf\",\"headerTag\":\"tag1:value1,tag2:value2\"}");
Assert.assertEquals(headers.size(), 2);
Assert.assertEquals(headers.get("Content-Type"), "application/x-protobuf");
Assert.assertEquals(headers.get("headerTag"), "tag1:value1,tag2:value2");
}

@Test
void testHeadersParseNull() {
Map<String, String> headers = OpenTelemetryMetrics.parseHttpHeaders("{}");
Assert.assertEquals(headers.size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ public abstract class GaaSObservabilityEventProducer implements Closeable {
public static final String GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS_KEY = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "class.name";
public static final String DEFAULT_GAAS_OBSERVABILITY_EVENT_PRODUCER_CLASS = NoopGaaSObservabilityEventProducer.class.getName();
public static final String ISSUES_READ_FAILED_METRIC_NAME = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "getIssuesFailedCount";
public static final String GAAS_OBSERVABILITY_METRICS_PREFIX = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics.";
public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = "gaas.observability.jobStatus";
public static final String GAAS_OBSERVABILITY_GROUP_NAME = GAAS_OBSERVABILITY_METRICS_PREFIX + "groupName";
public static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME = GAAS_OBSERVABILITY_EVENT_PRODUCER_PREFIX + "metrics";
public static final String GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME = "jobStatus";

protected MetricContext metricContext;
protected State state;
Expand Down Expand Up @@ -99,17 +98,18 @@ protected OpenTelemetryMetricsBase getOpentelemetryMetrics(State state) {
private void setupMetrics(State state) {
this.opentelemetryMetrics = getOpentelemetryMetrics(state);
if (this.opentelemetryMetrics != null) {
this.jobStatusMetric = this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
this.jobStatusMetric = this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.gaugeBuilder(GAAS_OBSERVABILITY_JOB_STATUS_METRIC_NAME)
.ofLongs()
.buildObserver();
this.opentelemetryMetrics.getMeter(state.getProp(GAAS_OBSERVABILITY_GROUP_NAME))
this.opentelemetryMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
.batchCallback(() -> {
for (GaaSObservabilityEventExperimental event : this.eventCollector) {
Attributes tags = getEventAttributes(event);
int status = event.getJobStatus() != JobStatus.SUCCEEDED ? 1 : 0;
this.jobStatusMetric.record(status, tags);
}
log.info("Submitted {} job status events", this.eventCollector.size());
// Empty the list of events as they are all emitted at this point.
this.eventCollector.clear();
}, this.jobStatusMetric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void testMockProduceMetrics() throws Exception {
// Check number of meters
Assert.assertEquals(metrics.size(), 1);
Map<String, MetricData > metricsByName = metrics.stream().collect(Collectors.toMap(metric -> metric.getName(), metricData -> metricData));
MetricData jobStatusMetric = metricsByName.get("gaas.observability.jobStatus");
MetricData jobStatusMetric = metricsByName.get("jobStatus");
// Check the attributes of the metrics
List<LongPointData> datapoints = jobStatusMetric.getLongGaugeData().getPoints().stream().collect(Collectors.toList());
Assert.assertEquals(datapoints.size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public static Properties extractPropertiesWithPrefix(Properties properties, Opti
* @param prefix of keys to be extracted
* @return a {@link Properties} instance
*/
public static Properties extractPropertiesWithPrefixAfterRemovingPrefix(Properties properties, String prefix) {
public static Properties extractChildProperties(Properties properties, String prefix) {
Preconditions.checkNotNull(properties);
Preconditions.checkNotNull(prefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ public void testExtractPropertiesWithPrefixAfterRemovingPrefix() {
properties.setProperty("k2.kk", "v3");

// First prefix
Properties extractedPropertiesK1 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k1.");
Properties extractedPropertiesK1 = PropertiesUtils.extractChildProperties(properties, "k1.");
Assert.assertEquals(extractedPropertiesK1.getProperty("kk1"), "v1");
Assert.assertEquals(extractedPropertiesK1.getProperty("kk2"), "v2");
Assert.assertTrue(!extractedPropertiesK1.containsKey("k2.kk"));

// Second prefix
Properties extractedPropertiesK2 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k2");
Properties extractedPropertiesK2 = PropertiesUtils.extractChildProperties(properties, "k2");
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK2.containsKey("k1.kk2"));
Assert.assertEquals(extractedPropertiesK2.getProperty(".kk"), "v3");

// Missing prefix
Properties extractedPropertiesK3 = PropertiesUtils.extractPropertiesWithPrefixAfterRemovingPrefix(properties, "k3");
Properties extractedPropertiesK3 = PropertiesUtils.extractChildProperties(properties, "k3");
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));
Expand Down

0 comments on commit df9a0c1

Please sign in to comment.