-
Notifications
You must be signed in to change notification settings - Fork 274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect conflict during named blob put operation before upload or stitch operation. #2704
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
*/ | ||
package com.github.ambry.frontend; | ||
|
||
import com.codahale.metrics.Histogram; | ||
import com.codahale.metrics.MetricRegistry; | ||
import com.github.ambry.commons.Callback; | ||
import com.github.ambry.commons.CallbackUtils; | ||
|
@@ -66,7 +67,7 @@ public IdConverter getIdConverter() { | |
return new AmbryIdConverter(idSigningService, namedBlobDb, frontendMetrics); | ||
} | ||
|
||
private static class AmbryIdConverter implements IdConverter { | ||
static class AmbryIdConverter implements IdConverter { | ||
private boolean isOpen = true; | ||
private final IdSigningService idSigningService; | ||
private final NamedBlobDb namedBlobDb; | ||
|
@@ -116,37 +117,97 @@ public Future<String> convert(RestRequest restRequest, String input, BlobInfo bl | |
convertedId = "/" + signIdIfRequired(restRequest, input); | ||
} else { | ||
CallbackUtils.callCallbackAfter(convertId(input, restRequest, blobInfo), | ||
(id, e) -> completeConversion(id, e, future, callback)); | ||
(id, e) -> completeConverterOperation(id, e, future, callback, | ||
frontendMetrics.idConversionDownstreamCallbackTimeInMs)); | ||
} | ||
} catch (Exception e) { | ||
exception = e; | ||
} finally { | ||
frontendMetrics.idConverterProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs); | ||
if (convertedId != null || exception != null) { | ||
completeConversion(convertedId, exception, future, callback); | ||
completeConverterOperation(convertedId, exception, future, callback, frontendMetrics.idConversionDownstreamCallbackTimeInMs); | ||
} | ||
} | ||
return future; | ||
} | ||
|
||
@Override | ||
public Future<Boolean> detectConflict(RestRequest restRequest, Callback<Boolean> callback) { | ||
final CompletableFuture<Boolean> future = new CompletableFuture<>(); | ||
Exception exception = null; | ||
frontendMetrics.idConverterDetectConflictRate.mark(); | ||
long startTimeInMs = System.currentTimeMillis(); | ||
try { | ||
if (!isOpen) { | ||
exception = new RestServiceException("IdConverter is closed", RestServiceErrorCode.ServiceUnavailable); | ||
} else { | ||
CallbackUtils.callCallbackAfter(detectConflict(restRequest), | ||
(conflictStatus, e) -> completeConverterOperation(conflictStatus, e, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs)); | ||
} | ||
} catch (Exception e) { | ||
exception = e; | ||
} finally { | ||
frontendMetrics.deleteCallbackProcessingTimeInMs.update(System.currentTimeMillis() - startTimeInMs); | ||
if (exception != null) { | ||
completeConverterOperation(false, exception, future, callback, frontendMetrics.idConversionDetectConflictDownstreamCallbackTimeInMs); | ||
} | ||
} | ||
return future; | ||
} | ||
|
||
/** | ||
* Detects if the ID in the request can cause a conflict. | ||
* @param restRequest @link RestRequest} representing the request. | ||
* @return the {@link CompletionStage} that will be completed with the conflict status. | ||
* @throws RestServiceException if the operation cannot be performed. | ||
*/ | ||
private CompletionStage<Boolean> detectConflict(RestRequest restRequest) throws RestServiceException { | ||
final CompletionStage<Boolean> detectionFuture; | ||
frontendMetrics.idConverterDetectConflictRate.mark(); | ||
if (restRequest.getRestMethod() == RestMethod.PUT && RestUtils.getRequestPath(restRequest) | ||
.matchesOperation(Operations.NAMED_BLOB) && !RestUtils.isUpsertForNamedBlob(restRequest.getArgs())) { | ||
// A named blob put without upsert is allowed only if the named blob does not exist. | ||
// If this is a named blob put request without upsert, then we check if a mapping for the name already exists. | ||
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs()); | ||
|
||
detectionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(), | ||
namedBlobPath.getBlobName(), GetOption.Include_All).thenApply(namedBlobRecord -> { | ||
LOGGER.info("A mapping for the namedBlob {} already exists. Original BlobId: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about if the named blob is deleted? |
||
namedBlobRecord.getBlobName(), namedBlobRecord.getBlobId()); | ||
return true; | ||
}).exceptionally(e -> { | ||
if (e.getCause() instanceof RestServiceException | ||
&& ((RestServiceException) e.getCause()).getErrorCode() == RestServiceErrorCode.NotFound) { | ||
return false; | ||
} else { | ||
throw new RuntimeException(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. qq.. would it be better if we complete the |
||
} | ||
}); | ||
} else { | ||
detectionFuture = CompletableFuture.completedFuture(false); | ||
} | ||
return detectionFuture; | ||
} | ||
|
||
/** | ||
* Completes the conversion by setting the future and invoking the callback. | ||
* @param conversionResult the conversion result. | ||
* Completes the operation of this converter by setting the future and invoking the callback. | ||
* @param result the conversion result. | ||
* @param exception any exception that occurred as a part of the conversion. | ||
* @param completableFuture the {@link CompletableFuture} that must be set. | ||
* @param callback the {@link Callback} that needs to be invoked. Can be null. | ||
* @param downstreamCallbackTimeInMs {@link Histogram} to update the time taken for downstream callback. | ||
*/ | ||
private <T> void completeConversion(T conversionResult, Exception exception, CompletableFuture<T> completableFuture, | ||
Callback<T> callback) { | ||
private <T> void completeConverterOperation(T result, Exception exception, CompletableFuture<T> completableFuture, | ||
Callback<T> callback, Histogram downstreamCallbackTimeInMs) { | ||
if (exception == null) { | ||
completableFuture.complete(conversionResult); | ||
completableFuture.complete(result); | ||
} else { | ||
completableFuture.completeExceptionally(exception); | ||
} | ||
if (callback != null) { | ||
long startTime = System.currentTimeMillis(); | ||
callback.onCompletion(conversionResult, exception); | ||
frontendMetrics.idConversionDownstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime); | ||
callback.onCompletion(result, exception); | ||
downstreamCallbackTimeInMs.update(System.currentTimeMillis() - startTime); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -215,24 +215,34 @@ private Callback<Void> securityProcessRequestCallback() { | |
*/ | ||
private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) { | ||
return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> { | ||
if (RestUtils.isNamedBlobStitchRequest(restRequest)) { | ||
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { | ||
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest); | ||
frontendMetrics.addDatasetVersionRate.mark(); | ||
addDatasetVersion(blobInfo.getBlobProperties(), restRequest); | ||
} | ||
RetainingAsyncWritableChannel channel = | ||
new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes); | ||
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo)); | ||
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { | ||
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest); | ||
frontendMetrics.addDatasetVersionRate.mark(); | ||
addDatasetVersion(blobInfo.getBlobProperties(), restRequest); | ||
} | ||
idConverter.detectConflict(restRequest, detectConflictCallback(blobInfo)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, this will result in checking for conflicts for regular named blob uploads as well, i.e. not just |
||
}, uri, LOGGER, deleteDatasetCallback); | ||
} | ||
|
||
/** | ||
* After ensuring that there is no conflict in the named blob stitch upload request, read the request body from the | ||
* rest request and call {@link Router#stitchBlob} to stitch the chunks. | ||
* @param blobInfo the {@link BlobInfo} to make the router call with. | ||
* @return a {@link Callback} to be used with conflict detection. | ||
*/ | ||
private Callback<Boolean> detectConflictCallback(BlobInfo blobInfo) { | ||
return buildCallback(frontendMetrics.putIdConflictDetectionMetrics, conflictDetected -> { | ||
if (conflictDetected) { | ||
throw new RestServiceException("Blob with the same name already exists", RestServiceErrorCode.Conflict); | ||
} else { | ||
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) { | ||
accountAndContainerInjector.injectDatasetForNamedBlob(restRequest); | ||
frontendMetrics.addDatasetVersionRate.mark(); | ||
addDatasetVersion(blobInfo.getBlobProperties(), restRequest); | ||
if (RestUtils.isNamedBlobStitchRequest(restRequest)) { | ||
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel(frontendConfig.maxJsonRequestSizeBytes); | ||
restRequest.readInto(channel, fetchStitchRequestBodyCallback(channel, blobInfo)); | ||
} else { | ||
PutBlobOptions options = getPutBlobOptionsFromRequest(); | ||
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options, | ||
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); | ||
} | ||
PutBlobOptions options = getPutBlobOptionsFromRequest(); | ||
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options, | ||
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); | ||
} | ||
}, uri, LOGGER, deleteDatasetCallback); | ||
} | ||
|
@@ -259,9 +269,11 @@ private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) { | |
*/ | ||
private Callback<Long> fetchStitchRequestBodyCallback(RetainingAsyncWritableChannel channel, BlobInfo blobInfo) { | ||
return buildCallback(frontendMetrics.putReadStitchRequestMetrics, | ||
bytesRead -> router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), | ||
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)), null, | ||
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)), | ||
bytesRead -> { | ||
router.stitchBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), | ||
getChunksToStitch(blobInfo.getBlobProperties(), readJsonFromChannel(channel)), null, | ||
routerStitchBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true)); | ||
}, | ||
uri, LOGGER, deleteDatasetCallback); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we verify the Dataset behavior?
Does it allow update with the same version number if upsert is not set?