Skip to content

Commit

Permalink
Detect conflict during named blob put operation before upload or stit…
Browse files Browse the repository at this point in the history
…ch operation.
  • Loading branch information
github-actions committed Feb 3, 2024
1 parent bc148a7 commit 5653cfd
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ default Future<String> convert(RestRequest restRequest, String input, BlobInfo b
return convert(restRequest, input, callback);
}

/**
* Detect if an ID in the request can cause a conflict during conversion.
* @param restRequest {@link RestRequest} representing the request.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
* @return a {@link Future} that will eventually contain a boolean indicating whether the ID can cause a conflict.
*/
Future<Boolean> detectConflict(RestRequest restRequest, Callback<Boolean> callback);

/**
* Converts an ID.
* @param restRequest {@link RestRequest} representing the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.frontend;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.commons.Callback;
import com.github.ambry.commons.CallbackUtils;
Expand Down Expand Up @@ -64,7 +65,7 @@ public IdConverter getIdConverter() {
return new AmbryIdConverter(idSigningService, namedBlobDb, frontendMetrics);
}

private static class AmbryIdConverter implements IdConverter {
static class AmbryIdConverter implements IdConverter {
private boolean isOpen = true;
private final IdSigningService idSigningService;
private final NamedBlobDb namedBlobDb;
Expand Down Expand Up @@ -112,37 +113,97 @@ public Future<String> convert(RestRequest restRequest, String input, BlobInfo bl
convertedId = "/" + signIdIfRequired(restRequest, input);
} else {
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo),
(id, e) -> completeConversion(id, e, future, callback));
(id, e) -> completeConverterOperation(id, e, future, callback,
frontendMetrics.idConversionDownstreamCallbackTimeInMs));
}
} catch (Exception e) {
exception = e;
} finally {
frontendMetrics.idConverterProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs);
if (convertedId != null || exception != null) {
completeConversion(convertedId, exception, future, callback);
completeConverterOperation(convertedId, exception, future, callback, frontendMetrics.idConversionDownstreamCallbackTimeInMs);
}
}
return future;
}

@Override
public Future<Boolean> detectConflict(RestRequest restRequest, Callback<Boolean> callback) {
final CompletableFuture<Boolean> future = new CompletableFuture<>();
Exception exception = null;
frontendMetrics.idConverterDetectConflictRate.mark();
long startTimeInMs = System.currentTimeMillis();
try {
if (!isOpen) {
exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable);
} else {
CallbackUtils.callCallbackAfter(detectConflict(restRequest),
(conflictStatus, e) -> completeConverterOperation(conflictStatus, e, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs));
}
} catch (Exception e) {
exception = e;
} finally {
frontendMetrics.deleteCallbackProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs);
if (exception != null) {
completeConverterOperation(false, exception, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs);
}
}
return future;
}

/**
* Detects if the ID in the request can cause a conflict.
* @param restRequest @link RestRequest} representing the request.
* @return the {@link CompletionStage} that will be completed with the conflict status.
* @throws RestServiceException if the operation cannot be performed.
*/
private CompletionStage<Boolean> detectConflict(RestRequest restRequest) throws RestServiceException {
final CompletionStage<Boolean> detectionFuture;
frontendMetrics.idConverterDetectConflictRate.mark();
if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
.matchesOperation(Operations.NAMED_BLOB) && !RestUtils.isUpsertForNamedBlob(restRequest.getArgs())) {
// A named blob put without upsert is allowed only if the named blob does not exist.
// If this is a named blob put request without upsert, then we check if a mapping for the name already exists.
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());

detectionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), GetOption.Include_All).thenApply(namedBlobRecord -> {
LOGGER.info("A mapping for the namedBlob {} already exists. Original BlobId: {}",
namedBlobRecord.getBlobName(), namedBlobRecord.getBlobId());
return true;
}).exceptionally(e -> {
if (e.getCause() instanceof RestServiceException
&& ((RestServiceException) e.getCause()).getErrorCode() == RestServiceErrorCode.NotFound) {
return false;
} else {
throw new RuntimeException(e);
}
});
} else {
detectionFuture = CompletableFuture.completedFuture(false);
}
return detectionFuture;
}

/**
* Completes the conversion by setting the future and invoking the callback.
* @param conversionResult the conversion result.
* Completes the operation of this converter by setting the future and invoking the callback.
* @param result the conversion result.
* @param exception any exception that occurred as a part of the conversion.
* @param completableFuture the {@link CompletableFuture} that must be set.
* @param callback the {@link Callback} that needs to be invoked. Can be null.
* @param downstreamCallbackTimeInMs {@link Histogram} to update the time taken for downstream callback.
*/
private <T> void completeConversion(T conversionResult, Exception exception, CompletableFuture<T> completableFuture,
Callback<T> callback) {
private <T> void completeConverterOperation(T result, Exception exception, CompletableFuture<T> completableFuture,
Callback<T> callback, Histogram downstreamCallbackTimeInMs) {
if (exception == null) {
completableFuture.complete(conversionResult);
completableFuture.complete(result);
} else {
completableFuture.completeExceptionally(exception);
}
if (callback != null) {
long startTime = System.currentTimeMillis();
callback.onCompletion(conversionResult, exception);
frontendMetrics.idConversionDownstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime);
callback.onCompletion(result, exception);
downstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class FrontendMetrics {
public final AsyncOperationTracker.Metrics putReadStitchRequestMetrics;
public final AsyncOperationTracker.Metrics putRouterStitchBlobMetrics;
public final AsyncOperationTracker.Metrics putRouterPutBlobMetrics;
public final AsyncOperationTracker.Metrics putIdConflictDetectionMetrics;
public final AsyncOperationTracker.Metrics putIdConversionMetrics;
public final AsyncOperationTracker.Metrics putBlobRouterMetrics;
public final AsyncOperationTracker.Metrics putBlobSecurityProcessResponseMetrics;
Expand Down Expand Up @@ -164,6 +165,7 @@ public class FrontendMetrics {
public final Meter securityServiceProcessResponseRate;
// AmbryIdConverter
public final Meter idConverterRequestRate;
public final Meter idConverterDetectConflictRate;
// Latencies
// FrontendRestRequestService
// POST
Expand Down Expand Up @@ -216,6 +218,7 @@ public class FrontendMetrics {
// AmbryIdConverter
public final Histogram idConverterProcessingTimeInMs;
public final Histogram idConversionDownstreamCallbackTimeInMs;
public final Histogram idConversionDetectConflictDownstreamCallbackTimeInMs;

// GetPeersHandler
public final Histogram getPeersProcessingTimeInMs;
Expand Down Expand Up @@ -433,6 +436,8 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putRouterStitchBlob", metricRegistry);
putRouterPutBlobMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putRouterPutBlob", metricRegistry);
putIdConflictDetectionMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "idConflictDetection", metricRegistry);
putIdConversionMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putIdConversion", metricRegistry);
putBlobRouterMetrics = new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "router", metricRegistry);
Expand Down Expand Up @@ -531,6 +536,7 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
metricRegistry.meter(MetricRegistry.name(AmbrySecurityService.class, "ProcessResponseRate"));
// AmbryIdConverter
idConverterRequestRate = metricRegistry.meter(MetricRegistry.name(AmbryIdConverterFactory.class, "RequestRate"));
idConverterDetectConflictRate = metricRegistry.meter(MetricRegistry.name(AmbryIdConverterFactory.class, "DetectConflictRate"));

// Latencies
// FrontendRestRequestService
Expand Down Expand Up @@ -620,6 +626,8 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "ProcessingTimeInMs"));
idConversionDownstreamCallbackTimeInMs =
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "DownstreamCallbackTimeInMs"));
idConversionDetectConflictDownstreamCallbackTimeInMs =
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "DetectConflictCallbackTimeInMs"));
// GetPeersHandler
getPeersProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(GetPeersHandler.class, "ProcessingTimeInMs"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,24 +208,34 @@ private Callback<Void> securityProcessRequestCallback() {
*/
private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> {
if (RestUtils.isNamedBlobStitchRequest(restRequest)) {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
RetainingAsyncWritableChannel channel =
new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes);
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
idConverter.detectConflict(restRequest, detectConflictCallback(blobInfo));
}, uri, LOGGER, deleteDatasetCallback);
}

/**
* After ensuring that there is no conflict in the named blob stitch upload request, read the request body from the
* rest request and call {@link Router#stitchBlob} to stitch the chunks.
* @param blobInfo the {@link BlobInfo} to make the router call with.
* @return a {@link Callback} to be used with conflict detection.
*/
private Callback<Boolean> detectConflictCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putIdConflictDetectionMetrics, conflictDetected -> {
if (conflictDetected) {
throw new RestServiceException("Blob with the same name already exists", RestServiceErrorCode.Conflict);
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
if (RestUtils.isNamedBlobStitchRequest(restRequest)) {
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes);
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, LOGGER, deleteDatasetCallback);
}
Expand All @@ -252,9 +262,11 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
*/
private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChannel channel, BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putReadStitchRequestMetrics,
bytesRead -> router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(),
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)),
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)),
bytesRead -> {
router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(),
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)),
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
},
uri, LOGGER, deleteDatasetCallback);
}

Expand Down

0 comments on commit 5653cfd

Please sign in to comment.