drake_context("hpc") test_with_dir("example template files", { expect_true(is.character(drake_hpc_template_files())) expect_true(length(drake_hpc_template_files()) > 0) expect_false(file.exists("slurm_batchtools.tmpl")) expect_silent(drake_hpc_template_file("slurm_batchtools.tmpl")) expect_true(file.exists("slurm_batchtools.tmpl")) }) test_with_dir("safe_jobs()", { skip_on_cran() # CRAN gets essential tests only (check time limits). expect_equal(safe_jobs(1:3), 1) expect_true(is.numeric(safe_jobs(1))) }) test_with_dir("check_jobs()", { skip_on_cran() # CRAN gets essential tests only (check time limits). expect_error(check_jobs(NULL), regexp = "length") expect_error(check_jobs(-1), regexp = "jobs > 0") expect_error(check_jobs(c(1, 1)), regexp = "of length 1") }) test_with_dir("check_parallelism()", { skip_on_cran() # CRAN gets essential tests only (check time limits). expect_error(check_parallelism(character(0), 1), regexp = "length") expect_error(check_parallelism(-1, 1), regexp = "character") expect_error( check_parallelism(letters[1:2], 1), regexp = "of length 1") expect_warning( make(drake_plan(x = 1), parallelism = "loop", jobs = 2), regexp = "should not be" ) }) test_with_dir("parallel imports", { skip_on_cran() config <- dbug() config$settings$jobs_preprocess <- 2 process_imports(config) process_imports_parLapply(config) expect_true("a" %in% cached(targets_only = FALSE)) clean(cache = config$cache) process_imports_mclapply(config) expect_true("a" %in% cached(targets_only = FALSE)) }) test_with_dir("lightly_parallelize_atomic() is correct", { skip_on_cran() # CRAN gets essential tests only (check time limits). with_seed(seed = 2017, code = { x <- sample(LETTERS[1:3], size = 1e3, replace = TRUE) append <- function(x) { paste0(x, "_text") } out0 <- lightly_parallelize(X = x, FUN = append, jobs = 2) out1 <- lightly_parallelize_atomic(X = x, FUN = append, jobs = 2) out2 <- lapply(X = x, FUN = append) expect_identical(out0, out1) expect_identical(out0, out2) y <- gsub("_text", "", unlist(out1)) expect_identical(x, y) }) }) test_with_dir("checksum functionality", { skip_on_cran() # CRAN gets essential tests only (check time limits). config <- dbug() config$settings$parallelism <- "loop" config$settings$jobs <- 1 config$cache <- decorate_storr(storr::storr_environment()) testrun(config) checksum <- get_checksum(target = "combined", value = 1, config = config) bad <- "askldfklhjsdfkj" expect_false(grepl("NA", checksum)) expect_true( is_good_checksum( target = "combined", value = 1, checksum = checksum, config = config)) expect_false( is_good_checksum( target = "combined", value = 1, checksum = bad, config = config)) expect_silent( wait_checksum( target = "combined", value = 1, checksum = checksum, config = config, timeout = 0.1 ) ) expect_error( wait_checksum( target = "combined", value = 1, checksum = bad, config = config, timeout = 0.1 ) ) checksum <- get_outfile_checksum( target = "combined", value = 1, config = config ) expect_silent( wait_outfile_checksum( target = "combined", value = 1, checksum = checksum, config = config, timeout = 0.1 ) ) expect_error( wait_outfile_checksum( target = "combined", value = 1, checksum = bad, config = config, timeout = 0.1 ) ) }) test_with_dir("direct users to GitHub issue #675", { skip_on_cran() skip_if_not_installed("future") plan <- drake_plan( # If base R is patched, mclapply may not always give this warning. x = warning("all scheduled cores encountered errors in user code") ) cache <- storr::storr_environment() regexp <- "workaround" expect_warning( make(plan, envir = globalenv(), session_info = FALSE, cache = cache), regexp = regexp ) clean(cache = cache) expect_warning( make( plan, parallelism = "future", envir = globalenv(), session_info = FALSE, cache = cache ), regexp = regexp ) }) test_with_dir("same, but with error (#675)", { skip_on_cran() plan <- drake_plan( # If base R is patched, mclapply may not always give this warning. x = { warning("all scheduled cores encountered errors in user code") stop() } ) cache <- storr::storr_environment() regexp <- "workaround" expect_error( expect_warning( make(plan, envir = globalenv(), session_info = FALSE, cache = cache) ) ) }) test_with_dir("drake_pmap", { skip_on_cran() # Basic functionality: example from purrr::pmap x <- list(1, 10, 100) y <- list(1, 2, 3) z <- list(5, 50, 500) ans <- list(x[[1]] + y[[1]] + z[[1]], x[[2]] + y[[2]] + z[[2]], x[[3]] + y[[3]] + z[[3]]) expect_identical(ans, drake_pmap(list(x, y, z), sum)) # Catches inputs of wrong type expect_error(drake_pmap("not a list", sum)) expect_error(drake_pmap(list(), "not a function")) # Handles empty list expect_identical(list(), drake_pmap(list(), sum)) # Passes dots to function x[2] <- NA ans[[2]] <- sum(x[[2]], y[[2]], z[[2]], na.rm = TRUE) expect_identical(ans, drake_pmap(list(x, y, z), sum, na.rm = TRUE)) # Catches unequally-lengthed sublists x[[2]] <- NULL expect_error(drake_pmap(list(x, y, z), sum)) }) test_with_dir("deprecate custom scheduler functions", { skip_on_cran() plan <- drake_plan(x = file.create("x")) expect_warning( config <- drake_config(plan, parallelism = identity), regexp = "deprecated" ) expect_warning( make(plan, parallelism = identity), regexp = "deprecated" ) expect_true(file.exists("x")) expect_true(config$cache$exists("x")) }) test_with_dir("caching arg and column", { skip_on_cran() plan <- drake_plan( x = 1, y = target(x, caching = "main"), z = target(y, caching = "worker") ) config <- drake_config(plan, caching = "main") expect_equal(hpc_caching("x", config), "main") expect_equal(hpc_caching("y", config), "main") expect_equal(hpc_caching("z", config), "worker") config <- drake_config(plan, caching = "worker") expect_equal(hpc_caching("x", config), "worker") expect_equal(hpc_caching("y", config), "main") expect_equal(hpc_caching("z", config), "worker") p2 <- drake_plan(x = 1, y = 2) config <- drake_config(p2, caching = "main") expect_equal(hpc_caching("x", config), "main") expect_equal(hpc_caching("y", config), "main") config <- drake_config(p2, caching = "worker") expect_equal(hpc_caching("x", config), "worker") expect_equal(hpc_caching("y", config), "worker") }) test_with_dir("custom caching column and clustermq", { skip_on_cran() skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } options(clustermq.scheduler = "multicore") plan <- drake_plan( x = 1, y = target(x, caching = "main"), z = target(y, caching = "worker") ) make(plan, parallelism = "clustermq", jobs = 1) expect_true(all(plan$target %in% cached())) if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("custom caching column and future", { skip_on_cran() skip_if_not_installed("future") skip_on_os("windows") future::plan(future::multicore) plan <- drake_plan( x = 1, y = target(x, caching = "main"), z = target(y, caching = "worker") ) make(plan, parallelism = "future", jobs = 1) expect_true(all(plan$target %in% cached())) }) test_with_dir("illegal hpc backend (#1222)", { plan <- drake_plan(x = 1) config <- drake_config(plan) config$settings$parallelism <- 123 expect_warning( make_impl(config), regexp = "Illegal drake backend" ) })