Skip to content

Commit

Permalink
[GOBBLIN-2064] Use proper GSON builder class for GaaSObservabilityPro…
Browse files Browse the repository at this point in the history
…ducer and cleanup fully qualified classnames in tests (#3945)

Remove full classnames and use imports for mock classes, use proper gson builder
  • Loading branch information
Will-Lo committed May 8, 2024
1 parent 7a11e72 commit c1a09f0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void testProcessMessageForFailedFlow() throws IOException, ReflectiveOper
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();

ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
Expand Down Expand Up @@ -279,7 +279,7 @@ public void testProcessMessageForSkippedFlow() throws IOException, ReflectiveOpe
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();

ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
Expand Down Expand Up @@ -339,9 +339,9 @@ public void testProcessingRetriedForApparentlyTransientErrors() throws IOExcepti
Config conf = ConfigFactory.empty().withValue(
KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + RETRY_MULTIPLIER, ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(shouldThrowFakeExceptionInParseJobStatusToggle, conf,
new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
new NoopGaaSJobObservabilityEventProducer());

jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
this::convertMessageAndMetadataToDecodableKafkaRecord);
Expand Down Expand Up @@ -397,7 +397,7 @@ public void testProcessMessageForCancelledAndKilledEvent() throws IOException, R
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
Expand Down Expand Up @@ -456,7 +456,7 @@ public void testProcessMessageForFlowPendingResume() throws IOException, Reflect
Thread.currentThread().interrupt();
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(), new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
Expand Down Expand Up @@ -507,7 +507,7 @@ public void testProcessProgressingMessageWhenNoPreviousStatus() throws IOExcepti
}

MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
new org.apache.gobblin.service.monitoring.NoopGaaSJobObservabilityEventProducer());
new NoopGaaSJobObservabilityEventProducer());
jobStatusMonitor.buildMetricsContextAndMetrics();
Iterator<DecodeableKafkaRecord> recordIterator = Iterators.transform(
this.kafkaTestHelper.getIteratorForTopic(TOPIC),
Expand Down Expand Up @@ -537,8 +537,8 @@ public void testJobMonitorCreatesGaaSObservabilityEvent() throws IOException, Re
Thread.currentThread().interrupt();
}
MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
MockGaaSJobObservabilityEventProducer mockEventProducer = new MockGaaSJobObservabilityEventProducer(
ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, false);
MockGaaSJobObservabilityEventProducer mockEventProducer = new MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
issueRepository, false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
Expand Down Expand Up @@ -584,8 +584,8 @@ public void testObservabilityEventSingleEmission() throws IOException, Reflectiv
Thread.currentThread().interrupt();
}
MultiContextIssueRepository issueRepository = new InMemoryMultiContextIssueRepository();
org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer mockEventProducer = new org.apache.gobblin.service.monitoring.MockGaaSJobObservabilityEventProducer(
ConfigUtils.configToState(ConfigFactory.empty()), issueRepository, false);
MockGaaSJobObservabilityEventProducer mockEventProducer = new MockGaaSJobObservabilityEventProducer(ConfigUtils.configToState(ConfigFactory.empty()),
issueRepository, false);
MockKafkaAvroJobStatusMonitor jobStatusMonitor = createMockKafkaAvroJobStatusMonitor(new AtomicBoolean(false), ConfigFactory.empty(),
mockEventProducer);
jobStatusMonitor.buildMetricsContextAndMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private GaaSJobObservabilityEvent createGaaSObservabilityEvent(final State jobSt
.setEffectiveUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
.setDatasetsMetrics(datasetMetrics)
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, null))
.setJobProperties(GsonUtils.GSON_WITH_DATE_HANDLING.newBuilder().create().toJson(jobProperties))
.setJobProperties(GsonUtils.GSON_WITH_DATE_HANDLING.toJson(jobProperties))
.setSourceNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))
.setDestinationNode(jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ""))
.setFlowEdgeId(!edgeId.isEmpty() ? edgeId : fullFlowEdge)
Expand Down

0 comments on commit c1a09f0

Please sign in to comment.