Skip to content

Commit

Permalink
Moving routing table version from IndexRouting stream to manifest
Browse files Browse the repository at this point in the history
Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
  • Loading branch information
Himshikha Gupta committed May 14, 2024
1 parent ee70dca commit 7aeecc8
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField INDICES_FIELD = new ParseField("indices");
private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid");
private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed");
private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version");
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");

private static long term(Object[] fields) {
Expand Down Expand Up @@ -99,8 +100,12 @@ private static String globalMetadataFileName(Object[] fields) {
return (String) fields[11];
}

private static long routingTableVersion(Object[] fields) {
return (long) fields[12];
}

private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[12];
return (List<UploadedIndexMetadata>) fields[13];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
Expand Down Expand Up @@ -154,6 +159,7 @@ private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
indices(fields),
previousClusterUUID(fields),
clusterUUIDCommitted(fields),
routingTableVersion(fields),
indicesRouting(fields)
)
);
Expand Down Expand Up @@ -187,6 +193,7 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD);
}
if (codec_version >= CODEC_V2) {
parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD);
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
Expand All @@ -207,6 +214,7 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
private final boolean committed;
private final String previousClusterUUID;
private final boolean clusterUUIDCommitted;
private final long routingTableVersion;
private final List<UploadedIndexMetadata> indicesRouting;

public List<UploadedIndexMetadata> getIndices() {
Expand Down Expand Up @@ -257,6 +265,10 @@ public String getGlobalMetadataFileName() {
return globalMetadataFileName;
}

public long getRoutingTableVersion() {
return routingTableVersion;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}
Expand All @@ -276,7 +288,7 @@ public ClusterMetadataManifest(
boolean clusterUUIDCommitted
) {
this(clusterTerm, version, clusterUUID, stateUUID, opensearchVersion, nodeId, committed, codecVersion,
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, new ArrayList<>());
globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted, -1, new ArrayList<>());
}

public ClusterMetadataManifest(
Expand All @@ -292,6 +304,7 @@ public ClusterMetadataManifest(
List<UploadedIndexMetadata> indices,
String previousClusterUUID,
boolean clusterUUIDCommitted,
long routingTableVersion,
List<UploadedIndexMetadata> indicesRouting
) {
this.clusterTerm = clusterTerm;
Expand All @@ -306,6 +319,7 @@ public ClusterMetadataManifest(
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.routingTableVersion = routingTableVersion;
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
}

Expand All @@ -323,14 +337,17 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.routingTableVersion = in.readLong();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.routingTableVersion = -1;
this.indicesRouting = null;
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
}
}
Expand Down Expand Up @@ -368,6 +385,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
if (onOrAfterCodecVersion(CODEC_V2)) {
builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion());
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
Expand Down Expand Up @@ -398,6 +416,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(globalMetadataFileName);
}
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeLong(routingTableVersion);
out.writeCollection(indicesRouting);
}
}
Expand All @@ -423,6 +442,7 @@ public boolean equals(Object o) {
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(routingTableVersion, that.routingTableVersion)
&& Objects.equals(indicesRouting, that.indicesRouting);
}

Expand All @@ -441,6 +461,7 @@ public int hashCode() {
committed,
previousClusterUUID,
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}
Expand Down Expand Up @@ -481,13 +502,19 @@ public static class Builder {
private String previousClusterUUID;
private boolean committed;
private boolean clusterUUIDCommitted;
private long routingTableVersion;
private List<UploadedIndexMetadata> indicesRouting;

public Builder indices(List<UploadedIndexMetadata> indices) {
this.indices = indices;
return this;
}

public Builder routingTableVersion(long routingTableVersion) {
this.routingTableVersion = routingTableVersion;
return this;
}

public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
this.indicesRouting = indicesRouting;
return this;
Expand Down Expand Up @@ -573,6 +600,7 @@ public Builder(ClusterMetadataManifest manifest) {
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
this.routingTableVersion = manifest.routingTableVersion;
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
}

Expand All @@ -590,6 +618,7 @@ public ClusterMetadataManifest build() {
indices,
previousClusterUUID,
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BufferedChecksumStreamInput;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

Expand All @@ -31,52 +25,27 @@
*/
public class IndexRoutingTableHeader {

private final long routingTableVersion;

private final String indexName;

private final Version nodeVersion;

public static final String INDEX_ROUTING_HEADER_CODEC = "index_routing_header_codec";

public static final int INITIAL_VERSION = 1;

public static final int CURRENT_VERSION = INITIAL_VERSION;
private final String indexName;

public IndexRoutingTableHeader(long routingTableVersion, String indexName, Version nodeVersion) {
this.routingTableVersion = routingTableVersion;
public IndexRoutingTableHeader(String indexName) {
this.indexName = indexName;
this.nodeVersion = nodeVersion;
}

/**
* Returns the bytes reference for the {@link IndexRoutingTableHeader}
* @throws IOException
*/
public void write(StreamOutput out) throws IOException {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION);
// Write version
out.writeLong(routingTableVersion);
out.writeInt(nodeVersion.id);
out.writeString(indexName);

out.flush();
}

/**
* Reads the contents on the byte array into the corresponding {@link IndexRoutingTableHeader}
*
* @param in
* @return IndexRoutingTableHeader
* @throws IOException
*/
public static IndexRoutingTableHeader read(BufferedChecksumStreamInput in) throws IOException {
public static IndexRoutingTableHeader read(StreamInput in) throws IOException {
try {
readHeaderVersion(in);
final long version = in.readLong();
final int nodeVersion = in.readInt();
final String name = in.readString();
assert version >= 0 : "Version must be non-negative [" + version + "]";
return new IndexRoutingTableHeader(version, name, Version.fromId(nodeVersion));
readHeaderVersion(in);
final String name = in.readString();
return new IndexRoutingTableHeader(name);
} catch (EOFException e) {
throw new IOException("index routing header truncated", e);
}
Expand All @@ -92,15 +61,19 @@ static int readHeaderVersion(final StreamInput in) throws IOException {
return version;
}

public long getRoutingTableVersion() {
return routingTableVersion;
/**
* Returns the bytes reference for the {@link IndexRoutingTableHeader}
*
* @throws IOException
*/
public void write(StreamOutput out) throws IOException {
CodecUtil.writeHeader(new OutputStreamDataOutput(out), INDEX_ROUTING_HEADER_CODEC, CURRENT_VERSION);
out.writeString(indexName);
out.flush();
}

public String getIndexName() {
return indexName;
}

public Version getNodeVersion() {
return nodeVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.gateway.remote.routingtable;

import org.opensearch.Version;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.stream.BufferedChecksumStreamOutput;
Expand Down Expand Up @@ -58,15 +57,15 @@ public class IndexRoutingTableInputStream extends InputStream {
private final BytesStreamOutput bytesStreamOutput;
private final BufferedChecksumStreamOutput out;

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion) throws IOException {
this(indexRoutingTable, version, nodeVersion, BUFFER_SIZE);
public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable) throws IOException {
this(indexRoutingTable, BUFFER_SIZE);
}

public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, long version, Version nodeVersion, int size)
public IndexRoutingTableInputStream(IndexRoutingTable indexRoutingTable, int size)
throws IOException {
this.buf = new byte[size];
this.shardIter = indexRoutingTable.iterator();
this.indexRoutingTableHeader = new IndexRoutingTableHeader(version, indexRoutingTable.getIndex().getName(), nodeVersion);
this.indexRoutingTableHeader = new IndexRoutingTableHeader(indexRoutingTable.getIndex().getName());
this.bytesStreamOutput = new BytesStreamOutput();
this.out = new BufferedChecksumStreamOutput(bytesStreamOutput);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,35 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() {
}
}

public void testClusterMetadataManifestXContentV2() throws IOException {
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
ClusterMetadataManifest originalManifest = new ClusterMetadataManifest(
1L,
1L,
"test-cluster-uuid",
"test-state-uuid",
Version.CURRENT,
"test-node-id",
false,
ClusterMetadataManifest.CODEC_V2,
"test-metadata",
Collections.singletonList(uploadedIndexMetadata),
"prev-cluster-uuid",
true,
1L,
Collections.singletonList(uploadedIndexMetadata)
);
final XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();

try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser);
assertEquals(originalManifest, fromXContentManifest);
}
}

private List<UploadedIndexMetadata> randomUploadedIndexMetadataList() {
final int size = randomIntBetween(1, 10);
final List<UploadedIndexMetadata> uploadedIndexMetadataList = new ArrayList<>(size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,22 @@

package org.opensearch.gateway.remote.routingtable;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.InputStreamDataInput;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

public class IndexRoutingTableHeaderTests extends OpenSearchTestCase {

public void testWrite() throws IOException {
public void testIndexRoutingTableHeader() throws IOException {
IndexRoutingTableHeader header = new IndexRoutingTableHeader("dummyIndex");
BytesStreamOutput out = new BytesStreamOutput();
IndexRoutingTableHeader header = new IndexRoutingTableHeader(1, "dummyIndex", Version.V_3_0_0);
header.write(out);

BytesStreamInput in = new BytesStreamInput(out.bytes().toBytesRef().bytes);
CodecUtil.checkHeader(new InputStreamDataInput(in),IndexRoutingTableHeader.INDEX_ROUTING_HEADER_CODEC, IndexRoutingTableHeader.INITIAL_VERSION, IndexRoutingTableHeader.CURRENT_VERSION );
assertEquals(1, in.readLong());
assertEquals(Version.V_3_0_0.id, in.readInt());
assertEquals("dummyIndex", in.readString());
IndexRoutingTableHeader headerRead = IndexRoutingTableHeader.read(in);
assertEquals("dummyIndex", headerRead.getIndexName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;

public class IndexRoutingTableInputStreamTests extends ReplicationTrackerTestCase {
public class IndexRoutingTableInputStreamTests extends OpenSearchTestCase {

public void testRoutingTableInputStream() throws IOException {
public void testRoutingTableInputStream(){
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
Expand All @@ -44,14 +44,14 @@ public void testRoutingTableInputStream() throws IOException {

initialRoutingTable.getIndicesRouting().values().forEach(indexShardRoutingTables -> {
try {
logger.info("IndexShardRoutingTables: {}", indexShardRoutingTables);
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables,
initialRoutingTable.version(), Version.CURRENT);
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexShardRoutingTables);

IndexRoutingTableInputStreamReader reader = new IndexRoutingTableInputStreamReader(indexRoutingStream);
Map<String, IndexShardRoutingTable> indexShardRoutingTableMap = reader.read();

logger.info("indexShardRoutingTableMap: {}", indexShardRoutingTableMap);
assertEquals(1, indexShardRoutingTableMap.size());
assertNotNull(indexShardRoutingTableMap.get("test"));
assertEquals(2,indexShardRoutingTableMap.get("test").shards().size());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 7aeecc8

Please sign in to comment.