Skip to content

Commit

Permalink
Make stopCluster() of a SOCK cluster wait a little bit for the worker…
Browse files Browse the repository at this point in the history
…s to

shut down (PR#18133).


git-svn-id: https://svn.r-project.org/R/trunk@86529 00db46b3-68df-0310-9c12-caf00c1e9a41
  • Loading branch information
kalibera committed May 10, 2024
1 parent 4ef6ec3 commit 05465fc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/library/parallel/NAMESPACE
Expand Up @@ -29,6 +29,7 @@ S3method(recvOneData, SOCKcluster)
S3method(sendData, SOCKnode)
S3method(sendData, SOCK0node)
S3method(stopCluster, default)
S3method(stopCluster, SOCKcluster)

## To support snow clusters
#S3method(closeNode, NWSnode)
Expand Down
28 changes: 26 additions & 2 deletions src/library/parallel/R/snowSOCK.R
@@ -1,7 +1,7 @@
# File src/library/parallel/R/snowSOCK.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2023 The R Core Team
# Copyright (C) 1995-2024 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -125,7 +125,8 @@ newPSOCKnode <- function(machine = "localhost", ...,
class = if(useXDR) "SOCKnode" else "SOCK0node")
}

closeNode.SOCKnode <- closeNode.SOCK0node <- function(node) close(node$con)
## Let the OS close the connection (see stopCluster).
closeNode.SOCKnode <- closeNode.SOCK0node <- function(node) {}

sendData.SOCKnode <- function(node, data) serialize(data, node$con)
sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE)
Expand Down Expand Up @@ -269,6 +270,29 @@ print.SOCKnode <- print.SOCK0node <- function(x, ...)
invisible(x)
}

stopCluster.SOCKcluster <- function(cl = NULL)
{
for (n in cl) postNode(n, "DONE")
cons <- lapply(cl, function(x) x$con)

## Wait (with a timeout) for the worker connection to be closed by the
## OS, so that the cleanup of the worker's R session has a chance to run
## before stopCluster() finishes (PR#18133).

t0 <- Sys.time()
cleanup_timeout <- 5
while(length(cons) > 0) {
done <- socketSelect(cons, write = FALSE, timeout = cleanup_timeout)
for(con in cons[done]) close(con)
cons <- cons[!done]
if (difftime(Sys.time(), t0, units="secs") > cleanup_timeout)
break
}

## Close the remaining worker connections unconditionally.
for(con in cons) close(con)
}

.workRSOCK <- function()
{
makeSOCKmaster <- function(master, port, setup_timeout, timeout, useXDR,
Expand Down

0 comments on commit 05465fc

Please sign in to comment.