Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Do not delete true backup version during Repush #945

Merged
merged 11 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class PushJobSetting implements Serializable {
public boolean isSourceKafka;
public String kafkaInputBrokerUrl;
public String kafkaInputTopic;
public int repushSourceVersion;
public long rewindTimeInSecondsOverride;
public boolean kafkaInputCombinerEnabled;
public boolean kafkaInputBuildNewDictEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ String getSourceTopicNameForKafkaInput(final String userProvidedStoreName, final
+ pushJobSetting.repushInfoResponse.getError());
}
int version = pushJobSetting.repushInfoResponse.getRepushInfo().getVersion().getNumber();
pushJobSetting.repushSourceVersion = version;
return Version.composeKafkaTopic(userProvidedStoreName, version);
}

Expand Down Expand Up @@ -2206,7 +2207,8 @@ void createNewStoreVersion(
setting.livenessHeartbeatEnabled,
setting.rewindTimeInSecondsOverride,
setting.deferVersionSwap,
setting.targetedRegions));
setting.targetedRegions,
pushJobSetting.repushSourceVersion));
if (versionCreationResponse.isError()) {
if (ErrorType.CONCURRENT_BATCH_PUSH.equals(versionCreationResponse.getErrorType())) {
LOGGER.error("Unable to run this job since another batch push is running. See the error message for details.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ private void configureControllerClientMock(
anyBoolean(),
anyLong(),
anyBoolean(),
any())).thenReturn(versionCreationResponse);
any(),
anyInt())).thenReturn(versionCreationResponse);
JobStatusQueryResponse jobStatusQueryResponse = createJobStatusQueryResponseMock(executionStatus);
when(controllerClient.queryOverallJobStatus(anyString(), any(), any())).thenReturn(jobStatusQueryResponse);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,8 @@ private VersionCreationResponse mockVersionCreationResponse(ControllerClient cli
anyBoolean(),
anyLong(),
anyBoolean(),
any());
any(),
anyInt());
}

return versionCreationResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ private ConfigKeys() {
public static final String CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS =
"controller.backup.version.default.retention.ms";

public static final String CONTROLLER_BACKUP_VERSION_DELETION_SLEEP_MS =
"controller.backup.version.deletion.sleep.ms";

/**
* The following config is to control whether to enable backup version cleanup based on retention policy or not at cluster level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public class ControllerApiConstants {

public static final String DEFER_VERSION_SWAP = "defer_version_swap";

public static final String REPUSH_SOURCE_VERSION = "repush_source_version";

public static final String REPLICATION_METADATA_VERSION_ID = "replication_metadata_version_id";

public static final String PARTITION_DETAIL_ENABLED = "partition_detail_enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGIONS_FILTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REMOTE_KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_VERSION_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPUSH_SOURCE_VERSION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS_OVERRIDE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SCHEMA_COMPAT_TYPE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SCHEMA_ID;
Expand Down Expand Up @@ -305,7 +306,8 @@ public VersionCreationResponse requestTopicForWrites(
batchJobHeartbeatEnabled,
rewindTimeInSecondsOverride,
false,
null);
null,
-1);
}

public VersionCreationResponse requestTopicForWrites(
Expand Down Expand Up @@ -336,29 +338,33 @@ public VersionCreationResponse requestTopicForWrites(
batchJobHeartbeatEnabled,
rewindTimeInSecondsOverride,
deferVersionSwap,
null);
null,
-1);
}

/**
* Request a topic for the VeniceWriter to write into. A new VPJ push, or a Samza bulk processing job should both use
* this method. The push job ID needs to be unique for this push. Multiple requests with the same pushJobId are
* idempotent and will return the same topic.
* @param storeName Name of the store being written to.
* @param storeSize Estimated size of push in bytes, used to determine partitioning
* @param pushJobId Unique identifier for this job
* @param sendStartOfPush Whether controller should send START_OF_PUSH message to the newly created topic,
* while adding a new version. This is currently used in Samza batch load, a.k.a. grandfather
* @param sorted Whether the push is going to contain sorted data (in each partition) or not
* @param wcEnabled Whether write compute is enabled for this push job or not
* @param partitioners partitioner class names in a string seperated by comma
* @param compressionDictionary Base64 encoded dictionary to be used to perform dictionary compression
* @param sourceGridFabric An identifier of the data center which is used in native replication to determine
* the Kafka URL
* @param batchJobHeartbeatEnabled whether batch push job enables the heartbeat
*
* @param storeName Name of the store being written to.
* @param storeSize Estimated size of push in bytes, used to determine partitioning
* @param pushJobId Unique identifier for this job
* @param sendStartOfPush Whether controller should send START_OF_PUSH message to the newly created topic,
* while adding a new version. This is currently used in Samza batch load, a.k.a.
* grandfather
* @param sorted Whether the push is going to contain sorted data (in each partition) or not
* @param wcEnabled Whether write compute is enabled for this push job or not
* @param partitioners partitioner class names in a string seperated by comma
* @param compressionDictionary Base64 encoded dictionary to be used to perform dictionary compression
* @param sourceGridFabric An identifier of the data center which is used in native replication to
* determine the Kafka URL
* @param batchJobHeartbeatEnabled whether batch push job enables the heartbeat
* @param rewindTimeInSecondsOverride if a valid value is specified (>=0) for hybrid store, this param will override
* the default store-level rewindTimeInSeconds config.
* @param deferVersionSwap whether to defer version swap after the push is done
* @param targetedRegions the list of regions that is separated by comma for targeted region push.
* the default store-level rewindTimeInSeconds config.
* @param deferVersionSwap whether to defer version swap after the push is done
* @param targetedRegions the list of regions that is separated by comma for targeted region push.
* @param repushSourceVersion
* @return VersionCreationResponse includes topic and partitioning
*/
public VersionCreationResponse requestTopicForWrites(
Expand All @@ -375,7 +381,8 @@ public VersionCreationResponse requestTopicForWrites(
boolean batchJobHeartbeatEnabled,
long rewindTimeInSecondsOverride,
boolean deferVersionSwap,
String targetedRegions) {
String targetedRegions,
int repushSourceVersion) {
QueryParams params = newParams().add(NAME, storeName)
// TODO: Store size is not used anymore. Remove it after the next round of controller deployment.
.add(STORE_SIZE, Long.toString(storeSize))
Expand All @@ -389,7 +396,8 @@ public VersionCreationResponse requestTopicForWrites(
.add(SOURCE_GRID_FABRIC, sourceGridFabric)
.add(BATCH_JOB_HEARTBEAT_ENABLED, batchJobHeartbeatEnabled)
.add(REWIND_TIME_IN_SECONDS_OVERRIDE, rewindTimeInSecondsOverride)
.add(DEFER_VERSION_SWAP, deferVersionSwap);
.add(DEFER_VERSION_SWAP, deferVersionSwap)
.add(REPUSH_SOURCE_VERSION, repushSourceVersion);
if (StringUtils.isNotEmpty(targetedRegions)) {
params.add(TARGETED_REGIONS, targetedRegions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,16 @@ public void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryV
throw new UnsupportedOperationException();
}

@Override
public void setRepushSourceVersion(int version) {
throw new UnsupportedOperationException();
}

@Override
public int getRepushSourceVersion() {
return this.delegate.getRepushSourceVersion();
}

@Override
public int getRmdVersionId() {
return this.delegate.getRmdVersionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ default void setTimestampMetadataVersionId(int replicationMetadataVersionId) {

Version cloneVersion();

void setRepushSourceVersion(int version);

int getRepushSourceVersion();

@JsonIgnore
int getRmdVersionId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ public void setDataRecoveryVersionConfig(DataRecoveryVersionConfig dataRecoveryV
}
}

@Override
public void setRepushSourceVersion(int version) {
this.storeVersion.repushSourceVersion = version;
}

@Override
public int getRepushSourceVersion() {
return this.storeVersion.repushSourceVersion;
}

@Override
public int getRmdVersionId() {
return this.storeVersion.timestampMetadataVersionId;
Expand Down Expand Up @@ -444,6 +454,7 @@ public Version cloneVersion() {
clonedVersion.setActiveActiveReplicationEnabled(isActiveActiveReplicationEnabled());
clonedVersion.setRmdVersionId(getRmdVersionId());
clonedVersion.setVersionSwapDeferred(isVersionSwapDeferred());
clonedVersion.setRepushSourceVersion(getRepushSourceVersion());
clonedVersion.setViewConfigs(getViewConfigs());
clonedVersion.setBlobTransferEnabled(isBlobTransferEnabled());
return clonedVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(77, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(78, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(20, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(21, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down