drake_context("clustermq") test_with_dir("clustermq parallelism for CRAN", { skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") options(clustermq.scheduler = "multicore") plan <- drake_plan(x = { # Test relaying. message("message") warning("warning") }) for (caching in c("main", "worker")) { clean() suppressWarnings(make(plan, parallelism = "clustermq", caching = caching)) config <- drake_config(plan) expect_equal(justbuilt(config), "x") } if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("clustermq parallelism", { skip_on_cran() skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } options(clustermq.scheduler = "multicore") scenario <- get_testing_scenario() e <- eval(parse(text = scenario$envir)) jobs <- scenario$jobs # ignoring for now, using 2 jobs load_mtcars_example(envir = e) e$my_plan$hpc <- e$my_plan$target != "regression1_large" parallelism <- "clustermq" for (caching in c("main", "worker")) { clean(destroy = TRUE) config <- drake_config(e$my_plan, envir = e) expect_equal(length(outdated_impl(config)), nrow(e$my_plan)) make( e$my_plan, parallelism = parallelism, jobs = jobs, caching = caching, envir = e, verbose = 1L, garbage_collection = TRUE, lock_envir = TRUE ) expect_equal(outdated_impl(config), character(0)) make( e$my_plan, parallelism = parallelism, jobs = jobs, caching = caching, envir = e, verbose = 1L, lock_envir = TRUE ) expect_equal(justbuilt(config), character(0)) e$my_plan$command[[2]] <- as.call( c(quote(identity), unname(e$my_plan$command[2])) ) make( e$my_plan, parallelism = parallelism, jobs = jobs, caching = caching, envir = e, verbose = 1L, lock_envir = TRUE ) expect_equal(justbuilt(config), "small") clean(small, cache = config$cache) make( e$my_plan, parallelism = parallelism, jobs = jobs, caching = caching, envir = e, verbose = 1L, lock_envir = TRUE ) expect_equal(justbuilt(config), "small") } if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("No hpc targets? No workers.", { skip_on_cran() skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } options(clustermq.scheduler = "multicore") plan <- drake_plan(x = target(1L, hpc = FALSE), y = target(x, hpc = FALSE)) cache <- storr::storr_environment() drake:::with_options( list(clustermq.scheduler = "does_not_exist"), make( plan, parallelism = "clustermq", jobs = 2, session_info = FALSE, cache = cache ) ) expect_equal(readd(x, cache = cache), 1L) if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("All hpc targets up to date? No workers.", { skip_on_cran() skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } options(clustermq.scheduler = "multicore") plan <- drake_plan(x = target(1L, hpc = FALSE), y = target(x, hpc = TRUE)) cache <- storr::storr_environment() make(plan, session_info = FALSE, cache = cache) config <- drake_config(plan, cache = cache) expect_equal(sort(justbuilt(config)), c("x", "y")) expect_equal(outdated_impl(config), character(0)) plan <- drake_plan( x = target(2L - 1L, hpc = FALSE), y = target(x, hpc = TRUE) ) config <- drake_config(plan, cache = cache) expect_equal(sort(outdated_impl(config)), c("x", "y")) drake:::with_options( list(clustermq.scheduler = "does_not_exist"), make( plan, parallelism = "clustermq", jobs = 2, session_info = FALSE, cache = cache ) ) expect_equal(justbuilt(config), "x") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("Start off with non-HPC targets, then go to HPC targets.", { skip_on_cran() skip_if_not_installed("clustermq", minimum_version = "0.9.1") skip_on_os("windows") if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } options(clustermq.scheduler = "multicore") scenario <- get_testing_scenario() e <- eval(parse(text = scenario$envir)) jobs <- scenario$jobs # ignoring for now, using 2 jobs load_mtcars_example(envir = e) e$my_plan$hpc <- !(e$my_plan$target %in% c("small", "large")) # Run interactively with 1 job to verify that clustermq workers # only get submitted after targets `large` and `small` are built. make( e$my_plan, parallelism = "clustermq", jobs = jobs, envir = e, verbose = 1L, garbage_collection = TRUE, lock_envir = TRUE ) config <- drake_config(e$my_plan, envir = e) expect_equal(sort(justbuilt(config)), sort(e$my_plan$target)) expect_equal(outdated_impl(config), character(0)) if ("package:clustermq" %in% search()) { detach("package:clustermq", unload = TRUE) # nolint } }) test_with_dir("clustermq warnings (worker caching)", { build <- list(target = "x", warnings = "y") expect_warning(cmq_report_warnings(build)) })