skip_if_no_redis() # start workers ---------------------------------------------------------------- test_that("constructing a rush manager works", { config = redux::redis_config() r = redux::hiredis(config) r$FLUSHDB() on.exit({ rush$reset() }) rush = rsh(network_id = "test-rush", config = config) expect_class(rush, "Rush") expect_equal(rush$network_id, "test-rush") expect_class(rush$config, "redis_config") expect_class(rush$connector, "redis_api") }) test_that("workers are started", { rush = start_rush() on.exit({ rush$reset() mirai::daemons(0) }) expect_data_table(rush$worker_info, nrows = 0) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) walk(rush$processes_mirai, function(process) expect_class(process, "mirai")) worker_info = rush$worker_info expect_data_table(worker_info, nrows = 2) expect_integer(worker_info$pid, unique = TRUE) expect_set_equal(worker_ids, worker_info$worker_id) expect_set_equal(rush$worker_ids, worker_ids) expect_set_equal(rush$worker_info$state, "running") }) test_that("packages are available on the worker", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) rush$start_workers( worker_loop = wl_queue, n_workers = 1, packages = "uuid" ) rush$wait_for_workers(1, timeout = 5) expect_equal(rush$n_workers, 1) xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys, detect_lost_workers = TRUE) expect_equal(rush$n_finished_tasks, 1) }) test_that("wait for workers works with worker ids", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1 ) expect_error(rush$wait_for_workers(timeout = 1), class = "Mlr3ErrorConfig", regexp = "Either") rush$wait_for_workers(worker_ids = worker_ids, timeout = 5) expect_equal(rush$n_running_workers, 1) # worker id does not exist so we expect a timeout expect_error(rush$wait_for_workers(worker_ids = "x", timeout = 1), class = "Mlr3ErrorTimeout") }) test_that("wait for workers works with n", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1 ) rush$wait_for_workers(n = 1, timeout = 5) expect_equal(rush$n_running_workers, 1) expect_error(rush$wait_for_workers(n = 2, timeout = 1), class = "Mlr3ErrorTimeout") }) test_that("wait for workers works with both n and worker ids", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1 ) rush$wait_for_workers(n = 1, worker_ids = worker_ids, timeout = 5) expect_equal(rush$n_running_workers, 1) expect_error( rush$wait_for_workers(n = 2, worker_ids = worker_ids, timeout = 1), class = "Mlr3ErrorConfig", regexp = "Number of workers to wait for" ) expect_error(rush$wait_for_workers(n = 1, worker_ids = "x", timeout = 1), class = "Mlr3ErrorTimeout") rush$wait_for_workers(n = 1, worker_ids = c(worker_ids, "x"), timeout = 1) expect_equal(rush$n_running_workers, 1) }) # local workers ---------------------------------------------------------------- test_that("local workers are started", { config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() walk(rush$processes_processx, function(process) process$kill()) }) expect_data_table(rush$worker_info, nrows = 0) worker_ids = rush$start_local_workers( worker_loop = wl_queue, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) walk(rush$processes_processx, function(process) expect_class(process, "process")) worker_info = rush$worker_info expect_data_table(worker_info, nrows = 1) expect_names(names(worker_info), must.include = c("worker_id", "pid", "hostname", "heartbeat", "state")) expect_integer(worker_info$pid, unique = TRUE) expect_set_equal(worker_ids, worker_info$worker_id) expect_set_equal(rush$worker_ids, worker_ids) expect_set_equal(rush$worker_info$state, "running") }) test_that("additional local workers are started", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() walk(rush$processes_processx, function(process) process$kill()) }) worker_ids = rush$start_local_workers( worker_loop = wl_queue, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) expect_equal(rush$n_workers, 1) worker_ids_2 = rush$start_local_workers( worker_loop = wl_queue, n_workers = 1 ) rush$wait_for_workers(2, timeout = 5) expect_length(rush$processes_processx, 2) walk(rush$processes_processx, function(process) expect_class(process, "process")) worker_info = rush$worker_info expect_data_table(worker_info, nrows = 2) expect_set_equal(c(worker_ids, worker_ids_2), worker_info$worker_id) expect_integer(worker_info$pid, unique = TRUE) expect_set_equal(rush$worker_info$state, "running") }) # start workers with script ---------------------------------------------------- test_that("heartbeat process is started", { skip_if_not_installed("callr") config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() }) expect_data_table(rush$worker_info, nrows = 0) script = rush$worker_script( worker_loop = wl_queue, heartbeat_period = 3, heartbeat_expire = 9 ) px = start_script_worker(script) on.exit( { px$kill() }, add = TRUE ) rush$wait_for_workers(1, timeout = 5) worker_info = rush$worker_info expect_logical(worker_info$heartbeat) }) # terminate workers ------------------------------------------------------------ test_that("a worker is terminated", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) worker_id_1 = rush$running_worker_ids[1] worker_id_2 = rush$running_worker_ids[2] # worker 1 rush$stop_workers(worker_ids = worker_id_1, type = "terminate") Sys.sleep(3) expect_false(unresolved(rush$processes_mirai[[worker_id_1]])) expect_true(unresolved(rush$processes_mirai[[worker_id_2]])) expect_equal(rush$running_worker_ids, worker_id_2) expect_equal(worker_id_1, rush$terminated_worker_ids) # worker 2 rush$stop_workers(worker_ids = worker_id_2, type = "terminate") Sys.sleep(3) expect_false(unresolved(rush$processes_mirai[[worker_id_2]])) expect_false(unresolved(rush$processes_mirai[[worker_id_1]])) expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids) expect_null(rush$running_worker_ids) }) test_that("reset workers works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_default, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) Sys.sleep(1) rush$reset(workers = TRUE) expect_data_table(rush$worker_info, nrows = 0) expect_null(rush$running_worker_ids) expect_null(rush$terminated_worker_ids) expect_null(rush$tasks) expect_null(rush$finished_tasks) }) test_that("reset data works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_default, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) Sys.sleep(1) expect_character(rush$finished_tasks) keys = rush$finished_tasks rush$reset(workers = FALSE) expect_true(all(keys %nin% rush$finished_tasks)) expect_equal(rush$n_running_workers, 1) expect_data_table(rush$worker_info, nrows = 1) }) # kill workers ----------------------------------------------------------------- test_that("a worker is killed", { rush = start_rush() on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_default, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) worker_id_1 = rush$running_worker_ids[1] worker_id_2 = rush$running_worker_ids[2] worker_info = rush$worker_info expect_true(all(tools::pskill(worker_info$pid, signal = 0L))) # worker 1 rush$stop_workers(worker_ids = worker_id_1, type = "kill") Sys.sleep(1) expect_equal(worker_id_1, rush$terminated_worker_ids) expect_equal(rush$running_worker_ids, worker_id_2) expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data)) expect_false(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data)) # worker 2 rush$stop_workers(worker_ids = worker_id_2, type = "kill") Sys.sleep(1) expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids) expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_1]]$data)) expect_true(mirai::is_error_value(rush$processes_mirai[[worker_id_2]]$data)) }) test_that("a local worker is killed", { config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() walk(rush$processes_processx, function(process) process$kill()) }) worker_ids = rush$start_local_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) worker_id_1 = rush$running_worker_ids[1] worker_id_2 = rush$running_worker_ids[2] rush$worker_info # worker 1 rush$stop_workers(worker_ids = worker_id_1, type = "kill") Sys.sleep(1) expect_equal(worker_id_1, rush$terminated_worker_ids) expect_false(rush$processes_processx[[worker_id_1]]$is_alive()) expect_true(rush$processes_processx[[worker_id_2]]$is_alive()) # worker 2 rush$stop_workers(worker_ids = worker_id_2, type = "kill") Sys.sleep(1) expect_set_equal(c(worker_id_1, worker_id_2), rush$terminated_worker_ids) expect_false(rush$processes_processx[[worker_id_1]]$is_alive()) expect_false(rush$processes_processx[[worker_id_2]]$is_alive()) }) test_that("worker is killed with a heartbeat process", { skip_if_not_installed("callr") config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() }) expect_data_table(rush$worker_info, nrows = 0) script = rush$worker_script( worker_loop = wl_queue, heartbeat_period = 3, heartbeat_expire = 9 ) px = start_script_worker(script) rush$wait_for_workers(1, timeout = 5) on.exit( { px$kill() }, add = TRUE ) worker_info = rush$worker_info expect_logical(worker_info$heartbeat) # signal 0L returns TRUE if the process is still alive expect_true(tools::pskill(worker_info$pid, signal = 0L)) rush$stop_workers(type = "kill") Sys.sleep(1) expect_false(tools::pskill(worker_info$pid, signal = 0L)) expect_true(rush$worker_info$state == "terminated") expect_equal(rush$terminated_worker_ids, worker_info$worker_id) }) # pushing tasks to the queue --------------------------------------------------- test_that("pushing a task to the queue works", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) expect_equal(rush$n_queued_tasks, 1) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) rush$wait_for_tasks(keys) # check task count expect_equal(rush$n_tasks, 1) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_running_tasks, 0) expect_equal(rush$n_finished_tasks, 1) expect_equal(rush$n_failed_tasks, 0) # check keys in sets expect_string(rush$tasks) expect_null(rush$queued_tasks) expect_null(rush$running_tasks) expect_string(rush$finished_tasks) expect_null(rush$failed_tasks) # check fetching expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_running_tasks(), nrows = 0) expect_data_table(rush$fetch_failed_tasks(), nrows = 0) data = rush$fetch_finished_tasks() expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys")) expect_data_table(data, nrows = 1) expect_data_table(rush$fetch_tasks(), nrows = 1) }) test_that("pushing multiple tasks to the queue works", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) xss = replicate(10, list(list(x1 = 1, x2 = 2))) keys = rush$push_tasks(xss) expect_equal(rush$n_queued_tasks, 10) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) rush$wait_for_tasks(keys) # check task count expect_equal(rush$n_tasks, 10) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_running_tasks, 0) expect_equal(rush$n_finished_tasks, 10) expect_equal(rush$n_failed_tasks, 0) # check keys in sets expect_character(rush$tasks, len = 10) expect_null(rush$queued_tasks) expect_null(rush$running_tasks) expect_character(rush$finished_tasks, len = 10) expect_null(rush$failed_tasks) # check fetching expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_running_tasks(), nrows = 0) expect_data_table(rush$fetch_failed_tasks(), nrows = 0) data = rush$fetch_finished_tasks() expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys")) expect_data_table(data, nrows = 10) expect_data_table(rush$fetch_tasks(), nrows = 10) }) test_that("pushing a task with extras to the queue works", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) xss = list(list(x1 = 1, x2 = 2)) timestamp = Sys.time() extra = list(list(timestamp = timestamp)) keys = rush$push_tasks(xss, extra) expect_equal(rush$n_queued_tasks, 1) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) rush$wait_for_tasks(keys) # check task count expect_equal(rush$n_tasks, 1) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_running_tasks, 0) expect_equal(rush$n_finished_tasks, 1) expect_equal(rush$n_failed_tasks, 0) # check keys in sets expect_string(rush$tasks) expect_null(rush$queued_tasks) expect_null(rush$running_tasks) expect_set_equal(rush$finished_tasks, keys) expect_null(rush$failed_tasks) # check fetchingtest-Rush.R:280:3' expect_false(rush$is_failed_task(keys)) }) test_that("pushing multiple tasks with extras to the queue works", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) xss = replicate(10, list(list(x1 = 1, x2 = 2))) timestamp = Sys.time() extra = list(list(timestamp = timestamp)) keys = rush$push_tasks(xss, extra) expect_equal(rush$n_queued_tasks, 10) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) rush$wait_for_tasks(keys) # check task count expect_equal(rush$n_tasks, 10) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_running_tasks, 0) expect_equal(rush$n_finished_tasks, 10) expect_equal(rush$n_failed_tasks, 0) # check keys in sets expect_character(rush$tasks, len = 10) expect_null(rush$queued_tasks) expect_null(rush$running_tasks) expect_set_equal(rush$finished_tasks, keys) expect_null(rush$failed_tasks) # check fetching expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_running_tasks(), nrows = 0) expect_data_table(rush$fetch_failed_tasks(), nrows = 0) data = rush$fetch_finished_tasks() expect_names(names(data), must.include = c("x1", "x2", "timestamp", "keys")) expect_data_table(data, nrows = 10) expect_equal(data$timestamp[[1]], timestamp[[1]]) expect_data_table(rush$fetch_tasks(), nrows = 10) # status checks expect_false(any(rush$is_running_task(keys))) expect_false(any(rush$is_failed_task(keys))) }) test_that("fetching tasks with vector parameters works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) xss = list( list(x1 = c(1, 2, 3), x2 = c(4, 5, 6)), list(x1 = c(7, 8, 9), x2 = c(10, 11, 12)) ) yss = list(list(y = 1), list(y = 2)) keys = rush$push_finished_tasks(xss, yss) data = rush$fetch_finished_tasks() expect_data_table(data, nrows = 2) expect_equal(data$x1[[1]], c(1, 2, 3)) expect_equal(data$x1[[2]], c(7, 8, 9)) expect_data_table(rush$fetch_tasks(), nrows = 2) }) test_that("empty queue works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_loop_sleep = function(rush) { while (TRUE) { Sys.sleep(1) } } worker_ids = rush$start_workers( worker_loop = worker_loop_sleep, n_workers = 1 ) xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) Sys.sleep(1) rush$empty_queue() expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_failed_tasks(), nrows = 1) xss = list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)) keys = rush$push_tasks(xss) Sys.sleep(1) rush$empty_queue() expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_failed_tasks(), nrows = 3) }) # segfault detection ----------------------------------------------------------- test_that("segfaults on mirai workers are detected", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_segfault, n_workers = 2 ) rush$wait_for_workers(1, timeout = 5) # wait until a lost worker is detected but timeout after 10 seconds start_time = Sys.time() lost_workers = character() while (start_time + 10 > Sys.time()) { lost_workers = c(lost_workers, rush$detect_lost_workers()) if (length(lost_workers) == 2) break } expect_character(rush$terminated_worker_ids, len = 2) expect_set_equal(rush$terminated_worker_ids, lost_workers) expect_data_table(rush$fetch_failed_tasks(), nrows = 2) data = rush$fetch_failed_tasks() expect_set_equal(data$message, "Worker has crashed or was killed") }) test_that("segfaults on processx workers are detected", { config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() walk(rush$processes_processx, function(process) process$kill()) }) worker_ids = rush$start_local_workers( worker_loop = wl_segfault, n_workers = 2 ) rush$wait_for_workers(2, timeout = 5) # wait until a lost worker is detected but timeout after 10 seconds start_time = Sys.time() lost_workers = character() while (start_time + 10 > Sys.time()) { lost_workers = c(lost_workers, rush$detect_lost_workers()) if (length(lost_workers) == 2) break } expect_character(rush$terminated_worker_ids, len = 2) expect_set_equal(rush$terminated_worker_ids, lost_workers) expect_data_table(rush$fetch_failed_tasks(), nrows = 2) data = rush$fetch_failed_tasks() expect_set_equal(data$message, "Worker has crashed or was killed") }) test_that("a segfault on a single worker is detected via heartbeat", { skip_if_not_installed("callr") config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() }) expect_data_table(rush$worker_info, nrows = 0) script = rush$worker_script( worker_loop = wl_segfault, heartbeat_period = 1, heartbeat_expire = 2 ) px = start_script_worker(script) rush$wait_for_workers(1, timeout = 10) on.exit( { px$kill() }, add = TRUE ) expect_null(rush$terminated_worker_ids) # wait until a lost worker is detected but timeout after 10 seconds start_time = Sys.time() while (start_time + 10 > Sys.time()) { lost_workers = rush$detect_lost_workers() if (length(lost_workers)) break } expect_character(rush$terminated_worker_ids, len = 1) expect_set_equal(rush$terminated_worker_ids, lost_workers) expect_data_table(rush$fetch_failed_tasks(), nrows = 1) data = rush$fetch_failed_tasks() expect_equal(data$message, "Worker has crashed or was killed") }) test_that("segfaults on multiple workers are detected via the heartbeat", { skip_if_not_installed("callr") config = redis_configuration() rush = rsh(config = config) on.exit({ rush$reset() }) expect_data_table(rush$worker_info, nrows = 0) script = rush$worker_script( worker_loop = wl_segfault, heartbeat_period = 1, heartbeat_expire = 2 ) px_1 = start_script_worker(script) px_2 = start_script_worker(script) rush$wait_for_workers(2, timeout = 10) on.exit( { px_1$kill() px_2$kill() }, add = TRUE ) expect_null(rush$terminated_worker_ids) # wait until all lost workers are detected but timeout after 10 seconds start_time = Sys.time() lost_workers = character() while (start_time + 10 > Sys.time()) { lost_workers = c(lost_workers, rush$detect_lost_workers()) if (length(lost_workers) == 2) break } expect_character(rush$terminated_worker_ids, len = 2) expect_set_equal(rush$terminated_worker_ids, lost_workers) expect_data_table(rush$fetch_failed_tasks(), nrows = 2) data = rush$fetch_failed_tasks() expect_set_equal(data$message, "Worker has crashed or was killed") }) test_that("wait for tasks works when a task gets lost", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_segfault, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2)) keys = rush$push_tasks(xss) expect_class(rush$wait_for_tasks(keys, detect_lost_workers = TRUE), "Rush") }) # logging ---------------------------------------------------------------------- test_that("saving lgr logs works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1, lgr_thresholds = c("mlr3/rush" = "debug") ) rush$wait_for_workers(1, timeout = 5) Sys.sleep(5) xss = list(list(x1 = 2, x2 = 2)) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys) Sys.sleep(5) log = rush$read_log() expect_data_table(log, min.rows = 1L) expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg")) log = rush$read_log(time_difference = TRUE) expect_data_table(log, min.rows = 1L) expect_names(names(log), must.include = c("time_difference")) expect_class(log$time_difference, "difftime") xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2), list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys) Sys.sleep(5) log = rush$read_log() expect_data_table(log, min.rows = 2L) expect_names(names(log), must.include = c("worker_id", "timestamp", "logger", "caller", "msg")) }) test_that("saving logs with redis appender works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1, lgr_thresholds = c("mlr3/rush" = "debug"), lgr_buffer_size = 1 ) rush$wait_for_workers(1, timeout = 5) log = rush$read_log() expect_data_table(log, min.rows = 1) expect_names(colnames(log), identical.to = c("worker_id", "level", "timestamp", "logger", "caller", "msg")) }) test_that("error and output logs work", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) message_log = tempdir() output_log = tempdir() worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 1, lgr_thresholds = c("mlr3/rush" = "debug"), lgr_buffer_size = 1, message_log = message_log, output_log = output_log ) rush$wait_for_workers(1) expect_match( readLines(file.path(message_log, sprintf("message_%s.log", worker_ids[1])))[1], "Debug message logging on worker" ) expect_match( readLines(file.path(output_log, sprintf("output_%s.log", worker_ids[1])))[1], "Debug output logging on worker" ) }) # misc-------------------------------------------------------------------------- test_that("caching results works", { rush = start_rush(n_workers = 2) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_queue, n_workers = 2, lgr_thresholds = c("mlr3/rush" = "debug") ) rush$wait_for_workers(2, timeout = 5) xss = replicate(10, list(list(x1 = 1, x2 = 2))) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys) expect_data_table(rush$fetch_finished_tasks(), nrows = 10) expect_data_table(get_private(rush)$.cached_tasks, nrows = 10) expect_data_table(rush$fetch_finished_tasks(), nrows = 10) expect_data_table(get_private(rush)$.cached_tasks, nrows = 10) xss = replicate(10, list(list(x1 = 1, x2 = 2))) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys) expect_data_table(rush$fetch_finished_tasks(), nrows = 20) expect_data_table(get_private(rush)$.cached_tasks, nrows = 20) }) test_that("reconnecting rush instance works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) on.exit({ file.remove("rush.rds") }) saveRDS(rush, file = "rush.rds") rush = readRDS("rush.rds") expect_error(rush$print(), "Context is not connected") rush$reconnect() expect_r6(rush, "Rush") }) test_that("large objects limit works", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) old_max_object_size = getOption("rush.max_object_size") on.exit(options(rush.max_object_size = old_max_object_size)) options(rush.max_object_size = 1) large_vector = runif(1e6) worker_loop = function(rush, large_vector) { rush$push_running_tasks(list(list(x1 = 1, x2 = length(large_vector)))) } expect_error( rush$start_workers( worker_loop = worker_loop, large_vector = large_vector, n_workers = 1 ), class = "Mlr3ErrorConfig" ) rush_plan(n_workers = 1, large_objects_path = tempdir()) rush$start_workers( worker_loop = worker_loop, large_vector = large_vector, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) Sys.sleep(1) expect_equal(rush$fetch_tasks()$x2, 1e6) }) test_that("worker_loop environment is stripped before serialization", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) # large_data (~8 MB) lives in the same local environment as worker_loop. # without crate(), R's closure mechanism would serialize it along with the function. local({ large_data = runif(1e6) worker_loop = function(rush) { rush$push_running_tasks(list(list(x1 = 1, x2 = 2))) } rush$.__enclos_env__$private$.push_worker_config(worker_loop = worker_loop) r = redux::hiredis(rush$config) bin = r$GET(sprintf("%s:start_args", rush$network_id)) expect_lt(as.numeric(object.size(bin)), 1e6) }) }) test_that("simple errors are pushed as failed tasks", { rush = start_rush(n_workers = 1) on.exit({ rush$reset() mirai::daemons(0) }) worker_ids = rush$start_workers( worker_loop = wl_fail, n_workers = 1 ) rush$wait_for_workers(1, timeout = 5) xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2)) keys = rush$push_tasks(xss) rush$wait_for_tasks(keys, detect_lost_workers = TRUE) Sys.sleep(2) # check task count expect_equal(rush$n_tasks, 2) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_running_tasks, 0) expect_equal(rush$n_finished_tasks, 1) expect_equal(rush$n_failed_tasks, 1) # check keys in sets expect_character(rush$tasks, len = 2) expect_null(rush$queued_tasks) expect_null(rush$running_tasks) expect_string(rush$finished_tasks) expect_string(rush$failed_tasks) # check fetching expect_data_table(rush$fetch_queued_tasks(), nrows = 0) expect_data_table(rush$fetch_running_tasks(), nrows = 0) expect_data_table(rush$fetch_tasks(), nrows = 2) data = rush$fetch_finished_tasks() expect_names(names(data), must.include = c("x1", "x2", "worker_id", "y", "keys")) expect_data_table(data, nrows = 1) data = rush$fetch_failed_tasks() expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys")) expect_data_table(data, nrows = 1) })