# Helpers for testing. #region General test helpers. # Helper for extracting the message associated with errors and warnings. as_text <- function(expression) { # Capture message. message <- tryCatch(expression, # Capture warning message. warning = function(w) w$message, # Capture error message. error = function(e) e$message ) return(message) } # Select a cluster type, with some variability. pick_cluster_type <- function(types) { # Decide what type of cluster to create. if (.Platform$OS.type == "unix") { # Randomly pick a cluster type. cluster_type <- sample(types, 1) } else { # Fix the cluster type to "psock" on Windows. cluster_type <- "psock" } return(cluster_type) } # Select backend type. pick_backend_type <- function() { # Randomly pick a backend type. backend_type <- sample(c("sync", "async"), 1) return(backend_type) } # Define the test task to use. test_task <- function(x, y, z, sleep = 0) { # Sleep a bit or not. Sys.sleep(sleep) # Compute something. output <- (x + y) / z # Return the result. return(output) } # Check if a task is running on an asynchronous backend, or context. task_is_running <- function(backend) { # If a context is passed. if ("Context" %in% class(backend)) { # Get the status via the context. status <- backend$backend$task_state$task_is_running } else { # Get the status from the backend directly. status <- backend$task_state$task_is_running } return(status) } # Check if the body of a decorated tasks contains a specific pattern. body_contains <- function(task, pattern, position = 2) { # Get the body of the decorated task at a specific position. extraction <- as.list(body(task))[position] # Check if the body at position contains the pattern. contains <- grepl(pattern, extraction) return(contains) } # Block the main session until an asynchronous task finishes execution. block_until_async_task_finished <- function(backend) { # Block the main session until the task is finished. while (task_is_running(backend)) { # Sleep a bit. Sys.sleep(0.001) } # Remain silent. invisible(NULL) } #endregion #region Tests sets applicable to all backends types. # Set of tests for unimplemented backend service methods. tests_set_for_unimplemented_service_methods <- function(service) { # Expect an error when calling the `start` method. expect_error(service$start(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `stop` method. expect_error(service$stop(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `clear` method. expect_error(service$clear(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `peek` method. expect_error(service$peek(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `export` method. expect_error(service$export(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `evaluate` method. expect_error(service$evaluate(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `sapply` method. expect_error(service$sapply(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `lapply` method. expect_error(service$lapply(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `apply` method. expect_error(service$apply(), as_text(Exception$method_not_implemented())) # Expect an error when calling the `get_output` method. expect_error(service$get_output(), as_text(Exception$method_not_implemented())) } # Set of tests for exporting to the backend (i.e,. regardless of type). tests_set_for_backend_exporting <- function(service) { # Create a variable in a different environment. env <- new.env() env$test_variable <- rnorm(1) # Export the variable from the specific environment to the backend. service$export("test_variable", env) # Expect that the variable is on the backend. expect_true(all(service$peek() == "test_variable")) # Expect the cluster to hold the correct value for the exported variable. expect_true(all(service$evaluate(test_variable) == env$test_variable)) # Assign a variable to the current environment. assign("test_variable", rnorm(1), envir = environment()) # Export the variable using the current environment (i.e., parent of `export`). service$export("test_variable") # Expect that the variable is on the backend. expect_true(all(service$peek() == "test_variable")) # Expect the cluster to hold the correct value for the exported variable. expect_true(all(service$evaluate(test_variable) == get("test_variable", envir = environment()))) } # Set of tests for starting and stopping backends. tests_set_for_backend_states <- function(backend, specification) { # Expect an error if an attempt is made to start a cluster while one is already active. expect_error(backend$start(specification), as_text(Exception$cluster_active())) # Stop the backend. backend$stop() # Expect that stopping the cluster marks it as inactive. expect_false(backend$active) # Expect the cluster field has been cleared. expect_null(backend$cluster) # Start a new cluster on the same backend instance. backend$start(specification) # Expect the cluster is active. expect_true(backend$active) # Stop the cluster. backend$stop() # Expect that trying to stop a cluster that is not active throws an error. expect_error(backend$stop(), as_text(Exception$cluster_not_active())) } #endregion #region Tests sets for synchronous backends. # Set of tests for the synchronous backend task execution via a specified operation. tests_set_for_synchronous_backend_task_execution <- function(operation, service, expected_output) { # Run the task in parallel via the requested operation (e.g., `sapply`). eval(operation) # Expect the that output is correct. expect_equal(service$get_output(), expected_output) # Expect that subsequent calls to `get_output` return `NULL`. expect_null(service$get_output()) # Remain silent. invisible(NULL) } # Set of tests for synchronous backend operations. tests_set_for_synchronous_backend_operations <- function(service, specification, task) { # Start the cluster on the backend. service$start(specification) # Always stop on exit. on.exit({ # Stop the backend. service$stop() }) # Expect that the cluster is empty upon creation. expect_true(all(sapply(service$peek(), length) == 0)) # Tests for exporting to the backend. tests_set_for_backend_exporting(service) # Clear the backend. service$clear() # Expect that clearing the cluster leaves it empty. expect_true(all(sapply(service$peek(), length) == 0)) # Select task arguments. x <- sample(1:100, 100) y <- sample(1:100, 1) z <- sample(1:100, 1) sleep = sample(c(0, 0.001, 0.002), 1) # Created the expect output. expected_output <- task(x, y, z) # Define the `sapply` operation. operation <- bquote(service$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `sapply` operation. tests_set_for_synchronous_backend_task_execution(operation, service, expected_output) # Expect that errors in the task executed via `sapply` are propagated. expect_error( service$sapply(1:10, function(x) stop("Task error.")), "Task error." ) # Created the expect output. expected_output <- as.list(expected_output) # Define the `lapply` operation. operation <- bquote(service$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `lapply` operation. tests_set_for_synchronous_backend_task_execution(operation, service, expected_output) # Expect that errors in the task executed via `lapply` are propagated. expect_error( service$lapply(1:10, function(x) stop("Task error.")), "Task error." ) # Redefine `x` as a matrix for the `apply` operation. x <- matrix(rnorm(100^2), nrow = 100, ncol = 100) # Compute the expected output for the `apply` operation applied over rows. expected_output <- base::apply(x, 1, task, y = y, z = z) # Define the `apply` operation over rows. operation <- bquote(service$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over rows. tests_set_for_synchronous_backend_task_execution(operation, service, expected_output) # Compute the expected output for the `apply` operation applied over columns. expected_output <- base::apply(x, 2, task, y = y, z = z) # Define the `apply` operation over columns. operation <- bquote(service$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over columns. tests_set_for_synchronous_backend_task_execution(operation, service, expected_output) # Redefine a smaller `x` matrix for the `apply` operation applied element-wise. x <- matrix(rnorm(10^2), nrow = 10, ncol = 10) # Compute the expected output for the `apply` operation applied element-wise. expected_output <- base::apply(x, c(1, 2), task, y = y, z = z) # Define the `apply` operation element-wise. operation <- bquote(service$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation element-wise. tests_set_for_synchronous_backend_task_execution(operation, service, expected_output) # Expect error when margins higher than the array dimensions are provided. expect_error( service$apply(x, 3, task, y = y, z = z), as_text(Exception$array_margins_not_compatible(3, dim(x))), fixed = TRUE ) # Expect error when duplicate margins are provided. expect_error( service$apply(x, c(1, 1), task, y = y, z = z), as_text(Exception$array_margins_not_compatible(c(1, 1), dim(x))), fixed = TRUE ) # Expect errors when the margins are not compatible with the array dimensions. expect_error( service$apply(x, c(1, 2, 3, 1), task, y = y, z = z), as_text(Exception$array_margins_not_compatible(c(1, 2, 3, 1), dim(x))), fixed = TRUE ) # Expect that errors in the task executed via `apply` are propagated. expect_error( service$apply(matrix(1:100, 10, 10), 1, function(x) stop("Task error.")), "Task error." ) # Expect that the cluster is empty after performing operations on it. expect_true(all(sapply(service$peek(), length) == 0)) # Remain silent. invisible(NULL) } #endregion #region Tests sets for asynchronous backends. # Set of tests for the asynchronous backend task execution via a specified operation. tests_set_for_asynchronous_backend_task_execution <- function(operation, service, expected_output) { # Run the task in parallel. eval(operation) # Expect the that output is correct. expect_equal(service$get_output(wait = TRUE), expected_output) # Expect that subsequent calls to `get_output` will throw an error. expect_error(service$get_output(), as_text(Exception$async_task_not_started())) # Run the task in parallel, with a bit of overhead. eval(operation) # Expect that trying to run a task while another is running fails. expect_error(eval(operation), as_text(Exception$async_task_running())) # Expect the that output is correct. expect_equal(service$get_output(wait = TRUE), expected_output) # Run the task in parallel, with a bit of overhead. eval(operation) # Expect that trying to get the output of a task that is still running fails. expect_error(service$get_output(), as_text(Exception$async_task_running())) # Block the main thread until the task is finished. block_until_async_task_finished(service) # Expect that trying to run a task without reading the previous output fails. expect_error(eval(operation), as_text(Exception$async_task_completed())) # Expect the that output is correct. expect_equal(service$get_output(), expected_output) } # Set of tests for synchronous backend operations. tests_set_for_asynchronous_backend_operations <- function(service, specification, task) { # Start the cluster on the backend. service$start(specification) # Always stop on exit. on.exit({ # Stop the backend. service$stop() }) # Expect that the cluster is empty upon creation. expect_true(all(sapply(service$peek(), length) == 0)) # Tests for the `export` operation. tests_set_for_backend_exporting(service) # Clear the backend. service$clear() # Expect that clearing the cluster leaves it empty. expect_true(all(sapply(service$peek(), length) == 0)) # Expect error waiting to fetch the output when no task is running. expect_error(service$get_output(wait = TRUE), as_text(Exception$async_task_not_started())) # Select task arguments. x <- sample(1:100, 100) y <- sample(1:100, 1) z <- sample(1:100, 1) sleep = sample(c(0, 0.001, 0.002), 1) # Compute the expected output for the `sapply` operation. expected_output <- task(x, y, z) # Define the `sapply` operation. operation <- bquote(service$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `sapply` operation. tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output) # Expect that errors in the task executed via `sapply` are propagated. expect_error( { service$sapply(1:10, function(x) stop("Task error.")) service$get_output(wait = TRUE) }, "Task error." ) # Compute the expected output for the `lapply` operation. expected_output <- as.list(expected_output) # Define the `lapply` operation. operation <- bquote(service$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `lapply` operation. tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output) # Expect that errors in the task executed via `lapply` are propagated. expect_error( { service$lapply(1:10, function(x) stop("Task error.")) service$get_output(wait = TRUE) }, "Task error." ) # Redefine `x` as a matrix for the `apply` operation. x <- matrix(rnorm(100^2), nrow = 100, ncol = 100) # Compute the expected output for the `apply` operation applied over rows. expected_output <- base::apply(x, 1, task, y = y, z = z) # Define the `apply` operation over rows. operation <- bquote(service$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over rows. tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output) # Compute the expected output for the `apply` operation applied over columns. expected_output <- base::apply(x, 2, task, y = y, z = z) # Define the `apply` operation over columns. operation <- bquote(service$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over columns. tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output) # Redefine a smaller `x` matrix for the `apply` operation applied element-wise. x <- matrix(rnorm(10^2), nrow = 10, ncol = 10) # Compute the expected output for the `apply` operation applied element-wise. expected_output <- base::apply(x, c(1, 2), task, y = y, z = z) # Define the `apply` operation element-wise. operation <- bquote(service$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation element-wise. tests_set_for_asynchronous_backend_task_execution(operation, service, expected_output) # Expect error when margins higher than the array dimensions are provided. expect_error( service$apply(x, 3, task, y = y, z = z), as_text(Exception$array_margins_not_compatible(3, dim(x))), fixed = TRUE ) # Expect error when duplicate margins are provided. expect_error( service$apply(x, c(1, 1), task, y = y, z = z), as_text(Exception$array_margins_not_compatible(c(1, 1), dim(x))), fixed = TRUE ) # Expect errors when the margins are not compatible with the array dimensions. expect_error( service$apply(x, c(1, 2, 3, 1), task, y = y, z = z), as_text(Exception$array_margins_not_compatible(c(1, 2, 3, 1), dim(x))), fixed = TRUE ) # Expect that errors in the task executed via `apply` are propagated. expect_error( { service$apply(matrix(1:100, 10, 10), 1, function(x) stop("Task error.")) service$get_output(wait = TRUE) }, "Task error." ) # Expect that the cluster is empty after performing operations on it. expect_true(all(sapply(service$peek(), length) == 0)) # Remain silent. invisible(NULL) } #endregion #region Tests sets for progress tracking. # Set of tests for executing tasks in a progress tracking context with output. tests_set_for_task_execution_with_progress_tracking <- function(operation, context, expected_output) { # Clear the progress output on exit. on.exit({ # Clear the output. context$progress_bar_output <- NULL }) # Create a bar factory. bar_factory <- BarFactory$new() # Get a basic bar instance. bar <- bar_factory$get("basic") # Register the bar with the context object. context$set_bar(bar) # Configure the bar. context$configure_bar( style = 3 ) # Run the task in parallel. eval(operation) # Expect that the task output is correct. expect_equal(context$get_output(wait = TRUE), expected_output) # Expect the progress bar was shown correctly. expect_true(any(grepl("=\\| 100%", context$progress_bar_output))) # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the same context object. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = ":bar| :percent", clear = FALSE, force = TRUE ) # Run the task in parallel. eval(operation) # Expect that the task output is correct. expect_equal(context$get_output(wait = TRUE), expected_output) # Expect the progress bar was shown correctly. expect_true(any(grepl("=\\| 100%", context$progress_bar_output))) } # Set of tests for progress tracking context. tests_set_for_progress_tracking_context <- function(context, task) { # Check the type. Helper$check_object_type(context, "ProgressTrackingContextTester") # Select task arguments. x <- sample(1:100, 100) y <- sample(1:100, 1) z <- sample(1:100, 1) sleep <- sample(c(0, 0.001, 0.002), 1) # Create the expected output for the `sapply` operation. expected_output <- task(x, y, z) # Create the `sapply` operation. operation <- bquote(context$sapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `sapply` operation in a progress tracking context. tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) # Create the expected output for the `lapply` operation. expected_output <- as.list(expected_output) # Create the `lapply` operation. operation <- bquote(context$lapply(.(x), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `lapply` operation in a progress tracking context. tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) # Redefine `x` as a matrix for the `apply` operation. x <- matrix(rnorm(100^2), nrow = 100, ncol = 100) # Compute the expected output for the `apply` operation applied over rows. expected_output <- base::apply(x, 1, task, y = y, z = z) # Define the `apply` operation over rows. operation <- bquote(context$apply(.(x), 1, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over rows in a progress tracking context. tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) # Compute the expected output for the `apply` operation applied over columns. expected_output <- base::apply(x, 2, task, y = y, z = z) # Define the `apply` operation over columns. operation <- bquote(context$apply(.(x), 2, .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over columns in a progress tracking context. tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) # Redefine a smaller `x` matrix for the `apply` operation applied element-wise. x <- matrix(rnorm(10^2), nrow = 10, ncol = 10) # Compute the expected output for the `apply` operation applied element-wise. expected_output <- base::apply(x, c(1, 2), task, y = y, z = z) # Define the `apply` operation element-wise. operation <- bquote(context$apply(.(x), c(1, 2), .(task), y = .(y), z = .(z), sleep = .(sleep))) # Tests for the `apply` operation over all elements in a progress tracking context. tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) } # Set of tests for executing tasks that throw errors in a progress tracking context. tests_set_for_task_execution_with_progress_tracking_and_error <- function(operation, context, expected_error) { # Clear the progress output on exit. on.exit({ # Clear the output. context$progress_bar_output <- NULL }) # Create a bar factory. bar_factory <- BarFactory$new() # Get a basic bar instance. bar <- bar_factory$get("basic") # Register the bar with the context object. context$set_bar(bar) # Configure the bar. context$configure_bar( style = 3 ) # Run the task that throws an error in parallel. eval(operation) # Expect that the task interrupted the progress bar with an error message. expect_error(context$get_output(wait = TRUE), expected_error) # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the same context object. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = ":bar| :percent", clear = FALSE, force = TRUE ) # Run the task in parallel. eval(operation) # Expect that the task interrupted the progress bar with an error message. expect_error(context$get_output(wait = TRUE), expected_error) } # Set of tests for progress bar interruptions in a tracking context. tests_set_for_progress_tracking_context_with_error <- function(context) { # Check the type. Helper$check_object_type(context, "ProgressTrackingContextTester") # Clean-up. on.exit({ # Set default values for package options. set_default_options() }) # Reduce waiting time between progress bar updates. set_option("progress_wait", 0.01) # Define the task. task <- function(x, error_x = 1, sleep = 0) { # Sleep a bit or not. Sys.sleep(sleep) # Randomly sample when to throw an error. if (any(x == error_x)) { # Throw an error. stop("Intentional task error.") } # Compute something. output <- x + 1 # Return the result. return(output) } # Construct the expected error message. expected_error <- as_text(task(x = 1, error_x = 1)) # Select task arguments. x <- sample(1:100, 100) error_x <- sample(length(x), 1) sleep <- sample(c(0, 0.001, 0.002), 1) # Create the `sapply` operation. operation <- bquote(context$sapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep))) # Tests for the `sapply` operation in a progress tracking context with error in the task. tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) # Create the `lapply` operation. operation <- bquote(context$lapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep))) # Tests for the `lapply` operation in a progress tracking context with error in the task. tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) # Redefine `x` as a matrix for the `apply` operation. x <- matrix(rnorm(100^2), nrow = 100, ncol = 100) # Sample new error `x`. error_x <- sample(x, 1) # Define the `apply` operation over rows. operation <- bquote(context$apply(.(x), 1, .(task), error_x = .(error_x), sleep = .(sleep))) # Tests for the `apply` operation over rows in a progress tracking context with error in the task. tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) # Define the `apply` operation over columns. operation <- bquote(context$apply(.(x), 2, .(task), error_x = .(error_x), sleep = .(sleep))) # Tests for the `apply` operation over columns in a progress tracking context with error in the task. tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) # Redefine a smaller `x` matrix for the `apply` operation applied element-wise. x <- matrix(rnorm(10^2), nrow = 10, ncol = 10) # Sample new error `x`. error_x <- sample(x, 1) # Define the `apply` operation element-wise. operation <- bquote(context$apply(.(x), c(1, 2), .(task), error_x = .(error_x), sleep = .(sleep))) # Tests for the `apply` operation over all elements in a progress tracking context with error in the task. tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) } #endregion #region Tests sets for the user API. # Set of tests for creating backends via the user API. tests_set_for_backend_creation_via_user_api <- function(cluster_type, backend_type) { # Create a backend. backend <- start_backend( cores = 2, cluster_type = cluster_type, backend_type = backend_type ) # Expect the backend to be active. expect_true(backend$active) # Expect the backend to be of the correct type. expect_equal( Helper$get_class_name(backend), Helper$get_class_name(BackendFactory$new()$get(backend_type)) ) # Stop the backend stop_backend(backend) # Expect the backend to be inactive. expect_false(backend$active) } # Set of tests for task execution via the user API. tests_set_for_user_api_task_execution <- function(parallel, sequential, expected_output) { # Clean-up. on.exit({ # Set default values for package options. set_default_options() }) # Select a cluster type. cluster_type <- pick_cluster_type(Specification$new()$types) # Disable progress tracking. set_option("progress_track", FALSE) # Create a synchronous backend. backend <- start_backend( cores = 2, cluster_type = cluster_type, backend_type = "sync" ) # Expect the output of the task ran in parallel to be correct. expect_equal(eval(parallel), expected_output) # Enable progress tracking. set_option("progress_track", TRUE) # Expect warning for requesting progress tracking with incompatible backend. expect_warning( eval(parallel), as_text(Warning$progress_not_supported_for_backend(backend)) ) # Stop the synchronous backend. stop_backend(backend) # Create an asynchronous backend. backend <- start_backend( cores = 2, cluster_type = cluster_type, backend_type = "async" ) # Expect the output to be correct. expect_equal(eval(parallel), expected_output) # Disable progress tracking. set_option("progress_track", FALSE) # Expect the output to be correct. expect_equal(eval(parallel), expected_output) # Stop the asynchronous backend. stop_backend(backend) # Expect the task to produce correct output when ran sequentially. expect_equal(eval(sequential), expected_output) } # Set of tests for progress tracking via the user API. tests_set_for_user_api_progress_tracking <- function(operation) { # Pick a cluster type. cluster_type <- pick_cluster_type(Specification$new()$types) # Create an asynchronous backend. backend <- start_backend( cores = 2, cluster_type = cluster_type, backend_type = "async" ) # Clean-up on exit. on.exit({ # Stop the backend. stop_backend(backend) # Restore the default options. set_default_options() }) # Configure modern bar. configure_bar( type = "modern", force = TRUE, clear = FALSE ) # Redirect output accordingly. if (.Platform$OS.type == "windows") { # Redirect output to `nul`. sink("nul", type = "output") } else { # Redirect output to `/dev/null`. sink("/dev/null", type = "output") } # Run the task and capture the progress bar output. output <- capture.output({ eval(operation) }, type = "message") # Remove output redirection. sink(NULL) # Expect the progress bar to be shown correctly. expect_true(grepl("tasks \\[100%\\]", paste0(output, collapse = ""), perl = TRUE)) # Configure the basic bar. configure_bar( type = "basic", style = 3 ) # Run the task and capture the progress bar output. output <- capture.output({ eval(operation) }, type = "output") # Expect the progress bar to be shown correctly. expect_true(grepl("=\\| 100%", paste0(output, collapse = ""), perl = TRUE)) # Disable progress tracking. set_option("progress_track", FALSE) # Run the task and capture the output without the progress bar. output <- capture.output({ eval(operation) }, type = "output") # Expect the progress bar to be missing from the output. expect_false(grepl("=\\| 100%", paste0(output, collapse = ""), perl = TRUE)) } #endregion #region Helper `R6` classes for testing. # Helper for testing private methods of `Specification` class. SpecificationTester <- R6::R6Class("SpecificationTester", inherit = Specification, private = list( # Overwrite the private `.get_available_cores()` method. .get_available_cores = function() { return(self$available_cores) } ), public = list( available_cores = NULL ), active = list( # Expose the private `.determine_usable_cores` method. usable_cores = function() { # Compute and return the number of the usable cores. return(private$.determine_usable_cores(self$available_cores)) } ) ) # Helper for testing method implementations of `BackendService` interface. BackendServiceImplementation <- R6::R6Class("BackendServiceImplementation", inherit = BackendService, public = list( # Allow instantiation. initialize = function() {} ) ) # Helper for testing method implementations of `Backend` class. BackendImplementation <- R6::R6Class("BackendImplementation", inherit = Backend, public = list( # Allow instantiation. initialize = function() {} ) ) # Helper for testing the `ProgressTrackingContext` class. ProgressTrackingContextTester <- R6::R6Class("ProgressTrackingContextTester", inherit = ProgressTrackingContext, private = list( # Wrapper for executing task operations with progress output capturing. .execute_and_capture_progress = function(operation) { # Create a text connection. connection <- textConnection("output", open = "w", local = TRUE) # Capture the output. sink(connection, type = "output") sink(connection, type = "message") # Close the connection and reset the sink on exit. on.exit({ # Reset the sink. sink(NULL, type = "message") sink(NULL, type = "output") # Close the connection. close(connection) }) # Execute the task. eval(operation) # Store the progress bar output on the instance. self$progress_bar_output <- output } ), public = list( # The progress bar output used for testing. progress_bar_output = NULL, # Implementation for the `sapply` method capturing the progress output. sapply = function(x, fun, ...) { # Define the operation. operation <- bquote( do.call( super$sapply, c(list(.(x), .(fun)), .(list(...))) ) ) # Execute the task via the operation and capture the progress output. private$.execute_and_capture_progress(operation) }, # Implementation for the `lapply` method capturing the progress output. lapply = function(x, fun, ...) { # Define the operation. operation <- bquote( do.call( super$lapply, c(list(.(x), .(fun)), .(list(...))) ) ) # Execute the task via the operation and capture the progress output. private$.execute_and_capture_progress(operation) }, # Implementation for the `apply` method capturing the progress output. apply = function(x, margin, fun, ...) { # Define the operation. operation <- bquote( do.call( super$apply, c(list(.(x), .(margin), .(fun)), .(list(...))) ) ) # Execute the task via the operation and capture the progress output. private$.execute_and_capture_progress(operation) }, # Wrapper to expose `.make_log` for testing. make_log = function() { private$.make_log() }, # Wrapper to expose the `.decorate` for testing. decorate = function(task, log) { private$.decorate(task, log) } ), active = list( # Expose the bar configuration. bar_config = function() { return(private$.bar_config) } ) ) # Helper for testing method implementations of `Bar` class. BarImplementation <- R6::R6Class("BarImplementation", inherit = Bar, public = list( # Allow instantiation. initialize = function() {} ) ) #endregion