# minitest - a minimal testing framework v0.0.2 -------------------------------- test_library <- function(package) library(package = package, character.only = TRUE) test_true <- function(x) invisible(isTRUE(x) || {print(x); stop("the above was returned instead of TRUE")}) test_null <- function(x) invisible(is.null(x) || {print(x); stop("the above was returned instead of NULL")}) test_zero <- function(x) invisible(x == 0L || {print(x); stop("the above was returned instead of 0L")}) test_type <- function(type, x) invisible(typeof(x) == type || {stop("object of type '", typeof(x), "' was returned instead of '", type, "'")}) test_class <- function(class, x) invisible(inherits(x, class) || {stop("object of class '", paste(class(x), collapse = ", "), "' was returned instead of '", class, "'")}) test_equal <- function(a, b) invisible(a == b || {print(a); print(b); stop("the above expressions were not equal")}) test_identical <- function(a, b) invisible(identical(a, b) || {print(a); print(b); stop("the above expressions were not identical")}) test_print <- function(x) invisible(is.character(capture.output(print(x))) || stop("print output of expression cannot be captured as a character value")) test_error <- function(x, containing = "") invisible(inherits(x <- tryCatch(x, error = identity), "error") && grepl(containing, x[["message"]], fixed = TRUE) || stop("Expected error message containing: ", containing, "\nActual error message: ", x[["message"]])) # ------------------------------------------------------------------------------ test_library("mirai") connection <- !is_error_value(collect_mirai(mirai(TRUE, .timeout = 2000L))) # core tests test_type("list", status()) test_zero(status()[["connections"]]) test_zero(status()[["daemons"]]) test_zero(daemons(0L)) test_error(mirai(), "missing expression, perhaps wrap in {}?") test_error(mirai(a, 1), "all '...' arguments must be named") test_error(mirai(a, .args = list(1)), "all items in '.args' must be named") test_error(mirai_map(1:2, "a function"), "must be of type function, not character") test_error(daemons(url = "URL"), "Invalid argument") test_error(daemons(-1), "zero or greater") test_error(daemons(n = 0, url = "ws://localhost:0"), "1 or greater") test_error(daemons(raw(0L)), "must be numeric") test_error(daemons(1, dispatcher = "p"), "must be one of") test_error(daemons(url = local_url(), dispatcher = "t"), "must be one of") test_error(dispatcher(client = "URL"), "at least one") test_error(daemon("URL"), "Invalid argument") test_error(launch_local(1L), "daemons must be set") test_type("character", host_url()) test_true(startsWith(host_url(ws = TRUE, tls = TRUE), "wss")) test_true(startsWith(host_url(tls = TRUE), "tls")) test_true(grepl("5555", host_url(port = 5555), fixed = TRUE)) test_type("list", ssh_config("ssh://remotehost")) test_type("list", ssh_config("ssh://remotehost", tunnel = TRUE, host = "tls+tcp://127.0.0.1:5555")) test_error(ssh_config("ssh://remotehost", tunnel = TRUE), "'host' must be specified") test_true(is_mirai_interrupt(r <- mirai:::mk_interrupt_error())) test_print(r) test_true(is_mirai_error(r <- `class<-`("Error in: testing\n", c("miraiError", "errorValue", "try-error")))) test_print(r) test_null(r$stack.trace) test_equal(mirai:::.DollarNames.miraiError(NULL, "s"), "stack.trace") test_true(mirai:::is.promising.mirai()) test_error(everywhere({}), "not found") test_null(nextstream()) test_null(nextget("pid")) # mirai and daemons tests connection && { Sys.sleep(1L) n <- function() m m <- mirai({ Sys.sleep(0.1) q <- m + n() + 2L q / m }, m = 2L, .args = environment(), .timeout = 2000L) test_identical(call_mirai(m), m) if (!is_error_value(m$data)) test_equal(m$data, 3L) Sys.sleep(1L) `lang obj` <- quote(m + n + 2L) args <- c(m = 2L, n = 4L) m <- mirai(.expr = `lang obj`, .args = args, .timeout = 2000L) if (!is_error_value(call_mirai_(m)$data)) test_equal(m$data, 8L) test_null(stop_mirai(m)) Sys.sleep(1L) test_equal(1L, d <- daemons(1L, dispatcher = FALSE, asyncdial = FALSE, seed = 1546L)) test_print(d) me <- mirai(mirai::mirai(), .timeout = 2000L)[] if (!is_mirai_error(me)) test_true(is_error_value(me)) if (is_mirai_error(me)) test_type("list", me$stack.trace) if (is_mirai_error(me)) test_true(length(me$stack.trace) >= 2L) if (is_mirai_error(me)) test_true(all(as.logical(lapply(me$stack.trace, is.character)))) test_true(!is_mirai_interrupt(me)) test_class("errorValue", me) test_print(me) df <- data.frame(a = 1, b = 2) dm <- mirai(as.matrix(df), .args = list(df = df), .timeout = 2000L) test_true(is_mirai(call_mirai(dm))) test_true(!unresolved(dm)) if (!is_error_value(dm$data)) test_class("matrix", dm$data) test_print(dm) test_type("integer", status()[["connections"]]) test_type("character", status()[["daemons"]]) test_type("character", mlc <- launch_remote("ws://[::1]:5555")) test_class("miraiLaunchCmd", mlc) test_print(mlc) test_error(launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = "invalid")), "must be an element") test_error(launch_remote(c("tcp://localhost:5555", "tcp://localhost:6666", "tcp://localhost:7777"), remote = remote_config(command = "echo", args = list(c("test", "."), c("test", ".")))), "must be of length 1 or the same length") test_zero(daemons(0L)) Sys.sleep(1L) test_equal(1L, daemons(1L, dispatcher = FALSE, idletime = 500L, timerstart = 1L, cleanup = FALSE, output = TRUE, .compute = "new")) test_type("character", nextget("urls", .compute = "new")) test_type("integer", nextstream(.compute = "new")) Sys.sleep(1.5) test_null(everywhere({}, as.environment(df), .compute = "new")) mn <- mirai("test1", .compute = "new") mp <- mirai(b + 1, .compute = "new") Sys.sleep(1L) if (!unresolved(mn$data)) test_equal(mn$data, "test1") if (!unresolved(mp$data)) test_equal(mp$data, 3) Sys.sleep(1L) test_type("integer", status(.compute = "new")[["connections"]]) test_zero(daemons(0L, .compute = "new")) } # additional daemons tests connection && .Platform[["OS.type"]] != "windows" && { Sys.sleep(1L) test_zero(daemons(url = value <- local_url(), dispatcher = "none")) test_identical(status()$daemons, value) test_identical(nextget("urls"), value) test_type("character", launch_remote("ws://[::1]:5555", remote = remote_config(command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript"))) test_type("character", launch_remote("tcp://localhost:5555", remote = ssh_config(remotes = c("ssh://remotehost", "ssh://remotenode"), tunnel = TRUE, command = "echo"))) test_zero(daemons(0L)) Sys.sleep(1L) test_zero(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE, remote = remote_config(quote = TRUE))) test_true(status()$daemons != value) test_zero(daemons(0L)) Sys.sleep(1L) m <- with(daemons(1, dispatcher = "none", .compute = "ml"), { if (is.null(tryCatch(mirai_map(list(1, "a", 2), sum, .compute = "ml")[.stop], error = function(e) NULL))) mirai_map(1:3, rnorm, .args = list(mean = 20, 2), .compute = "ml")[] }) test_true(!is_mirai_map(m)) test_type("list", m) test_equal(length(m), 3L) test_true(all(as.logical(lapply(m, is.numeric)))) Sys.sleep(1L) test_print(suppressWarnings(mp <- mirai_map(list(x = "a"), function(...) do(...), do = function(x, y) sprintf("%s%s", x, y), .args = list("b")))) test_identical(collect_mirai(mp)[["x"]], "ab") test_identical(call_mirai(mp)[["x"]][["data"]], "ab") test_true(all(mirai_map(data.frame(1:3, 3:1), sum, .args = list(3L))[.flat] == 7L)) test_true(all(mirai_map(list(c(a = 1, b = 1, c = 1), 3), sum)[.flat] == 3)) } # parallel cluster tests library(parallel) test_null(tryCatch(mirai::register_cluster(), error = function(e) NULL)) connection && { Sys.sleep(1L) cluster <- make_cluster(1) test_class("miraiCluster", cluster) test_class("cluster", cluster) test_equal(length(cluster), 1L) test_class("miraiNode", cluster[[1]]) test_print(cluster[[1]]) test_type("list", cluster[1]) test_type("character", launch_remote(cluster)) test_type("character", launch_remote(cluster[[1L]])) test_type("list", status(cluster)) clusterSetRNGStream(cluster, 123) j <- clusterEvalQ(cluster, expr = .GlobalEnv[[".Random.seed"]]) a <- parSapply(cluster, 1:4, runif) setDefaultCluster(cluster) res <- parLapply(X = 1:10, fun = rnorm) test_type("list", res) test_equal(length(res), 10L) test_type("double", res[[1L]]) test_equal(length(res[[1L]]), 1L) test_type("double", res[[10L]]) test_equal(length(res[[10L]]), 10L) res <- parLapplyLB(X = 1:10, fun = rnorm) test_type("list", res) test_equal(length(res), 10L) test_type("double", res[[1L]]) test_equal(length(res[[1L]]), 1L) test_type("double", res[[10L]]) test_equal(length(res[[10L]]), 10L) test_identical(parSapply(NULL, 1:4, factorial), c(1, 2, 6, 24)) test_identical(parSapplyLB(NULL, 1:8, factorial), c(1, 2, 6, 24, 120, 720, 5040, 40320)) df <- data.frame(a = c(1, 2, 3), b = c(6, 7, 8)) test_identical(parApply(cluster, df, 2, sum), `names<-`(c(6, 21), c("a", "b"))) test_identical(parCapply(cluster, df, sum), `names<-`(c(6, 21), c("a", "b"))) test_identical(parRapply(cluster, df, sum), `names<-`(c(7, 9, 11), c("1", "2", "3"))) res <- clusterEvalQ(expr = .GlobalEnv[[".Random.seed"]][[1L]]) test_type("integer", res[[1L]]) test_error(clusterEvalQ(cluster, elephant()), "Error in elephant(): could not find function \"elephant\"") test_null(stop_cluster(cluster)) Sys.sleep(1L) test_class("miraiCluster", cl <- make_cluster(1)) test_true(attr(cl, "id") != attr(cluster, "id")) clusterSetRNGStream(cl, 123) k <- clusterEvalQ(cl, expr = .GlobalEnv[[".Random.seed"]]) b <- parSapply(cl, 1:4, runif) test_identical(j, k) test_identical(a, b) test_identical(clusterApply(cl, 1:2, get("+"), 3), list(4, 5)) xx <- 1 clusterExport(cl, "xx", environment()) test_identical(clusterCall(cl, function(y) xx + y, 2), list(3)) test_identical(clusterMap(cl, function(x, y) seq_len(x) + y, c(a = 1, b = 2, c = 3), c(A = 10, B = 0, C = -10)), list(a = 11, b = c(1, 2), c = c(-9, -8, -7))) test_identical(parSapply(cl, 1:20, get("+"), 3), as.double(4:23)) test_null(stopCluster(cl)) test_error(parLapply(cluster, 1:10, runif), "cluster is no longer active") Sys.sleep(1L) test_print(cl <- make_cluster(url = local_url())) test_null(stopCluster(cl)) Sys.sleep(1L) test_print(cl <- make_cluster(n = 1, url = local_url(), remote = remote_config())) test_null(stopCluster(cl)) } # advanced daemons and dispatcher tests connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "true" && { Sys.sleep(1L) test_equal(1L, daemons(url = local_url(), dispatcher = "process", notused = "wrongtype")) test_true(grepl("://", launch_remote(1L), fixed = TRUE)) test_null(launch_local(nextget("urls"))) Sys.sleep(1L) requireNamespace("promises", quietly = TRUE) && { test_true(promises::is.promise(p1 <- promises::as.promise(mirai("completed")))) test_true(promises::is.promise(p2 <- promises::`%...>%`(mirai("completed"), identity()))) test_true(promises::is.promise(p3 <- promises::as.promise(call_mirai(mirai("completed"))))) test_zero(mirai_map(0:1, function(x) x, .promise = identity)[][[1L]]) test_true(is_mirai_map(mp <- mirai_map(matrix(1:4, nrow = 2L), function(x, y) x + y, .promise = list(identity)))) test_true(all(mp[.flat, .stop] == c(4L, 6L))) test_null(names(mp[])) test_class("errorValue", mirai_map(1, function(x) stop(x), .promise = list(identity, identity))[][[1L]]) } Sys.sleep(1L) test_zero(daemons(NULL)) test_equal(1L, daemons(url = "ws://:0", correctype = 0L, token = TRUE)) test_zero(daemons(0L)) test_zero(with(daemons(url = "tcp://:0", correcttype = c(1, 0), token = TRUE), {8L - 9L + 1L})) test_equal(daemons(n = 2, "ws://:0"), 2L) test_type("integer", nextget("pid")) test_equal(length(nextget("urls")), 2L) Sys.sleep(1L) status <- status()[["daemons"]] test_class("matrix", status) test_type("character", dn1 <- dimnames(status)[[1L]]) test_type("character", parse1 <- nanonext::parse_url(dn1[1L])) test_type("character", parse2 <- nanonext::parse_url(dn1[2L])) test_true((port <- as.integer(parse1[["port"]])) > 0L) test_equal(as.integer(parse2[["port"]]), port) test_equal(parse1[["path"]], "/1") test_equal(parse2[["path"]], "/2") test_zero(sum(status[, "online"])) test_zero(sum(status[, "instance"])) test_zero(sum(status[, "assigned"])) test_zero(sum(status[, "complete"])) test_type("character", saisei(i = 1L)) test_null(saisei(i = 0L)) test_type("character", saisei(i = 1L, force = TRUE)) test_null(saisei(i = 10L)) test_zero(daemons(0)) test_equal(daemons(url = c("tcp://127.0.0.1:45555", "tcp://127.0.0.1:0"), correcttype = NA), 2L) Sys.sleep(1L) test_null(launch_local(nextget("urls", .compute = "default")[1L], maxtasks = 1L)) test_null(launch_local(2, maxtasks = 1L)) Sys.sleep(2L) tstatus <- status()[["daemons"]] test_class("matrix", tstatus) test_type("character", tdn1 <- dimnames(tstatus)[[1L]]) test_type("character", tparse1 <- nanonext::parse_url(tdn1[1L])) test_type("character", tparse2 <- nanonext::parse_url(tdn1[2L])) test_true(tparse1[["port"]] != tparse2[["port"]]) test_equal(sum(tstatus[, "online"]), 2L) test_equal(sum(tstatus[, "instance"]), 2L) test_zero(sum(tstatus[, "assigned"])) test_zero(sum(tstatus[, "complete"])) test_type("list", res <- mirai_map(c(1,1), rnorm)[.progress]) test_true(res[[1L]] != res[[2L]]) test_zero(daemons(0)) test_equal(1L, daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test")) test_null(launch_local(1L)) Sys.sleep(1L) test_true(grepl("CERTIFICATE", launch_remote(1L), fixed = TRUE)) q <- quote(list2env(list(b = 2), envir = .GlobalEnv)) cfg <- serial_config("custom", function(x) serialize(x, NULL), unserialize) test_null(everywhere(q, .serial = cfg)) m <- mirai(b, .timeout = 1000) if (!is_error_value(m[])) test_equal(m[], 2L) test_null(saisei(1)) test_error(launch_local(0:1), "out of bounds") test_error(launch_remote(1:2), "out of bounds") option <- 15L Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") test_equal(1L, daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) Sys.unsetenv("R_DEFAULT_PACKAGES") Sys.sleep(1L) mq <- mirai(runif(1L), .timeout = 1000) test_true(is.numeric(mq[])) mq <- mirai(Sys.sleep(1.5), .timeout = 500) test_class("matrix", status()[["daemons"]]) Sys.sleep(2L) test_zero(daemons(0)) Sys.sleep(1L) test_tls <- function(cert) { file <- tempfile() on.exit(unlink(file)) cat(cert[["server"]], file = file) daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 1L && daemons(0L) == 0L } test_true(test_tls(nanonext::write_cert(cn = "127.0.0.1"))) } # threaded dispatcher tests connection && Sys.getenv("NOT_CRAN") == "true" && { test_equal(daemons(2, dispatcher = "thread"), 2L) test_equal(nextget("n"), 2L) test_true(startsWith(nextget("urls")[[1L]], mirai:::.urlscheme)) test_class("matrix", status()$daemons) test_true(mirai(TRUE)[]) test_zero(daemons(0)) test_equal(daemons(2, url = host_url(), dispatcher = "thread"), 2L) test_equal(length(urls <- nextget("urls")), 2L) test_identical(as.integer(nanonext::parse_url(urls[[2L]])[["port"]]), as.integer(nanonext::parse_url(urls[[1L]])[["port"]]) + 1L) test_zero(daemons(0)) test_equal(daemons(2, url = host_url(ws = TRUE), dispatcher = "thread"), 2L) test_equal(length(urls <- nextget("urls")), 2L) test_true(endsWith(urls[[1L]], "/1")) test_true(endsWith(urls[[2L]], "/2")) test_zero(daemons(0)) } Sys.sleep(1L)