library(mirai) library(parallel) nanotest <- function(x) invisible(x || stop("is not TRUE when expected to be TRUE")) nanotestn <- function(x) invisible(is.null(x) || stop("is not NULL when expected to be NULL")) nanotestz <- function(x) invisible(x == 0L || stop("does not equal 0L as expected")) nanotesto <- function(x) invisible(x == 1L || stop("does not equal 1L as expected")) nanotesti <- function(a, b) invisible(identical(a, b) || stop("the arguments are not identical as expected")) nanotestp <- function(x) invisible(is.character(capture.output(print(x))) || stop("print output of expression cannot be captured as a character value")) nanotesterr <- function(x, e = "") invisible(grepl(e, tryCatch(x, error = identity)[["message"]], fixed = TRUE) || stop("expected error message '", e, "' not generated")) connection <- !is_error_value(call_mirai(mirai(TRUE, .timeout = 2000L))[["data"]]) # core tests nanotest(is.list(status())) nanotestz(status()[["connections"]]) nanotestz(status()[["daemons"]]) nanotestz(daemons(0L)) nanotesterr(daemons(url = "URL"), "Invalid argument") nanotesterr(daemons(-1), "zero or greater") nanotesterr(daemons(n = 0, url = "ws://localhost:0"), "1 or greater") nanotesterr(daemons(raw(0L)), "must be numeric") nanotesterr(daemons(n = 1, maxtasks = "100"), "'...' arguments") nanotesterr(dispatcher(client = "URL"), "at least one") nanotesterr(daemon("URL"), "Invalid argument") nanotest(is.character(mlc <- launch_remote("ws://[::1]:5555"))) nanotest(inherits(mlc, "miraiLaunchCmd")) nanotestp(mlc) nanotesterr(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = "invalid")), "must be an element") nanotesterr(launch_remote(c("tcp://localhost:5555", "tcp://localhost:6666", "tcp://localhost:7777"), remote = remote_config(command = "echo", args = list(c("test", "."), c("test", ".")))), "must be of length 1 or the same length") nanotesterr(launch_local(1L), "requires daemons to be set") nanotestn(everywhere(mirai::serialization())) nanotest(is.list(serialization())) nanotesterr(serialization(list(NULL)), "must be a list of 2 functions or NULL") nanotest(is.character(host_url())) nanotest(substr(host_url(ws = TRUE, tls = TRUE), 1L, 3L) == "wss") nanotest(substr(host_url(tls = TRUE), 1L, 3L) == "tls") nanotest(grepl("5555", host_url(port = 5555), fixed = TRUE)) nanotest(is.list(ssh_config("ssh://remotehost"))) nanotesterr(ssh_config("ssh://remotehost", tunnel = TRUE), "must be called in the correct context") nanotest(is_mirai_interrupt(r <- mirai:::mk_interrupt_error())) nanotestp(r) nanotestn(nextstream()) nanotestn(nextget("pid")) Sys.sleep(2.5) # mirai tests if (connection) { n <- 3L m <- mirai({ Sys.sleep(0.1) q <- m + n + 1L q / m }, m = 2L, .args = list(n), .timeout = 2000L) nanotest(identical(call_mirai(m), m)) nanotest(is_error_value(m$data) || m$data == 3L) Sys.sleep(2.5) `lang obj` <- quote(m + n + 2L) args <- c(m = 2L, n = 4L) m <- mirai(.expr = `lang obj`, .args = args, .timeout = 2000L) nanotest(is_error_value(call_mirai_(m)$data) || m$data == 8L) Sys.sleep(2.5) } # daemons tests if (connection) { nanotesto(daemons(1L, dispatcher = FALSE)) me <- mirai(mirai::mirai(), .timeout = 2000L) nanotest(is_mirai_error(call_mirai(me)$data) || is_error_value(me$data)) nanotest(!is_mirai_interrupt(me$data)) nanotest(is_error_value(me[["data"]])) nanotestp(me) nanotestp(me$data) df <- data.frame(a = 1, b = 2) dm <- mirai(as.matrix(df), .args = list(df), .timeout = 2000L) nanotest(is_mirai(call_mirai(dm))) nanotest(!unresolved(dm)) nanotest(is_error_value(dm$data) || is.matrix(dm$data)) nanotest(is.integer(status()[["connections"]])) nanotest(is.character(status()[["daemons"]])) nanotestz(daemons(0L)) Sys.sleep(1L) nanotesto(daemons(1L, dispatcher = FALSE, idletime = 500L, timerstart = 1L, cleanup = FALSE, output = TRUE, seed = 1546, .compute = "new")) nanotest(is.character(nextget("urls", .compute = "new"))) nanotest(is.integer(nextstream(.compute = "new"))) Sys.sleep(1.5) nanotestn(everywhere(list2env(df, envir = .GlobalEnv), .args = list(df), .compute = "new")) mn <- mirai("test1", .compute = "new") mp <- mirai(b + 1, .compute = "new") Sys.sleep(1L) nanotest(unresolved(mn$data) || mn$data == "test1") nanotest(unresolved(mp$data) || mp$data == 3) Sys.sleep(1L) nanotestz(status(.compute = "new")[["connections"]]) nanotestz(daemons(0L, .compute = "new")) Sys.sleep(1L) } # additional daemons tests if (connection && .Platform[["OS.type"]] != "windows") { nanotest(is.character(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript")))) nanotest(is.character(launch_remote("tcp://localhost:5555", remote = ssh_config(remotes = c("ssh://remotehost", "ssh://remotenode"), tunnel = TRUE, command = "echo")))) nanotest(daemons(url = value <- local_url(), dispatcher = FALSE) == value) nanotest(grepl("://", launch_remote(status()$daemons), fixed = TRUE)) nanotestn(launch_local(nextget("urls"))) if (requireNamespace("promises", quietly = TRUE)) { nanotest(promises::is.promise(p1 <- promises::as.promise(mirai("completed")))) nanotest(promises::is.promise(p2 <- promises::`%...>%`(mirai("completed"), identity()))) } Sys.sleep(1L) nanotestz(daemons(NULL)) Sys.sleep(1L) nanotestn(launch_local(local_url(), .compute = "test")) Sys.sleep(1L) nanotest(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE, remote = remote_config()) != value) nanotestz(daemons(0L)) Sys.sleep(1L) } # parallel cluster tests if (connection) { cluster <- make_cluster(1) nanotest(inherits(cluster, "miraiCluster")) nanotest(inherits(cluster, "cluster")) nanotest(length(cluster) == 1L) nanotest(inherits(cluster[[1]], "miraiNode")) nanotestp(cluster[[1]]) nanotestp(cluster[1]) nanotest(is.character(launch_remote(cluster))) nanotest(is.character(launch_remote(cluster[[1L]]))) nanotest(is.list(status(cluster))) clusterSetRNGStream(cluster, 123) j <- clusterEvalQ(cluster, expr = .GlobalEnv[[".Random.seed"]]) a <- parSapply(cluster, 1:4, runif) setDefaultCluster(cluster) res <- parLapply(X = 1:10, fun = rnorm) nanotest(is.list(res) && length(res) == 10L) nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) res <- parLapplyLB(X = 1:10, fun = rnorm) nanotest(is.list(res) && length(res) == 10L) nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) nanotesti(parSapply(NULL, 1:4, factorial), c(1, 2, 6, 24)) nanotesti(parSapplyLB(NULL, 1:8, factorial), c(1, 2, 6, 24, 120, 720, 5040, 40320)) df <- data.frame(a = c(1, 2, 3), b = c(6, 7, 8)) nanotesti(parApply(cluster, df, 2, sum), `names<-`(c(6, 21), c("a", "b"))) nanotesti(parCapply(cluster, df, sum), `names<-`(c(6, 21), c("a", "b"))) nanotesti(parRapply(cluster, df, sum), `names<-`(c(7, 9, 11), c("1", "2", "3"))) res <- clusterEvalQ(expr = .GlobalEnv[[".Random.seed"]][[1L]]) nanotest(is.integer(res[[1L]])) nanotesterr(clusterEvalQ(cluster, elephant()), "Error in elephant(): could not find function \"elephant\"") nanotestn(stop_cluster(cluster)) Sys.sleep(1L) nanotest(inherits(cl <- make_cluster(1), "miraiCluster")) nanotest(attr(cl, "id") != attr(cluster, "id")) clusterSetRNGStream(cl, 123) k <- clusterEvalQ(cl, expr = .GlobalEnv[[".Random.seed"]]) b <- parSapply(cl, 1:4, runif) nanotesti(j, k) nanotesti(a, b) nanotesti(clusterApply(cl, 1:2, get("+"), 3), list(4, 5)) xx <- 1 clusterExport(cl, "xx", environment()) nanotesti(clusterCall(cl, function(y) xx + y, 2), list(3)) nanotesti(clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)), list (a = 11, b = c(1, 2), c = c(-9, -8, -7))) nanotesti(parSapply(cl, 1:20, get("+"), 3), as.double(4:23)) nanotestn(stopCluster(cl)) nanotesterr(parLapply(cluster, 1:10, runif), "cluster is no longer active") Sys.sleep(1L) } # additional parallel cluster tests if (connection && .Platform[["OS.type"]] != "windows") { nanotestp(cl <- make_cluster(url = local_url())) nanotestn(stopCluster(cl)) Sys.sleep(1L) nanotestp(cl <- make_cluster(n = 1, url = local_url(), remote = remote_config())) nanotestn(stopCluster(cl)) Sys.sleep(1L) } # advanced daemons and dispatcher tests if (connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "true") { nanotesto(daemons(url = "ws://:0", token = TRUE)) nanotestz(daemons(0L)) Sys.sleep(1L) nanotesto(daemons(url = "tcp://:0", token = TRUE)) nanotestz(daemons(0L)) Sys.sleep(1L) nanotest(daemons(n = 2, "ws://:0") == 2L) nanotest(is.integer(nextget("pid"))) nanotest(length(nextget("urls")) == 2L) Sys.sleep(1L) status <- status()[["daemons"]] nanotest(is.matrix(status)) nanotest(is.character(dn1 <- dimnames(status)[[1L]])) nanotest(is.character(parse1 <- nanonext::parse_url(dn1[1L]))) nanotest(is.character(parse2 <- nanonext::parse_url(dn1[2L]))) nanotest((port <- as.integer(parse1[["port"]])) > 0L) nanotest(as.integer(parse2[["port"]]) == port) nanotest(parse1[["path"]] == "/1") nanotest(parse2[["path"]] == "/2") nanotestz(sum(status[, "online"])) nanotestz(sum(status[, "instance"])) nanotestz(sum(status[, "assigned"])) nanotestz(sum(status[, "complete"])) nanotest(is.character(saisei(i = 1L))) nanotestn(saisei(i = 0L)) nanotest(is.character(saisei(i = 1L, force = TRUE))) nanotestn(saisei(i = 10L)) Sys.sleep(1L) nanotestz(daemons(0)) Sys.sleep(1L) nanotest(daemons(n = 2, "tcp://127.0.0.1:45555") == 2L) Sys.sleep(1L) nanotestn(launch_local(nextget("urls", .compute = "default")[1L], maxtasks = 1L)) Sys.sleep(1L) tstatus <- status()[["daemons"]] nanotest(is.matrix(tstatus)) nanotest(is.character(tdn1 <- dimnames(tstatus)[[1L]])) nanotest(is.character(tparse1 <- nanonext::parse_url(tdn1[1L]))) nanotest(is.character(tparse2 <- nanonext::parse_url(tdn1[2L]))) nanotest(tparse1[["port"]] == "45555") nanotest(tparse2[["port"]] == "45556") nanotesto(sum(tstatus[, "online"])) nanotesto(sum(tstatus[, "instance"])) nanotestz(sum(tstatus[, "assigned"])) nanotestz(sum(tstatus[, "complete"])) nanotestz(daemons(0)) Sys.sleep(1L) nanotest(is.list(serialization(list(function(x) serialize(x, NULL), unserialize)))) nanotest(is.function(serialization()[[1L]])) nanotesto(daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test")) nanotestn(launch_local(1L)) Sys.sleep(1L) nanotest(grepl("CERTIFICATE", launch_remote(1L), fixed = TRUE)) q <- quote(list2env(list(b = 2), envir = .GlobalEnv)) nanotestn(everywhere(q)) m <- mirai(b, .timeout = 1000) nanotest(call_mirai(m)$data == 2L || is_error_value(m$data)) nanotestn(saisei(1)) nanotesterr(launch_local(0:1), "out of bounds") nanotesterr(launch_remote(1:2), "out of bounds") nanotestz(daemons(0L)) nanotestn(unlist(serialization(NULL))) Sys.sleep(1L) option <- 15L nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) Sys.sleep(1L) mq <- mirai("daemon", .timeout = 1000) nanotest(call_mirai(mq)$data == "daemon" || is_error_value(mq$data)) mq <- mirai(Sys.sleep(1.5), .timeout = 500) nanotest(is.matrix(status()[["daemons"]])) Sys.sleep(2L) nanotestz(daemons(0)) Sys.sleep(1L) test_tls <- function(cert) { file <- tempfile() on.exit(unlink(file)) cat(cert[["server"]], file = file) daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 1L && daemons(0L) == 0L } nanotest(test_tls(nanonext::write_cert(cn = "127.0.0.1"))) Sys.sleep(1L) } Sys.sleep(1L)