Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fix: Upgrading to R Batch SDK to 2018-12-01.8.0 (#354)
Browse files Browse the repository at this point in the history
* Added resource files

* Added resource files

* Removed comments

* Fixed resource files documentation

* Added check on job state

* Fixed jobState
  • Loading branch information
brnleehng committed Jun 19, 2019
1 parent 93f3fbc commit 96bfc22
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 51 deletions.
27 changes: 13 additions & 14 deletions R/batch-api.R
Expand Up @@ -19,7 +19,7 @@ BatchUtilities <- R6::R6Class(

accountName <- storageClient$authentication$name

resourceFiles <- NULL
resourceFiles <- args$resourceFiles
if (!is.null(argsList)) {
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
Expand All @@ -37,8 +37,18 @@ BatchUtilities <- R6::R6Class(
envFile,
readToken,
config$endpointSuffix)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))

environmentResourceFile <-
rAzureBatch::createResourceFile(filePath = envFile, httpUrl = envFileUrl)

if (is.null(resourceFiles))
{
resourceFiles <-
list(environmentResourceFile)
}
else {
resourceFiles <- append(resourceFiles, environmentResourceFile)
}
}

# Only use the download command if cloudCombine is enabled
Expand All @@ -52,17 +62,6 @@ BatchUtilities <- R6::R6Class(

if (!is.null(cloudCombine)) {
assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
containerSettings$imageName <- "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1"

copyCommand <- sprintf(
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include results/*.rds --endpoint %s",
accountName,
jobId,
"$AZ_BATCH_TASK_WORKING_DIR",
config$endpointSuffix
)

commands <- c(paste("blobxfer", copyCommand))
}

exitConditions <- NULL
Expand Down
2 changes: 1 addition & 1 deletion R/cluster.R
Expand Up @@ -123,7 +123,7 @@ makeCluster <-

# install docker
containerConfiguration <- list(
type = "docker"
type = "dockerCompatible"
)

dockerImage <- "rocker/tidyverse:latest"
Expand Down
32 changes: 24 additions & 8 deletions R/doAzureParallel.R
Expand Up @@ -474,12 +474,12 @@ setHttpTraffic <- function(value = FALSE) {
storageEndpointSuffix = config$endpointSuffix)

requiredJobResourceFiles <- list(
rAzureBatch::createResourceFile(url = workerScriptUrl, fileName = "worker.R"),
rAzureBatch::createResourceFile(url = mergerScriptUrl, fileName = "merger.R"),
rAzureBatch::createResourceFile(url = installGithubScriptUrl, fileName = "install_github.R"),
rAzureBatch::createResourceFile(url = installCranScriptUrl, fileName = "install_cran.R"),
rAzureBatch::createResourceFile(url = installBioConductorScriptUrl, fileName = "install_bioconductor.R"),
rAzureBatch::createResourceFile(url = jobCommonFileUrl, fileName = jobFileName)
rAzureBatch::createResourceFile(filePath = "worker.R", httpUrl = workerScriptUrl),
rAzureBatch::createResourceFile(filePath = "merger.R", httpUrl = mergerScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_github.R", httpUrl = installGithubScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_cran.R", httpUrl = installCranScriptUrl),
rAzureBatch::createResourceFile(filePath = "install_bioconductor.R", httpUrl = installBioConductorScriptUrl),
rAzureBatch::createResourceFile(filePath = jobFileName, httpUrl = jobCommonFileUrl)
)

resourceFiles <-
Expand Down Expand Up @@ -669,6 +669,21 @@ setHttpTraffic <- function(value = FALSE) {
)
)

mergeReadSasToken <- storageClient$generateSasToken("rl", "c", id)
mergeResourceFileUrl <-
rAzureBatch::createBlobUrl(
storageAccount = storageClient$authentication$name,
containerName = id,
sasToken = mergeReadSasToken,
storageEndpointSuffix = config$endpointSuffix
)

mergeResources <-
list(
rAzureBatch::createResourceFile(
storageContainerUrl = mergeResourceFileUrl,
blobPrefix = "results"))

BatchUtilitiesOperations$addTask(
jobId = id,
taskId = "merge",
Expand All @@ -684,7 +699,8 @@ setHttpTraffic <- function(value = FALSE) {
dependsOn = taskDependencies,
cloudCombine = cloudCombine,
outputFiles = append(obj$options$azure$outputFiles, mergeOutput),
containerImage = data$containerImage
containerImage = data$containerImage,
resourceFiles = mergeResources
)

cat(". . .")
Expand Down Expand Up @@ -726,7 +742,7 @@ setHttpTraffic <- function(value = FALSE) {
}

if (!identical(function(a, ...) c(a, list(...)),
obj$combineInfo$fun, ignore.environment = TRUE)){
obj$combineInfo$fun, ignore.environment = TRUE)) {
tryCatch({
accumulator <- foreach::makeAccum(it)
accumulator(results, as.numeric(names(results)))
Expand Down
23 changes: 7 additions & 16 deletions R/utility-job.R
Expand Up @@ -472,19 +472,14 @@ waitForTasksToComplete <-

flush.console()

validationFlag <-
(taskCounts$validationStatus == "Validated" &&
totalTasks <= 200000) ||
totalTasks > 200000

if (taskCounts$failed > 0 &&
errorHandling == "stop" &&
validationFlag) {
errorHandling == "stop") {
cat("\n")

select <- "id, executionInfo"
filter <- "executionInfo/result eq 'failure'"
failedTasks <-
batchClient$taskOperations$list(jobId, select = select)
batchClient$taskOperations$list(jobId, select = select, filter = filter)

tasksFailureWarningLabel <-
sprintf(
Expand All @@ -498,14 +493,9 @@ waitForTasksToComplete <-
)

for (i in 1:length(failedTasks$value)) {
if (!is.null(failedTasks$value[[i]]$executionInfo$result) &&
grepl(failedTasks$value[[i]]$executionInfo$result,
"failure",
ignore.case = TRUE)) {
tasksFailureWarningLabel <-
paste0(tasksFailureWarningLabel,
sprintf("%s\n", failedTasks$value[[i]]$id))
}
}

warning(sprintf(tasksFailureWarningLabel,
Expand Down Expand Up @@ -533,9 +523,10 @@ waitForTasksToComplete <-
jobId)
}

if (taskCounts$completed >= totalTasks &&
(taskCounts$validationStatus == "Validated" ||
totalTasks >= 200000)) {
jobInfo <- getJob(jobId, verbose = FALSE)
if (taskCounts$completed >= totalTasks ||
jobInfo$jobState == "completed" ||
jobInfo$jobState == "terminating") {
cat("\n")
break
}
Expand Down
8 changes: 4 additions & 4 deletions docs/71-distributing-data.md
Expand Up @@ -39,12 +39,12 @@ Here's an example that uses data stored in a public location on Azure Blob Stora
# define where to download data from
resource_files = list(
rAzureBatch::createResourceFile(
url = "https://<accountname>.blob.core.windows.net/<container>/2010.csv",
fileName = "2010.csv"
httpUrl = "https://<accountname>.blob.core.windows.net/<container>/2010.csv",
filePath = "2010.csv"
),
rAzureBatch::createResourceFile(
url = "https://<accountname>.blob.core.windows.net/<container>/2011.csv",
fileName = "2011.csv"
httpUrl = "https://<accountname>.blob.core.windows.net/<container>/2011.csv",
filePath = "2011.csv"
)
)

Expand Down
12 changes: 6 additions & 6 deletions samples/resource_files/resource_files_example.R
Expand Up @@ -34,12 +34,12 @@ doAzureParallel::setCredentials("credentials.json")
# Using the NYC taxi datasets, http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml
azureStorageUrl <- "http://playdatastore.blob.core.windows.net/nyc-taxi-dataset"
resource_files <- list(
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), fileName = "yellow_tripdata_2016-1.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), fileName = "yellow_tripdata_2016-2.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), fileName = "yellow_tripdata_2016-3.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), fileName = "yellow_tripdata_2016-4.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), fileName = "yellow_tripdata_2016-5.csv"),
rAzureBatch::createResourceFile(url = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), fileName = "yellow_tripdata_2016-6.csv")
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-1.csv"), filePath = "yellow_tripdata_2016-1.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-2.csv"), filePath = "yellow_tripdata_2016-2.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-3.csv"), filePath = "yellow_tripdata_2016-3.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-4.csv"), filePath = "yellow_tripdata_2016-4.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-5.csv"), filePath = "yellow_tripdata_2016-5.csv"),
rAzureBatch::createResourceFile(httpUrl = paste0(azureStorageUrl, "/yellow_tripdata_2016-6.csv"), filePath = "yellow_tripdata_2016-6.csv")
)

# add the parameter 'resourceFiles' to download files to nodes
Expand Down
4 changes: 2 additions & 2 deletions samples/sas_resource_files/sas_resources_files_example.R
Expand Up @@ -56,8 +56,8 @@ csvFileUrl2 <- rAzureBatch::createBlobUrl(storageAccount = storageAccountName,
# Create a list of files to download to the cluster using read-only permissions
# Place the files in a directory called 'data'
resource_files = list(
rAzureBatch::createResourceFile(url = csvFileUrl1, fileName = "data/1989.csv"),
rAzureBatch::createResourceFile(url = csvFileUrl2, fileName = "data/1990.csv")
rAzureBatch::createResourceFile(httpUrl = csvFileUrl1, filePath = "data/1989.csv"),
rAzureBatch::createResourceFile(httpUrl = csvFileUrl2, filePath = "data/1990.csv")
)

# Create the cluster
Expand Down

0 comments on commit 96bfc22

Please sign in to comment.