test_that("packages are actually loaded", { skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") # Needs sge_batchtools.tmpl (in current directory). tar_destroy() on.exit(tar_destroy(), add = TRUE) tar_script({ future::plan( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl" ) tar_target( x, tibble(x = "x"), packages = "tibble" ) }) tar_make_future() expect_equal(tar_read(x), tibble::tibble(x = "x")) }) test_that("nontrivial globals with custom environment", { skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) tar_destroy() on.exit(tar_destroy(), add = TRUE) tar_script({ future::plan( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl" ) envir <- new.env(parent = globalenv()) evalq({ f <- function(x) { g(x) + 1L } g <- function(x) { x + 1L } }, envir = envir) tar_option_set(envir = envir) list( tar_target(x, 1), tar_target(y, f(x)) ) }) tar_make_future() expect_equal(tar_read(y), 3L) }) test_that("nontrivial globals with global environment", { skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) tar_destroy() on.exit(tar_destroy(), add = TRUE) tar_script({ future::plan( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl" ) f <- function(x) { g(x) + 1L } g <- function(x) { x + 1L } list( tar_target(x, 1), tar_target(y, f(x)) ) }) tar_make_future() expect_equal(tar_read(y), 3L) }) test_that("branching plan on SGE", { # Needs sge_batchtools.tmpl (in current directory). skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) unlink("_targets", recursive = TRUE) on.exit(unlink("_targets", recursive = TRUE), add = TRUE) skip_if_not_installed("future") skip_if_not_installed("future.batchtools") on.exit(future::plan(future::sequential), add = TRUE) future::plan( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl" ) pipeline <- pipeline_map() out <- future_init(pipeline, workers = 4L) out$run() skipped <- names(out$scheduler$progress$skipped$envir) expect_equal(skipped, character(0)) out2 <- future_init(pipeline_map(), workers = 2L) out2$run() completed <- names(out2$scheduler$progress$completed$envir) expect_equal(completed, character(0)) value <- function(name) { target_read_value(pipeline_get_target(pipeline, name))$object } expect_equal(value("data0"), 2L) expect_equal(value("data1"), seq_len(3L)) expect_equal(value("data2"), seq_len(3L) + 3L) branches <- target_get_children(pipeline_get_target(pipeline, "map1")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 2L) } branches <- target_get_children(pipeline_get_target(pipeline, "map2")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 2L * index + 3L) } branches <- target_get_children(pipeline_get_target(pipeline, "map3")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 3L) } branches <- target_get_children(pipeline_get_target(pipeline, "map4")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 3L * index + 5L) } branches <- target_get_children(pipeline_get_target(pipeline, "map5")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 2L * index + 5L) } branches <- target_get_children(pipeline_get_target(pipeline, "map6")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 15L) } }) test_that("Same with worker-side storage", { # Needs sge_batchtools.tmpl (in current directory). skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) unlink("_targets", recursive = TRUE) on.exit(unlink("_targets", recursive = TRUE), add = TRUE) skip_if_not_installed("future") skip_if_not_installed("future.batchtools") on.exit(future::plan(future::sequential), add = TRUE) future::plan( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl" ) pipeline <- pipeline_map(storage = "worker") out <- future_init(pipeline, workers = 4L) out$run() skipped <- names(out$scheduler$progress$skipped$envir) expect_equal(skipped, character(0)) value <- function(name) { target_read_value(pipeline_get_target(pipeline, name))$object } expect_equal(value("data0"), 2L) expect_equal(value("data1"), seq_len(3L)) expect_equal(value("data2"), seq_len(3L) + 3L) branches <- target_get_children(pipeline_get_target(pipeline, "map1")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 2L) } branches <- target_get_children(pipeline_get_target(pipeline, "map2")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 2L * index + 3L) } branches <- target_get_children(pipeline_get_target(pipeline, "map3")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 3L) } branches <- target_get_children(pipeline_get_target(pipeline, "map4")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 3L * index + 5L) } branches <- target_get_children(pipeline_get_target(pipeline, "map5")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), 2L * index + 5L) } branches <- target_get_children(pipeline_get_target(pipeline, "map6")) for (index in seq_along(branches)) { expect_equal(value(branches[index]), index + 15L) } }) test_that("future.batchtools structured resources", { # Needs sge_batchtools.tmpl (in current directory). skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) unlink("_targets", recursive = TRUE) on.exit(unlink("_targets", recursive = TRUE), add = TRUE) skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_script({ list( tar_target( x, Sys.sleep(10), resources = tar_resources( future = tar_resources_future( plan = future::tweak( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl", resources = list(slots = 2) ) ) ) ) ) }) tar_make_future() expect_true(tar_exist_objects("x")) }) test_that("future.batchtools unstructured resources", { # Needs sge_batchtools.tmpl (in current directory). skip_on_cran() skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_runtime$fun <- "tar_make_future" on.exit(tar_runtime$fun <- NULL) unlink("_targets", recursive = TRUE) on.exit(unlink("_targets", recursive = TRUE), add = TRUE) skip_if_not_installed("future") skip_if_not_installed("future.batchtools") tar_script({ suppressWarnings( list( tar_target( x, Sys.sleep(10), resources = list( plan = future::tweak( future.batchtools::batchtools_sge, template = "sge_batchtools.tmpl", resources = list(slots = 2) ) ) ) ) ) }) suppressWarnings(tar_make_future()) expect_true(tar_exist_objects("x")) })