R Under development (unstable) (2024-05-02 r86512 ucrt) -- "Unsuffered Consequences" Copyright (C) 2024 The R Foundation for Statistical Computing Platform: x86_64-w64-mingw32/x64 R is free software and comes with ABSOLUTELY NO WARRANTY. You are welcome to redistribute it under certain conditions. Type 'license()' or 'licence()' for distribution details. R is a collaborative project with many contributors. Type 'contributors()' for more information and 'citation()' on how to cite R or R packages in publications. Type 'demo()' for some demos, 'help()' for on-line help, or 'help.start()' for an HTML browser interface to help. Type 'q()' to quit R. > library(mirai) > library(parallel) > > nanotest <- function(x) invisible(x || stop("is not TRUE when expected to be TRUE")) > nanotestn <- function(x) invisible(is.null(x) || stop("is not NULL when expected to be NULL")) > nanotestz <- function(x) invisible(x == 0L || stop("does not equal 0L as expected")) > nanotesto <- function(x) invisible(x == 1L || stop("does not equal 1L as expected")) > nanotesti <- function(a, b) invisible(identical(a, b) || stop("the arguments are not identical as expected")) > nanotestp <- function(x) invisible(is.character(capture.output(print(x))) || stop("print output of expression cannot be captured as a character value")) > nanotesterr <- function(x, e = "") invisible(grepl(e, tryCatch(x, error = identity)[["message"]], fixed = TRUE) || stop("expected error message '", e, "' not generated")) > connection <- !is_error_value(call_mirai(mirai(TRUE, .timeout = 2000L))[["data"]]) > > # core tests > nanotest(is.list(status())) > nanotestz(status()[["connections"]]) > nanotestz(status()[["daemons"]]) > nanotestz(daemons(0L)) > nanotesterr(daemons(url = "URL"), "Invalid argument") > nanotesterr(daemons(-1), "zero or greater") > nanotesterr(daemons(n = 0, url = "ws://localhost:0"), "1 or greater") > nanotesterr(daemons(raw(0L)), "must be numeric") > nanotesterr(daemons(n = 1, maxtasks = "100"), "'...' arguments") > nanotesterr(dispatcher(client = "URL"), "at least one") > nanotesterr(daemon("URL"), "Invalid argument") > nanotest(is.character(mlc <- launch_remote("ws://[::1]:5555"))) > nanotest(inherits(mlc, "miraiLaunchCmd")) > nanotestp(mlc) > nanotesterr(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = "invalid")), "must be an element") > nanotesterr(launch_remote(c("tcp://localhost:5555", "tcp://localhost:6666", "tcp://localhost:7777"), remote = remote_config(command = "echo", args = list(c("test", "."), c("test", ".")))), "must be of length 1 or the same length") > nanotesterr(launch_local(1L), "requires daemons to be set") > nanotestn(everywhere(mirai::serialization())) > nanotest(is.list(serialization())) > nanotesterr(serialization(list(NULL)), "must be a list of 2 functions or NULL") > nanotest(is.character(host_url())) > nanotest(substr(host_url(ws = TRUE, tls = TRUE), 1L, 3L) == "wss") > nanotest(substr(host_url(tls = TRUE), 1L, 3L) == "tls") > nanotest(grepl("5555", host_url(port = 5555), fixed = TRUE)) > nanotest(is.list(ssh_config("ssh://remotehost"))) > nanotest(is.list(ssh_config("ssh://remotehost", tunnel = TRUE, host = "tls+tcp://127.0.0.1:5555"))) > nanotesterr(ssh_config("ssh://remotehost", tunnel = TRUE), "'host' must be specified") > nanotest(is_mirai_interrupt(r <- mirai:::mk_interrupt_error())) > nanotestp(r) > nanotest(is_mirai_error(r <- `class<-`("Error in: testing\n", c("miraiError", "errorValue", "try-error")))) > nanotestp(r) > nanotestn(r$stack.trace) > nanotest(mirai:::.DollarNames.miraiError(NULL, "s") == "stack.trace") > nanotest(mirai:::is.promising.mirai()) > nanotestn(nextstream()) > nanotestn(nextget("pid")) > Sys.sleep(2.5) > # mirai tests > if (connection) { + n <- function() m + m <- mirai({ + Sys.sleep(0.1) + q <- m + n() + 2L + q / m + }, m = 2L, .args = environment(), .timeout = 2000L) + nanotest(identical(call_mirai(m), m)) + nanotest(is_error_value(m$data) || m$data == 3L) + Sys.sleep(2.5) + `lang obj` <- quote(m + n + 2L) + args <- c(m = 2L, n = 4L) + m <- mirai(.expr = `lang obj`, .args = args, .timeout = 2000L) + nanotest(is_error_value(call_mirai_(m)$data) || m$data == 8L) + nanotestn(stop_mirai(m)) + Sys.sleep(2.5) + } > # daemons tests > if (connection) { + nanotesto(d <- daemons(1L, dispatcher = FALSE)) + nanotestp(d) + me <- mirai(mirai::mirai(), .timeout = 2000L) + nanotest(is_mirai_error(call_mirai(me)$data) || is_error_value(me$data)) + nanotest(!is_mirai_interrupt(me$data)) + nanotest(is_error_value(me[["data"]])) + nanotestp(me) + nanotestp(me$data) + df <- data.frame(a = 1, b = 2) + dm <- mirai(as.matrix(df), .args = list(df = df), .timeout = 2000L) + nanotest(is_mirai(call_mirai(dm))) + nanotest(!unresolved(dm)) + nanotest(is_error_value(dm$data) || is.matrix(dm$data)) + nanotest(is.integer(status()[["connections"]])) + nanotest(is.character(status()[["daemons"]])) + nanotestz(daemons(0L)) + Sys.sleep(1L) + nanotesto(daemons(1L, dispatcher = FALSE, idletime = 500L, timerstart = 1L, cleanup = FALSE, output = TRUE, seed = 1546, .compute = "new")) + nanotest(is.character(nextget("urls", .compute = "new"))) + nanotest(is.integer(nextstream(.compute = "new"))) + Sys.sleep(1.5) + nanotestn(everywhere({}, as.environment(df), .compute = "new")) + mn <- mirai("test1", .compute = "new") + mp <- mirai(b + 1, .compute = "new") + Sys.sleep(1L) + nanotest(unresolved(mn$data) || mn$data == "test1") + nanotest(unresolved(mp$data) || mp$data == 3) + Sys.sleep(1L) + nanotest(is.integer(status(.compute = "new")[["connections"]])) + nanotestz(daemons(0L, .compute = "new")) + Sys.sleep(1L) + nanotest(daemons(url = value <- local_url(), dispatcher = FALSE) == value) + nanotesti(status()$daemons, nextget("urls")) + nanotestz(daemons(0L)) + Sys.sleep(1L) + } > # additional daemons tests > if (connection && .Platform[["OS.type"]] != "windows") { + nanotest(is.character(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript")))) + nanotest(is.character(launch_remote("tcp://localhost:5555", remote = ssh_config(remotes = c("ssh://remotehost", "ssh://remotenode"), tunnel = TRUE, command = "echo")))) + nanotestn(launch_local(local_url(), .compute = "test")) + Sys.sleep(1L) + nanotest(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE, remote = remote_config()) != value) + nanotestz(daemons(0L)) + Sys.sleep(1L) + } > # parallel cluster tests > nanotestn(tryCatch(mirai::register_cluster(), error = function(e) NULL)) > if (connection) { + cluster <- make_cluster(1) + nanotest(inherits(cluster, "miraiCluster")) + nanotest(inherits(cluster, "cluster")) + nanotest(length(cluster) == 1L) + nanotest(inherits(cluster[[1]], "miraiNode")) + nanotestp(cluster[[1]]) + nanotest(is.list(cluster[1])) + nanotest(is.character(launch_remote(cluster))) + nanotest(is.character(launch_remote(cluster[[1L]]))) + nanotest(is.list(status(cluster))) + clusterSetRNGStream(cluster, 123) + j <- clusterEvalQ(cluster, expr = .GlobalEnv[[".Random.seed"]]) + a <- parSapply(cluster, 1:4, runif) + setDefaultCluster(cluster) + res <- parLapply(X = 1:10, fun = rnorm) + nanotest(is.list(res) && length(res) == 10L) + nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) + nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) + res <- parLapplyLB(X = 1:10, fun = rnorm) + nanotest(is.list(res) && length(res) == 10L) + nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) + nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) + nanotesti(parSapply(NULL, 1:4, factorial), c(1, 2, 6, 24)) + nanotesti(parSapplyLB(NULL, 1:8, factorial), c(1, 2, 6, 24, 120, 720, 5040, 40320)) + df <- data.frame(a = c(1, 2, 3), b = c(6, 7, 8)) + nanotesti(parApply(cluster, df, 2, sum), `names<-`(c(6, 21), c("a", "b"))) + nanotesti(parCapply(cluster, df, sum), `names<-`(c(6, 21), c("a", "b"))) + nanotesti(parRapply(cluster, df, sum), `names<-`(c(7, 9, 11), c("1", "2", "3"))) + res <- clusterEvalQ(expr = .GlobalEnv[[".Random.seed"]][[1L]]) + nanotest(is.integer(res[[1L]])) + nanotesterr(clusterEvalQ(cluster, elephant()), "Error in elephant(): could not find function \"elephant\"") + nanotestn(stop_cluster(cluster)) + Sys.sleep(1L) + nanotest(inherits(cl <- make_cluster(1), "miraiCluster")) + nanotest(attr(cl, "id") != attr(cluster, "id")) + clusterSetRNGStream(cl, 123) + k <- clusterEvalQ(cl, expr = .GlobalEnv[[".Random.seed"]]) + b <- parSapply(cl, 1:4, runif) + nanotesti(j, k) + nanotesti(a, b) + nanotesti(clusterApply(cl, 1:2, get("+"), 3), list(4, 5)) + xx <- 1 + clusterExport(cl, "xx", environment()) + nanotesti(clusterCall(cl, function(y) xx + y, 2), list(3)) + nanotesti(clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)), + list(a = 11, b = c(1, 2), c = c(-9, -8, -7))) + nanotesti(parSapply(cl, 1:20, get("+"), 3), as.double(4:23)) + nanotestn(stopCluster(cl)) + nanotesterr(parLapply(cluster, 1:10, runif), "cluster is no longer active") + Sys.sleep(1L) + nanotestp(cl <- make_cluster(url = local_url())) + nanotestn(stopCluster(cl)) + Sys.sleep(1L) + nanotestp(cl <- make_cluster(n = 1, url = local_url(), remote = remote_config())) + nanotestn(stopCluster(cl)) + Sys.sleep(1L) + } > # advanced daemons and dispatcher tests > if (connection && Sys.getenv("NOT_CRAN") == "true") { + nanotesto(daemons(url = local_url(), dispatcher = TRUE)) + nanotest(grepl("://", launch_remote(1L), fixed = TRUE)) + nanotestn(launch_local(nextget("urls"))) + if (requireNamespace("promises", quietly = TRUE)) { + nanotest(promises::is.promise(p1 <- promises::as.promise(mirai("completed")))) + nanotest(promises::is.promise(p2 <- promises::`%...>%`(mirai("completed"), identity()))) + nanotest(promises::is.promise(p3 <- promises::as.promise(call_mirai(mirai("completed"))))) + } + Sys.sleep(1L) + nanotestz(daemons(NULL)) + Sys.sleep(1L) + } > if (connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "true") { + nanotesto(daemons(url = "ws://:0", token = TRUE)) + nanotestz(daemons(0L)) + Sys.sleep(1L) + nanotestz(with(daemons(url = "tcp://:0", token = TRUE), {8L - 9L + 1L})) + Sys.sleep(1L) + nanotest(daemons(n = 2, "ws://:0") == 2L) + nanotest(is.integer(nextget("pid"))) + nanotest(length(nextget("urls")) == 2L) + Sys.sleep(1L) + status <- status()[["daemons"]] + nanotest(is.matrix(status)) + nanotest(is.character(dn1 <- dimnames(status)[[1L]])) + nanotest(is.character(parse1 <- nanonext::parse_url(dn1[1L]))) + nanotest(is.character(parse2 <- nanonext::parse_url(dn1[2L]))) + nanotest((port <- as.integer(parse1[["port"]])) > 0L) + nanotest(as.integer(parse2[["port"]]) == port) + nanotest(parse1[["path"]] == "/1") + nanotest(parse2[["path"]] == "/2") + nanotestz(sum(status[, "online"])) + nanotestz(sum(status[, "instance"])) + nanotestz(sum(status[, "assigned"])) + nanotestz(sum(status[, "complete"])) + nanotest(is.character(saisei(i = 1L))) + nanotestn(saisei(i = 0L)) + nanotest(is.character(saisei(i = 1L, force = TRUE))) + nanotestn(saisei(i = 10L)) + Sys.sleep(1L) + nanotestz(daemons(0)) + Sys.sleep(1L) + nanotest(daemons(n = 2, "tcp://127.0.0.1:45555") == 2L) + Sys.sleep(1L) + nanotestn(launch_local(nextget("urls", .compute = "default")[1L], maxtasks = 1L)) + Sys.sleep(2L) + tstatus <- status()[["daemons"]] + nanotest(is.matrix(tstatus)) + nanotest(is.character(tdn1 <- dimnames(tstatus)[[1L]])) + nanotest(is.character(tparse1 <- nanonext::parse_url(tdn1[1L]))) + nanotest(is.character(tparse2 <- nanonext::parse_url(tdn1[2L]))) + nanotest(tparse1[["port"]] == "45555") + nanotest(tparse2[["port"]] == "45556") + nanotesto(sum(tstatus[, "online"])) + nanotesto(sum(tstatus[, "instance"])) + nanotestz(sum(tstatus[, "assigned"])) + nanotestz(sum(tstatus[, "complete"])) + nanotestz(daemons(0)) + Sys.sleep(1L) + nanotest(is.list(serialization(list(function(x) serialize(x, NULL), unserialize)))) + nanotest(is.function(serialization()[[1L]])) + nanotesto(daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test")) + nanotestn(launch_local(1L)) + Sys.sleep(1L) + nanotest(grepl("CERTIFICATE", launch_remote(1L), fixed = TRUE)) + q <- quote(list2env(list(b = 2), envir = .GlobalEnv)) + nanotestn(everywhere(q)) + m <- mirai(b, .timeout = 1000) + nanotest(call_mirai(m)$data == 2L || is_error_value(m$data)) + nanotestn(saisei(1)) + nanotesterr(launch_local(0:1), "out of bounds") + nanotesterr(launch_remote(1:2), "out of bounds") + nanotestz(daemons(0L)) + nanotestn(unlist(serialization(NULL))) + Sys.sleep(1L) + option <- 15L + nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) + Sys.sleep(1L) + mq <- mirai("daemon", .timeout = 1000) + nanotest(call_mirai(mq)$data == "daemon" || is_error_value(mq$data)) + mq <- mirai(Sys.sleep(1.5), .timeout = 500) + nanotest(is.matrix(status()[["daemons"]])) + Sys.sleep(2L) + nanotestz(daemons(0)) + Sys.sleep(1L) + test_tls <- function(cert) { + file <- tempfile() + on.exit(unlink(file)) + cat(cert[["server"]], file = file) + daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 1L && daemons(0L) == 0L + } + nanotest(test_tls(nanonext::write_cert(cn = "127.0.0.1"))) + Sys.sleep(1L) + } > rm(list = ls()) > gc() used (Mb) gc trigger (Mb) max used (Mb) Ncells 406688 21.8 845302 45.2 633325 33.9 Vcells 734151 5.7 8388608 64.0 2002565 15.3 > Sys.sleep(1L) > > proc.time() user system elapsed 0.40 0.10 21.46