Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Routing Table] Initial commit for index routing table manifest #13577

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a66aaaf
Initial commit for index routing table manifest
Bukhtawar Apr 17, 2024
ad480ee
Changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
2ad70c4
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
acc172e
Revert unintentional changes for IndexRoutingTableHeader
Bukhtawar Apr 19, 2024
f65b102
Changes for IndexRoutingTableInputStream
Bukhtawar Apr 20, 2024
441f520
Fixing IndexRoutingTableInputStream and moving checksum to end to file
May 7, 2024
ee70dca
Add read flow for IndexRoutingTable
Arpit-Bandejiya May 8, 2024
7aeecc8
Moving routing table version from IndexRouting stream to manifest
May 13, 2024
7b2bc79
Refactor reader and add failure test
Arpit-Bandejiya May 14, 2024
88d266f
Fix GatewayMetaStatePersistedStateTests
Arpit-Bandejiya May 14, 2024
4c97869
Moving codec to version 2 for compatibility with manifest parser
May 14, 2024
418d9fd
Removing buffer logic
May 28, 2024
0400f40
Merge branch 'main' into remote_routing_input_stream
May 28, 2024
7f33ba2
Move BufferedChecksum streams to libs/core
May 29, 2024
42efb38
Spotless fix
May 29, 2024
9b50f8a
Refactor RemoteIndexRoutingTable read
Arpit-Bandejiya May 29, 2024
a7f7cab
Add Manifest Tests and spotless fix
Arpit-Bandejiya May 29, 2024
71e151f
Fix remoteClusterServiceTests
Arpit-Bandejiya May 30, 2024
a9ab9c1
Remote store objects to implement writeable interface
May 31, 2024
4167eeb
addressing pr comments
Jun 3, 2024
b3c18d8
test fixes
Jun 3, 2024
bbebce3
addressing PR comments
Jun 4, 2024
24fe063
Add package-info
Jun 6, 2024
c8ebbc0
Merge branch 'main' into remote_routing_input_stream
Jun 6, 2024
01019da
Fixing javadoc failure
Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
himshikha marked this conversation as resolved.
Show resolved Hide resolved
package org.opensearch.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.util.BitUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.opensearch.common.annotation.PublicApi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {

public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
public static final int CODEC_V2 = 2; // In Codec V2 we introduce index routing-metadata in manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
Expand All @@ -48,6 +49,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid");
private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed");
private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version");
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");

private static long term(Object[] fields) {
return (long) fields[0];
Expand Down Expand Up @@ -97,6 +100,14 @@ private static String globalMetadataFileName(Object[] fields) {
return (String) fields[11];
}

private static long routingTableVersion(Object[] fields) {
return (long) fields[12];
}

private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[13];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
Expand Down Expand Up @@ -133,11 +144,32 @@ private static String globalMetadataFileName(Object[] fields) {
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V1;
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V2 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> new ClusterMetadataManifest(
term(fields),
version(fields),
clusterUUID(fields),
stateUUID(fields),
opensearchVersion(fields),
nodeId(fields),
committed(fields),
codecVersion(fields),
globalMetadataFileName(fields),
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields),
routingTableVersion(fields),
indicesRouting(fields)
)
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand All @@ -160,6 +192,14 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD);
parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
}
if (codec_version >= CODEC_V2) {
parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD);
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_ROUTING_FIELD
);
}
}

private final int codecVersion;
Expand All @@ -174,6 +214,8 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
private final boolean committed;
private final String previousClusterUUID;
private final boolean clusterUUIDCommitted;
private final long routingTableVersion;
private final List<UploadedIndexMetadata> indicesRouting;

public List<UploadedIndexMetadata> getIndices() {
return indices;
Expand Down Expand Up @@ -223,6 +265,14 @@ public String getGlobalMetadataFileName() {
return globalMetadataFileName;
}

public long getRoutingTableVersion() {
return routingTableVersion;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -236,6 +286,40 @@ public ClusterMetadataManifest(
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted
) {
this(
clusterTerm,
version,
clusterUUID,
stateUUID,
opensearchVersion,
nodeId,
committed,
codecVersion,
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted,
-1,
new ArrayList<>()
);
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
String clusterUUID,
String stateUUID,
Version opensearchVersion,
String nodeId,
boolean committed,
int codecVersion,
String globalMetadataFileName,
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted,
long routingTableVersion,
List<UploadedIndexMetadata> indicesRouting
) {
this.clusterTerm = clusterTerm;
this.stateVersion = version;
Expand All @@ -249,6 +333,8 @@ public ClusterMetadataManifest(
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.routingTableVersion = routingTableVersion;
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
}

public ClusterMetadataManifest(StreamInput in) throws IOException {
Expand All @@ -262,12 +348,21 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
this.previousClusterUUID = in.readString();
this.clusterUUIDCommitted = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.routingTableVersion = in.readLong();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.routingTableVersion = -1;
this.indicesRouting = null;
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
}
}

Expand All @@ -291,7 +386,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
Expand All @@ -301,6 +398,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
if (onOrAfterCodecVersion(CODEC_V2)) {
builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion());
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder;
}

Expand All @@ -320,6 +429,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
}
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeLong(routingTableVersion);
out.writeCollection(indicesRouting);
}
}

@Override
Expand All @@ -342,7 +455,9 @@ public boolean equals(Object o) {
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion);
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(routingTableVersion, that.routingTableVersion)
&& Objects.equals(indicesRouting, that.indicesRouting);
}

@Override
Expand All @@ -359,7 +474,9 @@ public int hashCode() {
nodeId,
committed,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}

Expand All @@ -376,6 +493,10 @@ public static ClusterMetadataManifest fromXContentV0(XContentParser parser) thro
return PARSER_V0.parse(parser, null);
}

public static ClusterMetadataManifest fromXContentV1(XContentParser parser) throws IOException {
return PARSER_V1.parse(parser, null);
}

public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return CURRENT_PARSER.parse(parser, null);
}
Expand All @@ -399,12 +520,24 @@ public static class Builder {
private String previousClusterUUID;
private boolean committed;
private boolean clusterUUIDCommitted;
private long routingTableVersion;
private List<UploadedIndexMetadata> indicesRouting;

public Builder indices(List<UploadedIndexMetadata> indices) {
this.indices = indices;
return this;
}

public Builder routingTableVersion(long routingTableVersion) {
this.routingTableVersion = routingTableVersion;
return this;
}

public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
this.indicesRouting = indicesRouting;
return this;
}

public Builder codecVersion(int codecVersion) {
this.codecVersion = codecVersion;
return this;
Expand Down Expand Up @@ -454,6 +587,10 @@ public List<UploadedIndexMetadata> getIndices() {
return indices;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public Builder previousClusterUUID(String previousClusterUUID) {
this.previousClusterUUID = previousClusterUUID;
return this;
Expand All @@ -466,6 +603,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {

public Builder() {
indices = new ArrayList<>();
indicesRouting = new ArrayList<>();
}

public Builder(ClusterMetadataManifest manifest) {
Expand All @@ -481,6 +619,8 @@ public Builder(ClusterMetadataManifest manifest) {
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
this.routingTableVersion = manifest.routingTableVersion;
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
}

public ClusterMetadataManifest build() {
Expand All @@ -496,7 +636,9 @@ public ClusterMetadataManifest build() {
globalMetadataFileName,
indices,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}

Expand Down Expand Up @@ -571,11 +713,9 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
.endObject();
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ public class RemoteClusterStateService implements Closeable {
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0);

/**
* Manifest format compatible with codec v1, where we introduced codec versions/global metadata.
* Manifest format compatible with older codec v1, where codec versions/global metadata was introduced.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT_V1 =
new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1);

/**
* Manifest format compatible with codec v2, where we introduced routing table metadata in manifest.
*/
public static final ChecksumBlobStoreFormat<ClusterMetadataManifest> CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>(
"cluster-metadata-manifest",
Expand Down Expand Up @@ -172,7 +178,7 @@ public class RemoteClusterStateService implements Closeable {
private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemotePersistenceStats remoteStateStats;
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1;

// ToXContent Params with gateway mode.
Expand Down Expand Up @@ -1175,6 +1181,8 @@ private ChecksumBlobStoreFormat<ClusterMetadataManifest> getClusterMetadataManif
long codecVersion = getManifestCodecVersion(fileName);
if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) {
return CLUSTER_METADATA_MANIFEST_FORMAT;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V1) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V1;
} else if (codecVersion == ClusterMetadataManifest.CODEC_V0) {
return CLUSTER_METADATA_MANIFEST_FORMAT_V0;
}
Expand Down