test_that("an alias function is defined for each member function with the correct body", { skip_if(Sys.getenv("R_CODECOV_ENV") == "GITHUB_ACTION") pip <- Pipeline$new("pipe") funs2check <- sapply(names(pip), \(x) is.function(pip[[x]])) |> Filter(f = isTRUE) |> names() |> setdiff("initialize") for (fun in funs2check) { alias_fun <- paste0("pipe_", fun) expect_true(exists(alias_fun), info = fun) } }) describe("pipe_new", { test_that("returns a pipeline object", { expect_true(methods::is(pipe_new("pipe"), "Pipeline")) }) test_that("pipeline name must be a non-empty string", { expect_no_error(pipe_new("foo")) expect_error( pipe_new(name = 1), "name must be a string" ) expect_error( pipe_new(name = ""), "name must not be empty" ) }) test_that("data is added as first step to pipeline", { pip <- pipe_new("pipe1", data = 1) expect_equal(pipe_get_step_names(pip), "data") out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(pipe_get_out(pip, "data"), 1) }) test_that("the logger can be customized", { my_logger <- \(level, msg, ...) { message("My Logger: ", msg) } pip <- pipe_new("pipe", logger = my_logger) out <- capture.output( pipe_run(pip), type = "message" ) expect_equal( out, c( "My Logger: Start run of 'pipe' pipeline:", "My Logger: Step 1/1 data", "My Logger: Finished execution of steps.", "My Logger: Done." ) ) }) test_that("bad definition of the custom logger is signalled", { expected_error_msg <- paste( "logger function must have the following signature:", "function(level, msg, ...)" ) logger_with_missing_level_arg <- \(msg, ...) { message("My Logger: ", msg) } expect_error( pipe_new("pipe1", logger = logger_with_missing_level_arg), expected_error_msg, fixed = TRUE ) logger_with_missing_msg_arg <- \(level, ...) { message("My Logger: ", ...) } expect_error( pipe_new("pipe1", logger = logger_with_missing_msg_arg), expected_error_msg, fixed = TRUE ) logger_with_missing_dots <- \(msg, level) { message("My Logger: ", msg) } expect_error( pipe_new("pipe1", logger = logger_with_missing_dots), expected_error_msg, fixed = TRUE ) logger_with_additional_arg <- \(level, msg, foo, ...) { message("My Logger: ", msg) } expect_error( pipe_new("pipe1", logger = logger_with_additional_arg), expected_error_msg, fixed = TRUE ) }) }) describe("pipe_add", { test_that("step must be non-empty string", { pip <- pipe_new("pipe1") foo <- \(a = 0) a expect_error(pipe_add(pip, "", foo)) expect_error(pipe_add(pip, c("a", "b"), foo)) }) test_that("fun must be passed as a function or string", { pip <- pipe_new("pipe") expect_error( pipe_add(pip, "step1", fun = 1), "is.function(fun) || is_string(fun) is not TRUE", fixed = TRUE ) }) test_that("params must be a list", { pip <- pipe_new("pipe") expect_error( pipe_add(pip, "step1", fun = \() 1, params = 1), "is.list(params)", fixed = TRUE ) }) test_that("description must be (possibly empty) string", { pip <- pipe_new("pipe") expect_error( pipe_add(pip, "step1", fun = \() 1, description = 1), "is_string(description)", fixed = TRUE ) expect_no_error( pipe_add(pip, "step1", fun = \() 1, description = "") ) }) test_that("group must be non-empty string", { pip <- pipe_new("pipe") expect_error( pipe_add(pip, "step1", fun = \() 1, group = 1), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) expect_error( pipe_add(pip, "step1", fun = \() 1, group = ""), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) }) test_that("keepOut must be logical", { pip <- pipe_new("pipe") expect_error( pipe_add(pip, "step1", fun = \() 1, keepOut = 1), "is.logical(keepOut) is not TRUE", fixed = TRUE ) }) test_that("duplicated step names are signaled", { pip <- pipe_new("pipe1") foo <- \(a = 0) a pipe_add(pip, "f1", foo) expect_error(pipe_add(pip, "f1", foo), "step 'f1' already exists") expect_error( pipe_add(pip, "f1", \(x) x), "step 'f1' already exists" ) }) test_that("missing dependencies are signaled", { pip <- pipe_new("pipe1") foo <- \(a = 0) a pipe_add(pip, "f1", foo) expect_error( pipe_add(pip, "f2", foo, params = list(a = ~undefined)), "dependency 'undefined' not found" ) }) test_that("step can refer to previous step by relative number", { pip <- pipe_new("pipe1") pipe_add(pip, "f1", \(a = 5) a) pipe_add(pip, "f2", \(x = ~-1) 2*x, keepOut = TRUE) out = pipe_run(pip, ) |> pipe_collect_out() expect_equal(out[["f2"]][[1]], 10) pipe_add(pip, "f3", \(x = ~-1, a = ~-2) x + a, keepOut = TRUE) out = pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f3"]][[1]], 10 + 5) }) test_that("a bad relative step referal is signalled", { pip <- pipe_new("pipe1") expect_error( pipe_add(pip, "f1", \(x = ~-10) x), paste( "step 'f1': relative dependency x=-10", "points to outside the pipeline" ), fixed = TRUE ) }) test_that("added step can use lambda functions", { data <- 9 pip <- pipe_new("pipe1", data = data) pipe_add(pip, "f1", \(data = ~data) data, keepOut = TRUE) a <- 1 pipe_add(pip, "f2", \(a, b) a + b, params = list(a = a, b = ~f1), keepOut = TRUE ) expect_equal( unlist(pipe_get_step(pip, "f1")[["depends"]]), c(data = "data") ) expect_equal( unlist(pipe_get_step(pip, "f2")[["depends"]]), c(b = "f1") ) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]][[1]], data) expect_equal(out[["f2"]][[1]], a + data) }) test_that( "supports functions with wildcard arguments", { my_mean <- \(x, na.rm = FALSE) { mean(x, na.rm = na.rm) } foo <- \(x, ...) { my_mean(x, ...) } v <- c(1, 2, NA, 3, 4) pip <- pipe_new("pipe", data = v) params <- list(x = ~data, na.rm = TRUE) pipe_add(pip, "mean", fun = foo, params = params, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["mean"]], mean(v, na.rm = TRUE)) pipe_set_params_at_step(pip, "mean", list(na.rm = FALSE)) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["mean"]], as.numeric(NA)) }) test_that("can have a variable defined outside as parameter default", { x <- 1 pip <- pipe_new("pipe") |> pipe_add("f1", \(a) a, params = list(a = x)) expect_equal(pipe_get_params_at_step(pip, "f1")$a, x) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(out[["f1"]], x) }) test_that("handles Param object args", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = new("NumericParam", "a", value = 1)) a) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(out[["f1"]], 1) }) test_that( "can have a Param object defined outside as parameter default", { x <- 1 p <- new("NumericParam", "a", value = x) pip <- pipe_new("pipe") |> pipe_add("f1", \(a) a, params = list(a = p)) expect_equal(pipe_get_params_at_step(pip, "f1")$a, p) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(out[["f1"]], x) }) test_that( "function can be passed as a string", { pip <- pipe_new("pipe") |> pipe_add("f1", fun = "mean", params = list(x = 1:5)) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(out[["f1"]], mean(1:5)) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean") }) test_that( "if passed as a function, name is derived from the function", { pip <- pipe_new("pipe") pipe_add(pip, "f1", fun = mean, params = list(x = 1:5)) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean") pip <- pipe_new("pipe") |> pipe_add("f1", fun = mean, params = list(x = 1:5)) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean") }) test_that( "lampda functions, are named 'function'", { pip <- pipe_new("pipe") |> pipe_add("f1", fun = \(x = 1) x) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function") pip <- pipe_new("pipe") |> pipe_add("f1", fun = \(x = 1) x) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function") }) }) describe("pipe_append", { test_that("pipelines can be combined even if their steps share names, unless tryAutofixNames is FALSE", { pip1 <- pipe_new("pipe1", data = 1) |> pipe_add("f1", \(a = 1) a, keepOut = TRUE) |> pipe_add("f2", \(b = ~f1) b) pip2 <- pipe_new("pipe2") |> pipe_add("f1", \(a = 10) a) |> pipe_add("f2", \(b = ~f1) b, keepOut = TRUE) expect_error( pip1 |> pipe_append(pip2, tryAutofixNames = FALSE), paste( "combined pipeline would have duplicated step names:", "'data', 'f1', 'f2'," ) ) pp <- pip1 |> pipe_append(pip2) expect_equal( pp |> pipe_length(), pip1 |> pipe_length() + pip2 |> pipe_length() ) out1 <- pip1 |> pipe_run() |> pipe_collect_out() out2 <- pip2 |> pipe_run() |> pipe_collect_out() out <- pp |> pipe_run() |> pipe_collect_out() expect_equivalent(out, c(out1, out2)) }) test_that("auto-fixes only the names that need auto-fix", { pip1 <- pipe_new("pipe1", data = 1) |> pipe_add("f1", \(a = 1) a, keepOut = TRUE) |> pipe_add("f2", \(b = ~f1) b) pip2 <- pipe_new("pipe2") |> pipe_add("f3", \(a = 10) a) |> pipe_add("f4", \(b = ~f3) b, keepOut = TRUE) pp <- pip1 |> pipe_append(pip2) expect_equal( pp |> pipe_get_step_names(), c("data", "f1", "f2", "data.pipe2", "f3", "f4") ) out1 <- pip1 |> pipe_run() |> pipe_collect_out() out2 <- pip2 |> pipe_run() |> pipe_collect_out() out <- pp |> pipe_run() |> pipe_collect_out() expect_equivalent(out, c(out1, out2)) }) test_that("the separator used for step names can be customized", { pip1 <- pipe_new("pipe1", data = 1) |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) pip2 <- pipe_new("pipe2") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) pp <- pip1 |> pipe_append(pip2, sep = "_") expect_equal( pp |> pipe_get_step_names(), c("data", "f1", "f2", "data_pipe2", "f1_pipe2", "f2_pipe2") ) }) test_that( "output of first pipeline can be set as input of appended pipeline", { pip1 <- pipe_new("pipe1", data = 1) pip1 |> pipe_add("f1", \(a = ~data) a * 2) pip2 <- pipe_new("pipe2", data = 99) |> pipe_add("f1", \(a = ~data) a * 3) |> pipe_add("f2", \(a = ~f1) a * 4) pp <- pip1 |> pipe_append(pip2, outAsIn = TRUE) depends <- pipe_get_depends(pp) expect_equal(depends[["data.pipe2"]], c(data = "f1")) out <- pp |> pipe_run() |> pipe_collect_out(all = TRUE) pipe1_out <- out[["f1"]][["f1"]] expect_equal(pipe1_out, 1 * 2) expect_equal(out[["data.pipe2"]], pipe1_out) expect_equal(out[["f1"]][["f1.pipe2"]], pipe1_out * 3) expect_equal(out[["f2"]], out[["f1"]][["f1.pipe2"]] * 4) }) test_that("if duplicated step names would be created, an error is given", { pip1 <- pipe_new("pipe1") pip1 |> pipe_add("f1", \(a = ~data) a + 1) pip1 |> pipe_add("f1.pipe2", \(a = ~data) a + 1) pip2 <- pipe_new("pipe2") pip2 |> pipe_add("f1", \(a = ~data) a + 1) expect_error( pip1 |> pipe_append(pip2), "Cannot auto-fix name clash for step 'f1' in pipeline 'pipe2'", fixed = TRUE ) }) }) describe("append_to_step_names", { test_that("postfix can be appended to step names", { pip <- pipe_new("pipe", data = 1) pipe_add(pip, "f1", \(a = ~data) a + 1) pipe_add(pip, "f2", \(a = ~data, b = ~f1) a + b) pipe_append_to_step_names(pip, "foo") expected_names <- c("data.foo", "f1.foo", "f2.foo") expect_equal(pipe_get_step_names(pip), expected_names) expected_depends <- list( data.foo = character(0), f1.foo = c(a = "data.foo"), f2.foo = c(a = "data.foo", b = "f1.foo") ) expect_equal(pipe_get_depends(pip), expected_depends) }) }) describe("pipe_collect_out", { test_that("data is set as first step but not part of output by default", { dat <- data.frame(a = 1:2, b = 1:2) pip <- pipe_new("pipe1", data = dat) expect_equal(pip$pipeline[["step"]], "data") out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out, list()) pip <- pipe_new("pipe1", data = dat) |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], dat) }) test_that("at the end, pipeline can clean output that shall not be kept", { data <- 9 pip <- pipe_new("pipe1", data = data) foo <-\(a = 1) a bar <-\(a, b) a + b a <- 5 pipe_add(pip, "f1", foo, params = list(a = a)) pipe_add( pip, "f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE ) pipe_run(pip, cleanUnkept = TRUE) expect_equal(pipe_get_out(pip, "f1"), NULL) expect_equal(pipe_get_out(pip, "f2"), a + data) }) test_that("output is collected as expected", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a, keepOut = TRUE) |> pipe_add("f2", \(a = 2, b = ~f1) a + b) |> pipe_add("f3", \(a = 3, b = ~f2) a + b, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(length(out), 2) expect_equal(names(out), c("f1", "f3")) }) test_that("output can be grouped", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |> pipe_add("f3", \(a = 1, b = 2) a / b) |> pipe_add("f4", \(a = 2, b = 2) a + b, group = "plus") out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(out[["plus"]], list(f2 = 3, f4 = 4)) expect_equal(out[["f1"]], 1) expect_equal(out[["f3"]], 1/2) }) test_that("output is ordered in the order of steps", { pip <- pipe_new("pipe") |> pipe_add("f2", \(a = 1) a, keepOut = TRUE) |> pipe_add("f1", \(b = 2) b, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(names(out), c("f2", "f1")) }) test_that( "grouped output is ordered in the order of group definitions", { pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x, group = "g2") |> pipe_add("f2", \(x = 2) x, group = "g1") |> pipe_add("f3", \(x = 3) x, group = "g2") |> pipe_add("f4", \(x = 4) x, group = "g1") out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(names(out), c("data", "g2", "g1")) }) test_that( "if just one group the output name still will take the group name", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |> pipe_add("f3", \(a = 1, b = 2) a / b, group = "my f3") |> pipe_add("f4", \(a = 2, b = 2) a + b, group = "plus") out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(names(out), c("data", "f1", "plus", "my f3")) }) describe("groupBy option", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 1, b = 2) a + b, group = "plus") |> pipe_add("f3", \(a = 1, b = 2) a / b) test_that("column to groupBy can be customized", { pipe_run_step(pip, "f1") pipe_run_step(pip, "f3") out <- pipe_collect_out(pip, groupBy = "state", all = TRUE) expect_equal( out, list( New = list(data = NULL, f2 = NULL), Done = list(f1 = 1, f3 = 0.5) ) ) }) test_that("signals bad groupBy input", { expect_error( pipe_collect_out(pip, groupBy = c("not", "a", "string")), "groupBy must be a single string" ) expect_error( pipe_collect_out(pip, groupBy = "foo"), "groupBy column does not exist" ) expect_error( pipe_collect_out(pip, groupBy = "time"), "groupBy column must be character" ) }) }) }) describe("pipe_discard_steps", { test_that("pipeline steps can be discarded by pattern", { pip <- pipe_new("pipe1") |> pipe_add("calc", \(a = 1) a) |> pipe_add("plot1", \(x = ~calc) x) |> pipe_add("plot2", \(x = ~plot1) x) out <- capture.output( pipe_discard_steps(pip, "plot"), type = "message" ) expect_equal( out, c( "step 'plot2' was removed", "step 'plot1' was removed" ) ) expect_equal(pip$pipeline[["step"]], c("data", "calc")) }) test_that("if no pipeline step matches pattern, pipeline remains unchanged", { pip <- pipe_new("pipe1") |> pipe_add("calc", \(a = 1) a) |> pipe_add("plot1", \(x = ~calc) x) |> pipe_add("plot2", \(x = ~plot1) x) steps_before = pip$pipeline[["step"]] expect_silent(pipe_discard_steps(pip, "bla")) expect_equal(pip$pipeline[["step"]], steps_before) }) test_that("if step has downstream dependencies, an error is given", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) b) expect_error( pipe_discard_steps(pip, "f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2'" ) ) pipe_add(pip, "f3", \(x = ~f1) x) expect_error( pipe_discard_steps(pip, "f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f3'" ) ) }) }) describe("pipe_get_data", { test_that( "data can be retrieved", { p <- pipe_new("pipe", data = 1:2) expect_equal(p |> pipe_get_data(), 1:2) p |> pipe_set_data(3:4) expect_equal(p |> pipe_get_data(), 3:4) }) test_that( "signals missing data", { p <- pipe_new("pipe", data = 1:2) p |> pipe_pop_step() # remove data step expect_error( p |> pipe_get_data(), "no data step defined" ) }) }) describe("pipe_get_depends", { test_that( "dependencies can be retrieved and are named after the steps", { pip <- pipe_new("pipe", data = 1) pipe_add(pip, "f1", \(a = ~data) a + 1) pipe_add(pip, "f2", \(b = ~f1) b + 1) depends <- pipe_get_depends(pip) expected_depends <- list( data = character(0), f1 = c(a = "data"), f2 = c(b = "f1") ) expect_equal(depends, expected_depends) expect_equal(names(depends), pipe_get_step_names(pip)) }) }) describe("pipe_get_depends_down", { test_that("dependencies can be determined recursively for given step", { pip <- pipe_new("pipe") pipe_add(pip, "f1", \(a = 1) a) pipe_add(pip, "f2", \(a = ~f1) a) pipe_add(pip, "f3", \(a = ~f1, b = ~f2) a + b) pipe_add(pip, "f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c) expect_equal(pipe_get_depends_down(pip, "f3"), c("f4")) expect_equal(pipe_get_depends_down(pip, "f2"), c("f3", "f4")) expect_equal(pipe_get_depends_down(pip, "f1"), c("f2", "f3", "f4")) }) test_that("if no dependencies an empty character vector is returned", { pip <- pipe_new("pipe") pipe_add(pip, "f1", \(a = 1) a) expect_equal(pipe_get_depends_down(pip, "f1"), character(0)) }) test_that("step must exist", { pip <- pipe_new("pipe") expect_error( pipe_get_depends_down(pip, "f1"), "step 'f1' does not exist" ) }) test_that( "works with complex dependencies as created by data splits", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = 2) b) |> pipe_add("f3", \(x = ~f1, y = ~f2) x + y) pipe_set_data_split(pip, dataList, toStep = "f2") expect_equal( pipe_get_depends_down(pip, "f1.1"), c("f2.1", "f3") ) expect_equal( pipe_get_depends_down(pip, "f1.2"), c("f2.2", "f3") ) expect_equal(pipe_get_depends_down(pip, "f2.1"), "f3") expect_equal(pipe_get_depends_down(pip, "f2.2"), "f3") }) }) describe("pipe_get_depends_up", { test_that("dependencies can be determined recursively for given step", { pip <- pipe_new("pipe") pipe_add(pip, "f1", \(a = 1) a) pipe_add(pip, "f2", \(a = ~f1) a) pipe_add(pip, "f3", \(a = ~f1, b = ~f2) a + b) pipe_add(pip, "f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c) expect_equal(pipe_get_depends_up(pip, "f2"), c("f1")) expect_equal(pipe_get_depends_up(pip, "f3"), c("f1", "f2")) expect_equal(pipe_get_depends_up(pip, "f4"), c("f1", "f2", "f3")) }) test_that("if no dependencies an empty character vector is returned", { pip <- pipe_new("pipe") pipe_add(pip, "f1", \(a = 1) a) expect_equal(pipe_get_depends_up(pip, "f1"), character(0)) }) test_that("step must exist", { pip <- pipe_new("pipe") expect_error( pipe_get_depends_up(pip, "f1"), "step 'f1' does not exist" ) }) test_that("works with complex dependencies as created by data splits", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = 2) b) |> pipe_add("f3", \(x = ~f1, y = ~f2) x + y) pipe_set_data_split(pip, dataList, toStep = "f2") expect_equal(pipe_get_depends_up(pip, "f2.1"), c("f1.1")) expect_equal(pipe_get_depends_up(pip, "f2.2"), c("f1.2")) expect_equivalent( pipe_get_depends_up(pip, "f3"), c("f1.1", "f2.1", "f1.2", "f2.2") ) }) }) describe("pipe_get_graph", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = ~data) a) res <- pipe_get_graph(pip) test_that("returns a node table with the expected columns", { tab <- res$nodes expect_true(is.data.frame(tab)) expectedColumns <- c("id", "label", "group", "shape", "color", "title") expect_equal(colnames(tab), expectedColumns) }) test_that("the node table contains all steps", { tab <- res$nodes expect_equal(tab$label, pipe_get_step_names(pip)) }) test_that("returns an edges table with the expected columns", { tab <- res$edges expect_true(is.data.frame(tab)) expectedColumns <- c("from", "to", "arrows") expect_equal(colnames(tab), expectedColumns) }) test_that("can be printed created for certain groups", { pip <- pipe_new("pipe") pipe_add(pip, "step2", \(a = ~data) a + 1, group = "add") pipe_add(pip, "step3", \(a = ~step2) 2 * a, group = "mult") pipe_add(pip, "step4", \(a = ~step2, b = ~step3) a + b, group = "add") pipe_add(pip, "step5", \(a = ~data, b = ~step4) a * b, group = "mult") res.add <- pipe_get_graph(pip, groups = "add") expect_equal(res.add$nodes$label, c("step2", "step4")) res.mult <- pipe_get_graph(pip, groups = "mult") expect_equal(res.mult$nodes$label, c("step3", "step5")) }) }) describe("pipe_get_out", { test_that("output at given step can be retrieved", { data <- airquality pip <- pipe_new("pipe", data = data) |> pipe_add("model", \(data = ~data) { lm(Ozone ~ Wind, data = data) }, ) pipe_run(pip) expect_equal(pipe_get_out(pip, "data"), data) expect_equivalent( pipe_get_out(pip, "model"), lm(Ozone ~ Wind, data = data) ) }) test_that("step of requested output must exist", { pip <- pipe_new("pipe") pipe_run(pip) expect_error(pipe_get_out(pip, "foo"), "step 'foo' does not exist") }) }) describe("pipe_get_params", { test_that("parameters can be retrieved", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a, keepOut = TRUE) |> pipe_add("f2", \(a, b = ~f1) a + b, params = list(a = 8), keepOut = TRUE ) |> pipe_add("f3", \(a = ~f2, b = 3) a + b, keepOut = TRUE) p <- pipe_get_params(pip) expect_equal( p, list(f1 = list(a = 1), f2 = list(a = 8), f3 = list(b = 3)) ) }) test_that("empty pipeline gives empty list of parameters", { pip <- pipe_new("pipe1") expect_equivalent(pipe_get_params(pip), list()) pipe_add(pip, "f1", \() 1) expect_equivalent(pipe_get_params(pip), list()) }) test_that("hidden parameters are filtered out by default", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, .hidden = 2) a) p <- pipe_get_params(pip) expect_equal(p, list(f1 = list(a = 1))) p <- pipe_get_params(pip, ignoreHidden = FALSE) expect_equal(p, list(f1 = list(a = 1, .hidden = 2))) }) test_that("works with Param objects", { pip <- pipe_new("pipe1") |> pipe_add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } ) |> pipe_add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) par <- pipe_get_params(pip) expect_true(all(par$f1 |> sapply(is, "NumericParam"))) expect_equal(par$f1$x@value, 1) expect_equal(par$f1$y@value, 2) expect_true(all(par$f2 |> sapply(is, "StringParam"))) expect_equal(par$f2$s1@value, "Hello") expect_equal(par$f2$s2@value, "World") }) }) describe("pipe_get_params_at_step", { test_that("list of step parameters can be retrieved", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) |> pipe_add("f2", \(x = 1, y = 2) x + y) expect_equal( pipe_get_params_at_step(pip, "f1"), list(a = 1, b = 2) ) expect_equal( pipe_get_params_at_step(pip, "f2"), list(x = 1, y = 2) ) }) test_that("if no parameters empty list is returned", { pip <- pipe_new("pipe1") |> pipe_add("f1", \() 1) expect_equal( pipe_get_params_at_step(pip, "f1"), list() ) }) test_that( "hidden parameters are not returned, unless explicitly requested", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, .b = 2) a + .b) expect_equal( pipe_get_params_at_step(pip, "f1"), list(a = 1) ) expect_equal( pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE), list(a = 1, .b = 2) ) }) test_that("bound parameters are never returned", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = ~data) a + b) expect_equal( pipe_get_params_at_step(pip, "f1"), list(a = 1) ) expect_equal( pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE), list(a = 1) ) }) test_that("step must exist", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = ~data) a + b) expect_error( pipe_get_params_at_step(pip, "foo"), "step 'foo' does not exist" ) }) test_that("works with Param objects", { pip <- pipe_new("pipe1") |> pipe_add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } ) |> pipe_add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) par1 <- pipe_get_params_at_step(pip, "f1") expect_true(all(par1 |> sapply(is, "NumericParam"))) expect_equal(par1$x@value, 1) expect_equal(par1$y@value, 2) par2 <- pipe_get_params_at_step(pip, "f2") expect_true(all(par2 |> sapply(is, "StringParam"))) expect_equal(par2$s1@value, "Hello") expect_equal(par2$s2@value, "World") }) }) describe("pipe_get_params_unique", { test_that("parameters can be retrieved uniquely and if occuring multiple times, the 1st default value is used", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 2, b = 3) a + b) |> pipe_add("f3", \(a = 4, b = 5, c = 6) a + b) p <- pipe_get_params_unique(pip) expect_equivalent(p, list(a = 1, b = 3, c = 6)) }) test_that("empty pipeline gives empty list", { pip <- pipe_new("pipe") expect_equivalent(pipe_get_params_unique(pip), list()) }) test_that("pipeline with no parameters gives empty list", { pip <- pipe_new("pipe") |> pipe_add("f1", \() 1) expect_equivalent(pipe_get_params_unique(pip), list()) }) test_that("works with Param objects", { pip <- pipe_new("pipe1") |> pipe_add( "f1", \( a = 0, x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { a * (x + y) } ) |> pipe_add( "f2", \( a = new("NumericParam", "a", value = 0), x = new("NumericParam", "x", value = 1), y = 2, z = new("NumericParam", "y", value = 3) ) { a * (x + y + z) } ) par <- pipe_get_params_unique(pip) expect_equal(names(par), c("a", "x", "y", "z")) expect_equal(par$a, 0) expect_equal(par$x@value, 1) expect_equal(par$y@value, 2) expect_equal(par$z@value, 3) }) }) describe("pipe_get_params_unique_json", { test_that("the elements are not named", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a, keepOut = TRUE) p <- pipe_get_params_unique_json(pip) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(names(pl), NULL) pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |> pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pipe_get_params_unique_json(pip) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(names(pl), NULL) }) test_that("standard parameters are returned as name-value pairs", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 1, b = 2) a) |> pipe_add("f3", \(a = 1, b = 2, c = list(a = 1, b = 2)) a) p <- pipe_get_params_unique_json(pip) expect_true(methods::is(p, "json")) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal( pl, list( list(name = "a", value = 1), list(name = "b", value = 2), list(name = "c", value = list(a = 1, b = 2)) ) ) }) test_that("Param objects are returned with full information", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |> pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pipe_get_params_unique_json(pip) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal( pl, list( list( value = "some x", name = "x", advanced = FALSE, label = "my x", description = "", source = "internal", domain = "", class = "StringParam" ), list( value = "some y", name = "y", advanced = FALSE, label = "my y", description = "", source = "internal", domain = "", class = "StringParam" ) ) ) }) test_that("the name of the arg is set to the name of the Param object", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = new("StringParam", "my x", "some x")) x) |> pipe_add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pipe_get_params_unique_json(pip) pl <- jsonlite::fromJSON(p) expect_true(pl[["label"]][[1]] == "my x") expect_false(pl[["name"]][[1]] == "my x") hasArgName <- pl[["name"]][[1]] == "x" expect_true(pl[["label"]][[2]] == "my y") expect_false(pl[["name"]][[2]] == "my y") hasArgName <- pl[["name"]][[2]] == "y" }) test_that("works with mixed, that is, standard and Param objects", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = 1) x) |> pipe_add("f2", \(s = new("StringParam", "my s", "some s")) s) p <- pipe_get_params_unique_json(pip) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(pl[[1]], list(name = "x", value = 1L)) expect_equal( pl[[2]], list( value = "some s", name = "s", advanced = FALSE, label = "my s", description = "", source = "internal", domain = "", class = "StringParam" ) ) }) }) describe("pipe_get_step", { test_that("single steps can be retrieved", { pip <- pipe_new("pipe1") |> pipe_add("f1", identity, params = list(x = 1)) expect_equal(pipe_get_step(pip, "data"), pip$pipeline[1, ]) expect_equal(pipe_get_step(pip, "f1"), pip$pipeline[2, ]) expect_error(pipe_get_step(pip, "foo"), "step 'foo' does not exist") }) test_that("dependencies are recorded as expected", { pip <- pipe_new("pipe1", data = 9) foo <-\(a = 0) a bar <-\(a = 1, b = 2) a + b pipe_add(pip, "f1", foo) pipe_add(pip, "f2", bar, params = list(a = ~data, b = ~f1)) expect_true(length(unlist(pipe_get_step(pip, "f1")[["depends"]])) == 0) expect_equal( unlist(pipe_get_step(pip, "f2")[["depends"]]), c("a" = "data", b = "f1") ) }) }) describe("pipe_get_step_names", { test_that("step names be retrieved", { pip <- pipe_new("pipe1") |> pipe_add("f1", \() {}) |> pipe_add("f2", \(x = 1) x) expect_equal(pipe_get_step_names(pip), pip$pipeline[["step"]]) }) }) describe("pipe_get_step_number", { test_that("get_step_number", { test_that("get_step_number works as expected", { pip <- expect_no_error(pipe_new("pipe")) pipe_add(pip, "f1", \(a = 1) a) pipe_add(pip, "f2", \(a = 1) a) pipe_get_step_number(pip, "f1") |> expect_equal(2) pipe_get_step_number(pip, "f2") |> expect_equal(3) }) test_that("signals non-existent step", { pip <- expect_no_error(pipe_new("pipe")) pipe_add(pip, "f1", \(a = 1) a) expect_error( pipe_get_step_number(pip, "non-existent"), "step 'non-existent' does not exist" ) }) }) }) describe("pipe_has_step", { test_that("it can be checked if pipeline has a step", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_true(pipe_has_step(pip, "f1")) expect_false(pipe_has_step(pip, "f2")) }) }) describe("pipe_insert_after", { test_that("can insert a step after another step", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) pipe_insert_after( pip, "f1", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pipe_get_step_names(pip), c("data", "f1", "f3", "f2") ) }) test_that("will not insert a step if the step already exists", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_after(pip, "f1", step = "f2"), "step 'f2' already exists" ) }) test_that( "will not insert a step if the reference step does not exist", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_after(pip, "non-existent", step = "f3"), "step 'non-existent' does not exist" ) }) test_that( "will not insert a step with bad parameter dependencies", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_after(pip, "f1", step = "f3", \(x = ~f2) x), "step 'f3': dependency 'f2' not found" ) }) test_that("will work if insert happens at last position", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) pipe_insert_after( pip, "f2", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pipe_get_step_names(pip), c("data", "f1", "f2", "f3") ) }) }) describe("pipe_insert_before", { test_that("can insert a step after another step", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) pipe_insert_before( pip, "f2", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pipe_get_step_names(pip), c("data", "f1", "f3", "f2") ) }) test_that("will not insert a step if the step already exists", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_before(pip, "f1", step = "f2"), "step 'f2' already exists" ) }) test_that( "will not allow step to be inserted at first position", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) expect_error( pipe_insert_before(pip, "data", step = "f2"), "cannot insert before first step" ) }) test_that("will not insert a step if the reference step does not exist", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_before(pip, "non-existent", step = "f3"), "step 'non-existent' does not exist" ) }) test_that( "will not insert a step with bad parameter dependencies", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a + 1) |> pipe_add("f2", \(a = ~f1) a + 1) expect_error( pipe_insert_before(pip, "f2", step = "f3", \(x = ~f2) x), "step 'f3': dependency 'f2' not found" ) }) }) describe("pipe_length", { test_that("returns the number of steps", { pip <- pipe_new("pipe") expect_equal(pipe_length(pip), 1) pipe_add(pip, "f1", \(a = 1) a) expect_equal(pipe_length(pip), 2) pipe_remove_step(pip, "f1") expect_equal(pipe_length(pip), 1) }) }) describe("pipe_lock_step", { test_that("sets state to 'locked'", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) pipe_lock_step(pip, "f1") expect_equal(pipe_get_step(pip, "f1")[["state"]], "Locked") pip }) }) describe("pipe_print", { test_that("pipeline can be printed", { pip <- pipe_new("pipe1", data = 9) expect_output(pipe_print(pip)) }) test_that("missing function is signaled", { pip <- pipe_new("pipe1") expect_error( pipe_add(pip, "f1", "non-existing-function"), "object 'non-existing-function' of mode 'function' was not found" ) }) test_that("if verbose is TRUE, all columns are printed", { op <- options(width = 1000L) on.exit(options(op)) pip <- pipe_new("pipe1", data = 9) out <- capture.output(pipe_print(pip, verbose = TRUE)) header <- out[1] |> trimws() |> strsplit("\\s+") |> unlist() expected_header <- colnames(pip$pipeline) expect_equal(header, expected_header) }) }) describe("pipe_pop_step", { test_that("last pipeline step can be popped", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) pip_copy = pipe_clone(pip) pipe_add(pip, "f2", \(b = 2) b) expect_equal(pipe_length(pip), 3) expect_equal(pip_copy$length(), 2) res = pipe_pop_step(pip) expect_equal(res, "f2") expect_equal(pipe_length(pip), 2) expect_equal(pip, pip_copy) }) }) describe("pipe_pop_steps_after", { test_that("all steps after a given step can be removed", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) |> pipe_add("f2", \(x = ~f1) x) |> pipe_add("f3", \(x = ~f2) x) steps = pipe_pop_steps_after(pip, "f1") expect_equal(steps, c("f2", "f3")) hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]]) expect_true(hasAllStepsRemoved) }) test_that("if given step does not exist, an error is signalled", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) expect_error( pipe_pop_steps_after(pip, "bad_step"), "step 'bad_step' does not exist" ) }) test_that("if given step is the last step, nothing gets removed", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) |> pipe_add("f2", \(x = ~f1) x) length_before = pipe_length(pip) res = pipe_pop_steps_after(pip, "f2") expect_equal(res, character(0)) hasNothingRemoved = pipe_length(pip) == length_before expect_true(hasNothingRemoved) }) }) describe("pipe_pop_steps_from", { test_that("all steps from a given step can be removed", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) |> pipe_add("f2", \(x = ~f1) x) |> pipe_add("f3", \(x = ~f2) x) steps = pipe_pop_steps_from(pip, "f2") expect_equal(steps, c("f2", "f3")) hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]]) expect_true(hasAllStepsRemoved) }) test_that("if given step does not exist, an error is signalled", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) expect_error( pipe_pop_steps_from(pip, "bad_step"), "step 'bad_step' does not exist" ) }) test_that("if given step is the last step, one step removed", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(x = 1) x) |> pipe_add("f2", \(x = ~f1) x) length_before = pipe_length(pip) res = pipe_pop_steps_from(pip, "f2") expect_equal(res, "f2") hasOneStepRemoved = pipe_length(pip) == length_before - 1 expect_true(hasOneStepRemoved) }) }) describe("pipe_remove_step", { test_that("pipeline step can be removed", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 1) b) pipe_remove_step(pip, "f1") expect_equal(pipe_get_step_names(pip), c("data", "f2")) }) test_that("step must exist", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_error( pipe_remove_step(pip, "non-existent-step"), "step 'non-existent-step' does not exist" ) }) test_that("if step has downstream dependencies, an error is given", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) b) expect_error( pipe_remove_step(pip, "f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2'" ) ) pipe_add(pip, "f3", \(x = ~f1) x) expect_error( pipe_remove_step(pip, "f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f3'" ) ) }) test_that( "if error, only the direct downstream dependencies are reported", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) b) |> pipe_add("f3", \(c = ~f2) c) |> pipe_add("f4", \(d = ~f1) d) expect_error( pipe_remove_step(pip, "f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f4'" ) ) }) test_that( "step can be removed together with is downstream dependencies", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) b) |> pipe_add("f3", \(c = ~f2) c) |> pipe_add("f4", \(d = ~f1) d) |> pipe_add("f5", \(x = ~data) x) out <- utils::capture.output( pipe_remove_step(pip, "f1", recursive = TRUE), type = "message" ) expect_equal(pipe_get_step_names(pip), c("data", "f5")) expect_equal( out, paste( "Removing step 'f1' and its downstream dependencies:", "'f2', 'f3', 'f4'" ) ) }) }) describe("pipe_remove_step", { test_that("pipeline step can be renamed", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) pipe_rename_step(pip, from = "f1", to = "first") pipe_get_step_names(pip) |> expect_equal(c("data", "first", "f2")) }) test_that("signals name clash", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) expect_error( pipe_rename_step(pip, from = "f1", to = "f2"), "step 'f2' already exists" ) }) test_that("renames dependencies as well", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) b) |> pipe_add("f3", \(a = ~f1, b = ~f2) a + b) pipe_rename_step(pip, from = "f1", to = "first") expect_equal( pipe_get_depends(pip), list( data = character(0), first = character(0), f2 = c(b = "first"), f3 = c(a = "first", b = "f2") ) ) }) }) describe("pipe_replace_step", { test_that("pipeline steps can be replaced", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) |> pipe_add("f3", \(c = ~f2) c, keepOut = TRUE) out = unname(unlist(pipe_run(pip) |> pipe_collect_out())) expect_equal(out, 2) pipe_replace_step(pip, "f2", \(z = 4) z) out = unname(unlist(pipe_run(pip) |> pipe_collect_out())) expect_equal(out, 4) }) test_that("fun must be passed as a function or string", { pip <- pipe_new("pipe") |> pipe_add("step1", \(a = 1) a) expect_error( pipe_replace_step(pip, "step1", fun = 1), "is.function(fun) || is_string(fun) is not TRUE", fixed = TRUE ) }) test_that("params must be a list", { pip <- pipe_new("pipe") |> pipe_add("step1", \(a = 1) a) expect_error( pipe_replace_step(pip, "step1", fun =\() 1, params = 1), "is.list(params)", fixed = TRUE ) }) test_that("description must be (possibly empty) string", { pip <- pipe_new("pipe") |> pipe_add("step1", \(a = 1) a) expect_error( pipe_replace_step( pip, "step1", fun =\() 1, description = 1 ), "is_string(description)", fixed = TRUE ) expect_no_error( pipe_replace_step( pip, "step1", fun =\() 1, description = "" ) ) }) test_that("group must be non-empty string", { pip <- pipe_new("pipe") |> pipe_add("step1", \(a = 1) a) expect_error( pipe_replace_step(pip, "step1", fun =\() 1, group = 1), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) expect_error( pipe_replace_step(pip, "step1", fun =\() 1, group = ""), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) }) test_that("keepOut must be logical", { pip <- pipe_new("pipe") |> pipe_add("step1", \(a = 1) a) expect_error( pipe_replace_step(pip, "step1", fun =\() 1, keepOut = 1), "is.logical(keepOut) is not TRUE", fixed = TRUE ) }) test_that("the replacing function can be passed as a string", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = 3) x, keepOut = TRUE) out = unname(unlist(pipe_run(pip) |> pipe_collect_out())) expect_equal(out, 3) .my_func <-\(x = 3) { 2 * x } assign(".my_func", .my_func, envir = globalenv()) on.exit(rm(".my_func", envir = globalenv())) pipe_replace_step(pip, "f1", fun = ".my_func", keepOut = TRUE) out = unname(unlist(pipe_run(pip) |> pipe_collect_out())) expect_equal(out, 6) }) test_that( "when replacing function, default parameters can be overridden", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = 1:3) x, keepOut = TRUE) .my_func <-\(x = 3) { 2 * x } assign(".my_func", .my_func, envir = globalenv()) on.exit(rm(".my_func", envir = globalenv())) pipe_replace_step( pip, "f1", fun = ".my_func", params = list(x = 10), keepOut = TRUE ) out = unname(unlist(pipe_run(pip) |> pipe_collect_out())) expect_equal(out, 20) }) test_that("the pipeline step that is being replaced must exist", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) |> pipe_add("f3", \(c = ~f2) c, keepOut = TRUE) expect_error(pipe_replace_step(pip, "non-existent", \(z = 4) z)) }) test_that( "if replacing a pipeline step, dependencies are verified correctly", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = 2) b) |> pipe_add("f3", \(c = ~f2) c, keepOut = TRUE) expect_error( pipe_replace_step(pip, "f2", \(z = ~foo) z), "dependency 'foo' not found up to step 'f1'" ) expect_error( pipe_replace_step(pip, "f2", \(z = ~f2) z), "dependency 'f2' not found up to step 'f1'" ) expect_error( pipe_replace_step(pip, "f2", \(z = ~f3) z), "dependency 'f3' not found up to step 'f1'" ) }) test_that( "states are updated correctly", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 2) a) |> pipe_add("f3", \(a = ~f2) a, keepOut = TRUE) |> pipe_add("f4", \(a = ~f3) a, keepOut = TRUE) pipe_run(pip) pipe_replace_step(pip, "f2", \(a = 2) 2* a) expect_equal(pipe_get_step(pip, "f1")$state, "Done") expect_equal(pipe_get_step(pip, "f2")$state, "New") expect_equal(pipe_get_step(pip, "f3")$state, "Outdated") expect_equal(pipe_get_step(pip, "f4")$state, "Outdated") }) it("can have a variable defined outside as parameter default", { x <- 3 pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) pip |> pipe_replace_step("f1", fun = \(a) a, params = list(a = x)) out <- pipe_run(pip) |> pipe_get_out("f1") expect_equal(out, x) }) it("handles Param object args", { pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) pip |> pipe_replace_step( "f1", fun = \(a = new("NumericParam", "a", value = 3)) a ) out <- pipe_run(pip) |> pipe_get_out("f1") expect_equal(out, 3) }) it("can have a Param object defined outside as parameter default", { x <- 3 pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) p <- new("NumericParam", "a", value = x) pip |> pipe_replace_step("f1", fun = \(a) a, params = list(a = p)) pip <- pipe_new("pipe") |> pipe_add("f1", \(a) a, params = list(a = p)) expect_equal(pipe_get_params_at_step(pip, "f1")$a, p) out <- pipe_run(pip) |> pipe_get_out("f1") expect_equal(out, x) }) it("function can be passed as a string", { pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) pip |> pipe_replace_step("f1", fun = "mean", params = list(x = 1:5)) out <- pipe_run(pip) |> pipe_get_out("f1") expect_equal(out, mean(1:5)) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean") }) it("if passed as a function, name is derived from the function", { pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) pip |> pipe_replace_step("f1", fun = mean, params = list(x = 1:5)) out <- pipe_run(pip) |> pipe_get_out("f1") expect_equal(out, mean(1:5)) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "mean") }) it("lampda functions, are named 'function'", { pip <- pipe_new("pipe") |> pipe_add("f1", \(x = 1) x) pip |> pipe_replace_step("f1", fun = \(x = 1) x) expect_equal(pipe_get_step(pip, "f1")[["funcName"]], "function") }) }) describe("pipe_reset", { test_that( "after reset pipeline is the same as before the run", { p <- pipe_new("pipe", data = 1:2) p |> pipe_add("f1", \(x = 1) x) p |> pipe_add("f2", \(y = 1) y) p |> pipe_run() expect_equal( p |> pipe_collect_out(all = TRUE), list(data = 1:2, f1 = 1, f2 = 1) ) expect_true(all(p$pipeline[["state"]] == "Done")) p |> pipe_reset() expect_equal( p |> pipe_collect_out(all = TRUE), list(data = NULL, f1 = NULL, f2 = NULL) ) expect_true(all(p$pipeline[["state"]] == "New")) }) }) describe("pipe_run", { test_that("empty pipeline can be run", { expect_no_error(pipe_new("pipe1") |> pipe_run()) }) test_that("returns the pipeline object", { pip <- pipe_new("pipe1") |> pipe_run() expect_equal(pip$name, "pipe1") }) test_that("if function result is a list, all names are preserved", { # Result list length == 1 - the critical case resultList = list(foo = 1) pip <- pipe_new("pipe") |> pipe_add("f1", \() resultList, keepOut = TRUE) out = pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], resultList) # Result list length > 1 resultList = list(foo = 1, bar = 2) pip <- pipe_new("pipe") |> pipe_add("f1", \() resultList, keepOut = TRUE) out = pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], resultList) }) test_that("pipeline execution is correct", { data <- 9 pip <- pipe_new("pipe1", data = data) foo <-\(a = 1) a bar <-\(a, b) a + b a <- 5 pipe_add(pip, "f1", foo, params = list(a = a), keepOut = TRUE) pipe_add( pip, "f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE ) pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], a) expect_equal(pip$pipeline[["out"]][[3]], a + data) }) test_that("pipeline execution can cope with void functions", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) {}, keepOut = TRUE) |> pipe_add("f2", \(b = 2) b, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out, list(f1 = NULL, f2 = 2)) }) test_that( "if pipeline execution fails, the error message is returned as error", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(b = ~f1) stop("something went wrong")) expect_error(pipe_run(pip), "something went wrong") }) test_that( "can handle 'NULL' results", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], 1) pipe_set_data(pip, NULL) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], NULL) }) test_that( "can be run recursively to dynamically create and run pipelines", { pip <- pipe_new("pipe", data = 1) |> pipe_add( "f1", fun =\(data = 10) { pip <- pipe_new("2nd pipe", data = data) |> pipe_add("step1", \(x = ~data) x) |> pipe_add("step2", \(x = ~step1) { print(x) 2 * x }, keepOut = TRUE) } ) pip2 <- pipe_run(pip, recursive = TRUE) expect_equal(pip2 |> pipe_get_step_names(), c("data", "step1", "step2")) out <- pip2 |> pipe_collect_out() expect_equal(out[["step2"]], 20) }) test_that("will not re-run steps that are already done unless forced", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(y = ~f1) y + 1) pipe_run(pip) expect_equal(pipe_get_step(pip, "f1")$state, "Done") expect_equal(pipe_get_step(pip, "f2")$state, "Done") expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) # Set parameter, which outdates step and run again pipe_run(pip, force = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) }) test_that("will never re-run locked steps", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(y = ~f1) y + 1) pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) pipe_lock_step(pip, "f1") pipe_lock_step(pip, "f2") # Set parameter, which outdates step and run again pipe_run(pip, force = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) }) test_that("can clean unkept steps", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(y = ~f1) y + 1) pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pipe_run(pip, cleanUnkept = TRUE) expect_true(all(sapply(pip$pipeline[["out"]], is.null))) pipe_set_keep_out(pip, "f1", TRUE) pipe_run(pip, cleanUnkept = TRUE) expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL)) }) test_that("logs warning without interrupting the run", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(x = ~f1) { warning("something might be wrong") x }) |> pipe_add("f3", \(x = ~f2) x) log <- utils::capture.output( expect_warning(pipe_run(pip), "something might be wrong") ) Filter(log, f =\(x) x |> startsWith("WARN")) |> grepl(pattern = "something might be wrong") |> expect_true() wasRunTillEnd <- pipe_get_out(pip, "f3") == 2 expect_true(wasRunTillEnd) }) test_that("logs error and stops at failed step", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(x = ~f1) { stop("something went wrong") x }) |> pipe_add("f3", \(x = ~f2) x) log <- utils::capture.output( expect_error(pipe_run(pip), "something went wrong") ) Filter(log, f =\(x) x |> startsWith("ERROR")) |> grepl(pattern = "something went wrong") |> expect_true() wasRunTillEnd <- isTRUE(pipe_get_out(pip, "f3") == 2) expect_false(wasRunTillEnd) }) test_that("can show progress", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(x = ~f1) x) m <- mockery::mock() pipe_run(pip, progress = m) args <- mockery::mock_args(m) expect_equal(length(m), pipe_length(pip)) expect_equal(args[[1]][[1]], 1) expect_equal(args[[1]][["detail"]], "data") expect_equal(args[[2]][[1]], 2) expect_equal(args[[2]][["detail"]], "f1") expect_equal(args[[3]][[1]], 3) expect_equal(args[[3]][["detail"]], "f2") }) test_that("works with Param objects", { pip <- pipe_new("pipe1") |> pipe_add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } ) |> pipe_add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) pipe_run(pip) pipe_get_out(pip, "f1") |> expect_equal(3) pipe_get_out(pip, "f2") |> expect_equal("Hello World") }) }) describe("pipe_run_step", { test_that("pipeline can be run at given step", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) expect_no_error(pipe_run_step(pip, "A")) }) test_that("upstream steps are by default run with given step", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step(pip, "B") expect_equal(pipe_get_out(pip, "A"), 1) expect_equal(pipe_get_out(pip, "B"), c(1, 2)) expect_true(is.null(pipe_get_out(pip, "C"))) }) test_that("runs upstream steps in correct order", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step(pip, "C") expect_equal(pipe_get_out(pip, "C"), 1:3) }) test_that("runs downstream steps in correct order", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step(pip, "A", downstream = TRUE) expect_equal(pipe_get_out(pip, "C"), 1:3) }) test_that("pipeline can be run at given step excluding all upstream dependencies", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step(pip, "B", upstream = FALSE) expect_true(is.null(pipe_get_out(pip, "A"))) expect_equal(pipe_get_out(pip, "B"), 2) expect_true(is.null(pipe_get_out(pip, "C"))) }) test_that("pipeline can be run at given step excluding upstream but including downstream dependencies", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step( pip, "B", upstream = FALSE, downstream = TRUE ) expect_true(is.null(pipe_get_out(pip, "A"))) expect_equal(pipe_get_out(pip, "B"), 2) expect_equal(pipe_get_out(pip, "C"), c(2, 3)) }) test_that("pipeline can be run at given step including up- and downstream dependencies", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) pipe_run_step( pip, "B", upstream = TRUE, downstream = TRUE ) expect_equal(pipe_get_out(pip, "A"), 1) expect_equal(pipe_get_out(pip, "B"), c(1, 2)) expect_equal(pipe_get_out(pip, "C"), c(1, 2, 3)) }) test_that("if not marked as keepOut, output of run steps can be cleaned", { pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) pipe_run_step(pip, "A", cleanUnkept = TRUE) expect_true(is.null(pipe_get_out(pip, "A"))) pipe_set_keep_out(pip, "A", TRUE) |> pipe_run_step("A", cleanUnkept = TRUE) expect_false(is.null(pipe_get_out(pip, "A"))) }) test_that("up- and downstream steps are marked in log", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) pip <- pipe_new("pipe") |> pipe_add("A", \(a = 1) a) |> pipe_add("B", \(b = ~A) c(b, 2)) |> pipe_add("C", \(c = ~B) c(c, 3)) logOut <- utils::capture.output( pipe_run_step(pip, "B", upstream = TRUE, downstream = TRUE) ) contains <-\(x, pattern) { grepl(pattern = pattern, x = x, fixed = TRUE) } expect_true(logOut[2] |> contains("Step 1/3 A (upstream)")) expect_true(logOut[3] |> contains("Step 2/3 B")) expect_true(logOut[4] |> contains("Step 3/3 C (downstream)")) }) test_that( "updates the timestamp of the run steps", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) before <- pip$pipeline[["time"]] Sys.sleep(1) pipe_run_step(pip, "f1", upstream = FALSE) after <- pip$pipeline[["time"]] expect_equal(before[1], after[1]) expect_true(before[2] < after[2]) }) test_that( "updates the state of the run steps", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) before <- pip$pipeline[["state"]] pipe_run_step(pip, "f1", upstream = FALSE) after <- pip$pipeline[["state"]] expect_equal(before, c("New", "New")) expect_equal(after, c("New", "Done")) }) test_that("will never re-run locked step", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(y = ~f1) y + 1) pipe_run(pip) expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pipe_lock_step(pip, "f1") pipe_run_step(pip, "f1", downstream = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 1) }) test_that("can clean unkept steps", { pip <- pipe_new("pipe", data = 1) |> pipe_add("f1", \(x = 2) x) |> pipe_add("f2", \(y = ~f1) y + 1) pipe_run_step(pip, "f1", downstream = TRUE, cleanUnkept = TRUE) expect_true(all(sapply(pip$pipeline[["out"]], is.null))) pipe_set_keep_out(pip, "f1", TRUE) pipe_run_step(pip, "f1", downstream = TRUE, cleanUnkept = TRUE) expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL)) }) }) describe("pipe_set_data", { test_that("data can be set later after pipeline definition", { dat <- data.frame(a = 1:2, b = 1:2) pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], NULL) pipe_set_data(pip, dat) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["f1"]], dat) }) test_that("if data is set, all dependent steps are set to outdated", { dat <- data.frame(a = 1:2, b = 1:2) pip <- pipe_new("pipe1") |> pipe_add("f1", \(x = ~data) x, keepOut = TRUE) |> pipe_add("f2", \(x = ~f1) x, keepOut = TRUE) pipe_run(pip) expect_equal(pipe_get_step(pip, "f1")$state, "Done") expect_equal(pipe_get_step(pip, "f2")$state, "Done") pipe_set_data(pip, dat) expect_equal(pipe_get_step(pip, "f1")$state, "Outdated") expect_equal(pipe_get_step(pip, "f2")$state, "Outdated") }) }) describe("pipe_set_data_split", { test_that("the new steps have the names of the list attached", { dataList <- list(A = 1, B = 2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) pipe_set_data_split(pip, dataList) pipe_get_step_names(pip) |> expect_equal(c("data.A", "f1.A", "data.B", "f1.B")) }) test_that("the separator used in the creation of the new steps can be customized", { dataList <- list(A = 1, B = 2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) pipe_set_data_split(pip, dataList, sep = "_") pipe_get_step_names(pip) |> expect_equal(c("data_A", "f1_A", "data_B", "f1_B")) }) test_that("simple split pipeline computes results as expected", { dataList <- list(A = 1, B = 2, C = 3) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = ~data) { b + a }, keepOut = TRUE) pipe_set_data_split(pip, dataList) out <- pipe_run(pip) |> pipe_collect_out() expect_equivalent( unlist(out), unlist(lapply(dataList, \(x) x + 1)) ) }) test_that( "split pipeline by default overrides output groups according to split", { dataList <- list(A = 1, B = 2) pip <- pipe_new("pipe") |> pipe_add("f0", \(a = 1) a, group = "id") |> pipe_add("f1", \(a = 1) a, group = "id") |> pipe_add("f2", \(a = 2) a) pipe_set_data_split(pip, dataList) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal(names(out), names(dataList)) }) test_that("the grouping override can be omitted", { dataList <- list(A = 1, B = 2) pip <- pipe_new("pipe") |> pipe_add("f0", \(a = 1) a, group = "id") |> pipe_add("f1", \(a = 1) a, group = "id") |> pipe_add("f2", \(a = 2) a) pipe_set_data_split(pip, dataList, groupBySplit = FALSE) out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal( names(out), c("data.A", "id.A", "f2.A", "data.B", "id.B", "f2.B") ) }) test_that("the separator used in the creation of the groups can be customized", { dataList <- list(A = 1, B = 2) pip <- pipe_new("pipe") |> pipe_add("f0", \(a = 1) a, group = "id") |> pipe_add("f1", \(a = 1) a, group = "id") |> pipe_add("f2", \(a = 2) a) pipe_set_data_split(pip, dataList, groupBySplit = FALSE, sep = "_") out <- pipe_run(pip) |> pipe_collect_out(all = TRUE) expect_equal( names(out), c("data_A", "id_A", "f2_A", "data_B", "id_B", "f2_B") ) }) test_that("split pipeline works for list of data frames", { dat <- data.frame(x = 1:2, y = 1:2, z = 1:2) dataList <- list(A = dat, B = dat, C = dat) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) |> pipe_add("f3", \(a = ~f1, b = ~data) b[, 2:3], keepOut = TRUE) pipe_set_data_split(pip, dataList) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["A"]], c(f2.A = list(dat), f3.A = list(dat[, 2:3]))) expect_equal(out[["B"]], c(f2.B = list(dat), f3.B = list(dat[, 2:3]))) expect_equal(out[["C"]], c(f2.C = list(dat), f3.C = list(dat[, 2:3]))) }) test_that("if unnamed list of data frames, they are named with numbers", { dat <- data.frame(x = 1:2, y = 1:2, z = 1:2) dataList <- list(dat, dat) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) pipe_set_data_split(pip, dataList) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["1"]], dat) expect_equal(out[["2"]], dat) }) test_that( "depends are updated correctly, if data split on subset of pipeline", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) |> pipe_add("f3", \(x = ~f1, y = ~f2) list(x, y), keepOut = TRUE) |> pipe_add("f4", \(x = ~f3) x[[1]], keepOut = TRUE) pipe_set_data_split(pip, dataList, toStep = "f2") ee = expect_equivalent pp = pip$pipeline depends <- pipe_get_depends(pip) expect_equal(depends[["f2.1"]], c(a = "f1.1", b = "data.1")) expect_equal(depends[["f2.2"]], c(a = "f1.2", b = "data.2")) # Pipeline was not split for f3, which therefore has parameters that # each depend on two steps expect_equal( depends[["f3"]], list(x = c("f1.1", "f1.2"), y = c("f2.1", "f2.2")) ) # Pipeline was not split for f4, so just depdends on f3 ee(depends[["f4"]], c(x = "f3")) out <- pipe_run(pip) |> pipe_collect_out() expect_equal(out[["1"]], dat1) expect_equal(out[["2"]], dat2) expected_f3_res = list( list("f1.1" = 1, "f1.2" = 1), list("f2.1" = dat1, "f2.2" = dat2) ) expect_equal(out[["f3"]], expected_f3_res) expect_equal(out[["f4"]], expected_f3_res[[1]]) }) test_that("split data set can be created dynamically", { data = data.frame(a = 1:10, group = c("a", "b")) pip <- pipe_new("pipe", data = data) |> pipe_add("split_data_step", \(.self = NULL, data = ~data) { splitData = split(data, data[, "group"]) .self$remove_step("split_data_step") .self$set_data_split(splitData) .self$name = paste(.self$name, "after data split") .self } ) |> pipe_add("f1", \(data = ~data) { data }, keepOut = TRUE) pipe_set_params(pip, list(.self = pip)) out <- pipe_run(pip, recursive = TRUE) |> pipe_collect_out() expect_equivalent(out, split(data, data[, "group"])) }) }) describe("pipe_set_keep_out", { test_that("keep-out state can be set", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(a = 1) a) out <- pipe_run(pip) |> pipe_collect_out() expect_false("f1" %in% names(out)) out <- pipe_set_keep_out(pip, "f1", keepOut = TRUE) |> pipe_collect_out() expect_true("f1" %in% names(out)) out <- pipe_set_keep_out(pip, "f1", keepOut = FALSE) |> pipe_collect_out() expect_false("f1" %in% names(out)) }) test_that("step must be a string and exist", { pip <- pipe_new("pipe1") pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(a = 1) a) expect_error( pipe_set_keep_out(pip, 1), "is_string(step)", fixed = TRUE ) expect_error( pipe_set_keep_out(pip, "f2"), "step 'f2' does not exist", fixed = TRUE ) }) test_that("state must be logical", { pip <- pipe_new("pipe1", data = 0) |> pipe_add("f1", \(a = 1) a) expect_error( pipe_set_keep_out(pip, "f1", keepOut = 1), "is.logical(keepOut)", fixed = TRUE ) }) }) describe("pipe_set_params", { test_that("parameters can be set commonly on existing pipeline", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 2, b = 3) a) |> pipe_add("f3", \(a = 4, b = 5) a) before <- pipe_get_params(pip) after <- pipe_set_params(pip, list(a = 9, b = 99)) |> pipe_get_params() expect_equal(after, list( f1 = list(a = 9), f2 = list(a = 9, b = 99), f3 = list(a = 9, b = 99) )) }) test_that( "parameters depending on other steps are protected from being overwritten", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = 2, b = ~f1) a) |> pipe_add("f3", \(a = ~f2, b = 5) a) before <- pipe_get_params(pip) expect_equal( before, list( f1 = list(a = 1), f2 = list(a = 2), f3 = list(b = 5) ) ) after <- pipe_set_params(pip, list(a = 9, b = 99)) |> pipe_get_params() expect_equal( after, list( f1 = list(a = 9), f2 = list(a = 9), f3 = list(b = 99) ) ) }) test_that("an error is given if params argument is not a list", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_error( pipe_set_params(pip, c(a = 9)), "params must be a list", fixed = TRUE ) }) test_that("trying to set undefined parameters is signaled with a warning", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_warning( pipe_set_params(pip, list(a = 9, b = 9, c = 9)), "Trying to set parameters not defined in the pipeline: b, c", fixed = TRUE ) }) test_that("warning for undefined parameters can be omitted", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_no_warning( pipe_set_params(pip, list(a = 9, b = 9, c = 9), warnUndefined = FALSE ) ) }) test_that( "after setting a single parameter the params entry is still a list", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1) a) expect_equal(pip$pipeline[["params"]][[2]], list(a = 1)) pipe_set_params(pip, list(a = 9)) expect_equal(pip$pipeline[["params"]][[2]], list(a = 9)) }) test_that( "hidden parameters can be set as well", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, .b = 2) a) pipe_set_params(pip, list(a = 9, .b = 10)) pp <- pipe_get_params(pip, ignoreHidden = FALSE) expect_equal(pp, list(f1 = list(a = 9, .b = 10))) }) test_that( "trying to set locked parameters is ignored until they are unlocked", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) |> pipe_add("f2", \(a = 1, b = 2) a + b) pipe_lock_step(pip, "f1") expect_message( pipe_set_params(pip, list(a = 9, b = 99)), "skipping setting parameters a, b at locked step 'f1'" ) pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 1, b = 2)) pipe_get_params_at_step(pip, "f2") |> expect_equal(list(a = 9, b = 99)) pipe_unlock_step(pip, "f1") pipe_set_params(pip, list(a = 9, b = 99)) pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 9, b = 99)) }) }) describe("pipe_set_params_at_step", { test_that("parameters can be set at given step", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) |> pipe_add("f2", \(x = 1) x) expect_equal(pipe_get_params_at_step(pip, "f1"), list(a = 1, b = 2)) pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99)) expect_equal(pipe_get_params_at_step(pip, "f1"), list(a = 9, b = 99)) pipe_set_params_at_step(pip, "f2", list(x = 9)) expect_equal(pipe_get_params_at_step(pip, "f2"), list(x = 9)) }) test_that("step must be passed as a string and params as a list", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) expect_error( pipe_set_params_at_step(pip, 1, list(a = 9, b = 99)), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( pipe_set_params_at_step(pip, "f1", params = c(a = 9, b = 99)), "is.list(params) is not TRUE", fixed = TRUE ) }) test_that("hidden parameters can be set as well", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, .b = 2) a + b) pipe_set_params_at_step(pip, "f1", list(a = 9, .b = 99)) expect_equal( pipe_get_params_at_step(pip, "f1", ignoreHidden = FALSE), list(a = 9, .b = 99) ) }) test_that("trying to set undefined parameter signals an error", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) expect_error( pipe_set_params_at_step(pip, "f1", list(a = 9, z = 99)), "Unable to set parameter(s) z at step f1 - candidates are a, b", fixed = TRUE ) }) test_that("trying to set locked parameter is ignored until it is unlocked", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) pipe_lock_step(pip, "f1") expect_message( pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99)), "skipping setting parameters a, b at locked step 'f1'" ) pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 1, b = 2)) pipe_unlock_step(pip, "f1") pipe_set_params_at_step(pip, "f1", list(a = 9, b = 99)) pipe_get_params_at_step(pip, "f1") |> expect_equal(list(a = 9, b = 99)) }) test_that("setting values for bound parameters is not allowed", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = 1, b = 2) a + b) |> pipe_add("f2", \(x = 1, y = ~f1) x + y) expect_error( pipe_set_params_at_step(pip, "f2", list(x = 9, y = 99)), "Unable to set parameter(s) y at step f2 - candidates are x", fixed = TRUE ) }) test_that( "states of affected steps are updated once the pipeline was run", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1) a) |> pipe_add("f3", \(a = ~f2) a) |> pipe_add("f4", \(a = ~data) a) pipe_set_params_at_step(pip, "f1", params = list(a = 2)) expect_true(all(pip$pipeline[["state"]] == "New")) pipe_run(pip) pipe_set_params_at_step(pip, "f1", params = list(a = 3)) expect_equal(pipe_get_step(pip, "data")$state, "Done") expect_equal(pipe_get_step(pip, "f1")$state, "Outdated") expect_equal(pipe_get_step(pip, "f2")$state, "Outdated") expect_equal(pipe_get_step(pip, "f3")$state, "Outdated") expect_equal(pipe_get_step(pip, "f4")$state, "Done") pipe_run(pip) expect_true(all(pip$pipeline[["state"]] == "Done")) }) test_that("parameters can be set to NULL", { pip <- pipe_new("pipe1") |> pipe_add("f1", \(a = NULL, b = 1) a) pipe_set_params_at_step(pip, "f1", list(a = 1, b = NULL)) expect_equal( pipe_get_params_at_step(pip, "f1"), list(a = 1, b = NULL) ) }) test_that("preserves Param objects", { pip <- pipe_new("pipe1") |> pipe_add("f1", \( a = 1, b = new("NumericParam", "num", 2)) a + b ) pipe_set_params_at_step(pip, "f1", list(a = 3, b = 4)) params <- pipe_get_params_at_step(pip, "f1") expect_equal(params$a, 3) expect_true(params$b |> is("NumericParam")) expect_equal(params$b@value, 4) }) }) describe("pipe_split", { test_that("pipeline split of initial pipeline gives the expected result", { pip <- pipe_new("pipe") res <- pipe_split(pip) expect_true(is.list(res)) expect_equal(res[[1]]$name, "pipe1") expect_equal(res[[1]]$pipeline, pip$pipeline) }) test_that("pipeline with two indepdendent groups is split correctly", { pip <- pipe_new("pipe") pipe_add(pip, "f1", \(a = ~data) a) pipe_add(pip, "f2", \(a = 1) a) pipe_add(pip, "f3", \(a = ~f2) a) pipe_add(pip, "f4", \(a = ~f1) a) pipe_run(pip) res <- pipe_split(pip) pip1 <- res[[1]] pip2 <- res[[2]] expect_equal(pip1$name, "pipe1") expect_equal(pip2$name, "pipe2") expect_equal(pip1$get_step_names(), c("f2", "f3")) expect_equal(pip2$get_step_names(), c("data", "f1", "f4")) expect_equal( pip1$collect_out(all = TRUE), pipe_collect_out(pip, all = TRUE)[c("f2", "f3")] ) expect_equal( pip2$collect_out(all = TRUE), pipe_collect_out(pip, all = TRUE)[c("data", "f1", "f4")] ) }) test_that( "split is done correctly for complete data split", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) |> pipe_add("f2", \(a = ~f1, b = 2) b) |> pipe_add("f3", \(x = ~f1, y = ~f2) x + y) pipe_set_data_split(pip, dataList) res <- pipe_split(pip) steps <- lapply(res, \(x) x$get_step_names()) expect_equal( steps, list( "data.1", c("f1.1", "f2.1", "f3.1"), "data.2", c("f1.2", "f2.2", "f3.2") ) ) }) }) describe("pipe_unlock_step", { test_that("sets state to 'unlocked' if it was locked before", { pip <- pipe_new("pipe") |> pipe_add("f1", \(a = 1) a) pipe_lock_step(pip, "f1") expect_equal(pipe_get_step(pip, "f1")[["state"]], "Locked") pipe_unlock_step(pip, "data") expect_equal(pipe_get_step(pip, "data")[["state"]], "New") pipe_unlock_step(pip, "f1") expect_equal(pipe_get_step(pip, "f1")[["state"]], "Unlocked") pip }) })