library(mirai) nanotest <- function(x) invisible(isTRUE(x) || {print(x); stop("the above was returned instead of TRUE")}) nanotestn <- function(x) invisible(is.null(x) || {print(x); stop("the above was returned instead of NULL")}) nanotestz <- function(x) invisible(x == 0L || {print(x); stop("the above was returned instead of 0L")}) nanotesto <- function(x) invisible(x == 1L || {print(x); stop("the above was returned instead of 1L")}) nanotesti <- function(a, b) invisible(identical(a, b) || {print(a); print(b); stop("the above arguments were not identical")}) nanotestp <- function(x) invisible(is.character(capture.output(print(x))) || stop("print output of expression cannot be captured as a character value")) nanotesterr <- function(x, e = "") { x <- tryCatch(x, error = identity) inherits(x, "error") && grepl(e, x[["message"]], fixed = TRUE) || stop("expected error message containing '", e, "' was not generated") invisible(TRUE) } connection <- !is_error_value(collect_mirai(mirai(TRUE, .timeout = 2000L))) # core tests nanotest(is.list(status())) nanotestz(status()[["connections"]]) nanotestz(status()[["daemons"]]) nanotestz(daemons(0L)) nanotesterr(daemons(url = "URL"), "Invalid argument") nanotesterr(daemons(-1), "zero or greater") nanotesterr(daemons(n = 0, url = "ws://localhost:0"), "1 or greater") nanotesterr(daemons(raw(0L)), "must be numeric") nanotesterr(daemons(n = 1, maxtasks = "100"), "'...' arguments") nanotesterr(dispatcher(client = "URL"), "at least one") nanotesterr(daemon("URL"), "Invalid argument") nanotest(is.character(mlc <- launch_remote("ws://[::1]:5555"))) nanotest(inherits(mlc, "miraiLaunchCmd")) nanotestp(mlc) nanotesterr(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = "invalid")), "must be an element") nanotesterr(launch_remote(c("tcp://localhost:5555", "tcp://localhost:6666", "tcp://localhost:7777"), remote = remote_config(command = "echo", args = list(c("test", "."), c("test", ".")))), "must be of length 1 or the same length") nanotesterr(launch_local(1L), "requires daemons to be set") nanotest(is.character(host_url())) nanotest(startsWith(host_url(ws = TRUE, tls = TRUE), "wss")) nanotest(startsWith(host_url(tls = TRUE), "tls")) nanotest(grepl("5555", host_url(port = 5555), fixed = TRUE)) nanotest(is.list(ssh_config("ssh://remotehost"))) nanotest(is.list(ssh_config("ssh://remotehost", tunnel = TRUE, host = "tls+tcp://127.0.0.1:5555"))) nanotesterr(ssh_config("ssh://remotehost", tunnel = TRUE), "'host' must be specified") nanotest(is_mirai_interrupt(r <- mirai:::mk_interrupt_error())) nanotestp(r) nanotest(is_mirai_error(r <- `class<-`("Error in: testing\n", c("miraiError", "errorValue", "try-error")))) nanotestp(r) nanotestn(r$stack.trace) nanotest(mirai:::.DollarNames.miraiError(NULL, "s") == "stack.trace") nanotest(mirai:::is.promising.mirai()) nanotestn(nextstream()) nanotestn(nextget("pid")) # mirai and daemons tests connection && { Sys.sleep(1L) n <- function() m m <- mirai({ Sys.sleep(0.1) q <- m + n() + 2L q / m }, m = 2L, .args = environment(), .timeout = 2000L) nanotest(identical(call_mirai(m), m)) nanotest(is_error_value(m$data) || m$data == 3L) Sys.sleep(1L) `lang obj` <- quote(m + n + 2L) args <- c(m = 2L, n = 4L) m <- mirai(.expr = `lang obj`, .args = args, .timeout = 2000L) nanotest(is_error_value(call_mirai_(m)$data) || m$data == 8L) nanotestn(stop_mirai(m)) Sys.sleep(1L) nanotesto(d <- daemons(1L, dispatcher = FALSE, seed = 1546L)) nanotestp(d) me <- mirai(mirai::mirai(), .timeout = 2000L) nanotest(is_mirai_error(call_mirai(me)$data) || is_error_value(me$data)) nanotest(!is_mirai_interrupt(me$data)) nanotest(is_error_value(me[["data"]])) nanotestp(me) nanotestp(me$data) df <- data.frame(a = 1, b = 2) dm <- mirai(as.matrix(df), .args = list(df = df), .timeout = 2000L) nanotest(is_mirai(call_mirai(dm))) nanotest(!unresolved(dm)) nanotest(is_error_value(dm$data) || is.matrix(dm$data)) nanotest(is.integer(status()[["connections"]])) nanotest(is.character(status()[["daemons"]])) nanotestz(daemons(0L)) Sys.sleep(1L) nanotesto(daemons(1L, dispatcher = FALSE, idletime = 500L, timerstart = 1L, cleanup = FALSE, output = TRUE, .compute = "new")) nanotest(is.character(nextget("urls", .compute = "new"))) nanotest(is.integer(nextstream(.compute = "new"))) Sys.sleep(1.5) nanotestn(everywhere({}, as.environment(df), .compute = "new")) mn <- mirai("test1", .compute = "new") mp <- mirai(b + 1, .compute = "new") Sys.sleep(1L) nanotest(unresolved(mn$data) || mn$data == "test1") nanotest(unresolved(mp$data) || mp$data == 3) Sys.sleep(1L) nanotest(is.integer(status(.compute = "new")[["connections"]])) nanotestz(daemons(0L, .compute = "new")) } # additional daemons tests connection && .Platform[["OS.type"]] != "windows" && { Sys.sleep(1L) nanotest(daemons(url = value <- local_url(), dispatcher = FALSE) == value) nanotesti(status()$daemons, nextget("urls")) nanotestz(daemons(0L)) Sys.sleep(1L) nanotest(is.character(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript")))) nanotest(is.character(launch_remote("tcp://localhost:5555", remote = ssh_config(remotes = c("ssh://remotehost", "ssh://remotenode"), tunnel = TRUE, command = "echo")))) nanotestn(launch_local(local_url(), .compute = "test")) Sys.sleep(1L) nanotest(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE, remote = remote_config(quote = TRUE)) != value) nanotestz(daemons(0L)) Sys.sleep(1L) m <- with(daemons(1, dispatcher = FALSE, .compute = "ml"), { if (is.null(tryCatch(mirai_map(list(1, "a", 2), sum, .compute = "ml")[.stop], error = function(e) NULL))) mirai_map(1:3, rnorm, .args = list(mean = 20, 2), .compute = "ml")[.progress] }) nanotest(!is_mirai_map(m) && is.list(m) && length(m) == 3L && all(as.logical(lapply(m, is.numeric)))) Sys.sleep(1L) nanotestp(mp <- mirai_map(list(x = "a"), function(...) do(...), do = function(x, y) sprintf("%s%s", x, y), .args = list("b"))) nanotesti(collect_mirai(mp)[["x"]], "ab") nanotesti(call_mirai(mp)[["x"]][["data"]], "ab") nanotest(all(mirai_map(list(1:3, 3:1), sum, .args = list(3L))[.flat] == 7L)) nanotest(all(mirai_map(list(c(a = 1, b = 1, c = 1), 3), sum)[.flat] == 3)) } # parallel cluster tests library(parallel) nanotestn(tryCatch(mirai::register_cluster(), error = function(e) NULL)) connection && { Sys.sleep(1L) cluster <- make_cluster(1) nanotest(inherits(cluster, "miraiCluster")) nanotest(inherits(cluster, "cluster")) nanotest(length(cluster) == 1L) nanotest(inherits(cluster[[1]], "miraiNode")) nanotestp(cluster[[1]]) nanotest(is.list(cluster[1])) nanotest(is.character(launch_remote(cluster))) nanotest(is.character(launch_remote(cluster[[1L]]))) nanotest(is.list(status(cluster))) clusterSetRNGStream(cluster, 123) j <- clusterEvalQ(cluster, expr = .GlobalEnv[[".Random.seed"]]) a <- parSapply(cluster, 1:4, runif) setDefaultCluster(cluster) res <- parLapply(X = 1:10, fun = rnorm) nanotest(is.list(res) && length(res) == 10L) nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) res <- parLapplyLB(X = 1:10, fun = rnorm) nanotest(is.list(res) && length(res) == 10L) nanotest(is.double(res[[1L]]) && length(res[[1L]]) == 1L) nanotest(is.double(res[[10L]]) && length(res[[10L]]) == 10L) nanotesti(parSapply(NULL, 1:4, factorial), c(1, 2, 6, 24)) nanotesti(parSapplyLB(NULL, 1:8, factorial), c(1, 2, 6, 24, 120, 720, 5040, 40320)) df <- data.frame(a = c(1, 2, 3), b = c(6, 7, 8)) nanotesti(parApply(cluster, df, 2, sum), `names<-`(c(6, 21), c("a", "b"))) nanotesti(parCapply(cluster, df, sum), `names<-`(c(6, 21), c("a", "b"))) nanotesti(parRapply(cluster, df, sum), `names<-`(c(7, 9, 11), c("1", "2", "3"))) res <- clusterEvalQ(expr = .GlobalEnv[[".Random.seed"]][[1L]]) nanotest(is.integer(res[[1L]])) nanotesterr(clusterEvalQ(cluster, elephant()), "Error in elephant(): could not find function \"elephant\"") nanotestn(stop_cluster(cluster)) Sys.sleep(1L) nanotest(inherits(cl <- make_cluster(1), "miraiCluster")) nanotest(attr(cl, "id") != attr(cluster, "id")) clusterSetRNGStream(cl, 123) k <- clusterEvalQ(cl, expr = .GlobalEnv[[".Random.seed"]]) b <- parSapply(cl, 1:4, runif) nanotesti(j, k) nanotesti(a, b) nanotesti(clusterApply(cl, 1:2, get("+"), 3), list(4, 5)) xx <- 1 clusterExport(cl, "xx", environment()) nanotesti(clusterCall(cl, function(y) xx + y, 2), list(3)) nanotesti(clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)), list(a = 11, b = c(1, 2), c = c(-9, -8, -7))) nanotesti(parSapply(cl, 1:20, get("+"), 3), as.double(4:23)) nanotestn(stopCluster(cl)) nanotesterr(parLapply(cluster, 1:10, runif), "cluster is no longer active") Sys.sleep(1L) nanotestp(cl <- make_cluster(url = local_url())) nanotestn(stopCluster(cl)) Sys.sleep(1L) nanotestp(cl <- make_cluster(n = 1, url = local_url(), remote = remote_config())) nanotestn(stopCluster(cl)) } # advanced daemons and dispatcher tests connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "true" && { Sys.sleep(1L) nanotesto(daemons(url = local_url(), dispatcher = TRUE)) nanotest(grepl("://", launch_remote(1L), fixed = TRUE)) nanotestn(launch_local(nextget("urls"))) Sys.sleep(1L) requireNamespace("promises", quietly = TRUE) && { nanotest(promises::is.promise(p1 <- promises::as.promise(mirai("completed")))) nanotest(promises::is.promise(p2 <- promises::`%...>%`(mirai("completed"), identity()))) nanotest(promises::is.promise(p3 <- promises::as.promise(call_mirai(mirai("completed"))))) nanotestz(mirai_map(0:1, function(x) x, .promise = identity)[][[1L]]) nanotest(is_mirai_map(mp <- mirai_map(matrix(1:4, nrow = 2L), function(x, y) x + y, .promise = list(identity)))) nanotest(all(mp[.flat] == c(4L, 6L))) nanotest(is.null(names(mp[]))) nanotest(is_error_value(mirai_map(1, function(x) stop(x), .promise = list(identity, identity))[][[1L]])) } Sys.sleep(1L) nanotestz(daemons(NULL)) nanotesto(daemons(url = "ws://:0", token = TRUE)) nanotestz(daemons(0L)) nanotestz(with(daemons(url = "tcp://:0", token = TRUE), {8L - 9L + 1L})) nanotest(daemons(n = 2, "ws://:0") == 2L) nanotest(is.integer(nextget("pid"))) nanotest(length(nextget("urls")) == 2L) Sys.sleep(1L) status <- status()[["daemons"]] nanotest(is.matrix(status)) nanotest(is.character(dn1 <- dimnames(status)[[1L]])) nanotest(is.character(parse1 <- nanonext::parse_url(dn1[1L]))) nanotest(is.character(parse2 <- nanonext::parse_url(dn1[2L]))) nanotest((port <- as.integer(parse1[["port"]])) > 0L) nanotest(as.integer(parse2[["port"]]) == port) nanotest(parse1[["path"]] == "/1") nanotest(parse2[["path"]] == "/2") nanotestz(sum(status[, "online"])) nanotestz(sum(status[, "instance"])) nanotestz(sum(status[, "assigned"])) nanotestz(sum(status[, "complete"])) nanotest(is.character(saisei(i = 1L))) nanotestn(saisei(i = 0L)) nanotest(is.character(saisei(i = 1L, force = TRUE))) nanotestn(saisei(i = 10L)) nanotestz(daemons(0)) nanotest(daemons(n = 2, "tcp://127.0.0.1:45555") == 2L) Sys.sleep(1L) nanotestn(launch_local(nextget("urls", .compute = "default")[1L], maxtasks = 1L)) nanotestn(launch_local(2, maxtasks = 1L)) Sys.sleep(2L) tstatus <- status()[["daemons"]] nanotest(is.matrix(tstatus)) nanotest(is.character(tdn1 <- dimnames(tstatus)[[1L]])) nanotest(is.character(tparse1 <- nanonext::parse_url(tdn1[1L]))) nanotest(is.character(tparse2 <- nanonext::parse_url(tdn1[2L]))) nanotest(tparse1[["port"]] == "45555") nanotest(tparse2[["port"]] == "45556") nanotest(sum(tstatus[, "online"]) == 2L) nanotest(sum(tstatus[, "instance"]) == 2L) nanotestz(sum(tstatus[, "assigned"])) nanotestz(sum(tstatus[, "complete"])) nanotest(is.double(res <- mirai_map(c(1,1), rnorm)[.flat])) nanotest(res[1L] != res[2L]) nanotestz(daemons(0)) nanotesto(daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test")) nanotest(is.list(serialization(list(function(x) serialize(x, NULL), unserialize), "tst_cls"))) nanotestn(launch_local(1L)) Sys.sleep(1L) nanotest(grepl("CERTIFICATE", launch_remote(1L), fixed = TRUE)) q <- quote(list2env(list(b = 2), envir = .GlobalEnv)) cfg <- serial_config("custom", function(x) serialize(x, NULL), unserialize) nanotestn(everywhere(q, .serial = cfg)) m <- mirai(b, .timeout = 1000) nanotest(m[] == 2L || is_error_value(m[])) nanotestn(saisei(1)) nanotesterr(launch_local(0:1), "out of bounds") nanotesterr(launch_remote(1:2), "out of bounds") nanotest(!length(serialization(NULL))) option <- 15L Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") nanotesto(daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) Sys.unsetenv("R_DEFAULT_PACKAGES") Sys.sleep(1L) mq <- mirai("daemon", .timeout = 1000) nanotest(call_mirai(mq)$data == "daemon" || is_error_value(mq$data)) mq <- mirai(Sys.sleep(1.5), .timeout = 500) nanotest(is.matrix(status()[["daemons"]])) Sys.sleep(2L) nanotestz(daemons(0)) Sys.sleep(1L) test_tls <- function(cert) { file <- tempfile() on.exit(unlink(file)) cat(cert[["server"]], file = file) daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 1L && daemons(0L) == 0L } nanotest(test_tls(nanonext::write_cert(cn = "127.0.0.1"))) } Sys.sleep(1L)