source("incl/start.R") options(future.debug = FALSE) message("*** cluster() ...") message("Library paths: ", paste(sQuote(.libPaths()), collapse = ", ")) message("Package path: ", sQuote(system.file(package = "future"))) types <- "PSOCK" ## Speed up CRAN checks: Skip on CRAN Windows 32-bit if (isWin32) types <- NULL if (supportsMulticore() && !on_solaris) types <- c(types, "FORK") ## WORKAROUND: covr::package_coverage() -> merge_coverage() -> ... produces ## "Error in readRDS(x) : error reading from connection" for type = "FORK". ## Is this related to mcparallel() comments in help("package_coverage")? ## /HB 2017-05-20 if (covr_testing) types <- setdiff(types, "FORK") ## WORKAROUND: FORK:ed processing gives really odd type="FORK" results on ## macOS when running on GitHub Actions. /HB 2020-06-07 ## This error is also appearing on CRANs' 'r-release-macos-arm64' and ## 'M1mac' checks. /HB 2021-08-11 if (on_macos) types <- setdiff(types, "FORK") pid <- Sys.getpid() message("Main PID (original): ", pid) cl <- NULL for (type in types) { message(sprintf("Test set #1 with cluster type %s ...", sQuote(type))) message("Main PID (original): ", pid) for (cores in 1:availCores) { message(sprintf("Testing with %d cores on type = %s ...", cores, sQuote(type))) options(mc.cores = cores) ## Set up a cluster with nodes (explicitly) cl <- parallel::makeCluster(cores, type = type, timeout = 60) print(cl) plan(cluster, workers = cl) ## Assert that the worker's global environment is "empty" f <- future(ls(envir=globalenv(), all.names=TRUE)) v <- value(f) v <- grep("^[.][.][.]future[.]", v, invert = TRUE, value = TRUE) if (length(v) > 0) { stop(sprintf("Stray variables in the global environment of %s: %s", class(f)[1], paste(sQuote(v), collapse = ", "))) } ## No global variables f <- try(cluster({ 42L }, workers = cl), silent = FALSE) print(f) stopifnot(inherits(f, "ClusterFuture")) print(resolved(f)) y <- value(f) print(y) stopifnot(y == 42L) ## No global variables f <- try(cluster({ 42L }, workers = cl), silent = FALSE) print(f) stopifnot(inherits(f, "ClusterFuture")) print(resolved(f)) y <- value(f) print(y) stopifnot(y == 42L) ## A global variable a <- 0 f <- try(future({ b <- 3 c <- 2 a * b * c })) print(f) ## A cluster future is evaluated in a separate ## R session process. Changing the value of a global ## variable should not affect the result of the ## future. a <- 7 ## Make sure globals are frozen v <- value(f) print(v) stopifnot(v == 0) if (!"covr" %in% loadedNamespaces()) { message("*** cluster() with globals and blocking") fs <- list() for (ii in 1:3) { message(sprintf(" - Creating cluster future #%d ...", ii)) fs[[ii]] <- future({ ii }, label = sprintf("future-%d", ii)) } message(sprintf(" - Resolving %d cluster futures", length(fs))) v <- sapply(fs, FUN = value) stopifnot(all(v == 1:3)) } message("*** cluster() and errors") f <- future({ stop("Whoops!") 1 }) print(f) v <- value(f, signal = FALSE) print(v) stopifnot(inherits(v, "simpleError")) res <- tryCatch(value(f), error = identity) print(res) stopifnot(inherits(res, "error")) ## Error is repeated res <- tryCatch(value(f), error = identity) print(res) stopifnot(inherits(res, "error")) ## Custom error class f <- future({ stop(structure(list(message = "boom"), class = c("MyError", "error", "condition"))) }) print(f) v <- value(f, signal = FALSE) print(v) stopifnot(inherits(v, "error"), inherits(v, "MyError")) ## Make sure error is signaled res <- tryCatch(value(f), error = identity) stopifnot(inherits(res, "error"), inherits(res, "MyError")) message("*** cluster() - too large globals ...") ooptsT <- options(future.globals.maxSize = object.size(1:1014)) limit <- getOption("future.globals.maxSize") cat(sprintf("Max total size of globals: %g bytes\n", limit)) ## A large object a <- 1:1014 yTruth <- sum(a) size <- object.size(a) cat(sprintf("a: %g bytes\n", size)) f <- future({ sum(a) }) print(f) rm(list = "a") v <- value(f) print(v) stopifnot(v == yTruth) ## A too large object a <- 1:1015 yTruth <- sum(a) size <- object.size(a) cat(sprintf("a: %g bytes\n", size)) res <- tryCatch(f <- future({ sum(a) }), error = identity) rm(list = "a") stopifnot(inherits(res, "error")) ## Undo options changed in this test options(ooptsT) message("*** cluster() - too large globals ... DONE") message("*** cluster() - installed libraries ...") f <- try(cluster({ list( libPaths = .libPaths() ) }, workers = cl), silent = FALSE) print(f) stopifnot(inherits(f, "ClusterFuture")) v <- value(f) message(paste(capture.output(str(v)), collapse = "\n")) message("*** cluster() - installed packages ... DONE") message("*** cluster() - assert covr workaround ...") f <- try(cluster({ future:::hpaste(1:100) }, workers = cl), silent = FALSE) print(f) stopifnot(inherits(f, "ClusterFuture")) v <- value(f) message(v) stopifnot(v == hpaste(1:100)) message("*** cluster() - assert covr workaround ... DONE") ## Assert that the worker's global environment is "empty" f <- future(ls(envir=globalenv(), all.names=TRUE)) v <- value(f) v <- grep("^[.][.][.]future[.]", v, invert = TRUE, value = TRUE) if (length(v) > 0) { stop(sprintf("Stray variables in the global environment of %s: %s", class(f)[1], paste(sQuote(v), collapse = ", "))) } ## Sanity checks pid2 <- Sys.getpid() message("Main PID (original): ", pid) message("Main PID: ", pid2) stopifnot(pid2 == pid) ## Cleanup parallel::stopCluster(cl) message(sprintf("Testing with %d cores on type = %s ... DONE", cores, sQuote(type))) } ## for (cores ...) message("*** cluster() - exceptions ...") res <- tryCatch(cluster(42L, workers = NA), error = identity) print(res) stopifnot(inherits(res, "error")) message("*** cluster() - exceptions ... DONE") message("*** cluster() - assert registry behavior ...") ## Explicitly created clusters are *not* added to the registry cl <- parallel::makeCluster(1L, type = type, timeout = 60) plan(cluster, workers = cl) clR <- ClusterRegistry("get") stopifnot(is.null(clR)) ## ... and therefore changing plans shouldn't change anything plan(sequential) clR <- ClusterRegistry("get") stopifnot(is.null(clR)) ## Cleanup print(cl) str(cl) parallel::stopCluster(cl) message("*** cluster() - assert registry behavior ... DONE") ## Sanity checks pid2 <- Sys.getpid() message("Main PID (original): ", pid) message("Main PID: ", pid2) stopifnot(pid2 == pid) ## Cleanup plan(sequential) cl1 <- parallel::makeCluster(1L, type = type, timeout = 60) plan(cluster, workers = cl1) f1 <- future(1) ## Cleanup print(cl1) str(cl1) parallel::stopCluster(cl1) message(sprintf("Test set #1 with cluster type %s ... DONE", sQuote(type))) } ## for (type ...) library("parallel") for (type in types) { if (on_solaris) next message(sprintf("Test set #2 with cluster type %s ...", sQuote(type))) message("*** cluster() - setDefaultCluster() ...") cl <- makeCluster(1L, type = type, timeout = 60) print(cl) setDefaultCluster(cl) ## FIXME: Make plan(cluster, workers = NULL) work such that ## setDefaultCluster() is actually tested. plan(cluster) pid <- Sys.getpid() message(pid) a %<-% Sys.getpid() message(a) setDefaultCluster(NULL) ## Cleanup print(cl) str(cl) parallel::stopCluster(cl) message("*** cluster() - setDefaultCluster() ... DONE") ## Sanity checks pid2 <- Sys.getpid() message("Main PID (original): ", pid) message("Main PID: ", pid2) stopifnot(pid2 == pid) ## Cleanup plan(sequential) message(sprintf("Test set #2 with cluster type %s ... DONE", sQuote(type))) } ## for (type ...) ## Sanity checks pid2 <- Sys.getpid() message("Main PID (original): ", pid) message("Main PID: ", pid2) stopifnot(pid2 == pid) message("*** cluster() - exception when re-creating workers ...") ## https://github.com/HenrikBengtsson/future/issues/261 plan(cluster, workers = "localhost") f <- future(1) plan(cluster, workers = "localhost", .skip = FALSE) res <- tryCatch({ value(f) }, FutureError = function(ex) { message(conditionMessage(ex)) ex }) stopifnot(inherits(res, "FutureError")) message("*** cluster() - exception when re-creating workers ... DONE") message("*** cluster() ... DONE") ## Sanity checks pid2 <- Sys.getpid() message("Main PID (original): ", pid) message("Main PID: ", pid2) stopifnot(pid2 == pid) source("incl/end.R")