Skip to content

Commit

Permalink
Remove all datanode leader node usages (#19170)
Browse files Browse the repository at this point in the history
* Remove all datanode leader node usages

* Added changelog

* Fixed rest client injection

* fix datanode isLeader in builder

* Remove leader selector in datanode proxy implementations
  • Loading branch information
todvora committed Apr 29, 2024
1 parent fdac618 commit ba211bf
Show file tree
Hide file tree
Showing 16 changed files with 43 additions and 230 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-19170.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "fixed"
message = "Remove datanode leader flag. It's neither needed nor used."

issues = ["18861"]
pulls = ["19170"]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
import org.graylog.datanode.bootstrap.preflight.DataNodeConfigurationPeriodical;
import org.graylog.datanode.periodicals.ClusterManagerDiscovery;
import org.graylog.datanode.periodicals.MetricsCollector;
import org.graylog.datanode.periodicals.NodePingPeriodical;
import org.graylog.datanode.periodicals.OpensearchNodeHeartbeat;
Expand All @@ -34,7 +33,6 @@ protected void configure() {
periodicalBinder.addBinding().to(ClusterEventPeriodical.class);
periodicalBinder.addBinding().to(ClusterEventCleanupPeriodical.class);
periodicalBinder.addBinding().to(OpensearchNodeHeartbeat.class);
periodicalBinder.addBinding().to(ClusterManagerDiscovery.class);
// periodicalBinder.addBinding().to(UserSessionTerminationPeriodical.class);
periodicalBinder.addBinding().to(NodePingPeriodical.class);
periodicalBinder.addBinding().to(DataNodeConfigurationPeriodical.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ protected void startNodeRegistration(Injector injector) {
// always set leader to "false" on startup and let the NodePingPeriodical take care of it later
nodeService.registerServer(DataNodeDto.Builder.builder()
.setId(nodeId.getNodeId())
.setLeader(false)
.setTransportAddress(configuration.getHttpPublishUri().toString())
.setHostname(Tools.getLocalCanonicalHostname())
.setDataNodeStatus(DataNodeStatus.STARTING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.datanode.process.ProcessInformation;

public record OpensearchInfo(String nodeName, OpensearchState state, boolean isLeaderNode, String restBaseUrl,
public record OpensearchInfo(String nodeName, OpensearchState state, String restBaseUrl,
ProcessInformation process) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public interface OpensearchProcess extends ManagableProcess<OpensearchConfigurat
Optional<RestHighLevelClient> restClient();

Optional<OpenSearchClient> openSearchClient();

boolean isLeaderNode();
void setLeaderNode(boolean isManagerNode);
List<String> stdOutLogs();
List<String> stdErrLogs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class OpensearchProcessImpl implements OpensearchProcess, ProcessListener {

private final DatanodeConfiguration datanodeConfiguration;

private boolean isLeaderNode;
private OpensearchCommandLineProcess commandLineProcess;

private final Queue<String> stdout;
Expand Down Expand Up @@ -116,7 +115,7 @@ public Optional<OpenSearchClient> openSearchClient() {
}

public OpensearchInfo processInfo() {
return new OpensearchInfo(configuration.getDatanodeNodeName(), processState.getState(), isLeaderNode, getOpensearchBaseUrl().toString(), commandLineProcess != null ? commandLineProcess.processInfo() : ProcessInformation.empty());
return new OpensearchInfo(configuration.getDatanodeNodeName(), processState.getState(), getOpensearchBaseUrl().toString(), commandLineProcess != null ? commandLineProcess.processInfo() : ProcessInformation.empty());
}

@Override
Expand Down Expand Up @@ -156,15 +155,6 @@ public void addStateMachineTracer(Trace<OpensearchState, OpensearchEvent> stateM
this.processState.getTracerAggregator().addTracer((StateMachineTracer) stateMachineTracer);
}

public void setLeaderNode(boolean isLeaderNode) {
this.isLeaderNode = isLeaderNode;
}

@Override
public boolean isLeaderNode() {
return isLeaderNode;
}

public boolean isInState(OpensearchState expectedState) {
return this.processState.getState().equals(expectedState);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@
import jakarta.annotation.Nonnull;
import jakarta.inject.Inject;
import org.graylog.datanode.Configuration;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.metrics.ClusterStatMetricsCollector;
import org.graylog.datanode.metrics.NodeMetricsCollector;
import org.graylog.datanode.opensearch.OpensearchProcess;
import org.graylog.datanode.opensearch.statemachine.OpensearchState;
import org.graylog.shaded.opensearch2.org.joda.time.DateTime;
import org.graylog.shaded.opensearch2.org.joda.time.DateTimeZone;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.index.IndexResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.search.SearchResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.Request;
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.Response;
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
import org.graylog.shaded.opensearch2.org.opensearch.core.action.ActionListener;
import org.graylog.shaded.opensearch2.org.opensearch.index.query.QueryBuilders;
Expand All @@ -50,6 +52,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

public class MetricsCollector extends Periodical {

Expand All @@ -59,7 +62,6 @@ public class MetricsCollector extends Periodical {
private NodeMetricsCollector nodeStatMetricsCollector;
private ClusterStatMetricsCollector clusterStatMetricsCollector;
private final ObjectMapper objectMapper;
private boolean isLeader;

@Inject
public MetricsCollector(OpensearchProcess process, Configuration configuration, ObjectMapper objectMapper) {
Expand Down Expand Up @@ -110,8 +112,6 @@ public void doRun() {
process.restClient().ifPresent(client -> {
this.nodeStatMetricsCollector = new NodeMetricsCollector(client, objectMapper);
this.clusterStatMetricsCollector = new ClusterStatMetricsCollector(client, objectMapper);
this.isLeader = process.isLeaderNode();

final IndexRequest indexRequest = new IndexRequest(configuration.getMetricsStream());
Map<String, Object> metrics = new HashMap<String, Object>();
metrics.put(configuration.getMetricsTimestamp(), new DateTime(DateTimeZone.UTC));
Expand All @@ -122,17 +122,36 @@ public void doRun() {
indexRequest.source(metrics);
indexDocument(client, indexRequest);

if (isLeader) {
if (isManagerNode(process)) {
metrics = new HashMap<>(clusterStatMetricsCollector.getClusterMetrics(getPreviousMetricsForCluster(client)));
metrics.put(configuration.getMetricsTimestamp(), new DateTime(DateTimeZone.UTC));
indexRequest.source(metrics);
indexDocument(client, indexRequest);
}

});
}
}

public boolean isManagerNode(OpensearchProcess process) {
return process.restClient()
.flatMap(this::requestClusterState)
.map(r -> r.nodes().get(r.clusterManagerNode()))
.map(managerNode -> configuration.getDatanodeNodeName().equals(managerNode.name()))
.orElse(false);
}


private Optional<ClusterStateResponse> requestClusterState(RestHighLevelClient client) {
try {
final Response response = client.getLowLevelClient().performRequest(new Request("GET", "_cluster/state/"));
final ClusterStateResponse state = objectMapper.readValue(response.getEntity().getContent(), ClusterStateResponse.class);
return Optional.of(state);
} catch (IOException e) {
LOG.warn("Failed to obtain cluster state response", e);
return Optional.empty();
}
}

private void addJvmMetrics(Map<String, Object> metrics) {
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
metrics.put("dn_heap_usage", calcUsage(memoryMXBean.getHeapMemoryUsage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class NodePingPeriodical extends Periodical {
private final Supplier<URI> opensearchBaseUri;
private final Supplier<String> opensearchClusterUri;
private final Supplier<String> datanodeRestApiUri;
private final Supplier<Boolean> isLeader;
private final Configuration configuration;
private final Supplier<OpensearchState> processState;

Expand All @@ -54,7 +53,6 @@ public NodePingPeriodical(NodeService<DataNodeDto> nodeService, NodeId nodeId, C
managedOpenSearch::getOpensearchBaseUrl,
managedOpenSearch::getOpensearchClusterUrl,
managedOpenSearch::getDatanodeRestApiUrl,
managedOpenSearch::isLeaderNode,
() -> managedOpenSearch.processInfo().state()
);
}
Expand All @@ -66,15 +64,13 @@ public NodePingPeriodical(NodeService<DataNodeDto> nodeService, NodeId nodeId, C
Supplier<URI> opensearchBaseUri,
Supplier<String> opensearchClusterUri,
Supplier<String> datanodeRestApiUri,
Supplier<Boolean> isLeader,
Supplier<OpensearchState> processState
) {
this.nodeService = nodeService;
this.nodeId = nodeId;
this.opensearchBaseUri = opensearchBaseUri;
this.opensearchClusterUri = opensearchClusterUri;
this.datanodeRestApiUri = datanodeRestApiUri;
this.isLeader = isLeader;
this.configuration = configuration;
this.processState = processState;
}
Expand Down Expand Up @@ -124,7 +120,6 @@ public void initialize() {
public void doRun() {
final DataNodeDto dto = DataNodeDto.Builder.builder()
.setId(nodeId.getNodeId())
.setLeader(isLeader.get())
.setTransportAddress(opensearchBaseUri.get().toString())
.setClusterAddress(opensearchClusterUri.get())
.setDataNodeStatus(processState.get().getDataNodeStatus())
Expand All @@ -139,7 +134,6 @@ public void doRun() {
private void registerServer() {
final boolean registrationSucceeded = nodeService.registerServer(DataNodeDto.Builder.builder()
.setId(nodeId.getNodeId())
.setLeader(isLeader.get())
.setTransportAddress(opensearchBaseUri.get().toString())
.setClusterAddress(opensearchClusterUri.get())
.setHostname(configuration.getHostname())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class NodePingPeriodicalTest {

@Test
void doRun() throws NodeNotFoundException {
void doRun() {

final SimpleNodeId nodeID = new SimpleNodeId("5ca1ab1e-0000-4000-a000-000000000000");
final URI uri = URI.create("http://localhost:9200");
Expand All @@ -49,15 +49,13 @@ void doRun() throws NodeNotFoundException {
() -> uri,
() -> cluster,
() -> datanodeRestApi,
() -> true,
() -> OpensearchState.AVAILABLE
);

task.doRun();

Mockito.verify(nodeService).ping(Mockito.eq(DataNodeDto.Builder.builder()
.setId(nodeID.getNodeId())
.setLeader(true)
.setTransportAddress(uri.toString())
.setClusterAddress(cluster)
.setRestApiAddress(datanodeRestApi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ void setUp(GraylogApis apis) {

@ContainerMatrixTest
void testProxyPlaintextGet() throws ExecutionException, RetryException {
apis.datanodeProxy().waitForLeader();
final ValidatableResponse response = apis.get("/datanodes/leader/opensearch/_cat/indices", 200);
final ValidatableResponse response = apis.get("/datanodes/any/opensearch/_cat/indices", 200);
final String responseBody = response.extract().body().asString();
Assertions.assertThat(responseBody).contains(".ds-gl-datanode-metrics").contains("graylog_0").contains("gl-system-events_0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public abstract static class Builder extends NodeDto.Builder<Builder> {

@JsonCreator
public static Builder builder() {
return new AutoValue_DataNodeDto.Builder();
return new AutoValue_DataNodeDto.Builder()
.setLeader(false); // TODO: completely remove the leader property from this DTO
}

@JsonProperty("cluster_address")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.auto.value.AutoValue;
Expand All @@ -42,5 +43,4 @@ public static Builder builder() {
public abstract ServerNodeDto build();

}

}

0 comments on commit ba211bf

Please sign in to comment.