# Pipeline describe("initialize", { test_that("returns a pipeline object", { expect_true(methods::is(Pipeline$new("pipe"), "Pipeline")) }) test_that("pipeline name must be a non-empty string", { expect_no_error(Pipeline$new("foo")) expect_error( Pipeline$new(name = 1), "name must be a string" ) expect_error( Pipeline$new(name = ""), "name must not be empty" ) }) test_that("data is added as first step to pipeline", { pip <- Pipeline$new("pipe1", data = 1) expect_equal(pip$get_step_names(), "data") out <- pip$run()$collect_out(all = TRUE) expect_equal(pip$get_out("data"), 1) }) test_that("the logger can be customized", { my_logger <-\(level, msg, ...) { message("My Logger: ", msg) } pip <- Pipeline$new("pipe", logger = my_logger) out <- capture.output( pip$run(), type = "message" ) expect_equal( out, c( "My Logger: Start run of 'pipe' pipeline:", "My Logger: Step 1/1 data", "My Logger: Finished execution of steps.", "My Logger: Done." ) ) }) test_that("bad definition of the custom logger is signalled", { expected_error_msg <- paste( "logger function must have the following signature:", "function(level, msg, ...)" ) logger_with_missing_level_arg <-\(msg, ...) { message("My Logger: ", msg) } expect_error( Pipeline$new("pipe1", logger = logger_with_missing_level_arg), expected_error_msg, fixed = TRUE ) logger_with_missing_msg_arg <-\(level, ...) { message("My Logger: ", ...) } expect_error( Pipeline$new("pipe1", logger = logger_with_missing_msg_arg), expected_error_msg, fixed = TRUE ) logger_with_missing_dots <-\(msg, level) { message("My Logger: ", msg) } expect_error( Pipeline$new("pipe1", logger = logger_with_missing_dots), expected_error_msg, fixed = TRUE ) logger_with_additional_arg <-\(level, msg, foo, ...) { message("My Logger: ", msg) } expect_error( Pipeline$new("pipe1", logger = logger_with_additional_arg), expected_error_msg, fixed = TRUE ) }) }) describe("add", { test_that("step must be non-empty string", { pip <- Pipeline$new("pipe1") foo <-\(a = 0) a expect_error(pip$add("", foo)) expect_error(pip$add(c("a", "b"), foo)) }) test_that("fun must be passed as a function or string", { pip <- Pipeline$new("pipe") expect_error( pip$add("step1", fun = 1), "is.function(fun) || is_string(fun) is not TRUE", fixed = TRUE ) }) test_that("params must be a list", { pip <- Pipeline$new("pipe") expect_error( pip$add("step1", fun =\() 1, params = 1), "is.list(params)", fixed = TRUE ) }) test_that("description must be (possibly empty) string", { pip <- Pipeline$new("pipe") expect_error( pip$add("step1", fun =\() 1, description = 1), "is_string(description)", fixed = TRUE ) expect_no_error(pip$add("step1", fun =\() 1, description = "")) }) test_that("group must be non-empty string", { pip <- Pipeline$new("pipe") expect_error( pip$add("step1", fun =\() 1, group = 1), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) expect_error( pip$add("step1", fun =\() 1, group = ""), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) }) test_that("keepOut must be logical", { pip <- Pipeline$new("pipe") expect_error( pip$add("step1", fun =\() 1, keepOut = 1), "is.logical(keepOut) is not TRUE", fixed = TRUE ) }) test_that("duplicated step names are signaled", { pip <- Pipeline$new("pipe1") foo <-\(a = 0) a pip$add("f1", foo) expect_error(pip$add("f1", foo), "step 'f1' already exists") expect_error(pip$add("f1", \(x) x), "step 'f1' already exists") }) test_that("missing dependencies are signaled", { pip <- Pipeline$new("pipe1") foo <-\(a = 0) a pip$add("f1", foo) expect_error( pip$add("f2", foo, params = list(a = ~undefined)), "dependency 'undefined' not found" ) }) test_that("step can refer to previous step by relative number", { pip <- Pipeline$new("pipe1") pip$add("f1", \(a = 5) a) pip$add("f2", \(x = ~-1) 2*x, keepOut = TRUE) out = pip$run()$collect_out() expect_equal(out[["f2"]][[1]], 10) pip$add("f3", \(x = ~-1, a = ~-2) x + a, keepOut = TRUE) out = pip$run()$collect_out() expect_equal(out[["f3"]][[1]], 10 + 5) }) test_that("a bad relative step referal is signalled", { pip <- Pipeline$new("pipe1") expect_error( pip$add("f1", \(x = ~-10) x), paste( "step 'f1': relative dependency x=-10", "points to outside the pipeline" ), fixed = TRUE ) }) test_that("added step can use lambda functions", { data <- 9 pip <- Pipeline$new("pipe1", data = data) pip$add("f1", \(data = ~data) data, keepOut = TRUE) a <- 1 pip$add("f2", \(a, b) a + b, params = list(a = a, b = ~f1), keepOut = TRUE ) expect_equal(unlist(pip$get_step("f1")[["depends"]]), c(data = "data")) expect_equal(unlist(pip$get_step("f2")[["depends"]]), c(b = "f1")) out <- pip$run()$collect_out() expect_equal(out[["f1"]][[1]], data) expect_equal(out[["f2"]][[1]], a + data) }) test_that( "supports functions with wildcard arguments", { my_mean <-\(x, na.rm = FALSE) { mean(x, na.rm = na.rm) } foo <-\(x, ...) { my_mean(x, ...) } v <- c(1, 2, NA, 3, 4) pip <- Pipeline$new("pipe", data = v) params <- list(x = ~data, na.rm = TRUE) pip$add("mean", fun = foo, params = params, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(out[["mean"]], mean(v, na.rm = TRUE)) pip$set_params_at_step("mean", list(na.rm = FALSE)) out <- pip$run()$collect_out() expect_equal(out[["mean"]], as.numeric(NA)) }) test_that("can have a variable defined outside as parameter default", { x <- 1 pip <- Pipeline$new("pipe")$ add("f1", \(a) a, params = list(a = x)) expect_equal(pip$get_params_at_step("f1")$a, x) out <- pip$run()$collect_out(all = TRUE) expect_equal(out[["f1"]], x) }) test_that("handles Param object args", { pip <- Pipeline$new("pipe")$ add("f1", \(a = new("NumericParam", "a", value = 1)) a) out <- pip$run()$collect_out(all = TRUE) expect_equal(out[["f1"]], 1) }) test_that( "can have a Param object defined outside as parameter default", { x <- 1 p <- new("NumericParam", "a", value = x) pip <- Pipeline$new("pipe")$ add("f1", \(a) a, params = list(a = p)) expect_equal(pip$get_params_at_step("f1")$a, p) out <- pip$run()$collect_out(all = TRUE) expect_equal(out[["f1"]], x) }) test_that( "function can be passed as a string", { pip <- Pipeline$new("pipe")$ add("f1", fun = "mean", params = list(x = 1:5)) out <- pip$run()$collect_out(all = TRUE) expect_equal(out[["f1"]], mean(1:5)) expect_equal(pip$get_step("f1")[["funcName"]], "mean") }) test_that( "if passed as a function, name is derived from the function", { pip <- Pipeline$new("pipe") pip$add("f1", fun = mean, params = list(x = 1:5)) expect_equal(pip$get_step("f1")[["funcName"]], "mean") pip <- Pipeline$new("pipe")$ add("f1", fun = mean, params = list(x = 1:5)) expect_equal(pip$get_step("f1")[["funcName"]], "mean") }) test_that( "lampda functions, are named 'function'", { pip <- Pipeline$new("pipe")$add("f1", fun = \(x = 1) x) expect_equal(pip$get_step("f1")[["funcName"]], "function") pip <- Pipeline$new("pipe")$ add("f1", fun = \(x = 1) x) expect_equal(pip$get_step("f1")[["funcName"]], "function") }) }) describe("append", { test_that("pipelines can be combined even if their steps share names, unless tryAutofixNames is FALSE", { pip1 <- Pipeline$new("pipe1", data = 1)$ add("f1", \(a = 1) a, keepOut = TRUE)$ add("f2", \(b = ~f1) b) pip2 <- Pipeline$new("pipe2")$ add("f1", \(a = 10) a)$ add("f2", \(b = ~f1) b, keepOut = TRUE) expect_error( pip1$append(pip2, tryAutofixNames = FALSE), paste( "combined pipeline would have duplicated step names:", "'data', 'f1', 'f2'," ) ) pp <- pip1$append(pip2) expect_equal(pp$length(), pip1$length() + pip2$length()) out1 <- pip1$run()$collect_out() out2 <- pip2$run()$collect_out() out <- pp$run()$collect_out() expect_equivalent(out, c(out1, out2)) }) test_that("auto-fixes only the names that need auto-fix", { pip1 <- Pipeline$new("pipe1", data = 1)$ add("f1", \(a = 1) a, keepOut = TRUE)$ add("f2", \(b = ~f1) b) pip2 <- Pipeline$new("pipe2")$ add("f3", \(a = 10) a)$ add("f4", \(b = ~f3) b, keepOut = TRUE) pp <- pip1$append(pip2) expect_equal( pp$get_step_names(), c("data", "f1", "f2", "data.pipe2", "f3", "f4") ) out1 <- pip1$run()$collect_out() out2 <- pip2$run()$collect_out() out <- pp$run()$collect_out() expect_equivalent(out, c(out1, out2)) }) test_that("the separator used for step names can be customized", { pip1 <- Pipeline$new("pipe1", data = 1)$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b) pip2 <- Pipeline$new("pipe2")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b) pp <- pip1$append(pip2, sep = "_") expect_equal( pp$get_step_names(), c("data", "f1", "f2", "data_pipe2", "f1_pipe2", "f2_pipe2") ) }) test_that( "output of first pipeline can be set as input of appended pipeline", { pip1 <- Pipeline$new("pipe1", data = 1) pip1$add("f1", \(a = ~data) a * 2) pip2 <- Pipeline$new("pipe2", data = 99) pip2$add("f1", \(a = ~data) a * 3) pip2$add("f2", \(a = ~f1) a * 4) pp <- pip1$append(pip2, outAsIn = TRUE) depends <- pp$get_depends() expect_equal(depends[["data.pipe2"]], c(data = "f1")) out <- pp$run()$collect_out(all = TRUE) pipe1_out <- out[["f1"]][["f1"]] expect_equal(pipe1_out, 1 * 2) expect_equal(out[["data.pipe2"]], pipe1_out) expect_equal(out[["f1"]][["f1.pipe2"]], pipe1_out * 3) expect_equal(out[["f2"]], out[["f1"]][["f1.pipe2"]] * 4) }) test_that("if duplicated step names would be created, an error is given", { pip1 <- Pipeline$new("pipe1") pip1$add("f1", \(a = ~data) a + 1) pip1$add("f1.pipe2", \(a = ~data) a + 1) pip2 <- Pipeline$new("pipe2") pip2$add("f1", \(a = ~data) a + 1) expect_error( pip1$append(pip2), "Cannot auto-fix name clash for step 'f1' in pipeline 'pipe2'", fixed = TRUE ) }) }) describe("append_to_step_names", { test_that("postfix can be appended to step names", { pip <- Pipeline$new("pipe", data = 1) pip$add("f1", \(a = ~data) a + 1) pip$add("f2", \(a = ~data, b = ~f1) a + b) pip$append_to_step_names("foo") expected_names <- c("data.foo", "f1.foo", "f2.foo") expect_equal(pip$get_step_names(), expected_names) expected_depends <- list( data.foo = character(0), f1.foo = c(a = "data.foo"), f2.foo = c(a = "data.foo", b = "f1.foo") ) expect_equal(pip$get_depends(), expected_depends) }) }) describe("collect_out", { test_that("data is set as first step but not part of output by default", { dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1", data = dat) expect_equal(pip$pipeline[["step"]], "data") out <- pip$run()$collect_out() expect_equal(out, list()) pip <- Pipeline$new("pipe1", data = dat)$ add("f1", \(x = ~data) x, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(out[["f1"]], dat) }) test_that("at the end, pipeline can clean output that shall not be kept", { data <- 9 pip <- Pipeline$new("pipe1", data = data) foo <-\(a = 1) a bar <-\(a, b) a + b a <- 5 pip$add("f1", foo, params = list(a = a)) pip$add("f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE) pip$run(cleanUnkept = TRUE) expect_equal(pip$get_out("f1"), NULL) expect_equal(pip$get_out("f2"), a + data) }) test_that("output is collected as expected", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a, keepOut = TRUE)$ add("f2", \(a = 2, b = ~f1) a + b)$ add("f3", \(a = 3, b = ~f2) a + b, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(length(out), 2) expect_equal(names(out), c("f1", "f3")) }) test_that("output can be grouped", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = 1, b = 2) a + b, group = "plus")$ add("f3", \(a = 1, b = 2) a / b)$ add("f4", \(a = 2, b = 2) a + b, group = "plus") out <- pip$run()$collect_out(all = TRUE) expect_equal(out[["plus"]], list(f2 = 3, f4 = 4)) expect_equal(out[["f1"]], 1) expect_equal(out[["f3"]], 1/2) }) test_that("output is ordered in the order of steps", { pip <- Pipeline$new("pipe")$ add("f2", \(a = 1) a, keepOut = TRUE)$ add("f1", \(b = 2) b, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(names(out), c("f2", "f1")) }) test_that( "grouped output is ordered in the order of group definitions", { pip <- Pipeline$new("pipe")$ add("f1", \(x = 1) x, group = "g2")$ add("f2", \(x = 2) x, group = "g1")$ add("f3", \(x = 3) x, group = "g2")$ add("f4", \(x = 4) x, group = "g1") out <- pip$run()$collect_out(all = TRUE) expect_equal(names(out), c("data", "g2", "g1")) }) test_that( "if just one group the output name still will take the group name", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = 1, b = 2) a + b, group = "plus")$ add("f3", \(a = 1, b = 2) a / b, group = "my f3")$ add("f4", \(a = 2, b = 2) a + b, group = "plus") out <- pip$run()$collect_out(all = TRUE) expect_equal(names(out), c("data", "f1", "plus", "my f3")) }) describe("groupBy option", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = 1, b = 2) a + b, group = "plus")$ add("f3", \(a = 1, b = 2) a / b) test_that("column to groupBy can be customized", { pip$run_step("f1") pip$run_step("f3") out <- pip$collect_out(groupBy = "state", all = TRUE) expect_equal( out, list( New = list(data = NULL, f2 = NULL), Done = list(f1 = 1, f3 = 0.5) ) ) }) test_that("signals bad groupBy input", { expect_error( pip$collect_out(groupBy = c("not", "a", "string")), "groupBy must be a single string" ) expect_error( pip$collect_out(groupBy = "foo"), "groupBy column does not exist" ) expect_error( pip$collect_out(groupBy = "time"), "groupBy column must be character" ) }) }) }) describe("discard_steps", { test_that("pipeline steps can be discarded by pattern", { pip <- Pipeline$new("pipe1")$ add("calc", \(a = 1) a)$ add("plot1", \(x = ~calc) x)$ add("plot2", \(x = ~plot1) x) out <- capture.output( pip$discard_steps("plot"), type = "message" ) expect_equal( out, c( "step 'plot2' was removed", "step 'plot1' was removed" ) ) expect_equal(pip$pipeline[["step"]], c("data", "calc")) }) test_that("if no pipeline step matches pattern, pipeline remains unchanged", { pip <- Pipeline$new("pipe1")$ add("calc", \(a = 1) a)$ add("plot1", \(x = ~calc) x)$ add("plot2", \(x = ~plot1) x) steps_before = pip$pipeline[["step"]] expect_silent(pip$discard_steps("bla")) expect_equal(pip$pipeline[["step"]], steps_before) }) test_that("if step has downstream dependencies, an error is given", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) b) expect_error( pip$discard_steps("f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2'" ) ) pip$add("f3", \(x = ~f1) x) expect_error( pip$discard_steps("f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f3'" ) ) }) }) describe("get_data", { test_that( "data can be retrieved", { p <- Pipeline$new("pipe", data = 1:2) expect_equal(p$get_data(), 1:2) p$set_data(3:4) expect_equal(p$get_data(), 3:4) }) test_that( "signals missing data", { p <- Pipeline$new("pipe", data = 1:2) p$pop_step() # remove data step expect_error( p$get_data(), "no data step defined" ) }) }) describe("get_depends", { test_that( "dependencies can be retrieved and are named after the steps", { pip <- Pipeline$new("pipe", data = 1) pip$add("f1", \(a = ~data) a + 1) pip$add("f2", \(b = ~f1) b + 1) depends <- pip$get_depends() expected_depends <- list( data = character(0), f1 = c(a = "data"), f2 = c(b = "f1") ) expect_equal(depends, expected_depends) expect_equal(names(depends), pip$get_step_names()) }) }) describe("get_depends_down", { test_that("dependencies can be determined recursively for given step", { pip <- Pipeline$new("pipe") pip$add("f1", \(a = 1) a) pip$add("f2", \(a = ~f1) a) pip$add("f3", \(a = ~f1, b = ~f2) a + b) pip$add("f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c) expect_equal(pip$get_depends_down("f3"), c("f4")) expect_equal(pip$get_depends_down("f2"), c("f3", "f4")) expect_equal(pip$get_depends_down("f1"), c("f2", "f3", "f4")) }) test_that("if no dependencies an empty character vector is returned", { pip <- Pipeline$new("pipe") pip$add("f1", \(a = 1) a) expect_equal(pip$get_depends_down("f1"), character(0)) }) test_that("step must exist", { pip <- Pipeline$new("pipe") expect_error( pip$get_depends_down("f1"), "step 'f1' does not exist" ) }) test_that( "works with complex dependencies as created by data splits", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = 2) b)$ add("f3", \(x = ~f1, y = ~f2) x + y) pip$set_data_split(dataList, toStep = "f2") expect_equal( pip$get_depends_down("f1.1"), c("f2.1", "f3") ) expect_equal( pip$get_depends_down("f1.2"), c("f2.2", "f3") ) expect_equal(pip$get_depends_down("f2.1"), "f3") expect_equal(pip$get_depends_down("f2.2"), "f3") }) }) describe("get_depends_up", { test_that("dependencies can be determined recursively for given step", { pip <- Pipeline$new("pipe") pip$add("f1", \(a = 1) a) pip$add("f2", \(a = ~f1) a) pip$add("f3", \(a = ~f1, b = ~f2) a + b) pip$add("f4", \(a = ~f1, b = ~f2, c = ~f3) a + b + c) expect_equal(pip$get_depends_up("f2"), c("f1")) expect_equal(pip$get_depends_up("f3"), c("f1", "f2")) expect_equal(pip$get_depends_up("f4"), c("f1", "f2", "f3")) }) test_that("if no dependencies an empty character vector is returned", { pip <- Pipeline$new("pipe") pip$add("f1", \(a = 1) a) expect_equal(pip$get_depends_up("f1"), character(0)) }) test_that("step must exist", { pip <- Pipeline$new("pipe") expect_error( pip$get_depends_up("f1"), "step 'f1' does not exist" ) }) test_that("works with complex dependencies as created by data splits", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = 2) b)$ add("f3", \(x = ~f1, y = ~f2) x + y) pip$set_data_split(dataList, toStep = "f2") expect_equal(pip$get_depends_up("f2.1"), c("f1.1")) expect_equal(pip$get_depends_up("f2.2"), c("f1.2")) expect_equivalent( pip$get_depends_up("f3"), c("f1.1", "f2.1", "f1.2", "f2.2") ) }) }) describe("get_graph", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = ~data) a) res <- pip$get_graph() test_that("returns a node table with the expected columns", { tab <- res$nodes expect_true(is.data.frame(tab)) expectedColumns <- c("id", "label", "group", "shape", "color", "title") expect_equal(colnames(tab), expectedColumns) }) test_that("the node table contains all steps", { tab <- res$nodes expect_equal(tab$label, pip$get_step_names()) }) test_that("returns an edges table with the expected columns", { tab <- res$edges expect_true(is.data.frame(tab)) expectedColumns <- c("from", "to", "arrows") expect_equal(colnames(tab), expectedColumns) }) test_that("can be printed created for certain groups", { pip <- Pipeline$new("pipe") pip$add("step2", \(a = ~data) a + 1, group = "add") pip$add("step3", \(a = ~step2) 2 * a, group = "mult") pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add") pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult") res.add <- pip$get_graph(groups = "add") expect_equal(res.add$nodes$label, c("step2", "step4")) res.mult <- pip$get_graph(groups = "mult") expect_equal(res.mult$nodes$label, c("step3", "step5")) }) }) describe("get_out", { test_that("output at given step can be retrieved", { data <- airquality pip <- Pipeline$new("pipe", data = data)$ add("model", \(data = ~data) { lm(Ozone ~ Wind, data = data) }, ) pip$run() expect_equal(pip$get_out("data"), data) expect_equivalent( pip$get_out("model"), lm(Ozone ~ Wind, data = data) ) }) test_that("step of requested output must exist", { pip <- Pipeline$new("pipe") pip$run() expect_error(pip$get_out("foo"), "step 'foo' does not exist") }) }) describe("get_params", { test_that("parameters can be retrieved", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a, keepOut = TRUE)$ add("f2", \(a, b = ~f1) a + b, params = list(a = 8), keepOut = TRUE )$ add("f3", \(a = ~f2, b = 3) a + b, keepOut = TRUE) p <- pip$get_params() expect_equal( p, list(f1 = list(a = 1), f2 = list(a = 8), f3 = list(b = 3)) ) }) test_that("empty pipeline gives empty list of parameters", { pip <- Pipeline$new("pipe1") expect_equivalent(pip$get_params(), list()) pip$add("f1", \() 1) expect_equivalent(pip$get_params(), list()) }) test_that("hidden parameters are filtered out by default", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, .hidden = 2) a) p <- pip$get_params() expect_equal(p, list(f1 = list(a = 1))) p <- pip$get_params(ignoreHidden = FALSE) expect_equal(p, list(f1 = list(a = 1, .hidden = 2))) }) test_that("works with Param objects", { pip <- Pipeline$new("pipe1")$ add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } )$ add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) par <- pip$get_params() expect_true(all(par$f1 |> sapply(is, "NumericParam"))) expect_equal(par$f1$x@value, 1) expect_equal(par$f1$y@value, 2) expect_true(all(par$f2 |> sapply(is, "StringParam"))) expect_equal(par$f2$s1@value, "Hello") expect_equal(par$f2$s2@value, "World") }) }) describe("get_params_at_step", { test_that("list of step parameters can be retrieved", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b)$ add("f2", \(x = 1, y = 2) x + y) expect_equal( pip$get_params_at_step("f1"), list(a = 1, b = 2) ) expect_equal( pip$get_params_at_step("f2"), list(x = 1, y = 2) ) }) test_that("if no parameters empty list is returned", { pip <- Pipeline$new("pipe1")$ add("f1", \() 1) expect_equal( pip$get_params_at_step("f1"), list() ) }) test_that( "hidden parameters are not returned, unless explicitly requested", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, .b = 2) a + .b) expect_equal( pip$get_params_at_step("f1"), list(a = 1) ) expect_equal( pip$get_params_at_step("f1", ignoreHidden = FALSE), list(a = 1, .b = 2) ) }) test_that("bound parameters are never returned", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = ~data) a + b) expect_equal( pip$get_params_at_step("f1"), list(a = 1) ) expect_equal( pip$get_params_at_step("f1", ignoreHidden = FALSE), list(a = 1) ) }) test_that("step must exist", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = ~data) a + b) expect_error( pip$get_params_at_step("foo"), "step 'foo' does not exist" ) }) test_that("works with Param objects", { pip <- Pipeline$new("pipe1")$ add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } )$ add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) par1 <- pip$get_params_at_step("f1") expect_true(all(par1 |> sapply(is, "NumericParam"))) expect_equal(par1$x@value, 1) expect_equal(par1$y@value, 2) par2 <- pip$get_params_at_step("f2") expect_true(all(par2 |> sapply(is, "StringParam"))) expect_equal(par2$s1@value, "Hello") expect_equal(par2$s2@value, "World") }) }) describe("get_params_unique", { test_that("parameters can be retrieved uniquely and if occuring multiple times, the 1st default value is used", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(a = 2, b = 3) a + b)$ add("f3", \(a = 4, b = 5, c = 6) a + b) p <- pip$get_params_unique() expect_equivalent(p, list(a = 1, b = 3, c = 6)) }) test_that("empty pipeline gives empty list", { pip <- Pipeline$new("pipe") expect_equivalent(pip$get_params_unique(), list()) }) test_that("pipeline with no parameters gives empty list", { pip <- Pipeline$new("pipe")$ add("f1", \() 1) expect_equivalent(pip$get_params_unique(), list()) }) test_that("works with Param objects", { pip <- Pipeline$new("pipe1")$ add( "f1", \( a = 0, x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { a * (x + y) } )$ add( "f2", \( a = new("NumericParam", "a", value = 0), x = new("NumericParam", "x", value = 1), y = 2, z = new("NumericParam", "y", value = 3) ) { a * (x + y + z) } ) par <- pip$get_params_unique() expect_equal(names(par), c("a", "x", "y", "z")) expect_equal(par$a, 0) expect_equal(par$x@value, 1) expect_equal(par$y@value, 2) expect_equal(par$z@value, 3) }) }) describe("get_params_unique_json", { test_that("the elements are not named", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a, keepOut = TRUE) p <- pip$get_params_unique_json() pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(names(pl), NULL) pip <- Pipeline$new("pipe1")$ add("f1", \(x = new("StringParam", "my x", "some x")) x)$ add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pip$get_params_unique_json() pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(names(pl), NULL) }) test_that("standard parameters are returned as name-value pairs", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(a = 1, b = 2) a)$ add("f3", \(a = 1, b = 2, c = list(a = 1, b = 2)) a) p <- pip$get_params_unique_json() expect_true(methods::is(p, "json")) pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal( pl, list( list(name = "a", value = 1), list(name = "b", value = 2), list(name = "c", value = list(a = 1, b = 2)) ) ) }) test_that("Param objects are returned with full information", { pip <- Pipeline$new("pipe1")$ add("f1", \(x = new("StringParam", "my x", "some x")) x)$ add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pip$get_params_unique_json() pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal( pl, list( list( value = "some x", name = "x", advanced = FALSE, label = "my x", description = "", source = "internal", domain = "", class = "StringParam" ), list( value = "some y", name = "y", advanced = FALSE, label = "my y", description = "", source = "internal", domain = "", class = "StringParam" ) ) ) }) test_that("the name of the arg is set to the name of the Param object", { pip <- Pipeline$new("pipe1")$ add("f1", \(x = new("StringParam", "my x", "some x")) x)$ add("f2", \(y = new("StringParam", "my y", "some y")) y) p <- pip$get_params_unique_json() pl <- jsonlite::fromJSON(p) expect_true(pl[["label"]][[1]] == "my x") expect_false(pl[["name"]][[1]] == "my x") hasArgName <- pl[["name"]][[1]] == "x" expect_true(pl[["label"]][[2]] == "my y") expect_false(pl[["name"]][[2]] == "my y") hasArgName <- pl[["name"]][[2]] == "y" }) test_that("works with mixed, that is, standard and Param objects", { pip <- Pipeline$new("pipe1")$ add("f1", \(x = 1) x)$ add("f2", \(s = new("StringParam", "my s", "some s")) s) p <- pip$get_params_unique_json() pl <- jsonlite::fromJSON(p, simplifyVector = FALSE) expect_equal(pl[[1]], list(name = "x", value = 1L)) expect_equal( pl[[2]], list( value = "some s", name = "s", advanced = FALSE, label = "my s", description = "", source = "internal", domain = "", class = "StringParam" ) ) }) }) describe("get_step", { test_that("single steps can be retrieved", { pip <- Pipeline$new("pipe1")$ add("f1", identity, params = list(x = 1)) expect_equal(pip$get_step("data"), pip$pipeline[1, ]) expect_equal(pip$get_step("f1"), pip$pipeline[2, ]) expect_error(pip$get_step("foo"), "step 'foo' does not exist") }) test_that("dependencies are recorded as expected", { pip <- Pipeline$new("pipe1", data = 9) foo <-\(a = 0) a bar <-\(a = 1, b = 2) a + b pip$add("f1", foo) pip$add("f2", bar, params = list(a = ~data, b = ~f1)) expect_true(length(unlist(pip$get_step("f1")[["depends"]])) == 0) expect_equal( unlist(pip$get_step("f2")[["depends"]]), c("a" = "data", b = "f1") ) }) }) describe("get_step_names", { test_that("step names be retrieved", { pip <- Pipeline$new("pipe1")$ add("f1", \() {})$ add("f2", \(x = 1) x) expect_equal(pip$get_step_names(), pip$pipeline[["step"]]) }) }) describe("get_step_number", { test_that("get_step_number", { expect_true(is.function(Pipeline$new("pipe")$get_step_number)) test_that("get_step_number works as expected", { pip <- expect_no_error(Pipeline$new("pipe")) pip$add("f1", \(a = 1) a) pip$add("f2", \(a = 1) a) pip$get_step_number("f1") |> expect_equal(2) pip$get_step_number("f2") |> expect_equal(3) }) test_that("signals non-existent step", { pip <- expect_no_error(Pipeline$new("pipe")) pip$add("f1", \(a = 1) a) expect_error( pip$get_step_number("non-existent"), "step 'non-existent' does not exist" ) }) }) }) describe("has_step", { test_that("it can be checked if pipeline has a step", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_true(pip$has_step("f1")) expect_false(pip$has_step("f2")) }) }) describe("insert_after", { test_that("can insert a step after another step", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) pip$insert_after( "f1", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pip$get_step_names(), c("data", "f1", "f3", "f2") ) }) test_that("will not insert a step if the step already exists", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_after("f1", step = "f2"), "step 'f2' already exists" ) }) test_that( "will not insert a step if the reference step does not exist", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_after("non-existent", step = "f3"), "step 'non-existent' does not exist" ) }) test_that( "will not insert a step with bad parameter dependencies", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_after("f1", step = "f3", \(x = ~f2) x), "step 'f3': dependency 'f2' not found" ) }) test_that("will work if insert happens at last position", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) pip$insert_after( "f2", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pip$get_step_names(), c("data", "f1", "f2", "f3") ) }) }) describe("insert_before", { test_that("can insert a step after another step", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) pip$insert_before( "f2", step = "f3", fun =\(a = ~f1) a + 1 ) expect_equal( pip$get_step_names(), c("data", "f1", "f3", "f2") ) }) test_that("will not insert a step if the step already exists", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_before("f1", step = "f2"), "step 'f2' already exists" ) }) test_that( "will not allow step to be inserted at first position", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1) expect_error( pip$insert_before("data", step = "f2"), "cannot insert before first step" ) }) test_that("will not insert a step if the reference step does not exist", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_before("non-existent", step = "f3"), "step 'non-existent' does not exist" ) }) test_that( "will not insert a step with bad parameter dependencies", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a + 1)$ add("f2", \(a = ~f1) a + 1) expect_error( pip$insert_before("f2", step = "f3", \(x = ~f2) x), "step 'f3': dependency 'f2' not found" ) }) }) describe("length", { test_that("returns the number of steps", { pip <- Pipeline$new("pipe") expect_equal(pip$length(), 1) pip$add("f1", \(a = 1) a) expect_equal(pip$length(), 2) pip$remove_step("f1") expect_equal(pip$length(), 1) }) }) describe("lock_step", { test_that("sets state to 'locked'", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) pip$lock_step("f1") expect_equal(pip$get_step("f1")[["state"]], "Locked") pip }) }) describe("print", { test_that("pipeline can be printed", { pip <- Pipeline$new("pipe1", data = 9) expect_output(pip$print()) }) test_that("missing function is signaled", { pip <- Pipeline$new("pipe1") expect_error( pip$add("f1", "non-existing-function"), "object 'non-existing-function' of mode 'function' was not found" ) }) test_that("if verbose is TRUE, all columns are printed", { op <- options(width = 1000L) on.exit(options(op)) pip <- Pipeline$new("pipe1", data = 9) out <- capture.output(pip$print(verbose = TRUE)) header <- out[1] |> trimws() |> strsplit("\\s+") |> unlist() expected_header <- colnames(pip$pipeline) expect_equal(header, expected_header) }) }) describe("pop_step", { test_that("last pipeline step can be popped", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) pip_copy = pip$clone() pip$add("f2", \(b = 2) b) expect_equal(pip$length(), 3) expect_equal(pip_copy$length(), 2) res = pip$pop_step() expect_equal(res, "f2") expect_equal(pip$length(), 2) expect_equal(pip, pip_copy) }) }) describe("pop_steps_after", { test_that("all steps after a given step can be removed", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x)$ add("f2", \(x = ~f1) x)$ add("f3", \(x = ~f2) x) steps = pip$pop_steps_after("f1") expect_equal(steps, c("f2", "f3")) hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]]) expect_true(hasAllStepsRemoved) }) test_that("if given step does not exist, an error is signalled", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x) expect_error( pip$pop_steps_after("bad_step"), "step 'bad_step' does not exist" ) }) test_that("if given step is the last step, nothing gets removed", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x)$ add("f2", \(x = ~f1) x) length_before = pip$length() res = pip$pop_steps_after("f2") expect_equal(res, character(0)) hasNothingRemoved = pip$length() == length_before expect_true(hasNothingRemoved) }) }) describe("pop_steps_from", { test_that("all steps from a given step can be removed", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x)$ add("f2", \(x = ~f1) x)$ add("f3", \(x = ~f2) x) steps = pip$pop_steps_from("f2") expect_equal(steps, c("f2", "f3")) hasAllStepsRemoved = !any(steps %in% pip$pipeline[["name"]]) expect_true(hasAllStepsRemoved) }) test_that("if given step does not exist, an error is signalled", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x) expect_error( pip$pop_steps_from("bad_step"), "step 'bad_step' does not exist" ) }) test_that("if given step is the last step, one step removed", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(x = 1) x)$ add("f2", \(x = ~f1) x) length_before = pip$length() res = pip$pop_steps_from("f2") expect_equal(res, "f2") hasOneStepRemoved = pip$length() == length_before - 1 expect_true(hasOneStepRemoved) }) }) describe("remove_step", { test_that("pipeline step can be removed", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 1) b) pip$remove_step("f1") expect_equal(pip$get_step_names(), c("data", "f2")) }) test_that("step must exist", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_error( pip$remove_step("non-existent-step"), "step 'non-existent-step' does not exist" ) }) test_that("if step has downstream dependencies, an error is given", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) b) expect_error( pip$remove_step("f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2'" ) ) pip$add("f3", \(x = ~f1) x) expect_error( pip$remove_step("f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f3'" ) ) }) test_that( "if error, only the direct downstream dependencies are reported", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) b)$ add("f3", \(c = ~f2) c)$ add("f4", \(d = ~f1) d) expect_error( pip$remove_step("f1"), paste( "cannot remove step 'f1' because the following", "steps depend on it: 'f2', 'f4'" ) ) }) test_that( "step can be removed together with is downstream dependencies", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) b)$ add("f3", \(c = ~f2) c)$ add("f4", \(d = ~f1) d)$ add("f5", \(x = ~data) x) out <- utils::capture.output( pip$remove_step("f1", recursive = TRUE), type = "message" ) expect_equal(pip$get_step_names(), c("data", "f5")) expect_equal( out, paste( "Removing step 'f1' and its downstream dependencies:", "'f2', 'f3', 'f4'" ) ) }) }) describe("remove_step", { f <- Pipeline$new("pipe")$remove_step test_that("pipeline step can be renamed", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b) pip$rename_step(from = "f1", to = "first") pip$get_step_names() |> expect_equal(c("data", "first", "f2")) }) test_that("signals name clash", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b) expect_error( pip$rename_step(from = "f1", to = "f2"), "step 'f2' already exists" ) }) test_that("renames dependencies as well", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) b)$ add("f3", \(a = ~f1, b = ~f2) a + b) pip$rename_step(from = "f1", to = "first") expect_equal( pip$get_depends(), list( data = character(0), first = character(0), f2 = c(b = "first"), f3 = c(a = "first", b = "f2") ) ) }) }) describe("replace_step", { test_that("pipeline steps can be replaced", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b)$ add("f3", \(c = ~f2) c, keepOut = TRUE) out = unname(unlist(pip$run()$collect_out())) expect_equal(out, 2) pip$replace_step("f2", \(z = 4) z) out = unname(unlist(pip$run()$collect_out())) expect_equal(out, 4) }) test_that("fun must be passed as a function or string", { pip <- Pipeline$new("pipe")$ add("step1", \(a = 1) a) expect_error( pip$replace_step("step1", fun = 1), "is.function(fun) || is_string(fun) is not TRUE", fixed = TRUE ) }) test_that("params must be a list", { pip <- Pipeline$new("pipe")$ add("step1", \(a = 1) a) expect_error( pip$replace_step("step1", fun =\() 1, params = 1), "is.list(params)", fixed = TRUE ) }) test_that("description must be (possibly empty) string", { pip <- Pipeline$new("pipe")$ add("step1", \(a = 1) a) expect_error( pip$replace_step("step1", fun =\() 1, description = 1), "is_string(description)", fixed = TRUE ) expect_no_error( pip$replace_step("step1", fun =\() 1, description = "") ) }) test_that("group must be non-empty string", { pip <- Pipeline$new("pipe")$ add("step1", \(a = 1) a) expect_error( pip$replace_step("step1", fun =\() 1, group = 1), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) expect_error( pip$replace_step("step1", fun =\() 1, group = ""), "is_string(group) && nzchar(group) is not TRUE", fixed = TRUE ) }) test_that("keepOut must be logical", { pip <- Pipeline$new("pipe")$ add("step1", \(a = 1) a) expect_error( pip$replace_step("step1", fun =\() 1, keepOut = 1), "is.logical(keepOut) is not TRUE", fixed = TRUE ) }) test_that("the replacing function can be passed as a string", { pip <- Pipeline$new("pipe1")$ add("f1", \(x = 3) x, keepOut = TRUE) out = unname(unlist(pip$run()$collect_out())) expect_equal(out, 3) .my_func <-\(x = 3) { 2 * x } assign(".my_func", .my_func, envir = globalenv()) on.exit(rm(".my_func", envir = globalenv())) pip$replace_step("f1", fun = ".my_func", keepOut = TRUE) out = unname(unlist(pip$run()$collect_out())) expect_equal(out, 6) }) test_that( "when replacing function, default parameters can be overridden", { pip <- Pipeline$new("pipe1")$ add("f1", \(x = 1:3) x, keepOut = TRUE) .my_func <-\(x = 3) { 2 * x } assign(".my_func", .my_func, envir = globalenv()) on.exit(rm(".my_func", envir = globalenv())) pip$replace_step( "f1", fun = ".my_func", params = list(x = 10), keepOut = TRUE ) out = unname(unlist(pip$run()$collect_out())) expect_equal(out, 20) }) test_that("the pipeline step that is being replaced must exist", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b)$ add("f3", \(c = ~f2) c, keepOut = TRUE) expect_error(pip$replace_step("non-existent", \(z = 4) z)) }) test_that( "if replacing a pipeline step, dependencies are verified correctly", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = 2) b)$ add("f3", \(c = ~f2) c, keepOut = TRUE) expect_error( pip$replace_step("f2", \(z = ~foo) z), "dependency 'foo' not found up to step 'f1'" ) expect_error( pip$replace_step("f2", \(z = ~f2) z), "dependency 'f2' not found up to step 'f1'" ) expect_error( pip$replace_step("f2", \(z = ~f3) z), "dependency 'f3' not found up to step 'f1'" ) }) test_that( "states are updated correctly", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(a = 2) a)$ add("f3", \(a = ~f2) a, keepOut = TRUE)$ add("f4", \(a = ~f3) a, keepOut = TRUE) pip$run() pip$replace_step("f2", \(a = 2) 2* a) expect_equal(pip$get_step("f1")$state, "Done") expect_equal(pip$get_step("f2")$state, "New") expect_equal(pip$get_step("f3")$state, "Outdated") expect_equal(pip$get_step("f4")$state, "Outdated") }) it("can have a variable defined outside as parameter default", { x <- 3 pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) pip$replace_step("f1", fun = \(a) a, params = list(a = x)) out <- pip$run()$get_out("f1") expect_equal(out, x) }) it("handles Param object args", { pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) pip$replace_step( "f1", fun = \(a = new("NumericParam", "a", value = 3)) a ) out <- pip$run()$get_out("f1") expect_equal(out, 3) }) it("can have a Param object defined outside as parameter default", { x <- 3 pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) p <- new("NumericParam", "a", value = x) pip$replace_step("f1", fun = \(a) a, params = list(a = p)) pip <- Pipeline$new("pipe")$ add("f1", \(a) a, params = list(a = p)) expect_equal(pip$get_params_at_step("f1")$a, p) out <- pip$run()$get_out("f1") expect_equal(out, x) }) it("function can be passed as a string", { pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) pip$replace_step("f1", fun = "mean", params = list(x = 1:5)) out <- pip$run()$get_out("f1") expect_equal(out, mean(1:5)) expect_equal(pip$get_step("f1")[["funcName"]], "mean") }) it("if passed as a function, name is derived from the function", { pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) pip$replace_step("f1", fun = mean, params = list(x = 1:5)) out <- pip$run()$get_out("f1") expect_equal(out, mean(1:5)) expect_equal(pip$get_step("f1")[["funcName"]], "mean") }) it("lampda functions, are named 'function'", { pip <- Pipeline$new("pipe")$add("f1", \(x = 1) x) pip$replace_step("f1", fun = \(x = 1) x) expect_equal(pip$get_step("f1")[["funcName"]], "function") }) }) describe("reset", { test_that( "after reset pipeline is the same as before the run", { p <- Pipeline$new("pipe", data = 1:2) p$add("f1", \(x = 1) x) p$add("f2", \(y = 1) y) p$run() expect_equal( p$collect_out(all = TRUE), list(data = 1:2, f1 = 1, f2 = 1) ) expect_true(all(p$pipeline[["state"]] == "Done")) p$reset() expect_equal( p$collect_out(all = TRUE), list(data = NULL, f1 = NULL, f2 = NULL) ) expect_true(all(p$pipeline[["state"]] == "New")) }) }) describe("run", { test_that("empty pipeline can be run", { expect_no_error(Pipeline$new("pipe1")$run()) }) test_that("returns the pipeline object", { pip <- Pipeline$new("pipe1")$run() expect_equal(pip$name, "pipe1") }) test_that("if function result is a list, all names are preserved", { # Result list length == 1 - the critical case resultList = list(foo = 1) pip <- Pipeline$new("pipe")$ add("f1", \() resultList, keepOut = TRUE) out = pip$run()$collect_out() expect_equal(out[["f1"]], resultList) # Result list length > 1 resultList = list(foo = 1, bar = 2) pip <- Pipeline$new("pipe")$ add("f1", \() resultList, keepOut = TRUE) out = pip$run()$collect_out() expect_equal(out[["f1"]], resultList) }) test_that("pipeline execution is correct", { data <- 9 pip <- Pipeline$new("pipe1", data = data) foo <-\(a = 1) a bar <-\(a, b) a + b a <- 5 pip$add("f1", foo, params = list(a = a), keepOut = TRUE) pip$add("f2", bar, params = list(a = ~data, b = ~f1), keepOut = TRUE) pip$run() expect_equal(pip$pipeline[["out"]][[2]], a) expect_equal(pip$pipeline[["out"]][[3]], a + data) }) test_that("pipeline execution can cope with void functions", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) {}, keepOut = TRUE)$ add("f2", \(b = 2) b, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(out, list(f1 = NULL, f2 = 2)) }) test_that( "if pipeline execution fails, the error message is returned as error", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(b = ~f1) stop("something went wrong")) expect_error(pip$run(), "something went wrong") }) test_that( "can handle 'NULL' results", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = ~data) x, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(out[["f1"]], 1) pip$set_data(NULL) out <- pip$run()$collect_out() expect_equal(out[["f1"]], NULL) }) test_that( "can be run recursively to dynamically create and run pipelines", { pip <- Pipeline$new("pipe", data = 1)$ add( "f1", fun =\(data = 10) { pip <- Pipeline$new("2nd pipe", data = data)$ add("step1", \(x = ~data) x)$ add("step2", \(x = ~step1) { print(x) 2 * x }, keepOut = TRUE) } ) pip2 <- pip$run(recursive = TRUE) expect_equal(pip2$get_step_names(), c("data", "step1", "step2")) out <- pip2$collect_out() expect_equal(out[["step2"]], 20) }) test_that("will not re-run steps that are already done unless forced", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(y = ~f1) y + 1) pip$run() expect_equal(pip$get_step("f1")$state, "Done") expect_equal(pip$get_step("f2")$state, "Done") expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pip$run() expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) # Set parameter, which outdates step and run again pip$run(force = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) }) test_that("will never re-run locked steps", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(y = ~f1) y + 1) pip$run() expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pip$run() expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) pip$lock_step("f1") pip$lock_step("f2") # Set parameter, which outdates step and run again pip$run(force = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 0) }) test_that("can clean unkept steps", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(y = ~f1) y + 1) pip$run() expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$run(cleanUnkept = TRUE) expect_true(all(sapply(pip$pipeline[["out"]], is.null))) pip$set_keep_out("f1", TRUE) pip$run(cleanUnkept = TRUE) expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL)) }) test_that("logs warning without interrupting the run", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(x = ~f1) { warning("something might be wrong") x })$ add("f3", \(x = ~f2) x) log <- utils::capture.output( expect_warning(pip$run(), "something might be wrong") ) Filter(log, f =\(x) x |> startsWith("WARN")) |> grepl(pattern = "something might be wrong") |> expect_true() wasRunTillEnd <- pip$get_out("f3") == 2 expect_true(wasRunTillEnd) }) test_that("logs error and stops at failed step", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(x = ~f1) { stop("something went wrong") x })$ add("f3", \(x = ~f2) x) log <- utils::capture.output( expect_error(pip$run(), "something went wrong") ) Filter(log, f =\(x) x |> startsWith("ERROR")) |> grepl(pattern = "something went wrong") |> expect_true() wasRunTillEnd <- isTRUE(pip$get_out("f3") == 2) expect_false(wasRunTillEnd) }) test_that("can show progress", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(x = ~f1) x) m <- mockery::mock() pip$run(progress = m) args <- mockery::mock_args(m) expect_equal(length(m), pip$length()) expect_equal(args[[1]][[1]], 1) expect_equal(args[[1]][["detail"]], "data") expect_equal(args[[2]][[1]], 2) expect_equal(args[[2]][["detail"]], "f1") expect_equal(args[[3]][[1]], 3) expect_equal(args[[3]][["detail"]], "f2") }) test_that("works with Param objects", { pip <- Pipeline$new("pipe1")$ add( "f1", \( x = new("NumericParam", "x", value = 1), y = new("NumericParam", "y", value = 2) ) { x + y } )$ add( "f2", \( s1 = new("StringParam", "s1", "Hello"), s2 = new("StringParam", "s2", "World") ) { paste(s1, s2) } ) pip$run() pip$get_out("f1") |> expect_equal(3) pip$get_out("f2") |> expect_equal("Hello World") }) }) describe("run_step", { test_that("pipeline can be run at given step", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a) expect_no_error(pip$run_step("A")) }) test_that("upstream steps are by default run with given step", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step("B") expect_equal(pip$get_out("A"), 1) expect_equal(pip$get_out("B"), c(1, 2)) expect_true(is.null(pip$get_out("C"))) }) test_that("runs upstream steps in correct order", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step("C") expect_equal(pip$get_out("C"), 1:3) }) test_that("runs downstream steps in correct order", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step("A", downstream = TRUE) expect_equal(pip$get_out("C"), 1:3) }) test_that("pipeline can be run at given step excluding all upstream dependencies", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step("B", upstream = FALSE) expect_true(is.null(pip$get_out("A"))) expect_equal(pip$get_out("B"), 2) expect_true(is.null(pip$get_out("C"))) }) test_that("pipeline can be run at given step excluding upstream but including downstream dependencies", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step( "B", upstream = FALSE, downstream = TRUE ) expect_true(is.null(pip$get_out("A"))) expect_equal(pip$get_out("B"), 2) expect_equal(pip$get_out("C"), c(2, 3)) }) test_that("pipeline can be run at given step including up- and downstream dependencies", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) pip$run_step( "B", upstream = TRUE, downstream = TRUE ) expect_equal(pip$get_out("A"), 1) expect_equal(pip$get_out("B"), c(1, 2)) expect_equal(pip$get_out("C"), c(1, 2, 3)) }) test_that("if not marked as keepOut, output of run steps can be cleaned", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a) pip$run_step("A", cleanUnkept = TRUE) expect_true(is.null(pip$get_out("A"))) pip$set_keep_out("A", TRUE)$run_step("A", cleanUnkept = TRUE) expect_false(is.null(pip$get_out("A"))) }) test_that("up- and downstream steps are marked in log", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) c(b, 2))$ add("C", \(c = ~B) c(c, 3)) logOut <- utils::capture.output( pip$run_step("B", upstream = TRUE, downstream = TRUE) ) contains <-\(x, pattern) { grepl(pattern = pattern, x = x, fixed = TRUE) } expect_true(logOut[2] |> contains("Step 1/3 A (upstream)")) expect_true(logOut[3] |> contains("Step 2/3 B")) expect_true(logOut[4] |> contains("Step 3/3 C (downstream)")) }) test_that( "updates the timestamp of the run steps", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = ~data) x, keepOut = TRUE) before <- pip$pipeline[["time"]] Sys.sleep(1) pip$run_step("f1", upstream = FALSE) after <- pip$pipeline[["time"]] expect_equal(before[1], after[1]) expect_true(before[2] < after[2]) }) test_that( "updates the state of the run steps", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = ~data) x, keepOut = TRUE) before <- pip$pipeline[["state"]] pip$run_step("f1", upstream = FALSE) after <- pip$pipeline[["state"]] expect_equal(before, c("New", "New")) expect_equal(after, c("New", "Done")) }) test_that("will never re-run locked step", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(y = ~f1) y + 1) pip$run() expect_equal(pip$pipeline[["out"]][[2]], 2) expect_equal(pip$pipeline[["out"]][[3]], 3) pip$pipeline[2, "out"] <- 0 pip$pipeline[3, "out"] <- 0 pip$lock_step("f1") pip$run_step("f1", downstream = TRUE) expect_equal(pip$pipeline[["out"]][[2]], 0) expect_equal(pip$pipeline[["out"]][[3]], 1) }) test_that("can clean unkept steps", { pip <- Pipeline$new("pipe", data = 1)$ add("f1", \(x = 2) x)$ add("f2", \(y = ~f1) y + 1) pip$run_step("f1", downstream = TRUE, cleanUnkept = TRUE) expect_true(all(sapply(pip$pipeline[["out"]], is.null))) pip$set_keep_out("f1", TRUE) pip$run_step("f1", downstream = TRUE, cleanUnkept = TRUE) expect_equal(pip$pipeline[["out"]], list(NULL, 2, NULL)) }) }) describe("set_data", { test_that("data can be set later after pipeline definition", { dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1")$ add("f1", \(x = ~data) x, keepOut = TRUE) out <- pip$run()$collect_out() expect_equal(out[["f1"]], NULL) pip$set_data(dat) out <- pip$run()$collect_out() expect_equal(out[["f1"]], dat) }) test_that("if data is set, all dependent steps are set to outdated", { dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1")$ add("f1", \(x = ~data) x, keepOut = TRUE)$ add("f2", \(x = ~f1) x, keepOut = TRUE) pip$run() expect_equal(pip$get_step("f1")$state, "Done") expect_equal(pip$get_step("f2")$state, "Done") pip$set_data(dat) expect_equal(pip$get_step("f1")$state, "Outdated") expect_equal(pip$get_step("f2")$state, "Outdated") }) }) describe("set_data_split", { test_that("the new steps have the names of the list attached", { dataList <- list(A = 1, B = 2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) pip$set_data_split(dataList) pip$get_step_names() |> expect_equal(c("data.A", "f1.A", "data.B", "f1.B")) }) test_that("the separator used in the creation of the new steps can be customized", { dataList <- list(A = 1, B = 2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) pip$set_data_split(dataList, sep = "_") pip$get_step_names() |> expect_equal(c("data_A", "f1_A", "data_B", "f1_B")) }) test_that("simple split pipeline computes results as expected", { dataList <- list(A = 1, B = 2, C = 3) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = ~data) { b + a }, keepOut = TRUE) pip$set_data_split(dataList) out <- pip$run()$collect_out() expect_equivalent( unlist(out), unlist(lapply(dataList, \(x) x + 1)) ) }) test_that( "split pipeline by default overrides output groups according to split", { dataList <- list(A = 1, B = 2) pip <- Pipeline$new("pipe")$ add("f0", \(a = 1) a, group = "id")$ add("f1", \(a = 1) a, group = "id")$ add("f2", \(a = 2) a) pip$set_data_split(dataList) out <- pip$run()$collect_out(all = TRUE) expect_equal(names(out), names(dataList)) }) test_that("the grouping override can be omitted", { dataList <- list(A = 1, B = 2) pip <- Pipeline$new("pipe")$ add("f0", \(a = 1) a, group = "id")$ add("f1", \(a = 1) a, group = "id")$ add("f2", \(a = 2) a) pip$set_data_split(dataList, groupBySplit = FALSE) out <- pip$run()$collect_out(all = TRUE) expect_equal( names(out), c("data.A", "id.A", "f2.A", "data.B", "id.B", "f2.B") ) }) test_that("the separator used in the creation of the groups can be customized", { dataList <- list(A = 1, B = 2) pip <- Pipeline$new("pipe")$ add("f0", \(a = 1) a, group = "id")$ add("f1", \(a = 1) a, group = "id")$ add("f2", \(a = 2) a) pip$set_data_split(dataList, groupBySplit = FALSE, sep = "_") out <- pip$run()$collect_out(all = TRUE) expect_equal( names(out), c("data_A", "id_A", "f2_A", "data_B", "id_B", "f2_B") ) }) test_that("split pipeline works for list of data frames", { dat <- data.frame(x = 1:2, y = 1:2, z = 1:2) dataList <- list(A = dat, B = dat, C = dat) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)$ add("f3", \(a = ~f1, b = ~data) b[, 2:3], keepOut = TRUE) pip$set_data_split(dataList) out <- pip$run()$collect_out() expect_equal(out[["A"]], c(f2.A = list(dat), f3.A = list(dat[, 2:3]))) expect_equal(out[["B"]], c(f2.B = list(dat), f3.B = list(dat[, 2:3]))) expect_equal(out[["C"]], c(f2.C = list(dat), f3.C = list(dat[, 2:3]))) }) test_that("if unnamed list of data frames, they are named with numbers", { dat <- data.frame(x = 1:2, y = 1:2, z = 1:2) dataList <- list(dat, dat) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE) pip$set_data_split(dataList) out <- pip$run()$collect_out() expect_equal(out[["1"]], dat) expect_equal(out[["2"]], dat) }) test_that( "depends are updated correctly, if data split on subset of pipeline", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = ~data) b, keepOut = TRUE)$ add("f3", \(x = ~f1, y = ~f2) list(x, y), keepOut = TRUE)$ add("f4", \(x = ~f3) x[[1]], keepOut = TRUE) pip$set_data_split(dataList, toStep = "f2") ee = expect_equivalent pp = pip$pipeline depends <- pip$get_depends() expect_equal(depends[["f2.1"]], c(a = "f1.1", b = "data.1")) expect_equal(depends[["f2.2"]], c(a = "f1.2", b = "data.2")) # Pipeline was not split for f3, which therefore has parameters that # each depend on two steps expect_equal( depends[["f3"]], list(x = c("f1.1", "f1.2"), y = c("f2.1", "f2.2")) ) # Pipeline was not split for f4, so just depdends on f3 ee(depends[["f4"]], c(x = "f3")) out <- pip$run()$collect_out() expect_equal(out[["1"]], dat1) expect_equal(out[["2"]], dat2) expected_f3_res = list( list("f1.1" = 1, "f1.2" = 1), list("f2.1" = dat1, "f2.2" = dat2) ) expect_equal(out[["f3"]], expected_f3_res) expect_equal(out[["f4"]], expected_f3_res[[1]]) }) test_that("split data set can be created dynamically", { data = data.frame(a = 1:10, group = c("a", "b")) pip <- Pipeline$new("pipe", data = data)$ add("split_data_step", \(.self = NULL, data = ~data) { splitData = split(data, data[, "group"]) .self$remove_step("split_data_step") .self$set_data_split(splitData) .self$name = paste(.self$name, "after data split") .self } )$ add("f1", \(data = ~data) { data }, keepOut = TRUE) pip$set_params(list(.self = pip)) out <- pip$run(recursive = TRUE)$collect_out() expect_equivalent(out, split(data, data[, "group"])) }) }) describe("set_keep_out", { test_that("keep-out state can be set", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(a = 1) a) out <- pip$run()$collect_out() expect_false("f1" %in% names(out)) out <- pip$set_keep_out("f1", keepOut = TRUE)$collect_out() expect_true("f1" %in% names(out)) out <- pip$set_keep_out("f1", keepOut = FALSE)$collect_out() expect_false("f1" %in% names(out)) }) test_that("step must be a string and exist", { pip <- Pipeline$new("pipe1") pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(a = 1) a) expect_error( pip$set_keep_out(1), "is_string(step)", fixed = TRUE ) expect_error( pip$set_keep_out("f2"), "step 'f2' does not exist", fixed = TRUE ) }) test_that("state must be logical", { pip <- Pipeline$new("pipe1", data = 0)$ add("f1", \(a = 1) a) expect_error( pip$set_keep_out("f1", keepOut = 1), "is.logical(keepOut)", fixed = TRUE ) }) }) describe("set_params", { test_that("parameters can be set commonly on existing pipeline", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(a = 2, b = 3) a)$ add("f3", \(a = 4, b = 5) a) before <- pip$get_params() after <- pip$set_params(list(a = 9, b = 99))$get_params() expect_equal(after, list( f1 = list(a = 9), f2 = list(a = 9, b = 99), f3 = list(a = 9, b = 99) )) }) test_that( "parameters depending on other steps are protected from being overwritten", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \(a = 2, b = ~f1) a)$ add("f3", \(a = ~f2, b = 5) a) before <- pip$get_params() expect_equal( before, list( f1 = list(a = 1), f2 = list(a = 2), f3 = list(b = 5) ) ) after <- pip$set_params(list(a = 9, b = 99))$get_params() expect_equal( after, list( f1 = list(a = 9), f2 = list(a = 9), f3 = list(b = 99) ) ) }) test_that("an error is given if params argument is not a list", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_error( pip$set_params(c(a = 9)), "params must be a list", fixed = TRUE ) }) test_that("trying to set undefined parameters is signaled with a warning", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_warning( pip$set_params(list(a = 9, b = 9, c = 9)), "Trying to set parameters not defined in the pipeline: b, c", fixed = TRUE ) }) test_that("warning for undefined parameters can be omitted", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_no_warning( pip$set_params(list(a = 9, b = 9, c = 9), warnUndefined = FALSE ) ) }) test_that( "after setting a single parameter the params entry is still a list", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a) expect_equal(pip$pipeline[["params"]][[2]], list(a = 1)) pip$set_params(list(a = 9)) expect_equal(pip$pipeline[["params"]][[2]], list(a = 9)) }) test_that( "hidden parameters can be set as well", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, .b = 2) a) pip$set_params(list(a = 9, .b = 10)) pp <- pip$get_params(ignoreHidden = FALSE) expect_equal(pp, list(f1 = list(a = 9, .b = 10))) }) test_that( "trying to set locked parameters is ignored until they are unlocked", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b)$ add("f2", \(a = 1, b = 2) a + b) pip$lock_step("f1") expect_message( pip$set_params(list(a = 9, b = 99)), "skipping setting parameters a, b at locked step 'f1'" ) pip$get_params_at_step("f1") |> expect_equal(list(a = 1, b = 2)) pip$get_params_at_step("f2") |> expect_equal(list(a = 9, b = 99)) pip$unlock_step("f1") pip$set_params(list(a = 9, b = 99)) pip$get_params_at_step("f1") |> expect_equal(list(a = 9, b = 99)) }) }) describe("set_params_at_step", { test_that("parameters can be set at given step", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b)$ add("f2", \(x = 1) x) expect_equal(pip$get_params_at_step("f1"), list(a = 1, b = 2)) pip$set_params_at_step("f1", list(a = 9, b = 99)) expect_equal(pip$get_params_at_step("f1"), list(a = 9, b = 99)) pip$set_params_at_step("f2", list(x = 9)) expect_equal(pip$get_params_at_step("f2"), list(x = 9)) }) test_that("step must be passed as a string and params as a list", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b) expect_error( pip$set_params_at_step(1, list(a = 9, b = 99)), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( pip$set_params_at_step("f1", params = c(a = 9, b = 99)), "is.list(params) is not TRUE", fixed = TRUE ) }) test_that("hidden parameters can be set as well", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, .b = 2) a + b) pip$set_params_at_step("f1", list(a = 9, .b = 99)) expect_equal( pip$get_params_at_step("f1", ignoreHidden = FALSE), list(a = 9, .b = 99) ) }) test_that("trying to set undefined parameter signals an error", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b) expect_error( pip$set_params_at_step("f1", list(a = 9, z = 99)), "Unable to set parameter(s) z at step f1 - candidates are a, b", fixed = TRUE ) }) test_that("trying to set locked parameter is ignored until it is unlocked", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b) pip$lock_step("f1") expect_message( pip$set_params_at_step("f1", list(a = 9, b = 99)), "skipping setting parameters a, b at locked step 'f1'" ) pip$get_params_at_step("f1") |> expect_equal(list(a = 1, b = 2)) pip$unlock_step("f1") pip$set_params_at_step("f1", list(a = 9, b = 99)) pip$get_params_at_step("f1") |> expect_equal(list(a = 9, b = 99)) }) test_that("setting values for bound parameters is not allowed", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1, b = 2) a + b)$ add("f2", \(x = 1, y = ~f1) x + y) expect_error( pip$set_params_at_step("f2", list(x = 9, y = 99)), "Unable to set parameter(s) y at step f2 - candidates are x", fixed = TRUE ) }) test_that( "states of affected steps are updated once the pipeline was run", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1) a)$ add("f3", \(a = ~f2) a)$ add("f4", \(a = ~data) a) pip$set_params_at_step("f1", params = list(a = 2)) expect_true(all(pip$pipeline[["state"]] == "New")) pip$run() pip$set_params_at_step("f1", params = list(a = 3)) expect_equal(pip$get_step("data")$state, "Done") expect_equal(pip$get_step("f1")$state, "Outdated") expect_equal(pip$get_step("f2")$state, "Outdated") expect_equal(pip$get_step("f3")$state, "Outdated") expect_equal(pip$get_step("f4")$state, "Done") pip$run() expect_true(all(pip$pipeline[["state"]] == "Done")) }) test_that("parameters can be set to NULL", { pip <- Pipeline$new("pipe1")$ add("f1", \(a = NULL, b = 1) a) pip$set_params_at_step("f1", list(a = 1, b = NULL)) expect_equal( pip$get_params_at_step("f1"), list(a = 1, b = NULL) ) }) test_that("preserves Param objects", { pip <- Pipeline$new("pipe1")$ add("f1", \( a = 1, b = new("NumericParam", "num", 2)) a + b ) pip$set_params_at_step("f1", list(a = 3, b = 4)) params <- pip$get_params_at_step("f1") expect_equal(params$a, 3) expect_true(params$b |> is("NumericParam")) expect_equal(params$b@value, 4) }) }) describe("split", { test_that("pipeline split of initial pipeline gives the expected result", { pip <- Pipeline$new("pipe") res <- pip$split() expect_true(is.list(res)) expect_equal(res[[1]]$name, "pipe1") expect_equal(res[[1]]$pipeline, pip$pipeline) }) test_that("pipeline with two indepdendent groups is split correctly", { pip <- Pipeline$new("pipe") pip$add("f1", \(a = ~data) a) pip$add("f2", \(a = 1) a) pip$add("f3", \(a = ~f2) a) pip$add("f4", \(a = ~f1) a) pip$run() res <- pip$split() pip1 <- res[[1]] pip2 <- res[[2]] expect_equal(pip1$name, "pipe1") expect_equal(pip2$name, "pipe2") expect_equal(pip1$get_step_names(), c("f2", "f3")) expect_equal(pip2$get_step_names(), c("data", "f1", "f4")) expect_equal( pip1$collect_out(all = TRUE), pip$collect_out(all = TRUE)[c("f2", "f3")] ) expect_equal( pip2$collect_out(all = TRUE), pip$collect_out(all = TRUE)[c("data", "f1", "f4")] ) }) test_that( "split is done correctly for complete data split", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = 2) b)$ add("f3", \(x = ~f1, y = ~f2) x + y) pip$set_data_split(dataList) res <- pip$split() steps <- lapply(res, \(x) x$get_step_names()) expect_equal( steps, list( "data.1", c("f1.1", "f2.1", "f3.1"), "data.2", c("f1.2", "f2.2", "f3.2") ) ) }) }) describe("unlock_step", { test_that("sets state to 'unlocked' if it was locked before", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) pip$lock_step("f1") expect_equal(pip$get_step("f1")[["state"]], "Locked") pip$unlock_step("data") expect_equal(pip$get_step("data")[["state"]], "New") pip$unlock_step("f1") expect_equal(pip$get_step("f1")[["state"]], "Unlocked") pip }) }) describe("pipeline logging", { expect_no_error(set_log_layout("json")) on.exit(set_log_layout("text")) test_that("pipeline logging is done in json format", { dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1", data = dat) log <- utils::capture.output(pip$run()) isValidJSON <- sapply(log, jsonlite::validate) expect_true(all(isValidJSON)) }) test_that("each step is logged with its name", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1")$ add("f1", \(x = ~data) x) log <- utils::capture.output(pip$run()) step1 = jsonlite::fromJSON(log[2]) step2 = jsonlite::fromJSON(log[3]) expect_equal(step1[["message"]], "Step 1/2 data") expect_equal(step2[["message"]], "Step 2/2 f1") }) test_that( "upon warning during run, both context and warn msg are logged", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \() warning("this is a warning")) expect_warning(log <- utils::capture.output(pip$run())) log_fields = lapply(head(log, -1), jsonlite::fromJSON) warnings = Filter( log_fields, f =\(x) x[["level"]] == "warn" )[[1]] expect_equal(warnings[["message"]], "this is a warning") expect_equal(warnings[["context"]], "Step 3 ('f2')") }) test_that( "upon error during run, both context and error msg are logged", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) dat <- data.frame(a = 1:2, b = 1:2) pip <- Pipeline$new("pipe1")$ add("f1", \(a = 1) a)$ add("f2", \() stop("this is an error")) log <- utils::capture.output({ tryCatch(pip$run(), error = identity) }) log_fields = lapply(head(log, -1), jsonlite::fromJSON) last = tail(log_fields, 1)[[1]] expect_equal(last[["message"]], "this is an error") expect_equal(last[["context"]], "Step 3 ('f2')") }) }) # --------------- # private methods # --------------- describe("private methods", { # Helper function to access private fields get_private <-\(x) { x[[".__enclos_env__"]]$private } pip <- Pipeline$new("pipe") expect_true(is.environment(get_private(pip))) describe(".clean_out_not_kept", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.clean_out_not_kept test_that("cleans all output if nothing is marked to be kept", { pip <- Pipeline$new("pipe1", data = 1)$ add("A", \(a = 2) a) fclean <- get_private(pip)$.clean_out_not_kept fexe <- get_private(pip)$.run_step fexe(step = "data") fexe(step = "A") expect_equal(pip$get_out("data"), 1) expect_equal(pip$get_out("A"), 2) fclean() hasCleanedData <- is.null(pip$get_out("data")) expect_true(hasCleanedData) hasCleanedA <- is.null(pip$get_out("A")) expect_true(hasCleanedA) }) test_that("does not clean if marked to be kept", { pip <- Pipeline$new("pipe1", data = 1)$ add("A", \(a = 2) a, keepOut = TRUE) fclean <- get_private(pip)$.clean_out_not_kept fexe <- get_private(pip)$.run_step fexe(step = "data") fexe(step = "A") expect_equal(pip$get_out("data"), 1) expect_equal(pip$get_out("A"), 2) fclean() hasCleanedData <- is.null(pip$get_out("data")) expect_true(hasCleanedData) hasCleanedA <- is.null(pip$get_out("A")) expect_false(hasCleanedA) }) }) describe(".create_edge_table", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.create_edge_table test_that("returns a data frame with the expected columns", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_edge_table res <- f() expect_true(is.data.frame(res)) expected_columns <- c("from", "to", "arrows") expect_equal(colnames(res), expected_columns) }) test_that("if no dependencies are defined, edge table is empty", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_edge_table res <- f() hasEmptyEdgeTable <- nrow(res) == 0 expect_true(hasEmptyEdgeTable) pip$add("f1", \(a = 1) a) res <- f() hasEmptyEdgeTable <- nrow(res) == 0 expect_true(hasEmptyEdgeTable) pip$add("f2", \(a = ~f1) a) res <- f() hasEmptyEdgeTable <- nrow(res) == 0 expect_false(hasEmptyEdgeTable) }) test_that("adds one edge for each dependency", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_edge_table # step1 is data pip$add("step2", \(a = 1) a) pip$add("step3", \(a = ~step2) a) pip$add("step4", \(a = ~step2, b = ~step3) a) pip$add("step5", \(a = ~data, b = ~step4) a) res <- f() expect_equal(nrow(res), 5) expect_equivalent( res["step3", c("from", "to")], # from step2 to step3 data.frame(from = 2, to = 3) ) expect_equivalent( res["step4.a", c("from", "to")], # from step2 to step4 data.frame(from = 2, to = 4) ) expect_equivalent( res["step4.b", c("from", "to")], # from step3 to step4 data.frame(from = 3, to = 4) ) expect_equivalent( res["step5.a", c("from", "to")], # from data to step5 data.frame(from = 1, to = 5) ) expect_equivalent( res["step5.b", c("from", "to")], # from data to step5 data.frame(from = 4, to = 5) ) }) test_that("can be created for certain groups", { pip <- Pipeline$new("pipe") # step1 is data pip$add("step2", \(a = ~data) a + 1, group = "add") pip$add("step3", \(a = ~step2) 2 * a, group = "mult") pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add") pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult") f <- get_private(pip)$.create_edge_table res.all <- f() res.add <- f(groups = "add") expect_equal( res.add, res.all[c("step2", "step4.a", "step4.b"), ] ) res.mult <- f(groups = "mult") expect_equal( res.mult, res.all[c("step5.a", "step3", "step5.b"), ] ) expect_equal( f(groups = c("data", "add", "mult")), f() ) }) test_that("signals bad group specification", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_edge_table expect_error( f(groups = "foo"), "all(groups %in% self$pipeline[[\"group\"]]) is not TRUE", fixed = TRUE ) expect_error( f(groups = 1), "is.character(groups) is not TRUE", fixed = TRUE ) }) test_that("can be created for parameters with multiple dependencies", { pip <- Pipeline$new("pipe") # step1 is data pip$add("step2", \(x = ~data) x + 1) pip$add("step3", \(y = ~step2) sum(unlist(y))) pip$set_data_split( list(data.frame(x = 1:2), data.frame(x = 3:4)), toStep = "step2" ) # Step 3 depends on both step2.1 and step2.2 expect_equal( pip$get_depends()$step3$y, c("step2.1", "step2.2") ) f <- get_private(pip)$.create_edge_table res <- f() expectedRes <- data.frame( from = 1:4, to = c(2, 5, 4, 5), arrows = "to" ) expect_equivalent(res, expectedRes) expect_equal( rownames(res), c("step2.1", "step3.y1", "step2.2", "step3.y2") ) }) }) describe(".create_node_table", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.create_node_table test_that("returns a data frame with the expected columns", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_node_table res <- f() expect_true(is.data.frame(res)) expected_columns <- c( "id", "label", "group", "shape", "color", "title" ) expect_equal(colnames(res), expected_columns) }) test_that("adds one node for each step", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_node_table res <- f() expect_equal(nrow(res), pip$length()) pip$add("f2", \(a = 1) a) res <- f() expect_equal(nrow(res), pip$length()) pip$add("f3", \(a = ~f2) a) res <- f() expect_equal(nrow(res), pip$length()) }) test_that("description is shown in the title", { pip <- Pipeline$new("pipe")$ add("step2", \(a = 1) a, description = "my 2nd step") f <- get_private(pip)$.create_node_table res <- f() expect_equal(res[2, "title"], "

my 2nd step

") }) test_that("can be created for certain groups", { pip <- Pipeline$new("pipe") # step1 is data pip$add("step2", \(a = ~data) a + 1, group = "add") pip$add("step3", \(a = ~step2) 2 * a, group = "mult") pip$add("step4", \(a = ~step2, b = ~step3) a + b, group = "add") pip$add("step5", \(a = ~data, b = ~step4) a * b, group = "mult") f <- get_private(pip)$.create_node_table res.all <- f() res.add <- f(groups = "add") expect_equal(res.add, res.all[c(2, 4), ]) res.mult <- f(groups = "mult") expect_equal(res.mult, res.all[c(3, 5), ]) expect_equal( f(groups = c("data", "add", "mult")), f() ) }) test_that("signals bad group specification", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.create_node_table expect_error( f(groups = "foo"), "all(groups %in% self$pipeline[[\"group\"]]) is not TRUE", fixed = TRUE ) expect_error( f(groups = 1), "is.character(groups) is not TRUE", fixed = TRUE ) }) }) describe(".derive_dependencies", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.derive_dependencies test_that("if no params are defined, dependencies are empty", { expect_equal(f(params = list(), step = "step"), character(0)) expect_equal(f(params = list(a = 1), step = "step"), character(0)) }) test_that("signals bad input", { expect_error( f(params = list(), step = 1), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( f(params = list(), step = "step", toStep = 2), "is_string(toStep) is not TRUE", fixed = TRUE ) }) test_that("extracts all dependencies defined via step name", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.derive_dependencies pip$add("f1", \(a = 1) a) pip$add("f2", \(b = 2) b) expect_equal( f(params = list(x = ~f1), step = "step3"), c(x = "f1") ) expect_equal( f(params = list(x = ~f1, y = ~f2), step = "step3"), c(x = "f1", y = "f2") ) }) test_that("extracts all dependencies defined relative", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.derive_dependencies pip$add("f1", \(a = 1) a) pip$add("f2", \(b = 2) b) expect_equal( f(params = list(x = ~-1), step = "step3"), c(x = "f2") ) expect_equal( f(params = list(x = ~-1, y = ~-2), step = "step3"), c(x = "f2", y = "f1") ) }) test_that("extracts all dependencies if defined both ways", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.derive_dependencies pip$add("f1", \(a = 1) a) pip$add("f2", \(b = 2) b) expect_equal( f(params = list(x = ~-1, y = ~f2), step = "step3"), c(x = "f2", y = "f2") ) }) }) describe(".extract_dependent_out", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.extract_dependent_out test_that("if no depends, NULL is returned", { expect_true(is.null(f(depends = list(), out = list()))) }) test_that("signals badly typed input", { expect_error( f(1:2, list()), "is.character(depends) || is.list(depends) is not TRUE", fixed = TRUE ) expect_error( f(list(), 1), "is.list(out) is not TRUE", fixed = TRUE ) }) test_that("signals if dependency is not found in out", { expect_no_error( f(depends = "foo", out = list("foo" = 1)) ) expect_error( f(depends = "foo", out = list()), "all(unlist(depends) %in% names(out)) is not TRUE", fixed = TRUE ) # Dependencies can be lists of dependencies depends <- list(x = c("a", "b")) out <- list(a = 1) expect_error(f(depends, out)) }) test_that("output is extracted as expected", { out <- list(a = 1, b = 2, c = 3, d = 4) expect_equal( f(c(x = "a"), out), list(x = 1) ) expect_equal( f(c(x = "a", y = "c"), out), list(x = 1, y = 3) ) expect_equal( f(list(x = c("a", "c"), y = c("b", "d")), out), list( x = list(a = 1, c = 3), y = list(b = 2, d = 4) ) ) }) }) describe(".extract_depends_from_param_list", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.extract_depends_from_param_list test_that("if no params are defined, dependencies are empty", { expect_equal(f(params = list()), character(0)) }) test_that("params must be a list or NULL", { expect_equal(f(params = NULL), character(0)) expect_error( f(params = c(a = 1)), "is.list(params) is not TRUE", fixed = TRUE ) }) test_that("if no dependencies are defined, nothing is extracted", { expect_equal(f(params = list(a = 1)), character(0)) expect_equal(f(params = list(a = 1, b = 2)), character(0)) }) test_that("if dependencies are defined, they are extracted", { expect_equal(f(params = list(a = ~x)), c(a = "x")) expect_equal( f(params = list(a = ~x, b = ~-1)), c(a = "x", b = "-1") ) expect_equal( f(params = list(a = ~x, b = ~-1, c = 1)), c(a = "x", b = "-1") ) }) }) describe(".get_depends_grouped", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.get_depends_grouped test_that("grouped dependencies are obtained correctly", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.get_depends_grouped expect_equal(f(), list("data")) pip$add("f1", \(a = ~data) a) expect_equal(f(), list(c("data", "f1"))) pip$add("f2", \(a = 1) a) pip$add("f3", \(a = ~f2) a) pip$add("f4", \(a = ~f1) a) expect_equal( f(), list( c("f2", "f3"), c("data", "f1", "f4") ) ) }) test_that("grouped dependencies are obtained correctly for complete data split", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = 2) b)$ add("f3", \(x = ~f1, y = ~f2) x + y) f <- get_private(pip)$.get_depends_grouped pip$set_data_split(dataList, toStep = "f3") expect_equal( f(), list( "data.1", c("f1.1", "f2.1", "f3.1"), "data.2", c("f1.2", "f2.2", "f3.2") ) ) }) test_that("grouped dependencies are obtained correctly for partial data split", { dat1 <- data.frame(x = 1:2) dat2 <- data.frame(y = 1:2) dataList <- list(dat1, dat2) pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a)$ add("f2", \(a = ~f1, b = 2) b)$ add("f3", \(x = ~f1, y = ~f2) x + y) f <- get_private(pip)$.get_depends_grouped pip$set_data_split(dataList, toStep = "f2") expect_equal( f(), list( "data.1", "data.2", c("f1.1", "f2.1", "f1.2", "f2.2", "f3") ) ) }) test_that( "if all steps somehow are dependent on each other, just one group is returned", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.get_depends_grouped pip$add("f1", \(a = ~data) a) pip$add("f2", \(a = 1) a) pip$add("f3", \(a = ~f2) a) pip$add("f4", \(a = ~f1, b = ~f3) a) expect_equal( unlist(f()), pip$get_step_names() ) }) }) describe(".get_downstream_depends", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.get_downstream_depends test_that("badly typed inputs are signalled", { expect_error( f(step = 1), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( f(step = "foo", depends = c(a = 1)), "is.character(depends) || is.list(depends) is not TRUE", fixed = TRUE ) expect_error( f(step = "foo", depends = list(), recursive = 1), "is.logical(recursive)", fixed = TRUE ) }) test_that("if no dependencies, empty character vector is returned", { expect_equal(f(step = "foo", depends = character()), character(0)) expect_equal(f(step = "foo", depends = list()), character(0)) }) test_that( "if no depends, recursive should give same as non-recursiv call", { expect_equal( f(step = "foo", depends = list()), f(step = "foo", depends = list(), recursive = FALSE) ) }) test_that("dependencies by default are determined recursively", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f1"), f3 = c(a = "f2") ) expect_equal(f("f3", depends), character(0)) expect_equal(f("f2", depends), c("f3")) expect_equal(f("f1", depends), c("f2", "f3")) expect_equal(f("f0", depends), c("f1", "f2", "f3")) }) test_that("returned dependencies are unique", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f0", b = "f1"), f3 = c(a = "f0", b = "f1", c = "f2") ) expect_equal(f("f3", depends), character(0)) expect_equal(f("f2", depends), c("f3")) expect_equal(f("f1", depends), c("f2", "f3")) expect_equal(f("f0", depends), c("f1", "f2", "f3")) }) test_that("dependencies can be determined non-recursively", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f1"), f3 = c(a = "f2") ) expect_equal(f("f3", depends, recursive = FALSE), character(0)) expect_equal(f("f2", depends, recursive = FALSE), c("f3")) expect_equal(f("f1", depends, recursive = FALSE), c("f2")) expect_equal(f("f0", depends, recursive = FALSE), c("f1")) }) test_that("works with multiple dependencies given in sublist", { depends <- list( f2 = c(a = "f1", b = "data"), f3 = list( x = c("f0", "f1"), y = c("f1", "f2") ) ) expect_equal(f("f1", depends), c("f2", "f3")) }) }) describe(".get_last_step", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.get_last_step expect_equal(f(), "data") pip$add("f1", \(a = 1) a) expect_equal(f(), "f1") pip$add("f2", \(a = 1) a) expect_equal(f(), "f2") pip$pop_step() expect_equal(f(), "f1") }) describe(".get_upstream_depends", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.get_upstream_depends test_that("badly typed inputs are signalled", { expect_error( f(step = 1), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( f(step = "foo", depends = c(a = 1)), "is.character(depends) || is.list(depends) is not TRUE", fixed = TRUE ) expect_error( f(step = "foo", depends = list(), recursive = 1), "is.logical(recursive)", fixed = TRUE ) }) test_that("if no dependencies, empty character vector is returned", { expect_equal(f(step = "foo", depends = character()), character(0)) expect_equal(f(step = "foo", depends = list()), character(0)) }) test_that("dependencies by default are determined recursively", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f1"), f3 = c(a = "f2") ) expect_equal(f("f0", depends), character(0)) expect_equal(f("f1", depends), c("f0")) expect_equal(f("f2", depends), c("f1", "f0")) expect_equal(f("f3", depends), c("f2", "f1", "f0")) }) test_that("returned dependencies are unique", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f0", b = "f1"), f3 = c(a = "f0", b = "f1", c = "f2") ) expect_equal(f("f0", depends), character(0)) expect_equal(f("f1", depends), c("f0")) expect_equal(f("f2", depends), c("f0", "f1")) expect_equal(f("f3", depends), c("f0", "f1", "f2")) }) test_that("dependencies can be determined non-recursively", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f1"), f3 = c(a = "f2") ) expect_equal(f("f0", depends, recursive = FALSE), character(0)) expect_equal(f("f1", depends, recursive = FALSE), c("f0")) expect_equal(f("f2", depends, recursive = FALSE), c("f1")) expect_equal(f("f3", depends, recursive = FALSE), c("f2")) }) test_that("works with multiple dependencies given in sublist", { depends <- list( f1 = c(a = "f0"), f2 = c(a = "f1", b = "data"), f3 = list( x = c("f1"), y = c("f1", "f2") ), f4 = list(x = c("f3", "data")) ) expect_equal(f("f2", depends), c("f1", "data", "f0")) expect_equal(f("f3", depends), c("f1", "f2", "f0", "data")) expect_equal(f("f4", depends), c("f3", "data", "f1", "f2", "f0")) }) }) describe(".init_function_and_params", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.prepare_and_verify_params test_that("fun must be a function", { expect_error( f(fun = 1), "is.function(fun) is not TRUE", fixed = TRUE ) }) test_that("funcName must be a string", { expect_error( f(fun = identity, funcName = 1), "is_string(funcName) is not TRUE", fixed = TRUE ) }) test_that("params must be a list", { expect_error( f(fun = identity, funcName = "identity", params = 1), "is.list(params) is not TRUE", fixed = TRUE ) }) test_that("returns all function args in the parameter list", { foo <-\(a = 1, b = 2, c = 3) 1 res <- f(foo, "foo") expect_equal(res, list(a = 1, b = 2, c = 3)) }) test_that("params given in the list override the function arg", { foo <-\(a = 1, b = 2, c = 3) 1 res <- f(foo, "foo", params = list(a = 9, b = 99)) expect_equal(res, list(a = 9, b = 99, c = 3)) }) test_that("signals undefined function args unless they are defined in the parameter list", { foo <-\(a, b = 2, c = 3) 1 expect_error( f(foo, "foo"), "'a' parameter(s) must have default values", fixed = TRUE ) res <- f(foo, "foo", params = list(a = 9)) expect_equal(res, list(a = 9, b = 2, c = 3)) }) test_that("signals if parameter is not defined in function args, unless function is defined with ...", { foo <-\(a) 1 expect_error( f(foo, "foo", params = list(a = 1, b = 2, c = 3)), "'b', 'c' are no function parameters of 'foo'", fixed = TRUE ) foo <-\(a, ...) 1 res <- f(foo, "foo", params = list(a = 1, b = 2, c = 3)) expect_equal(res, list(a = 1, b = 2, c = 3)) }) }) describe(".relative_dependency_to_index", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.relative_dependency_to_index test_that("signals bad input", { expect_error( f(relative_dep = "-1"), "is_number(relative_dep) is not TRUE", fixed = TRUE ) expect_error( f(relative_dep = 1), "relative_dep < 0", fixed = TRUE ) expect_error( f(-1, dependencyName = NULL), "is_string(dependencyName) is not TRUE", fixed = TRUE ) expect_error( f(-1, "dep-name", startIndex = "2"), "is_number(startIndex) is not TRUE", fixed = TRUE ) expect_error( f(-1, "dep-name", startIndex = -2), "startIndex > 0 is not TRUE", fixed = TRUE ) expect_error( f(-1, "dep-name", 2, step = 3), "is_string(step) is not TRUE", fixed = TRUE ) expect_no_error( f(-1, "dep-name", 2, step = "foo"), ) }) test_that("signals if relative dependency exceeds pipeline", { expect_error( f( relative_dep = -10, dependencyName = "dep-name", startIndex = 1, step = "step-name" ), paste( "step 'step-name': relative dependency dep-name=-10", "points to outside the pipeline" ), fixed = TRUE ) expect_error(f(-1, "dep-name", startIndex = 1, "step-name")) expect_error(f(-2, "dep-name", startIndex = 2, "step-name")) }) test_that("returns correct index for relative dependency", { expect_equal( f( relative_dep = -1, dependencyName = "dep-name", startIndex = 3, step = "step-name" ), 2 ) expect_equal( f( relative_dep = -2, dependencyName = "dep-name", startIndex = 3, step = "step-name" ), 1 ) }) }) describe(".run_step", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.run_step test_that("returns the result of the function at the given step", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = 2) b) f <- get_private(pip)$.run_step expect_equal(f(step = "A"), 1) expect_equal(f(step = "B"), 2) }) test_that("stores the result at the given step", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = 2) b) f <- get_private(pip)$.run_step expect_equal(f(step = "A"), 1) expect_equal(pip$get_out("A"), 1) expect_equal(f(step = "B"), 2) expect_equal(pip$get_out("B"), 2) }) test_that("uses output of dependent steps", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) b) f <- get_private(pip)$.run_step expect_equal(f(step = "B"), NULL) # Now set output for step 'A' i <- match("A", pip$get_step_names()) pip$pipeline[i, "out"] <- 2 expect_equal(f(step = "B"), 2) }) test_that("accepts Param object args", { pip <- Pipeline$new("pipe")$ add("A", \(a = new("NumericParam", "a", value = 1)) a) f <- get_private(pip)$.run_step expect_no_error(f("A")) }) test_that("if error, the failing step is given in the log", { lgr::unsuspend_logging() on.exit(lgr::suspend_logging()) pip <- Pipeline$new("pipe")$ add("A", \(a = 1) stop("something went wrong")) f <- get_private(pip)$.run_step log <- capture.output(expect_error(f("A"))) hasInfo <- grepl( log[1], pattern = "something went wrong {\"context\":\"Step 2 ('A')\"}", fixed = TRUE ) expect_true(hasInfo) }) test_that( "updates the state to 'done' if step was run successfully otherwise to 'failed'", { pip <- Pipeline$new("pipe")$ add("ok", \(x = 1) x)$ add("error", \(x = 2) stop("ups"))$ add("warning", \(x = 3) warning("hm")) f <- get_private(pip)$.run_step expect_true(all(pip$pipeline[["state"]] == "New")) f(step = "ok") pip$get_step("ok")$state expect_equal(pip$get_step("ok")$state, "Done") expect_error(f(step = "error")) expect_equal(pip$get_step("error")$state, "Failed") expect_warning(f(step = "warning")) expect_equal(pip$get_step("warning")$state, "Done") }) test_that("will re-compute if locked step does not keep output", { pip <- Pipeline$new("pipe")$ add("A", \(a = 1) a)$ add("B", \(b = ~A) b, keepOut = FALSE) f <- get_private(pip)$.run_step expect_equal(f(step = "A"), 1) expect_equal(f(step = "B"), 1) pip$set_params_at_step("A", list(a = 2)) pip$lock_step("B") get_private(pip)$.clean_out_not_kept() expect_true(is.null(pip$get_out("B"))) expect_equal(f(step = "A"), 2) expect_equal(f(step = "B"), 2) }) }) describe(".set_at_step", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.set_at_step test_that("field must be a string and exist", { pip <- Pipeline$new("pipe") expect_error( f("data", field = 1), "is_string(field) is not TRUE", fixed = TRUE ) expect_error( f("data", field = "foo"), "field %in% names(self$pipeline) is not TRUE", fixed = TRUE ) }) test_that("sets the value without change data class of the fields", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) f <- get_private(pip)$.set_at_step before <- sapply(pip$pipeline, data.class) f("f1", field = "step", value = "f2") f("f2", field = "fun", value = mean) f("f2", field = "funcName", value = "my fun") f("f2", field = "params", value = list(a = 1, b = 2)) f("f2", field = "out", value = 1) f("f2", field = "keepOut", value = TRUE) f("f2", field = "group", value = "my group") f("f2", field = "description", value = "my description") f("f2", field = "time", value = Sys.time()) f("f2", field = "state", value = "new-state") after <- sapply(pip$pipeline, data.class) expect_equal(before, after) }) test_that("signals class mismatch unless field is of class list", { pip <- Pipeline$new("pipe")$ add("f1", \(a = 1) a) f <- get_private(pip)$.set_at_step ee <- expect_error ee(f("f1", field = "step", value = 1)) ee(f("f2", field = "fun", value = "mean")) ee(f("f2", field = "funcName", value = list("my fun"))) ee(f("f2", field = "params", value = c(a = 1, b = 2))) ee(f("f2", field = "keepOut", value = 1)) ee(f("f2", field = "group", value = list("my group"))) ee(f("f2", field = "description", value = list("my description"))) ee(f("f2", field = "time", value = as.character(Sys.time()))) ee(f("f2", field = "state", value = list("new-state"))) }) }) describe(".update_states_downstream", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.update_states_downstream test_that("state must be a string", { pip <- Pipeline$new("pipe") expect_error( f("data", state = 1), "is_string(state) is not TRUE", fixed = TRUE ) }) test_that("states are updated according to dependencies", { pip <- Pipeline$new("pipe") f <- get_private(pip)$.update_states_downstream pip$add("f1", \(a = ~data) a) pip$add("f2", \(a = ~f1) a) pip$add("f3", \(a = 1) a) pip$add("f4", \(a = ~f2) a) expect_true(all(pip$pipeline[["state"]] == "New")) f("f1", state = "new-state") states <- pip$pipeline[["state"]] |> stats::setNames(pip$get_step_names()) expect_equal( states, c( data = "New", f1 = "New", f2 = "new-state", f3 = "New", f4 = "new-state" ) ) f("data", state = "another-state") states <- pip$pipeline[["state"]] |> stats::setNames(pip$get_step_names()) expect_equal( states, c( data = "New", f1 = "another-state", f2 = "another-state", f3 = "New", f4 = "another-state" ) ) }) }) describe(".verify_dependency", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.verify_dependency test_that("signals badly typed input", { expect_error( f(dep = 1), "is_string(dep) is not TRUE", fixed = TRUE ) expect_error( f(dep = "dep", step = 1), "is_string(step) is not TRUE", fixed = TRUE ) expect_error( f(dep = "dep", step = "step", toStep = 1), "is_string(toStep) is not TRUE", fixed = TRUE ) }) test_that("signals non existing toStep", { expect_error( f("dep", "step", toStep = "non-existent"), "step 'non-existent' does not exist", fixed = TRUE ) }) test_that("verifies valid depdendencies", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.verify_dependency pip$add("f1", \(a = 1) a) pip$add("f2", \(a = 1) a) expect_true(f(dep = "f1", "new-step")) expect_true(f(dep = "f2", "new-step")) }) test_that("signals if dependency is not defined", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.verify_dependency pip$add("f1", \(a = 1) a) pip$add("f2", \(a = 1) a) expect_error( f(dep = "f3", "new-step"), "dependency 'f3' not found", fixed = TRUE ) expect_error( f(dep = "f2", "new-step", toStep = "f1"), "dependency 'f2' not found up to step 'f1'", fixed = TRUE ) }) }) describe(".verify_from_to", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.verify_from_to eefix <-\(...) expect_error(..., fixed = TRUE) it("signals badly typed args", { pip <- expect_no_error(Pipeline$new("pipe")) pip$add("f1", \(a = 1) a) f <- get_private(pip)$.verify_from_to eefix( f(from = "f1", to = 2), "is_number(from) is not TRUE" ) eefix( f(from = 1, to = "f1"), "is_number(to) is not TRUE" ) }) it("returns true if to <= from", { pip <- expect_no_error(Pipeline$new("pipe")) pip$add("f1", \(a = 1) a) f <- get_private(pip)$.verify_from_to expect_true(f(from = 1, to = 2)) }) it("signals if from > to", { eefix( f(from = 2, to = 1), "from <= to is not TRUE" ) }) it("signals if to > pipeline length", { pip <- expect_no_error(Pipeline$new("pipe")) pip$add("f1", \(a = 1) a) f <- get_private(pip)$.verify_from_to eefix( f(from = 1, to = pip$length() + 1), "'to' must not be larger than pipeline length" ) }) }) describe(".verify_fun_params", { pip <- expect_no_error(Pipeline$new("pipe")) f <- get_private(pip)$.verify_fun_params test_that("returns TRUE if function has no args", { fun <-\() 1 expect_true(f(fun, "funcName")) }) test_that("returns TRUE if args are defined with default values", { fun <-\(x = 1, y = 2) x + y expect_true(f(fun, "funcName")) }) test_that("if not defined with default values, params can be passed in addition", { fun <-\(x, y) x + y expect_true(f(fun, "funcName", params = list(x = 1, y = 2))) }) test_that("signals parameters with no default values", { fun <-\(x, y, z = 1) x + y expect_error( f(fun, "funcName"), "'x', 'y' parameter(s) must have default values", fixed = TRUE ) fun <-\(x, y = 1, z) x + y expect_error( f(fun, "funcName"), "'x', 'z' parameter(s) must have default values", fixed = TRUE ) }) test_that("supports ...", { fun <-\(x, ...) x expect_true(f(fun, "funcName", params = list(x = 1))) }) test_that("signals undefined parameters", { fun <-\(x = 1, y = 1) x + y params <- list(x = 1, undef1 = 1, undef2 = 2) expect_error( f(fun, "funcName", params = params), "'undef1', 'undef2' are no function parameters of 'funcName'", fixed = TRUE ) }) test_that( "supports additional parameters if function is defined with ...", { fun <-\(x, ...) x params <- list(x = 1, add1 = 1, add2 = 2) expect_true(f(fun, "funcName", params = params)) }) test_that("signals badly typed input", { expect_error( f("mean"), "is.function(fun) is not TRUE", fixed = TRUE ) expect_error( f(mean, funcName = 1), "is_string(funcName) is not TRUE", fixed = TRUE ) expect_error( f(mean, funcName = "mean", params = 1), "is.list(params) is not TRUE", fixed = TRUE ) }) }) })