R Under development (unstable) (2025-12-11 r89152 ucrt) -- "Unsuffered Consequences" Copyright (C) 2025 The R Foundation for Statistical Computing Platform: x86_64-w64-mingw32/x64 R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. > #! /usr/bin/env Rscript > ## This runs testme test script inst/testme/test-makeNodePSOCK.R > ## Don't edit - it was autogenerated by inst/testme/deploy.R > parallelly:::testme("makeNodePSOCK") Test 'makeNodePSOCK' ... > library(parallelly) > message("*** makeNodePSOCK() ...") *** makeNodePSOCK() ... > makeNodePSOCK <- parallelly:::makeNodePSOCK > message("- default arguments ...") - default arguments ... > options <- makeNodePSOCK(port = 12345, action = "options") [06:48:25.592] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92030972c0d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92030972c0d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92030972c0d.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92030972c0d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("- action = 'options' ...") - action = 'options' ... > options <- makeNodePSOCK(port = 12345, action = "options") [06:48:26.163] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9204d8d6716.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9204d8d6716.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9204d8d6716.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9204d8d6716.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("- specific worker and master ...") - specific worker and master ... > options <- makeNodePSOCK(worker = "remote.server.org", + master = "local.server.org", port = 12345, action = "options") [06:48:26.533] [local output] localMachine=FALSE && 'rshcmd' type is "ssh" => revtunnel=TRUE > print(options) $worker [1] "remote.server.org" attr(,"localhost") [1] FALSE $master [1] "local.server.org" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "'Rscript'" $homogeneous [1] FALSE $rscript_args [1] "--default-packages=datasets,utils,grDevices,graphics,stats,methods -e '#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN' -e 'options(socketOptions=\"no-delay\")' -e 'workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()'" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "sh" "sh" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd ssh "d:\\rtools45\\usr\\bin\\ssh.exe" attr(,"type") [1] "ssh" attr(,"version") [1] "OpenSSH_10.2p1, OpenSSL 3.6.0 1 Oct 2025" $user character(0) $revtunnel [1] TRUE $rshlogfile NULL $rshopts [1] "-R 12345:local.server.org:12345" $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "sequential" $local_cmd [1] "\"d:\\rtools45\\usr\\bin\\ssh.exe\" -R 12345:local.server.org:12345 remote.server.org \"'Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e '#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN' -e 'options(socketOptions=\\\"no-delay\\\")' -e 'workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()' MASTER=local.server.org PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=sequential\"" $pidfile NULL $rshcmd_label [1] "'d:\\rtools45\\usr\\bin\\ssh.exe' [type='ssh', version='OpenSSH_10.2p1, OpenSSL 3.6.0 1 Oct 2025']" $rsh_call [1] "\"d:\\rtools45\\usr\\bin\\ssh.exe\" -R 12345:local.server.org:12345 remote.server.org" $cmd [1] "'Rscript' --default-packages=datasets,utils,grDevices,graphics,stats,methods -e '#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN' -e 'options(socketOptions=\"no-delay\")' -e 'workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()' MASTER=local.server.org PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=sequential" $localMachine [1] FALSE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "remote.server.org" $arguments$master [1] "local.server.org" $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("- rscript_sh = 'cmd' ...") - rscript_sh = 'cmd' ... > options <- makeNodePSOCK(port = 12345, rscript_sh = "cmd", + action = "options") [06:48:26.555] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9203352b46.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9203352b46.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9203352b46.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.9203352b46.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "cmd" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > if (.Platform[["OS.type"]] != "windows") { + message("- rscript_sh = c('sh', 'cmd') ...") + options <- makeNodePSOCK(port = 12345, rscript_s .... [TRUNCATED] > message("- rscript_args ...") - rscript_args ... > options <- makeNodePSOCK(port = 12345, rscript_args = c("--vanilla"), + action = "options") [06:48:26.575] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--vanilla --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92077e21554.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --vanilla --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92077e21554.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92077e21554.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --vanilla --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92077e21554.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args [1] "--vanilla" $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("- rscript_envs ...") - rscript_envs ... > options <- makeNodePSOCK(port = 12345, rscript_envs = c(FOO = "bar"), + action = "options") [06:48:26.591] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.920111a67a4.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"Sys.setenv(\\\"FOO\\\"=\\\"bar\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs FOO "bar" $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.920111a67a4.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"Sys.setenv(\\\"FOO\\\"=\\\"bar\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.920111a67a4.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=datasets,utils,grDevices,graphics,stats,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.920111a67a4.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"Sys.setenv(\\\"FOO\\\"=\\\"bar\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs FOO "bar" $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "datasets" "utils" "grDevices" "graphics" "stats" "methods" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("- default_packages ...") - default_packages ... > options <- makeNodePSOCK(port = 12345, default_packages = c("stats", + "*"), action = "options") [06:48:26.612] [local output] localMachine=TRUE => revtunnel=FALSE > print(options) $worker [1] "localhost" attr(,"localhost") [1] TRUE $master [1] "localhost" $port [1] 12345 $connectTimeout [1] 120 $timeout [1] 120 $rscript [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\"" $homogeneous [1] TRUE $rscript_args [1] "--default-packages=stats,datasets,utils,grDevices,graphics,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92073e1174d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\"" $rscript_envs NULL $rscript_libs NULL $rscript_startup NULL $rscript_sh [1] "cmd" "cmd" $default_packages [1] "stats" "datasets" "utils" "grDevices" "graphics" "methods" $methods [1] TRUE $socketOptions [1] "no-delay" $useXDR [1] FALSE $outfile [1] "/dev/null" $renice [1] NA $rshcmd NULL $user character(0) $revtunnel [1] FALSE $rshlogfile NULL $rshopts character(0) $rank [1] 1 $manual [1] FALSE $dryrun [1] FALSE $quiet [1] FALSE $setup_strategy [1] "parallel" $local_cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=stats,datasets,utils,grDevices,graphics,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92073e1174d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $pidfile [1] "D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92073e1174d.pid" $rshcmd_label NULL $rsh_call NULL $cmd [1] "\"D:/RCompile/recent/R/bin/x64/Rscript\" --default-packages=stats,datasets,utils,grDevices,graphics,methods -e \"#label=test-makeNodePSOCK.R:2336:CRANWIN3:CRAN\" -e \"try(suppressWarnings(cat(Sys.getpid(),file=\\\"D:/temp/2025_12_12_06_35_17_6718/Rtmpw3OrCv/worker.rank=1.parallelly.parent=2336.92073e1174d.pid\\\")),silent=TRUE)\" -e \"options(socketOptions=\\\"no-delay\\\")\" -e \"workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()\" MASTER=localhost PORT=12345 OUT=/dev/null TIMEOUT=120 XDR=FALSE SETUPTIMEOUT=120 SETUPSTRATEGY=parallel" $localMachine [1] TRUE $make_fcn function (worker = getOption2("parallelly.localhost.hostname", "localhost"), master = NULL, port, connectTimeout = getOption2("parallelly.makeNodePSOCK.connectTimeout", 2 * 60), timeout = getOption2("parallelly.makeNodePSOCK.timeout", 30 * 24 * 60 * 60), rscript = NULL, homogeneous = NULL, rscript_args = NULL, rscript_envs = NULL, rscript_libs = NULL, rscript_startup = NULL, rscript_sh = c("auto", "cmd", "sh", "none"), default_packages = c("datasets", "utils", "grDevices", "graphics", "stats", if (methods) "methods"), methods = TRUE, socketOptions = getOption2("parallelly.makeNodePSOCK.socketOptions", "no-delay"), useXDR = getOption2("parallelly.makeNodePSOCK.useXDR", FALSE), outfile = "/dev/null", renice = NA_integer_, rshcmd = getOption2("parallelly.makeNodePSOCK.rshcmd", NULL), user = NULL, revtunnel = NA, rshlogfile = NULL, rshopts = getOption2("parallelly.makeNodePSOCK.rshopts", NULL), rank = 1L, manual = FALSE, dryrun = FALSE, quiet = FALSE, setup_strategy = getOption2("parallelly.makeNodePSOCK.setup_strategy", "parallel"), calls = getOption2("parallelly.makeNodePSOCK.calls", FALSE), action = c("launch", "options"), verbose = FALSE) { verbose <- as.logical(verbose) stop_if_not(length(verbose) == 1L, !is.na(verbose)) verbose_prefix <- "[local output] " if (inherits(worker, "makeNodePSOCKOptions")) { return(launchNodePSOCK(options = worker, verbose = verbose)) } if (missing(rscript_sh)) rscript_sh <- rscript_sh[1] rscript_sh <- match.arg(rscript_sh, several.ok = TRUE) args_org <- list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, calls = calls) localhostHostname <- getOption2("parallelly.localhost.hostname", "localhost") localMachine <- attr(worker, "localhost") if (is.logical(localMachine)) { stop_if_not(length(localMachine) == 1L, !is.na(localMachine)) } else { localMachine <- is.element(worker, c(localhostHostname, "localhost", "127.0.0.1")) if (!localMachine) { localMachine <- is_localhost(worker) if (localMachine) worker <- getOption2("parallelly.localhost.hostname", "localhost") } attr(worker, "localhost") <- localMachine } stop_if_not(is.character(rscript_sh), length(rscript_sh) >= 1L, length(rscript_sh) <= 2L, !anyNA(rscript_sh)) is_auto <- (rscript_sh == "auto") if (any(is_auto)) { if (localMachine) { rscript_sh[is_auto] <- if (.Platform[["OS.type"]] == "windows") "cmd" else "sh" } else { rscript_sh[is_auto] <- "sh" } } rscript_sh <- rep(rscript_sh, length.out = 2L) manual <- as.logical(manual) stop_if_not(length(manual) == 1L, !is.na(manual)) dryrun <- as.logical(dryrun) stop_if_not(length(dryrun) == 1L, !is.na(dryrun)) setup_strategy <- match.arg(setup_strategy, choices = c("sequential", "parallel")) quiet <- as.logical(quiet) stop_if_not(length(quiet) == 1L, !is.na(quiet)) if (identical(rshcmd, "")) rshcmd <- NULL if (!is.null(rshcmd)) { if (is.function(rshcmd)) { args <- formals(rshcmd) names <- names(args) stop_if_not(length(args) >= 2) if (!"rshopts" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `rshopts` argument: ", paste(sQuote(names), collapse = ", ")) } else if (!"worker" %in% names) { stop("Argument 'rshcmd' specifies a function that does not have an `worker` argument: ", paste(sQuote(names), collapse = ", ")) } type <- attr(rshcmd, "type") if (is.null(type)) { attr(rshcmd, "type") <- "function" } } else { rshcmd <- as.character(rshcmd) stop_if_not(length(rshcmd) >= 1L) } } if (identical(rshopts, "")) rshopts <- NULL rshopts <- as.character(rshopts) user <- as.character(user) stop_if_not(length(user) <= 1L) port <- as.integer(port) assertPort(port) if (!is.function(rshcmd) && !localMachine && (is.null(rshcmd) || all(grepl("^<[a-zA-Z-]+>$", rshcmd)))) { if (is.null(rshcmd)) { which <- NULL if (verbose) { mdebugf("%sWill search for all 'rshcmd' available\n", verbose_prefix) } } else if (all(grepl("^<[a-zA-Z-]+>$", rshcmd))) { if (verbose) { mdebugf("%sWill search for specified 'rshcmd' types: %s\n", verbose_prefix, paste(sQuote(rshcmd), collapse = ", ")) } which <- gsub("^<([a-zA-Z-]+)>$", "\\1", rshcmd) } rshcmd <- find_rshcmd(which = which, must_work = !localMachine && !manual && !dryrun) if (verbose) { s <- unlist(lapply(rshcmd, FUN = function(r) { sprintf("%s [type=%s, version=%s]", paste(sQuote(r), collapse = ", "), sQuote(attr(r, "type")), sQuote(attr(r, "version"))) })) s <- paste(sprintf("%s %d. %s", verbose_prefix, seq_along(s), s), collapse = "\n") mdebugf("%sFound the following available 'rshcmd':\n%s", verbose_prefix, s) } rshcmd <- rshcmd[[1]] stop_if_not(is.character(rshcmd), length(rshcmd) >= 1L) } else if (!is.function(rshcmd) && !is.null(rshcmd)) { basename <- tolower(basename(rshcmd[1])) if (basename %in% c("ssh", "plink")) { type <- "ssh" } else if (basename %in% c("rsh")) { type <- "rsh" } else { type <- "" } if (is.null(attr(rshcmd, "type"))) attr(rshcmd, "type") <- type if (is.null(attr(rshcmd, "version"))) attr(rshcmd, "version") <- "" } revtunnel <- as.logical(revtunnel) stop_if_not(length(revtunnel) == 1L) if (is.na(revtunnel)) { if (localMachine) { mdebugf("%slocalMachine=TRUE => revtunnel=FALSE\n", verbose_prefix) revtunnel <- FALSE } else if (identical(attr(rshcmd, "type"), "ssh")) { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=TRUE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- TRUE } else { mdebugf("%slocalMachine=FALSE && 'rshcmd' type is \"%s\" => revtunnel=FALSE\n", verbose_prefix, attr(rshcmd, "type")) revtunnel <- FALSE } } if (!is.null(rshlogfile)) { if (is.logical(rshlogfile)) { stop_if_not(!is.na(rshlogfile)) if (rshlogfile) { rshlogfile <- tempfile(pattern = "parallelly_makeClusterPSOCK_", fileext = ".log") } else { rshlogfile <- NULL } } else { rshlogfile <- as.character(rshlogfile) rshlogfile <- normalizePath(rshlogfile, mustWork = FALSE) } } if (is.null(master)) { if (localMachine || revtunnel) { master <- localhostHostname } else { master <- Sys.info()[["nodename"]] } } stop_if_not(!is.null(master)) timeout <- as.numeric(timeout) stop_if_not(length(timeout) == 1L, !is.na(timeout), is.finite(timeout), timeout >= 0) methods <- as.logical(methods) stop_if_not(length(methods) == 1L, !is.na(methods)) if (!is.null(default_packages)) { default_packages <- as.character(default_packages) stop_if_not(!anyNA(default_packages)) is_asterisk <- (default_packages == "*") if (any(is_asterisk)) { pkgs <- getOption("defaultPackages") if (length(pkgs) == 0) { default_packages[!is_asterisk] } else { pkgs <- paste(pkgs, collapse = ",") default_packages[is_asterisk] <- pkgs default_packages <- unlist(strsplit(default_packages, split = ",", fixed = TRUE)) } } default_packages <- unique(default_packages) pattern <- sprintf("^%s$", .standard_regexps()$valid_package_name) invalid <- grep(pattern, default_packages, invert = TRUE, value = TRUE) if (length(invalid) > 0) { stop(sprintf("Argument %s specifies invalid package names: %s", sQuote("default_packages"), paste(sQuote(invalid), collapse = ", "))) } } if (is.null(homogeneous)) { homogeneous <- { localMachine || (!revtunnel && is_localhost(master)) || (!is_ip_number(worker) && !is_fqdn(worker)) } } homogeneous <- as.logical(homogeneous) stop_if_not(length(homogeneous) == 1L, !is.na(homogeneous)) if (setup_strategy == "parallel") { if (getRversion() < "4.0.0" || manual || dryrun || !homogeneous || !localMachine) { setup_strategy <- "sequential" } } bin <- "Rscript" if (homogeneous) bin <- file.path(R.home("bin"), bin) if (is.null(rscript)) { rscript <- bin } else { if (!is.character(rscript)) rscript <- as.character(rscript) stop_if_not(length(rscript) >= 1L) rscript[rscript == "*"] <- bin bin <- rscript[1] if (homogeneous && !inherits(bin, "AsIs")) { bin <- Sys.which(bin) if (bin == "") bin <- normalizePath(rscript[1], mustWork = FALSE) rscript[1] <- bin } } name <- sub("[.]exe$", "", basename(bin)) is_Rscript <- (tolower(name) == "rscript") name <- bin <- NULL rscript_args <- as.character(rscript_args) if (length(rscript_startup) > 0L) { if (!is.list(rscript_startup)) rscript_startup <- list(rscript_startup) rscript_startup <- lapply(rscript_startup, FUN = function(init) { if (is.language(init)) { init <- deparse(init, width.cutoff = 500L) init <- paste(init, collapse = ";") } init <- as.character(init) if (length(init) == 0L) return(NULL) tryCatch({ parse(text = init) }, error = function(ex) { stopf("Syntax error in argument 'rscript_startup': %s", conditionMessage(ex)) }) init }) rscript_startup <- unlist(rscript_startup, use.names = FALSE) } if (!is.null(rscript_libs)) { rscript_libs <- as.character(rscript_libs) stop_if_not(!anyNA(rscript_libs)) } useXDR <- as.logical(useXDR) stop_if_not(length(useXDR) == 1L, !is.na(useXDR)) if (!is.null(socketOptions)) { stop_if_not(is.character(socketOptions), length(socketOptions) == 1L, !is.na(socketOptions), nzchar(socketOptions)) if (socketOptions == "NULL") socketOptions <- NULL } stop_if_not(is.null(outfile) || is.character(outfile)) renice <- as.integer(renice) stop_if_not(length(renice) == 1L) rank <- as.integer(rank) stop_if_not(length(rank) == 1L, !is.na(rank)) action <- match.arg(action, choices = c("launch", "options")) if (!inherits(rscript, "AsIs")) { idxs <- grep("^[[:alpha:]_][[:alnum:]_]*=.*", rscript, invert = TRUE) rscript[idxs] <- shQuote(rscript[idxs], type = rscript_sh[1]) } rscript_args_internal <- character(0L) if (localMachine && !dryrun) { res <- useWorkerPID(rscript, rank = rank, rscript_sh = rscript_sh[1], verbose = verbose) pidfile <- res$pidfile rscript_args_internal <- c(res$rscript_pid_args, rscript_args_internal) } else { pidfile <- NULL } rscript_label <- getOption2("parallelly.makeNodePSOCK.rscript_label", NULL) if (!is.null(rscript_label) && nzchar(rscript_label) && !isFALSE(as.logical(rscript_label))) { if (isTRUE(as.logical(rscript_label))) { script <- grep("[.]R$", commandArgs(), value = TRUE)[1] if (is.na(script)) script <- "UNKNOWN" rscript_label <- sprintf("%s:%s:%s:%s", script, Sys.getpid(), Sys.info()[["nodename"]], Sys.info()[["user"]]) } rscript_args_internal <- c("-e", shQuote(paste0("#label=", rscript_label), type = rscript_sh[1]), rscript_args_internal) } if (!is.null(default_packages)) { pkgs <- paste(unique(default_packages), collapse = ",") if (is_Rscript) { arg <- sprintf("--default-packages=%s", pkgs) rscript_args_internal <- c(arg, rscript_args_internal) } else { arg <- sprintf("R_DEFAULT_PACKAGES=%s", pkgs) on_MSWindows <- (rscript_sh[1] %in% c("cmd", "cmd2")) if (on_MSWindows) { rscript_args <- c(arg, rscript_args) } else { rscript <- c(arg, rscript) } } } if (!localMachine && revtunnel && getOption2("parallelly.makeNodePSOCK.port.increment", TRUE)) { rscript_port <- assertPort(port + rank - 1L) if (verbose) { mdebugf("%sRscript port: %d + %d = %d\n", verbose_prefix, port, rank - 1L, rscript_port) } } else { rscript_port <- port if (verbose) { mdebugf("%sRscript port: %d\n", verbose_prefix, rscript_port) } } if (length(socketOptions) == 1L) { code <- sprintf("options(socketOptions=\"%s\")", socketOptions) rscript_expr <- c("-e", shQuote(code, type = rscript_sh[1])) rscript_args_internal <- c(rscript_args_internal, rscript_expr) } if (length(rscript_startup) > 0L) { rscript_startup <- paste("invisible({", rscript_startup, "})", sep = "") rscript_startup <- shQuote(rscript_startup, type = rscript_sh[1]) rscript_startup <- lapply(rscript_startup, FUN = function(value) c("-e", value)) rscript_startup <- unlist(rscript_startup, use.names = FALSE) rscript_args_internal <- c(rscript_args_internal, rscript_startup) } if (length(rscript_envs) > 0L) { names <- names(rscript_envs) if (is.null(names)) { copy <- seq_along(rscript_envs) } else { copy <- which(nchar(names) == 0L) } if (length(copy) > 0L) { missing <- NULL for (idx in copy) { name <- rscript_envs[idx] if (!nzchar(name)) { stop("Argument 'rscript_envs' contains an empty non-named environment variable") } value <- Sys.getenv(name, NA_character_) if (!is.na(value)) { rscript_envs[idx] <- value names(rscript_envs)[idx] <- name } else { missing <- c(missing, name) } } if (length(missing) > 0L) { warnf("Did not pass down missing environment variables to cluster node: %s", paste(sQuote(missing), collapse = ", ")) } names <- names(rscript_envs) rscript_envs <- rscript_envs[nzchar(names)] names <- names(rscript_envs) } if (length(unset <- which(is.na(rscript_envs))) > 0L) { names <- names(rscript_envs[unset]) code <- sprintf("\"%s\"", names) code <- paste(code, collapse = ", ") code <- paste0("Sys.unsetenv(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s", sQuote(names)), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) rscript_envs <- rscript_envs[-unset] names <- names(rscript_envs) } if (length(names) > 0L) { code <- sprintf("\"%s\"=\"%s\"", names, rscript_envs) code <- paste(code, collapse = ", ") code <- paste0("Sys.setenv(", code, ")") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_envs' appears to contain invalid values: %s", paste(sprintf("%s=%s", sQuote(names), sQuote(rscript_envs)), collapse = ",")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } } if (length(rscript_libs) > 0L) { rscript_libs <- gsub("\\\\", "\\\\\\\\", rscript_libs, fixed = TRUE) code <- paste0("\"", rscript_libs, "\"") code[rscript_libs == "*"] <- ".libPaths()" code <- paste(code, collapse = ",") code <- paste0(".libPaths(c(", code, "))") tryCatch({ parse(text = code) }, error = function(ex) { stopf("Argument 'rscript_libs' appears to contain invalid values: %s", paste(sQuote(rscript_libs), collapse = ", ")) }) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(code, type = rscript_sh[1])) } if (calls) { calls <- sys.calls() calls <- calls[-length(calls)] calls <- lapply(calls, FUN = function(call) as.character(call)[1]) calls <- unlist(calls, use.names = FALSE) calls <- paste(calls, collapse = "->") calls <- gsub("[[:space:]]+", "", calls) calls <- sprintf("calls:%s", calls) calls <- shQuote(calls) calls <- sprintf("invisible(%s)", calls) rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(calls)) } if (!any(grepl("parallel:::[.](slave|work)RSOCK[(][)]", rscript_args))) { cmd <- "workRSOCK<-tryCatch(parallel:::.workRSOCK,error=function(e)parallel:::.slaveRSOCK);workRSOCK()" rscript_args_internal <- c(rscript_args_internal, "-e", shQuote(cmd, type = rscript_sh[1])) } rscript_args_org <- rscript_args idx <- which(rscript_args == "*") if (length(idx) == 0L) { rscript_args <- c(rscript_args, rscript_args_internal) } else if (length(idx) == 1L) { n <- length(rscript_args) if (idx == 1L) { rscript_args <- c(rscript_args_internal, rscript_args[-1]) } else if (idx == n) { rscript_args <- c(rscript_args[-n], rscript_args_internal) } else { rscript_args <- c(rscript_args[1:(idx - 1)], rscript_args_internal, rscript_args[(idx + 1):n]) } } else { stop(sprintf("Argument 'rscript_args' may contain at most one asterisk ('*'): %s", paste(sQuote(rscript_args), collapse = " "))) } rscript <- paste(rscript, collapse = " ") rscript_args <- paste(rscript_args, collapse = " ") envvars <- paste0("MASTER=", master, " PORT=", rscript_port, " OUT=", outfile, " TIMEOUT=", timeout, " XDR=", useXDR, " SETUPTIMEOUT=", connectTimeout, " SETUPSTRATEGY=", setup_strategy) cmd <- paste(rscript, rscript_args, envvars) if (!is.na(renice) && renice > 0L) { cmd <- sprintf("nice --adjustment=%d %s", renice, cmd) } if (!localMachine) { stop_if_not(is.function(rshcmd) || is.character(rshcmd), length(rshcmd) >= 1L) s <- sprintf("type=%s, version=%s", sQuote(attr(rshcmd, "type")), sQuote(attr(rshcmd, "version"))) if (is.function(rshcmd)) { rshcmd_label <- sprintf("%s [%s]", paste(sQuote("rshcmd-function"), collapse = ", "), s) } else { rshcmd_label <- sprintf("%s [%s]", paste(sQuote(rshcmd), collapse = ", "), s) } if (verbose) mdebugf("%sUsing 'rshcmd': %s", verbose_prefix, rshcmd_label) if (length(user) == 1L) rshopts <- c("-l", user, rshopts) if (revtunnel) { if (is_localhost(master) && .Platform[["OS.type"]] == "windows" && (isTRUE(attr(rshcmd, "OpenSSH_for_Windows")) || basename(rshcmd[1]) == "ssh")) { master <- "127.0.0.1" } rshopts <- c(sprintf("-R %d:%s:%d", rscript_port, master, port), rshopts) } if (is.character(rshlogfile)) { rshopts <- c(sprintf("-E %s", shQuote(rshlogfile)), rshopts) } rshopts <- paste(rshopts, collapse = " ") if (is.function(rshcmd)) { rsh_call <- rshcmd(rshopts = rshopts, worker = worker) } else { rsh_call <- paste(paste(shQuote(rshcmd), collapse = " "), rshopts, worker) } local_cmd <- paste(rsh_call, shQuote(cmd, type = rscript_sh[2])) } else { rshcmd_label <- NULL rsh_call <- NULL local_cmd <- cmd } stop_if_not(length(local_cmd) == 1L) options <- structure(list(worker = worker, master = master, port = port, connectTimeout = connectTimeout, timeout = timeout, rscript = rscript, homogeneous = homogeneous, rscript_args = rscript_args, rscript_envs = rscript_envs, rscript_libs = rscript_libs, rscript_startup = rscript_startup, rscript_sh = rscript_sh, default_packages = default_packages, methods = methods, socketOptions = socketOptions, useXDR = useXDR, outfile = outfile, renice = renice, rshcmd = rshcmd, user = user, revtunnel = revtunnel, rshlogfile = rshlogfile, rshopts = rshopts, rank = rank, manual = manual, dryrun = dryrun, quiet = quiet, setup_strategy = setup_strategy, local_cmd = local_cmd, pidfile = pidfile, rshcmd_label = rshcmd_label, rsh_call = rsh_call, cmd = cmd, localMachine = localMachine, make_fcn = makeNodePSOCK, arguments = args_org), class = c("makeNodePSOCKOptions", "makeNodeOptions")) if (action == "options") return(options) launchNodePSOCK(options, verbose = verbose) } $arguments $arguments$worker [1] "localhost" $arguments$master NULL $arguments$port [1] 12345 $arguments$connectTimeout [1] 120 $arguments$timeout [1] 120 $arguments$rscript NULL $arguments$homogeneous NULL $arguments$rscript_args NULL $arguments$rscript_envs NULL $arguments$rscript_libs NULL $arguments$rscript_startup NULL $arguments$rscript_sh [1] "auto" $arguments$default_packages [1] "stats" "*" $arguments$methods [1] TRUE $arguments$socketOptions [1] "no-delay" $arguments$useXDR [1] FALSE $arguments$outfile [1] "/dev/null" $arguments$renice [1] NA $arguments$rshcmd NULL $arguments$user NULL $arguments$revtunnel [1] NA $arguments$rshlogfile NULL $arguments$rshopts NULL $arguments$rank [1] 1 $arguments$manual [1] FALSE $arguments$dryrun [1] FALSE $arguments$quiet [1] FALSE $arguments$setup_strategy [1] "parallel" $arguments$calls [1] FALSE attr(,"class") [1] "makeNodePSOCKOptions" "makeNodeOptions" > stopifnot(inherits(options, "makeNodePSOCKOptions")) > message("*** makeNodePSOCK() ... DONE") *** makeNodePSOCK() ... DONE Failed to undo environment variables: - Expected environment variables: [n=215] '!ExitCode', 'ALLUSERSPROFILE', 'APPDATA', 'BIBINPUTS', 'BINDIR', 'BSTINPUTS', 'COMMONPROGRAMFILES', 'COMPUTERNAME', 'COMSPEC', 'CURL_CA_BUNDLE', 'CV_Instance001', 'CYGWIN', 'CommonProgramFiles(x86)', 'CommonProgramW6432', 'DriverData', 'HOME', 'HOMEDRIVE', 'HOMEPATH', 'JAGS_ROOT', 'JAVA_HOME', 'LANGUAGE', 'LC_COLLATE', 'LC_MONETARY', 'LC_TIME', 'LOCALAPPDATA', 'LOGONSERVER', 'LS_HOME', 'LS_LICENSE_PATH', 'MAKE', 'MAKEFLAGS', 'MAKELEVEL', 'MFLAGS', 'MSMPI_BENCHMARKS', 'MSMPI_BIN', 'MSYS2_ENV_CONV_EXCL', 'NUMBER_OF_PROCESSORS', 'OCL', 'OMP_THREAD_LIMIT', 'OS', 'PATH', 'PATHEXT', 'PROCESSOR_ARCHITECTURE', 'PROCESSOR_IDENTIFIER', 'PROCESSOR_LEVEL', 'PROCESSOR_REVISION', 'PROGRAMFILES', 'PROMPT', 'PSModulePath', 'PUBLIC', 'PWD', 'ProgramData', 'ProgramFiles(x86)', 'ProgramW6432', 'RTOOLS44_HOME', 'RTOOLS45_HOME', 'R_ARCH', 'R_BROWSER', 'R_BZIPCMD', 'R_CMD', 'R_COMPILED_BY', 'R_CRAN_WEB', 'R_CUSTOM_TOOLS_PATH', 'R_CUSTOM_TOOLS_SOFT', 'R_DOC_DIR', 'R_ENVIRON_USER', 'R_GSCMD', 'R_GZIPCMD', 'R_HOME', 'R_INCLUDE_DIR', 'R_INSTALL_TAR', 'R_LIBS', 'R_LIBS_SITE', 'R_LIBS_USER', 'R_MAX_NUM_DLLS', 'R_OSTYPE', 'R_PAPERSIZE', 'R_PAPERSIZE_USER', 'R_PARALLELLY_MAKENODEPSOCK_AUTOKILL', 'R_PARALLELLY_MAKENODEPSOCK_CONNECTTIMEOUT', 'R_PARALLELLY_MAKENODEPSOCK_RSCRIPT_LABEL', 'R_PARALLELLY_MAKENODEPSOCK_SESSIONINFO_PKGS', 'R_PARALLELLY_MAKENODEPSOCK_TIMEOUT', 'R_PARALLELLY_RANDOM_PORTS', 'R_PARALLEL_PORT', 'R_RD4PDF', 'R_RTOOLS45_PATH', 'R_SCRIPT_LEGACY', 'R_SHARE_DIR', 'R_TESTME_NAME', 'R_TESTME_PACKAGE', 'R_TESTME_PATH', 'R_TESTS', 'R_UNZIPCMD', 'R_USER', 'R_VERSION', 'R_ZIPCMD', 'SED', 'SHLVL', 'SYSTEMDRIVE', 'SYSTEMROOT', 'TAR', 'TAR_OPTIONS', 'TEMP', 'TERM', 'TETRAD_DIR', 'TEXINPUTS', 'TMP', 'TMPDIR', 'USERDOMAIN', 'USERDOMAIN_ROAMINGPROFILE', 'USERNAME', 'USERPROFILE', 'WINDIR', '_', '_R_CHECK_AUTOCONF_', '_R_CHECK_BOGUS_RETURN_', '_R_CHECK_BROWSER_NONINTERACTIVE_', '_R_CHECK_BUILD_VIGNETTES_SEPARATELY_', '_R_CHECK_CODETOOLS_PROFILE_', '_R_CHECK_CODE_ASSIGN_TO_GLOBALENV_', '_R_CHECK_CODE_ATTACH_', '_R_CHECK_CODE_CLASS_IS_STRING_', '_R_CHECK_CODE_DATA_INTO_GLOBALENV_', '_R_CHECK_CODE_USAGE_VIA_NAMESPACES_', '_R_CHECK_CODE_USAGE_WITHOUT_LOADING_', '_R_CHECK_CODE_USAGE_WITH_ONLY_BASE_ATTACHED_', '_R_CHECK_CODOC_VARIABLES_IN_USAGES_', '_R_CHECK_COMPACT_DATA2_', '_R_CHECK_COMPILATION_FLAGS_', '_R_CHECK_CONNECTIONS_LEFT_OPEN_', '_R_CHECK_CRAN_INCOMING_', '_R_CHECK_CRAN_INCOMING_ASPELL_RECHECK_MAYBE_', '_R_CHECK_CRAN_INCOMING_ASPELL_RECHECK_START_', '_R_CHECK_CRAN_INCOMING_CHECK_FILE_URIS_', '_R_CHECK_CRAN_INCOMING_CHECK_URLS_IN_PARALLEL_', '_R_CHECK_CRAN_INCOMING_NOTE_GNU_MAKE_', '_R_CHECK_CRAN_INCOMING_REMOTE_', '_R_CHECK_CRAN_INCOMING_USE_ASPELL_', '_R_CHECK_DATALIST_', '_R_CHECK_DEPRECATED_DEFUNCT_', '_R_CHECK_DOC_SIZES2_', '_R_CHECK_DOT_FIRSTLIB_', '_R_CHECK_DOT_INTERNAL_', '_R_CHECK_EXAMPLE_TIMING_THRESHOLD_', '_R_CHECK_EXECUTABLES_', '_R_CHECK_EXECUTABLES_EXCLUSIONS_', '_R_CHECK_FF_CALLS_', '_R_CHECK_FF_DUP_', '_R_CHECK_FORCE_SUGGESTS_', '_R_CHECK_FUTURE_FILE_TIMESTAMPS_', '_R_CHECK_FUTURE_FILE_TIMESTAMPS_LEEWAY_', '_R_CHECK_HAVE_MYSQL_', '_R_CHECK_HAVE_ODBC_', '_R_CHECK_HAVE_PERL_', '_R_CHECK_HAVE_POSTGRES_', '_R_CHECK_INSTALL_DEPENDS_', '_R_CHECK_INTERNALS2_', '_R_CHECK_LENGTH_1_CONDITION_', '_R_CHECK_LICENSE_', '_R_CHECK_LIMIT_CORES_', '_R_CHECK_LOG_USE_INFO_', '_R_CHECK_MATRIX_DATA_', '_R_CHECK_MBCS_CONVERSION_FAILURE_', '_R_CHECK_NATIVE_ROUTINE_REGISTRATION_', '_R_CHECK_NEWS_IN_PLAIN_TEXT_', '_R_CHECK_NO_RECOMMENDED_', '_R_CHECK_NO_STOP_ON_TEST_ERROR_', '_R_CHECK_ORPHANED_', '_R_CHECK_OVERWRITE_REGISTERED_S3_METHODS_', '_R_CHECK_PACKAGES_USED_IGNORE_UNUSED_IMPORTS_', '_R_CHECK_PACKAGES_USED_IN_TESTS_USE_SUBDIRS_', '_R_CHECK_PACKAGE_DATASETS_SUPPRESS_NOTES_', '_R_CHECK_PACKAGE_NAME_', '_R_CHECK_PKG_SIZES_', '_R_CHECK_PKG_SIZES_THRESHOLD_', '_R_CHECK_PRAGMAS_', '_R_CHECK_RD_EXAMPLES_T_AND_F_', '_R_CHECK_RD_LINE_WIDTHS_', '_R_CHECK_RD_MATH_RENDERING_', '_R_CHECK_RD_NOTE_LOST_BRACES_', '_R_CHECK_RD_VALIDATE_RD2HTML_', '_R_CHECK_REPLACING_IMPORTS_', '_R_CHECK_R_DEPENDS_', '_R_CHECK_S3_METHODS_SHOW_POSSIBLE_ISSUES_', '_R_CHECK_SCREEN_DEVICE_', '_R_CHECK_SERIALIZATION_', '_R_CHECK_SHLIB_OPENMP_FLAGS_', '_R_CHECK_SRC_MINUS_W_IMPLICIT_', '_R_CHECK_SUBDIRS_NOCASE_', '_R_CHECK_SUGGESTS_ONLY_', '_R_CHECK_SYSTEM_CLOCK_', '_R_CHECK_TESTS_NLINES_', '_R_CHECK_TEST_TIMING_', '_R_CHECK_TIMINGS_', '_R_CHECK_TOPLEVEL_FILES_', '_R_CHECK_UNDOC_USE_ALL_NAMES_', '_R_CHECK_UNSAFE_CALLS_', '_R_CHECK_URLS_SHOW_301_STATUS_', '_R_CHECK_VC_DIRS_', '_R_CHECK_VIGNETTES_NLINES_', '_R_CHECK_VIGNETTES_SKIP_RUN_MAYBE_', '_R_CHECK_VIGNETTE_TIMING_', '_R_CHECK_VIGNETTE_TITLES_', '_R_CHECK_WINDOWS_DEVICE_', '_R_CHECK_XREFS_NOTE_MISSING_PACKAGE_ANCHORS_', '_R_CHECK_XREFS_USE_ALIASES_FROM_CRAN_', '_R_CLASS_MATRIX_ARRAY_', '_R_DEPRECATED_IS_R_', '_R_S3_METHOD_LOOKUP_BASEENV_AFTER_GLOBALENV_', '_R_SHLIB_BUILD_OBJECTS_SYMBOL_TABLES_', '_R_USE_STRICT_R_HEADERS_', '__R_CHECK_DOC_FILES_NOTE_IF_ALL_INTERNAL__', 'maj.version', 'nextArg--timingsnextArg--install', 'tempdirname' - Environment variables still there: [n=0] - Environment variables missing: [n=1] 'MAKEFLAGS' Differences environment variable by environment variable: Test time: user.self=0.2s, sys.self=0.05s, elapsed=1s, user.child=NAs, sys.child=NAs Test 'tempdirname' ... success > > proc.time() user system elapsed 0.56 0.12 1.56