Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect conflict during named blob put operation before upload or stitch operation. #2704

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ default Future<String> convert(RestRequest restRequest, String input, BlobInfo b
return convert(restRequest, input, callback);
}

/**
* Detect if an ID in the request can cause a conflict during conversion.
* @param restRequest {@link RestRequest} representing the request.
* @param callback the {@link Callback} to invoke once the converted ID is available. Can be null.
* @return a {@link Future} that will eventually contain a boolean indicating whether the ID can cause a conflict.
*/
Future<Boolean> detectConflict(RestRequest restRequest, Callback<Boolean> callback);

/**
* Converts an ID.
* @param restRequest {@link RestRequest} representing the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.frontend;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.commons.Callback;
import com.github.ambry.commons.CallbackUtils;
Expand Down Expand Up @@ -66,7 +67,7 @@ public IdConverter getIdConverter() {
return new AmbryIdConverter(idSigningService, namedBlobDb, frontendMetrics);
}

private static class AmbryIdConverter implements IdConverter {
static class AmbryIdConverter implements IdConverter {
private boolean isOpen = true;
private final IdSigningService idSigningService;
private final NamedBlobDb namedBlobDb;
Expand Down Expand Up @@ -116,37 +117,97 @@ public Future<String> convert(RestRequest restRequest, String input, BlobInfo bl
convertedId = "/" + signIdIfRequired(restRequest, input);
} else {
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo),
(id, e) -> completeConversion(id, e, future, callback));
(id, e) -> completeConverterOperation(id, e, future, callback,
frontendMetrics.idConversionDownstreamCallbackTimeInMs));
}
} catch (Exception e) {
exception = e;
} finally {
frontendMetrics.idConverterProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs);
if (convertedId != null || exception != null) {
completeConversion(convertedId, exception, future, callback);
completeConverterOperation(convertedId, exception, future, callback, frontendMetrics.idConversionDownstreamCallbackTimeInMs);
}
}
return future;
}

@Override
public Future<Boolean> detectConflict(RestRequest restRequest, Callback<Boolean> callback) {
final CompletableFuture<Boolean> future = new CompletableFuture<>();
Exception exception = null;
frontendMetrics.idConverterDetectConflictRate.mark();
long startTimeInMs = System.currentTimeMillis();
try {
if (!isOpen) {
exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable);
} else {
CallbackUtils.callCallbackAfter(detectConflict(restRequest),
(conflictStatus, e) -> completeConverterOperation(conflictStatus, e, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs));
}
} catch (Exception e) {
exception = e;
} finally {
frontendMetrics.deleteCallbackProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs);
if (exception != null) {
completeConverterOperation(false, exception, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs);
}
}
return future;
}

/**
* Detects if the ID in the request can cause a conflict.
* @param restRequest @link RestRequest} representing the request.
* @return the {@link CompletionStage} that will be completed with the conflict status.
* @throws RestServiceException if the operation cannot be performed.
*/
private CompletionStage<Boolean> detectConflict(RestRequest restRequest) throws RestServiceException {
final CompletionStage<Boolean> detectionFuture;
frontendMetrics.idConverterDetectConflictRate.mark();
if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we verify the Dataset behavior?
Does it allow update with the same version number if upsert is not set?

.matchesOperation(Operations.NAMED_BLOB) && !RestUtils.isUpsertForNamedBlob(restRequest.getArgs())) {
// A named blob put without upsert is allowed only if the named blob does not exist.
// If this is a named blob put request without upsert, then we check if a mapping for the name already exists.
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());

detectionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), GetOption.Include_All).thenApply(namedBlobRecord -> {
LOGGER.info("A mapping for the namedBlob {} already exists. Original BlobId: {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about if the named blob is deleted?
If the named blob is deleted, we should allow upload even without the upsert option.
If it's the case, we need check the db record status.

namedBlobRecord.getBlobName(), namedBlobRecord.getBlobId());
return true;
}).exceptionally(e -> {
if (e.getCause() instanceof RestServiceException
&& ((RestServiceException) e.getCause()).getErrorCode() == RestServiceErrorCode.NotFound) {
return false;
} else {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq.. would it be better if we complete the detectionFuture with exception instead of throwing run time exception?

}
});
} else {
detectionFuture = CompletableFuture.completedFuture(false);
}
return detectionFuture;
}

/**
* Completes the conversion by setting the future and invoking the callback.
* @param conversionResult the conversion result.
* Completes the operation of this converter by setting the future and invoking the callback.
* @param result the conversion result.
* @param exception any exception that occurred as a part of the conversion.
* @param completableFuture the {@link CompletableFuture} that must be set.
* @param callback the {@link Callback} that needs to be invoked. Can be null.
* @param downstreamCallbackTimeInMs {@link Histogram} to update the time taken for downstream callback.
*/
private <T> void completeConversion(T conversionResult, Exception exception, CompletableFuture<T> completableFuture,
Callback<T> callback) {
private <T> void completeConverterOperation(T result, Exception exception, CompletableFuture<T> completableFuture,
Callback<T> callback, Histogram downstreamCallbackTimeInMs) {
if (exception == null) {
completableFuture.complete(conversionResult);
completableFuture.complete(result);
} else {
completableFuture.completeExceptionally(exception);
}
if (callback != null) {
long startTime = System.currentTimeMillis();
callback.onCompletion(conversionResult, exception);
frontendMetrics.idConversionDownstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime);
callback.onCompletion(result, exception);
downstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class FrontendMetrics {
public final AsyncOperationTracker.Metrics putReadStitchRequestMetrics;
public final AsyncOperationTracker.Metrics putRouterStitchBlobMetrics;
public final AsyncOperationTracker.Metrics putRouterPutBlobMetrics;
public final AsyncOperationTracker.Metrics putIdConflictDetectionMetrics;
public final AsyncOperationTracker.Metrics putIdConversionMetrics;
public final AsyncOperationTracker.Metrics putBlobRouterMetrics;
public final AsyncOperationTracker.Metrics putBlobSecurityProcessResponseMetrics;
Expand Down Expand Up @@ -171,6 +172,7 @@ public class FrontendMetrics {
public final Meter securityServiceProcessResponseRate;
// AmbryIdConverter
public final Meter idConverterRequestRate;
public final Meter idConverterDetectConflictRate;
// Latencies
// FrontendRestRequestService
// POST
Expand Down Expand Up @@ -224,6 +226,7 @@ public class FrontendMetrics {
// AmbryIdConverter
public final Histogram idConverterProcessingTimeInMs;
public final Histogram idConversionDownstreamCallbackTimeInMs;
public final Histogram idConversionDetectConflictDownstreamCallbackTimeInMs;

// GetPeersHandler
public final Histogram getPeersProcessingTimeInMs;
Expand Down Expand Up @@ -442,6 +445,8 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putRouterStitchBlob", metricRegistry);
putRouterPutBlobMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putRouterPutBlob", metricRegistry);
putIdConflictDetectionMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "idConflictDetection", metricRegistry);
putIdConversionMetrics =
new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "putIdConversion", metricRegistry);
putBlobRouterMetrics = new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "router", metricRegistry);
Expand Down Expand Up @@ -547,6 +552,7 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
metricRegistry.meter(MetricRegistry.name(AmbrySecurityService.class, "ProcessResponseRate"));
// AmbryIdConverter
idConverterRequestRate = metricRegistry.meter(MetricRegistry.name(AmbryIdConverterFactory.class, "RequestRate"));
idConverterDetectConflictRate = metricRegistry.meter(MetricRegistry.name(AmbryIdConverterFactory.class, "DetectConflictRate"));

// Latencies
// FrontendRestRequestService
Expand Down Expand Up @@ -638,6 +644,8 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "ProcessingTimeInMs"));
idConversionDownstreamCallbackTimeInMs =
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "DownstreamCallbackTimeInMs"));
idConversionDetectConflictDownstreamCallbackTimeInMs =
metricRegistry.histogram(MetricRegistry.name(AmbryIdConverterFactory.class, "DetectConflictCallbackTimeInMs"));
// GetPeersHandler
getPeersProcessingTimeInMs =
metricRegistry.histogram(MetricRegistry.name(GetPeersHandler.class, "ProcessingTimeInMs"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,34 @@ private Callback<Void> securityProcessRequestCallback() {
*/
private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> {
if (RestUtils.isNamedBlobStitchRequest(restRequest)) {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
RetainingAsyncWritableChannel channel =
new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes);
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
idConverter.detectConflict(restRequest, detectConflictCallback(blobInfo));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this will result in checking for conflicts for regular named blob uploads as well, i.e. not just stitch requests? Wondering if it may increase latency of regular uploads due to additional call to mysql db. Would it be better to check for conflicts only for stitch requests?

}, uri, LOGGER, deleteDatasetCallback);
}

/**
* After ensuring that there is no conflict in the named blob stitch upload request, read the request body from the
* rest request and call {@link Router#stitchBlob} to stitch the chunks.
* @param blobInfo the {@link BlobInfo} to make the router call with.
* @return a {@link Callback} to be used with conflict detection.
*/
private Callback<Boolean> detectConflictCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putIdConflictDetectionMetrics, conflictDetected -> {
if (conflictDetected) {
throw new RestServiceException("Blob with the same name already exists", RestServiceErrorCode.Conflict);
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest);
frontendMetrics.addDatasetVersionRate.mark();
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
if (RestUtils.isNamedBlobStitchRequest(restRequest)) {
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes);
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo));
} else {
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, LOGGER, deleteDatasetCallback);
}
Expand All @@ -259,9 +269,11 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
*/
private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChannel channel, BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putReadStitchRequestMetrics,
bytesRead -> router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(),
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)), null,
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)),
bytesRead -> {
router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(),
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)), null,
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
},
uri, LOGGER, deleteDatasetCallback);
}

Expand Down