# Tests for parallel processing functionality # Tests cover: parallel_config.R, progress_utils.R, and parallel operations library(testthat) library(riemtan) library(Matrix) # Helper function to create test SPD matrices create_test_spd <- function(p = 4, n = 10) { lapply(1:n, function(i) { mat <- diag(p) + matrix(rnorm(p*p, 0, 0.1), p, p) mat <- (mat + t(mat)) / 2 mat <- mat + diag(p) * 0.5 Matrix::nearPD(mat)$mat |> Matrix::pack() }) } # ============================================================================== # Test parallel_config.R functions # ============================================================================== test_that("set_parallel_plan() configures strategies correctly", { # Test sequential set_parallel_plan("sequential") expect_false(is_parallel_enabled()) expect_equal(get_n_workers(), 1) # Test multisession set_parallel_plan("multisession", workers = 2) expect_true(is_parallel_enabled()) expect_equal(get_n_workers(), 2) # Reset reset_parallel_plan() expect_false(is_parallel_enabled()) }) test_that("set_parallel_plan() validates strategy parameter", { expect_error( set_parallel_plan("invalid_strategy"), "Invalid strategy" ) # future package validates workers >= 1 expect_error( set_parallel_plan("multisession", workers = -1), "workers" ) }) test_that("is_parallel_enabled() detects parallel state correctly", { # Sequential set_parallel_plan("sequential") expect_false(is_parallel_enabled()) # Parallel set_parallel_plan("multisession", workers = 2) expect_true(is_parallel_enabled()) # Reset reset_parallel_plan() expect_false(is_parallel_enabled()) }) test_that("should_parallelize() respects threshold and parallel state", { # Sequential plan: never parallelize set_parallel_plan("sequential") expect_false(should_parallelize(100)) expect_false(should_parallelize(100, threshold = 10)) # Parallel plan: respect threshold set_parallel_plan("multisession", workers = 2) expect_true(should_parallelize(100, threshold = 10)) expect_false(should_parallelize(5, threshold = 10)) expect_true(should_parallelize(10, threshold = 10)) # Exactly at threshold # Reset reset_parallel_plan() }) test_that("should_parallelize() validates inputs", { expect_error(should_parallelize(-1), "non-negative") expect_error(should_parallelize("100"), "numeric") expect_error(should_parallelize(100, threshold = -1), "positive") expect_error(should_parallelize(100, threshold = "10"), "numeric") }) test_that("get_n_workers() returns correct count", { # Sequential set_parallel_plan("sequential") expect_equal(get_n_workers(), 1) # Multisession with 2 workers set_parallel_plan("multisession", workers = 2) expect_equal(get_n_workers(), 2) # Multisession with 4 workers set_parallel_plan("multisession", workers = 4) expect_equal(get_n_workers(), 4) # Reset reset_parallel_plan() }) test_that("reset_parallel_plan() resets to sequential", { # Set parallel set_parallel_plan("multisession", workers = 4) expect_true(is_parallel_enabled()) # Reset reset_parallel_plan() expect_false(is_parallel_enabled()) expect_equal(get_n_workers(), 1) }) # ============================================================================== # Test progress_utils.R functions # ============================================================================== test_that("is_progress_available() checks progressr correctly", { has_progressr <- requireNamespace("progressr", quietly = TRUE) expect_equal(is_progress_available(), has_progressr) }) test_that("create_progressor() handles enable parameter", { # Disabled: should return no-op function p <- create_progressor(10, enable = FALSE) expect_silent(p()) # Should not error # Enabled with progressr available if (requireNamespace("progressr", quietly = TRUE)) { p <- create_progressor(10, enable = TRUE) expect_silent(p()) # Should not error } }) test_that("with_progress() handles enable parameter", { # Disabled result <- with_progress({ 1 + 1 }, name = "Test", enable = FALSE) expect_equal(result, 2) # Enabled (may or may not have progressr) result <- with_progress({ 2 + 2 }, name = "Test", enable = TRUE) expect_equal(result, 4) }) # ============================================================================== # Test parallel vs sequential equivalence # ============================================================================== test_that("Parallel and sequential compute_tangents() produce identical results", { skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 15) # Sequential set_parallel_plan("sequential") sample_seq <- CSample$new(conns = connectomes, metric_obj = airm) sample_seq$compute_tangents() tangents_seq <- sample_seq$tangent_images # Parallel set_parallel_plan("multisession", workers = 2) sample_par <- CSample$new(conns = connectomes, metric_obj = airm) sample_par$compute_tangents() tangents_par <- sample_par$tangent_images # Compare expect_equal(length(tangents_seq), length(tangents_par)) for (i in seq_along(tangents_seq)) { expect_equal( as.matrix(tangents_seq[[i]]), as.matrix(tangents_par[[i]]), tolerance = 1e-10 ) } # Reset reset_parallel_plan() }) test_that("Parallel and sequential compute_vecs() produce identical results", { skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 15) # Sequential set_parallel_plan("sequential") sample_seq <- CSample$new(conns = connectomes, metric_obj = airm) sample_seq$compute_tangents() sample_seq$compute_vecs() vecs_seq <- sample_seq$vector_images # Parallel set_parallel_plan("multisession", workers = 2) sample_par <- CSample$new(conns = connectomes, metric_obj = airm) sample_par$compute_tangents() sample_par$compute_vecs() vecs_par <- sample_par$vector_images # Compare expect_equal(vecs_seq, vecs_par, tolerance = 1e-10) # Reset reset_parallel_plan() }) test_that("Parallel and sequential compute_fmean() produce identical results", { skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 15) # Sequential set_parallel_plan("sequential") sample_seq <- CSample$new(conns = connectomes, metric_obj = airm) sample_seq$compute_fmean(tol = 0.01, max_iter = 50) fmean_seq <- sample_seq$frechet_mean # Parallel set_parallel_plan("multisession", workers = 2) sample_par <- CSample$new(conns = connectomes, metric_obj = airm) sample_par$compute_fmean(tol = 0.01, max_iter = 50) fmean_par <- sample_par$frechet_mean # Compare expect_equal( as.matrix(fmean_seq), as.matrix(fmean_par), tolerance = 1e-6 # Slightly looser tolerance for iterative algorithm ) # Reset reset_parallel_plan() }) test_that("Parallel and sequential relocate() produce identical results", { skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 15) # Create two reference points old_ref <- connectomes[[1]] new_ref <- connectomes[[2]] # Compute tangent images at old_ref tangents <- lapply(connectomes[3:15], function(conn) { airm$log(old_ref, conn) }) # Sequential relocate set_parallel_plan("sequential") relocated_seq <- relocate(old_ref, new_ref, tangents, airm, progress = FALSE) # Parallel relocate set_parallel_plan("multisession", workers = 2) relocated_par <- relocate(old_ref, new_ref, tangents, airm, progress = FALSE) # Compare expect_equal(length(relocated_seq), length(relocated_par)) for (i in seq_along(relocated_seq)) { expect_equal( as.matrix(relocated_seq[[i]]), as.matrix(relocated_par[[i]]), tolerance = 1e-10 ) } # Reset reset_parallel_plan() }) # ============================================================================== # Test ParquetBackend parallel operations # ============================================================================== test_that("ParquetBackend parallel loading produces correct results", { skip_if_not_installed("arrow") skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 12) # Write to Parquet temp_dir <- tempfile() dir.create(temp_dir) write_connectomes_to_parquet( connectomes, output_dir = temp_dir, subject_ids = paste0("subj_", 1:12), overwrite = TRUE ) # Sequential loading backend_seq <- create_parquet_backend(temp_dir, cache_size = 5) set_parallel_plan("sequential") matrices_seq <- backend_seq$get_all_matrices(parallel = FALSE) # Parallel loading backend_par <- create_parquet_backend(temp_dir, cache_size = 5) set_parallel_plan("multisession", workers = 2) matrices_par <- backend_par$get_all_matrices(parallel = TRUE) # Compare expect_equal(length(matrices_seq), length(matrices_par)) for (i in seq_along(matrices_seq)) { expect_equal( as.matrix(matrices_seq[[i]]), as.matrix(matrices_par[[i]]), tolerance = 1e-10 ) } # Cleanup reset_parallel_plan() unlink(temp_dir, recursive = TRUE) }) test_that("ParquetBackend$get_matrices_parallel() works correctly", { skip_if_not_installed("arrow") skip_if_not_installed("Matrix") # Create test data connectomes <- create_test_spd(p = 4, n = 12) # Write to Parquet temp_dir <- tempfile() dir.create(temp_dir) write_connectomes_to_parquet( connectomes, output_dir = temp_dir, subject_ids = paste0("subj_", 1:12), overwrite = TRUE ) # Create backend backend <- create_parquet_backend(temp_dir, cache_size = 5) # Load subset set_parallel_plan("multisession", workers = 2) subset <- backend$get_matrices_parallel(c(1, 3, 5, 7), progress = FALSE) # Verify expect_equal(length(subset), 4) expect_equal(as.matrix(subset[[1]]), as.matrix(connectomes[[1]]), tolerance = 1e-10, check.attributes = FALSE) expect_equal(as.matrix(subset[[2]]), as.matrix(connectomes[[3]]), tolerance = 1e-10, check.attributes = FALSE) # Cleanup reset_parallel_plan() unlink(temp_dir, recursive = TRUE) }) # ============================================================================== # Test CSample$load_connectomes_batched() # ============================================================================== test_that("CSample$load_connectomes_batched() works with ParquetBackend", { skip_if_not_installed("arrow") skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 20) # Write to Parquet temp_dir <- tempfile() dir.create(temp_dir) write_connectomes_to_parquet( connectomes, output_dir = temp_dir, subject_ids = paste0("subj_", 1:20), overwrite = TRUE ) # Create sample backend <- create_parquet_backend(temp_dir, cache_size = 5) sample <- CSample$new(backend = backend, metric_obj = airm) # Load in batches set_parallel_plan("multisession", workers = 2) loaded <- sample$load_connectomes_batched( indices = 1:15, batch_size = 5, progress = FALSE ) # Verify expect_equal(length(loaded), 15) for (i in 1:15) { expect_equal( as.matrix(loaded[[i]]), as.matrix(connectomes[[i]]), tolerance = 1e-10, check.attributes = FALSE ) } # Cleanup reset_parallel_plan() unlink(temp_dir, recursive = TRUE) }) test_that("CSample$load_connectomes_batched() validates indices", { skip_if_not_installed("arrow") skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 10) # Write to Parquet temp_dir <- tempfile() dir.create(temp_dir) write_connectomes_to_parquet( connectomes, output_dir = temp_dir, subject_ids = paste0("subj_", 1:10), overwrite = TRUE ) # Create sample backend <- create_parquet_backend(temp_dir) sample <- CSample$new(backend = backend, metric_obj = airm) # Invalid indices expect_error( sample$load_connectomes_batched(indices = c(1, 15), batch_size = 5), "must be in range" ) expect_error( sample$load_connectomes_batched(indices = c(0, 5), batch_size = 5), "must be in range" ) # Cleanup unlink(temp_dir, recursive = TRUE) }) # ============================================================================== # Test auto-detection logic # ============================================================================== test_that("Auto-detection prevents parallelization for small datasets", { skip_if_not_installed("Matrix") # Create small dataset (n = 5, below default threshold of 10) data(airm) connectomes <- create_test_spd(p = 4, n = 5) # Enable parallel plan set_parallel_plan("multisession", workers = 2) # Create sample and compute sample <- CSample$new(conns = connectomes, metric_obj = airm) # Should use sequential due to auto-detection # (We can't directly test this, but we can verify it doesn't error) expect_silent(sample$compute_tangents()) expect_silent(sample$compute_vecs()) # Reset reset_parallel_plan() }) test_that("Auto-detection enables parallelization for large datasets", { skip_if_not_installed("Matrix") # Create large dataset (n = 15, above default threshold of 10) data(airm) connectomes <- create_test_spd(p = 4, n = 15) # Enable parallel plan set_parallel_plan("multisession", workers = 2) expect_true(is_parallel_enabled()) # Create sample and compute sample <- CSample$new(conns = connectomes, metric_obj = airm) # Should use parallel processing expect_silent(sample$compute_tangents()) expect_silent(sample$compute_vecs()) # Results should be correct expect_equal(nrow(sample$vector_images), 15) # Reset reset_parallel_plan() }) # ============================================================================== # Test progress parameter pass-through # ============================================================================== test_that("Progress parameter is accepted by all methods", { skip_if_not_installed("Matrix") # Create test data data(airm) connectomes <- create_test_spd(p = 4, n = 12) # Create sample sample <- CSample$new(conns = connectomes, metric_obj = airm) # All methods should accept progress parameter without error expect_silent(sample$compute_tangents(progress = FALSE)) expect_silent(sample$compute_vecs(progress = FALSE)) expect_silent(sample$compute_conns(progress = FALSE)) expect_silent(sample$compute_unvecs(progress = FALSE)) # compute_fmean produces convergence messages (expected behavior), so just suppress them suppressMessages(sample$compute_fmean(progress = FALSE)) expect_silent(sample$change_ref_pt(connectomes[[2]], progress = FALSE)) }) # ============================================================================== # Test edge cases # ============================================================================== test_that("Parallel processing handles single matrix correctly", { skip_if_not_installed("Matrix") # Create single matrix data(airm) connectomes <- create_test_spd(p = 4, n = 1) # Enable parallel set_parallel_plan("multisession", workers = 2) # Create sample (should handle n=1 gracefully) sample <- CSample$new(conns = connectomes, metric_obj = airm) # Should work without error (will use sequential due to auto-detection) expect_silent(sample$compute_tangents()) expect_equal(length(sample$tangent_images), 1) # Reset reset_parallel_plan() }) test_that("Parallel processing handles exact threshold correctly", { skip_if_not_installed("Matrix") # Create dataset exactly at threshold (n = 10) data(airm) connectomes <- create_test_spd(p = 4, n = 10) # Enable parallel set_parallel_plan("multisession", workers = 2) # Create sample sample <- CSample$new(conns = connectomes, metric_obj = airm) # Should enable parallelization (n >= threshold) expect_silent(sample$compute_tangents()) expect_equal(length(sample$tangent_images), 10) # Reset reset_parallel_plan() }) # ============================================================================== # Test strategy fallback # ============================================================================== test_that("multicore strategy falls back to multisession on Windows", { skip_if(.Platform$OS.type != "windows") # Request multicore on Windows expect_warning( set_parallel_plan("multicore", workers = 2), "not available on Windows" ) # Should still enable parallel processing (via multisession) expect_true(is_parallel_enabled()) # Reset reset_parallel_plan() }) # ============================================================================== # Cleanup # ============================================================================== # Ensure parallel plan is reset after all tests reset_parallel_plan()