# --- tf_warp / tf_align ------------------------------------------------------- quiet_expected_registration_warnings <- function(expr) { expected_patterns <- c( "evaluations were `NA`", "All .* evaluations on `arg =", "Returning irregular \\.", "Returning regular data on the reduced grid\\.", "argument-wise `mean` were `NA`", "Attempting - for different argument values", "interpolate = FALSE.*no values present", "`NA`s created\\.", "collapsing to unique 'x' values" ) withCallingHandlers( expr, warning = function(w) { msg <- conditionMessage(w) is_expected <- any(vapply( expected_patterns, \(pat) grepl(pat, msg, perl = TRUE), logical(1) )) if (is_expected) { invokeRestart("muffleWarning") } stop(w) } ) } test_that("tf_warp and tf_align are inverses", { withr::local_seed(1234) # Two representative domains: one non-standard, one wide min_t <- c(-2, 1) max_t <- c(3, 10) walk2(min_t, max_t, function(min_t, max_t) { t <- seq(min_t, max_t, length.out = 101) f0 <- tfd(sin((t - min_t) / (max_t - min_t) * 2 * pi), t) x <- rep(f0, 10) w <- tf_rgp(10, t) |> exp() |> tf_integrate(definite = FALSE) w_min <- tf_fmin(w) w_max <- tf_fmax(w) w <- (w - w_min) / (w_max - w_min) * (max_t - min_t) + min_t # Warp: class, length, domain unreg <- tf_warp(x, w) expect_s3_class(unreg, "tfd_reg") expect_length(unreg, length(x)) expect_identical(tf_domain(unreg), tf_domain(x)) # keep_new_arg = TRUE returns tfd_irreg expect_s3_class(tf_warp(x, w, keep_new_arg = TRUE), "tfd_irreg") expect_s3_class(tf_align(x, w, keep_new_arg = TRUE), "tfd_irreg") # Both inverse directions expect_equal(tf_align(tf_warp(x, w), w), x, tolerance = 0.1) expect_equal(tf_warp(tf_align(x, w), w), x, tolerance = 0.1) }) }) test_that("tf_warp/tf_align identity roundtrip", { t <- seq(-1, 2, length.out = 50) f0 <- tfd(cos(t), t) x <- rep(f0, 5) w <- rep(tfd(t, arg = t), 5) # Identity warp: both directions exact expect_identical(tf_align(tf_warp(x, w), w), x) expect_identical(tf_warp(tf_align(x, w), w), x) # Also with keep_new_arg expect_identical(tf_align(tf_warp(x, w, keep_new_arg = TRUE), w), x) expect_identical(tf_warp(tf_align(x, w, keep_new_arg = TRUE), w), x) }) test_that("tf_warp and tf_align accept tfb warps", { t <- seq(-10, 10, length.out = 101) x <- tfd( t(sapply(c(-0.2, 0, 0.15), \(s) sin((t - s) * 0.3))), arg = t ) warp_tfd <- rep(tfd(t, arg = t), length(x)) warp_tfb <- suppressMessages(tfb(warp_tfd, k = 9)) expect_no_error(x_unreg <- tf_warp(x, warp_tfb)) expect_no_error(x_reg <- tf_align(x_unreg, warp_tfb)) expect_s3_class(x_unreg, "tfd") expect_s3_class(x_reg, "tfd") }) test_that("tf_warp/tf_align reset keep_new_arg for tfb and warn", { t <- seq(0, 1, length.out = 51) x_tfb <- suppressMessages(tfb( matrix(sin(2 * pi * t), nrow = 1), arg = t, k = 9 )) warp <- rep(tfd(t, arg = t), length(x_tfb)) expect_warning( warped <- tf_warp(x_tfb, warp, keep_new_arg = TRUE), "reset to FALSE" ) expect_s3_class(warped, "tfb") expect_warning( unwarped <- tf_align(x_tfb, warp, keep_new_arg = TRUE), "reset to FALSE" ) expect_s3_class(unwarped, "tfb") }) test_that("tf_align handles irregular grids with non-domain-preserving warps", { arg_x <- list( seq(0, 1, length.out = 41), seq(0, 1, length.out = 73) ) x <- tfd( list( sin(2 * pi * arg_x[[1]]), cos(2 * pi * arg_x[[2]]) ), arg = arg_x ) warp_arg <- seq(0, 1, length.out = 101) warp_tfd <- tfd( rbind( warp_arg + 0.15, warp_arg - 0.12 ), arg = warp_arg ) warp_tfb <- suppressMessages(tfb(warp_tfd, k = 9)) ret_irreg <- tf_align(x, warp_tfd, keep_new_arg = TRUE) expect_s3_class(ret_irreg, "tfd_irreg") expect_length(ret_irreg, length(x)) expect_identical(tf_domain(ret_irreg), tf_domain(x)) ret_irreg_tfb <- tf_align(x, warp_tfb, keep_new_arg = TRUE) expect_s3_class(ret_irreg_tfb, "tfd_irreg") expect_length(ret_irreg_tfb, length(x)) expect_identical(tf_domain(ret_irreg_tfb), tf_domain(x)) ret_grid <- quiet_expected_registration_warnings( tf_align(x, warp_tfd, keep_new_arg = FALSE) ) expect_length(ret_grid, length(x)) expect_identical(tf_domain(ret_grid), tf_domain(x)) }) # --- tf_registration class ---------------------------------------------------- test_that("tf_register returns tf_registration object", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift") ) expect_s3_class(reg, "tf_registration") expect_length(reg, length(x)) # Accessors work expect_s3_class(tf_aligned(reg), "tfd") expect_s3_class(tf_inv_warps(reg), "tfd") expect_s3_class(tf_template(reg), "tfd") expect_length(tf_aligned(reg), length(x)) expect_length(tf_inv_warps(reg), length(x)) expect_length(tf_template(reg), 1) # Direct slot access expect_identical(reg$registered, tf_aligned(reg)) expect_identical(reg$inv_warps, tf_inv_warps(reg)) expect_identical(reg$template, tf_template(reg)) expect_true(is.call(reg$call)) expect_identical(reg$x, x) }) test_that("tf_register store_x = FALSE omits original data", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift", store_x = FALSE) ) expect_s3_class(reg, "tf_registration") expect_null(reg$x) }) test_that("tf_registration print works", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift") ) expect_no_error(print(reg)) }) test_that("tf_registration summary works", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift") ) s <- summary(reg) expect_s3_class(s, "summary.tf_registration") expect_true(is.call(s$call)) expect_equal(s$n, 3) expect_true(is.finite(s$amp_var_reduction)) # Warp deviation quantiles (relative to domain length) expect_length(s$inv_warp_dev_quantiles, 7) expect_true(all(is.finite(s$inv_warp_dev_quantiles))) expect_true(all(s$inv_warp_dev_quantiles >= 0)) # Domain loss quantiles expect_length(s$domain_loss_quantiles, 7) expect_true(all(is.finite(s$domain_loss_quantiles))) # Warp slope stats expect_length(s$inv_warp_slope_range, 2) expect_true(s$inv_warp_slope_range["min"] > 0) # monotone warps expect_length(s$inv_warp_min_slopes, 7) expect_length(s$inv_warp_max_slopes, 7) expect_true(s$has_original) expect_no_error(print(s)) # summary with store_x = FALSE reg2 <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift", store_x = FALSE) ) s2 <- summary(reg2) expect_true(is.na(s2$amp_var_reduction)) expect_false(s2$has_original) }) test_that("tf_registration plot works", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift") ) pdf(tempfile(fileext = ".pdf")) expect_no_error(plot(reg)) dev.off() # Also with store_x = FALSE (2 panels instead of 3) reg2 <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift", store_x = FALSE) ) pdf(tempfile(fileext = ".pdf")) expect_no_error(plot(reg2)) dev.off() }) test_that("tf_registration subsetting works", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) reg <- quiet_expected_registration_warnings( tf_register(x, method = "affine", type = "shift") ) sub <- reg[1:2] expect_s3_class(sub, "tf_registration") expect_length(sub, 2) expect_length(tf_aligned(sub), 2) expect_length(tf_inv_warps(sub), 2) expect_length(tf_template(sub), 1) # template unchanged expect_length(sub$x, 2) }) # --- tf_estimate_warps -------------------------------------------------------- test_that("tf_estimate_warps works for SRVF and CC methods", { skip_if_not_installed("fdasrvf") withr::local_seed(1234) n_curves <- 3 t <- seq(0, 2 * pi, length.out = 61) data <- sapply(seq_len(n_curves), \(i) { phase_shift <- runif(1, -pi / 2, pi / 2) amplitude <- runif(1, 0.8, 1.2) amplitude * sin(t + phase_shift) + rnorm(length(t), 0, 0.1) }) check_warp <- function(warp, x, label = "") { domain <- tf_domain(x) expect_s3_class(warp, "tfd") expect_length(warp, length(x)) expect_identical(tf_domain(warp), domain) expect_true(all(tf_fmin(warp) >= domain[1]), label = label) expect_true(all(tf_fmax(warp) <= domain[2]), label = label) } x <- tfd(t(data)) # SRVF: default (Karcher mean) and explicit template check_warp(tf_estimate_warps(x), x, "SRVF default") check_warp(tf_estimate_warps(x, template = mean(x)), x, "SRVF template") # CC: default and non-default crit (tests ... passthrough) check_warp( quiet_expected_registration_warnings(tf_estimate_warps( x, method = "cc", max_iter = 1, iterlim = 1 )), x, "CC smoke" ) # tfd_irreg: SRVF/CC are currently unsupported expect_error( tf_estimate_warps(tfd(t(data)) |> tf_sparsify(.2), method = "srvf"), "only `affine` and `landmark` registration are currently supported" ) expect_error( tf_estimate_warps(tfd(t(data)) |> tf_sparsify(.2), method = "cc"), "only `affine` and `landmark` registration are currently supported" ) }) test_that("tf_estimate_warps returns tfd (not tf_registration)", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) warp <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", type = "shift") ) expect_s3_class(warp, "tfd") expect_false(inherits(warp, "tf_registration")) expect_true(!is.null(attr(warp, "template"))) }) test_that("tf_estimate_warps supports affine registration for irregular tfd", { withr::local_seed(99) t <- seq(0, 1, length.out = 101) shifts <- c(-0.09, 0, 0.08) x_vals <- lapply( shifts, \(s) sin(2 * pi * (t + s)) ) arg_list <- list( sort(c(seq(0, 1, length.out = 45), runif(8))), seq(0, 1, length.out = 63), seq(0, 1, length.out = 87) ) x_irreg <- tfd( lapply(seq_along(arg_list), \(i) { approx(t, x_vals[[i]], xout = arg_list[[i]], rule = 2)$y }), arg = arg_list ) template <- tfd( sin(2 * pi * seq(0, 1, length.out = 51)), arg = seq(0, 1, length.out = 51) ) warp <- quiet_expected_registration_warnings(tf_estimate_warps( x_irreg, method = "affine", type = "shift", template = template, max_iter = 2L )) expect_s3_class(warp, "tfd_irreg") expect_length(warp, length(x_irreg)) expect_identical(tf_domain(warp), tf_domain(x_irreg)) expect_identical(tf_arg(warp), tf_arg(x_irreg)) warp_vals <- tf_evaluations(warp) for (i in seq_along(warp_vals)) { expect_true(all(diff(warp_vals[[i]]) > 0)) expect_equal(warp_vals[[i]][1], tf_domain(x_irreg)[1], tolerance = 0.2) expect_equal( warp_vals[[i]][length(warp_vals[[i]])], tf_domain(x_irreg)[2], tolerance = 0.2 ) } }) test_that("tf_estimate_warps supports landmark registration for irregular tfd", { withr::local_seed(321) peaks <- c(0.3, 0.5, 0.7) arg_list <- lapply(c(51, 71, 93), \(n) sort(c(0, 1, runif(n - 2)))) x_irreg <- tfd( lapply(seq_along(peaks), \(i) dnorm(arg_list[[i]], peaks[i], 0.08)), arg = arg_list ) landmarks <- matrix(peaks, ncol = 1) warp <- tf_estimate_warps( x_irreg, method = "landmark", landmarks = landmarks ) expect_s3_class(warp, "tfd_irreg") expect_length(warp, length(x_irreg)) expect_identical(tf_arg(warp), tf_arg(x_irreg)) expect_identical(tf_domain(warp), tf_domain(x_irreg)) target_peak <- mean(peaks) warp_at_target <- as.numeric(tf_evaluate(warp, arg = target_peak)) expect_equal(warp_at_target, peaks, tolerance = 0.02) }) test_that("tf_estimate_warps accepts CC warp basis controls", { withr::local_seed(123) t <- seq(0, 1, length.out = 61) x <- tfd( t(sapply(c(-0.07, 0, 0.06), \(s) sin(2 * pi * (t + s)))), arg = t ) w_default <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "cc", max_iter = 1, iterlim = 1, nbasis = 6L, lambda = 0, crit = 2 ) ) expect_s3_class(w_default, "tfd") expect_length(w_default, length(x)) expect_identical(tf_domain(w_default), tf_domain(x)) expect_no_error( w_tuned <- quiet_expected_registration_warnings(tf_estimate_warps( x, method = "cc", max_iter = 1, iterlim = 1, crit = 1, nbasis = 8L, lambda = 1e-3 )) ) expect_s3_class(w_tuned, "tfd") expect_length(w_tuned, length(x)) expect_identical(tf_domain(w_tuned), tf_domain(x)) expect_error( tf_estimate_warps(x, method = "cc", nbasis = 1L), ">= 2" ) expect_error( tf_estimate_warps(x, method = "cc", lambda = -1), ">= 0" ) }) test_that("tf_estimate_warps dispatches correctly for tfb subclasses", { withr::local_seed(1234) t <- seq(0, 2 * pi, length.out = 100) data <- sapply(seq_len(5), \(i) { sin(t + runif(1, -0.5, 0.5)) }) # Use cheap affine method (not SRVF/CC) to test class dispatch x_tfb <- suppressMessages(tfb(t(data), k = 20)) warp_tfb <- quiet_expected_registration_warnings( tf_estimate_warps(x_tfb, method = "affine", type = "shift") ) expect_s3_class(warp_tfb, "tfd") expect_length(warp_tfb, length(x_tfb)) expect_identical(tf_domain(warp_tfb), tf_domain(x_tfb)) x_fpc <- tfb_fpc(t(data), pve = .95) warp_fpc <- quiet_expected_registration_warnings( tf_estimate_warps(x_fpc, method = "affine", type = "shift") ) expect_s3_class(warp_fpc, "tfd") expect_length(warp_fpc, length(x_fpc)) expect_identical(tf_domain(warp_fpc), tf_domain(x_fpc)) }) test_that("tf_estimate_warps SRVF/CC works for tfb subclasses", { skip_if_not_installed("fdasrvf") withr::local_seed(1234) t <- seq(0, 2 * pi, length.out = 41) data <- sapply(seq_len(3), \(i) sin(t + runif(1, -0.4, 0.4))) x_tfb <- suppressMessages(tfb(t(data), k = 12)) x_fpc <- tfb_fpc(t(data), pve = .95) warp_srvf <- tf_estimate_warps(x_tfb, method = "srvf") expect_s3_class(warp_srvf, "tfd") expect_length(warp_srvf, length(x_tfb)) expect_identical(tf_domain(warp_srvf), tf_domain(x_tfb)) warp_cc <- quiet_expected_registration_warnings( tf_estimate_warps(x_fpc, method = "cc", max_iter = 1, iterlim = 1) ) expect_s3_class(warp_cc, "tfd") expect_length(warp_cc, length(x_fpc)) expect_identical(tf_domain(warp_cc), tf_domain(x_fpc)) }) test_that("tf_estimate_warps validates SRVF/CC templates", { t <- seq(0, 1, length.out = 51) x <- tfd(t(cbind(sin(2 * pi * t), cos(2 * pi * t))), arg = t) template_bad_len <- tfd( t(cbind(sin(2 * pi * t), cos(2 * pi * t), sin(4 * pi * t))), arg = t ) expect_error( tf_estimate_warps(x, method = "srvf", template = template_bad_len), "length 1 or the same length" ) expect_error( tf_estimate_warps(x, method = "cc", template = template_bad_len), "length 1 or the same length" ) template_bad_domain <- tfd( sin(pi * seq(0, 2, length.out = 51)), arg = seq(0, 2, length.out = 51) ) expect_error( tf_estimate_warps(x, method = "srvf", template = template_bad_domain), "same domain" ) expect_error( tf_estimate_warps(x, method = "cc", template = template_bad_domain), "same domain" ) t2 <- seq(0, 1, length.out = 61) template_bad_grid <- tfd(sin(2 * pi * t2), arg = t2) expect_error( tf_estimate_warps(x, method = "srvf", template = template_bad_grid), "same grid" ) expect_error( tf_estimate_warps(x, method = "cc", template = template_bad_grid), "same grid" ) }) test_that("tf_estimate_warps SRVF/CC reject unknown method-specific arguments", { skip_if_not_installed("fdasrvf") t <- seq(0, 1, length.out = 51) x <- tfd(t(cbind(sin(2 * pi * t), sin(2 * pi * (t + 0.05)))), arg = t) expect_error( tf_estimate_warps(x, method = "srvf", unknown_arg = 1), "unused argument|unused arguments|formal argument" ) expect_error( tf_estimate_warps(x, method = "cc", unknown_arg = 1), "unused argument|unused arguments|formal argument" ) }) test_that("tf_estimate_warps CC explicit template path is exercised", { withr::local_seed(4321) t <- seq(0, 1, length.out = 61) x <- tfd( t(sapply(c(-0.08, 0, 0.08), \(s) sin(2 * pi * (t + s)))), arg = t ) template <- mean(x) w_default <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "cc", max_iter = 1, iterlim = 1) ) w_template <- quiet_expected_registration_warnings(tf_estimate_warps( x, method = "cc", template = template, max_iter = 1, iterlim = 1 )) expect_equal( as.matrix(w_default), as.matrix(w_template), tolerance = 0.15 ) }) # --- Landmark detection ------------------------------------------------------- test_that("tf_landmarks_extrema finds correct extrema and zero crossings", { # Single-period sin: quantitative accuracy with smooth = TRUE (default path) t1 <- seq(0, 2 * pi, length.out = 101) phase_shifts <- c(0, 0.5, -0.3) x1 <- tfd(t(cbind(sin(t1), sin(t1 + 0.5), sin(t1 - 0.3))), arg = t1) expected_max <- pi / 2 - phase_shifts maxima <- tf_landmarks_extrema(x1, "max") expect_true(is.matrix(maxima)) expect_equal(dim(maxima), c(3, 1)) expect_equal(as.numeric(maxima), expected_max, tolerance = 0.1) expected_min <- 3 * pi / 2 - phase_shifts minima <- tf_landmarks_extrema(x1, "min") expect_equal(dim(minima), c(3, 1)) expect_equal(as.numeric(minima), expected_min, tolerance = 0.1) both <- tf_landmarks_extrema(x1, "both") expect_equal(dim(both), c(3, 2)) expect_equal(both[, 1], maxima[, 1]) expect_equal(both[, 2], minima[, 1]) expect_equal(attr(both, "feature_types"), c("max", "min")) # 2-period sin: structural checks + all types t2 <- seq(0, 2, length.out = 201) template <- sin(2 * pi * t2) x2 <- tfd( t(sapply(c(0, 0.03, -0.03), \(s) { approx(t2, template, xout = t2 + s, rule = 2)$y })), arg = t2 ) lm_all <- tf_landmarks_extrema(x2, "all") types <- attr(lm_all, "feature_types") expect_true(ncol(lm_all) >= 5) expect_true("max" %in% types) expect_true("min" %in% types) expect_true("zero" %in% types) # Each row sorted for (i in seq_len(nrow(lm_all))) { non_na <- lm_all[i, !is.na(lm_all[i, ])] expect_true(all(diff(non_na) > 0)) } }) test_that("tf_landmarks_extrema drops boundary features", { t <- seq(0, 1, length.out = 101) vals <- pnorm(t, 0.2, 0.03) + 0.3 * dnorm(t, 0.7, 0.08) - 0.5 x <- tfd( t(sapply(c(0, 0.02, -0.02), \(s) { approx(t, vals, xout = t + s, rule = 2)$y })), arg = t ) lm <- tf_landmarks_extrema(x, "both") expect_equal(ncol(lm), 1) expect_equal(attr(lm, "feature_types"), "max") expect_true(all(abs(lm[, 1] - 0.7) < 0.05, na.rm = TRUE)) }) test_that("tf_landmarks_extrema detects landmarks on irregular tfd", { set.seed(123) peak_locs <- c(0.4, 0.5, 0.6, 0.45) arg_list <- lapply(1:4, \(i) sort(c(0, 1, runif(80)))) vals_list <- lapply(seq_along(peak_locs), \(i) { dnorm(arg_list[[i]], mean = peak_locs[i], sd = 0.1) }) x_irr <- tfd(vals_list, arg = arg_list) expect_true(inherits(x_irr, "tfd_irreg")) lm <- tf_landmarks_extrema(x_irr, "max") expect_true(is.matrix(lm)) expect_equal(nrow(lm), 4) expect_equal(ncol(lm), 1) expect_equal(as.numeric(lm), peak_locs, tolerance = 0.05) lm_both <- tf_landmarks_extrema(x_irr, "both") expect_equal(ncol(lm_both), 1) expect_equal(attr(lm_both, "feature_types"), "max") # Full chain: detection -> registration -> align x_reg <- tfd(x_irr, arg = seq(0, 1, length.out = 101)) warp <- tf_estimate_warps(x_reg, method = "landmark", landmarks = lm) aligned <- tf_align(x_reg, warp) aligned_peaks <- tf_where(aligned, value == max(value), "first") expect_equal(aligned_peaks, rep(mean(peak_locs), 4), tolerance = 0.03) }) test_that("tf_landmarks_extrema threshold parameter filters rare features", { set.seed(101) t <- seq(0, 1, length.out = 201) make_curve <- function(main_peak, add_secondary = FALSE) { y <- dnorm(t, main_peak, 0.06) if (add_secondary) { y <- y + 0.8 * dnorm(t, 0.8, 0.04) } y } vals <- rbind( make_curve(0.38, add_secondary = TRUE), make_curve(0.40), make_curve(0.42), make_curve(0.39), make_curve(0.41) ) x <- tfd(vals, arg = t) # threshold = 0.5 -> only main peak lm_strict <- tf_landmarks_extrema(x, "max", threshold = 0.5) expect_true(is.matrix(lm_strict)) expect_equal(ncol(lm_strict), 1) expect_true(all(abs(lm_strict[, 1] - 0.4) < 0.1, na.rm = TRUE)) # threshold = 0.2 -> both peaks, warns about missing landmarks expect_warning( lm_loose <- tf_landmarks_extrema(x, "max", threshold = 0.2), "missing landmark" ) expect_true(is.matrix(lm_loose)) expect_equal(ncol(lm_loose), 2) expect_true(all(abs(lm_loose[, 1] - 0.4) < 0.1, na.rm = TRUE)) expect_true(any(abs(lm_loose[, 2] - 0.8) < 0.1, na.rm = TRUE)) }) test_that("tf_landmarks_extrema warns when no stable landmarks found", { set.seed(202) t <- seq(0, 1, length.out = 101) x_const <- tfd(rbind(rep(1, 101), rep(2, 101), rep(3, 101)), arg = t) expect_warning( lm <- tf_landmarks_extrema(x_const, "max"), "No stable landmarks" ) expect_true(is.matrix(lm)) expect_equal(ncol(lm), 0) expect_equal(nrow(lm), 3) expect_equal(attr(lm, "feature_types"), character(0)) expect_warning( lm_both <- tf_landmarks_extrema(x_const, "both"), "No stable landmarks" ) expect_equal(ncol(lm_both), 0) expect_equal(nrow(lm_both), 3) }) # --- Landmark registration ---------------------------------------------------- test_that("landmark registration aligns to default and custom template", { t <- seq(0, 1, length.out = 101) peak_locs <- c(0.3, 0.5, 0.7) x <- tfd( t(sapply(peak_locs, \(s) dnorm(t, mean = s, sd = 0.1))), arg = t ) landmarks <- matrix(peak_locs, ncol = 1) # Default template (mean of landmarks = 0.5) warp <- tf_estimate_warps(x, method = "landmark", landmarks = landmarks) expect_s3_class(warp, "tfd") expect_length(warp, length(x)) expect_identical(tf_domain(warp), tf_domain(x)) template_landmark <- mean(peak_locs) warp_at_template <- as.numeric(warp[, template_landmark]) expect_equal(warp_at_template, peak_locs, tolerance = 1e-10) # Custom template custom_template <- 0.4 warp2 <- tf_estimate_warps( x, method = "landmark", landmarks = landmarks, template_landmarks = custom_template ) warp_at_custom <- as.numeric(warp2[, custom_template]) expect_equal(warp_at_custom, peak_locs, tolerance = 1e-10) # After aligning with custom template, peaks aligned near template x_aligned <- tf_align(x, warp2) aligned_peaks <- tf_where(x_aligned, value == max(value), "first") expect_equal(aligned_peaks, rep(custom_template, 3), tolerance = 0.02) }) test_that("register_landmark handles NA landmarks correctly", { set.seed(42) t <- seq(0, 1, length.out = 101) peak_locs <- c(0.4, 0.45, 0.5, 0.55, 0.6) x <- tfd(t(sapply(peak_locs, \(p) dnorm(t, p, 0.08))), arg = t) lm_mat <- matrix(NA_real_, nrow = 5, ncol = 2) lm_mat[, 1] <- peak_locs lm_mat[1:3, 2] <- c(0.75, 0.78, 0.80) warp <- tf_estimate_warps(x, method = "landmark", landmarks = lm_mat) expect_s3_class(warp, "tfd") expect_length(warp, 5) # All warps strictly increasing with correct endpoints warp_evals <- tf_evaluations(warp) domain <- tf_domain(x) for (i in seq_len(5)) { expect_true( all(diff(warp_evals[[i]]) > 0), info = paste("Warp", i, "is not strictly increasing") ) vals <- warp_evals[[i]] expect_equal(vals[1], domain[1]) expect_equal(vals[length(vals)], domain[2]) } }) test_that("tf_estimate_warps landmark method validates input", { t <- seq(0, 1, length.out = 51) x <- tfd(t(cbind(sin(t * pi), sin(t * pi + 0.1))), arg = t) # Wrong number of rows expect_error( tf_estimate_warps( x, method = "landmark", landmarks = matrix(c(0.3, 0.5, 0.7), ncol = 1) ), "rows" ) # Non-increasing landmarks expect_error( tf_estimate_warps( x, method = "landmark", landmarks = matrix(c(0.5, 0.3, 0.4, 0.2), ncol = 2, byrow = TRUE) ), "strictly increasing" ) # Landmarks outside domain expect_error( tf_estimate_warps( x, method = "landmark", landmarks = matrix(c(-0.1, 1.1), ncol = 1) ), "strictly inside the domain" ) # Template landmarks wrong length expect_error( tf_estimate_warps( x, method = "landmark", landmarks = matrix(c(0.3, 0.5), ncol = 1), template_landmarks = c(0.4, 0.5) ), "len" ) }) # --- Affine registration ------------------------------------------------------ test_that("affine registration produces linear warps for all types", { # Shift data on [0, 2*pi] t1 <- seq(0, 2 * pi, length.out = 101) template1 <- tfd(sin(t1), arg = t1) x_shift <- tfd( t(sapply(c(-0.3, -0.1, 0, 0.1, 0.3), \(s) sin(t1 + s))), arg = t1 ) # Scale data on [0, 1] t2 <- seq(0, 1, length.out = 101) x_scale <- tfd( t(cbind(sin(t2 * pi), sin(t2 * pi * 0.9), sin(t2 * pi * 1.1))), arg = t2 ) # Shift_scale data on [0, 2*pi] x_ss <- tfd( t(cbind(sin(t1), sin(t1 * 0.95 + 0.2), sin(t1 * 1.05 - 0.1))), arg = t1 ) check_linear <- function(warp, type) { warp_evals <- tf_evaluations(warp) for (i in seq_along(warp_evals)) { expect_true( all(diff(warp_evals[[i]]) > 0), label = paste(type, "function", i, "monotonicity") ) second_diff <- diff(diff(warp_evals[[i]])) expect_lt( max(abs(second_diff)), 1e-10, label = paste(type, "function", i, "linearity") ) } } check_linear( quiet_expected_registration_warnings( tf_estimate_warps( x_shift, method = "affine", template = template1, type = "shift" ) ), "shift" ) check_linear( quiet_expected_registration_warnings( tf_estimate_warps(x_scale, method = "affine", type = "scale") ), "scale" ) check_linear( quiet_expected_registration_warnings( tf_estimate_warps(x_ss, method = "affine", type = "shift_scale") ), "shift_scale" ) # Scale centering: warp(midpoint) ~ midpoint warp_scale <- quiet_expected_registration_warnings( tf_estimate_warps(x_scale, method = "affine", type = "scale") ) center <- 0.5 warp_at_center <- as.numeric(warp_scale[, center]) expect_equal(warp_at_center, rep(center, 3), tolerance = 0.1) }) test_that("affine shift produces warps that recover shift direction", { t <- seq(0, 2 * pi, length.out = 101) template <- tfd(sin(t), arg = t) x_shifted_left <- tfd(sin(t - 0.3), arg = t) x_shifted_right <- tfd(sin(t + 0.3), arg = t) warp_left <- quiet_expected_registration_warnings( tf_estimate_warps( x_shifted_left, method = "affine", template = template, type = "shift" ) ) mean_shift_left <- mean(tf_evaluations(warp_left)[[1]] - t) warp_right <- quiet_expected_registration_warnings( tf_estimate_warps( x_shifted_right, method = "affine", template = template, type = "shift" ) ) mean_shift_right <- mean(tf_evaluations(warp_right)[[1]] - t) expect_true(mean_shift_left > mean_shift_right) }) test_that("affine registration aligns functions to template", { set.seed(4321) arg <- seq(0, 1, length.out = 101) template <- tfd(sin(2 * pi * arg), arg = arg) template_vec <- tf_evaluations(template)[[1]] # Shift: shifted functions true_shifts <- c(-0.2, -0.1, 0.1, 0.15) x_shift <- do.call( c, lapply(true_shifts, \(s) { tfd(sin(2 * pi * (arg - s)), arg = arg) }) ) # Scale: scaled functions scale_factors <- c(0.7, 0.85, 1.15, 1.3) x_scale <- do.call( c, lapply(scale_factors, \(a) { inv_arg <- arg / a + 0.5 * (1 - 1 / a) tfd(sin(2 * pi * inv_arg), arg = arg) }) ) # Shift_scale: combined-transform functions params <- list( c(a = 0.9, b = 0.05), c(a = 1.1, b = -0.05), c(a = 0.95, b = 0.02) ) x_ss <- do.call( c, lapply(params, \(p) { tfd(sin(2 * pi * ((arg - p["b"]) / p["a"])), arg = arg) }) ) check_alignment <- function(x, type) { warps <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", template = template, type = type) ) x_reg <- quiet_expected_registration_warnings(tf_align(x, warps)) # Domain preserved (replaces former standalone smoke test) expect_identical( tf_domain(x_reg), tf_domain(x), label = paste(type, "domain preservation") ) for (i in seq_along(x_reg)) { reg_on_grid <- tf_evaluate(x_reg[i], arg = arg)[[1]] valid <- !is.na(reg_on_grid) expect_gt( sum(valid), 20, label = paste(type, "function", i, "valid points") ) max_diff <- max(abs(reg_on_grid[valid] - template_vec[valid])) expect_lt( max_diff, 0.01, label = paste(type, "function", i, "alignment") ) } } check_alignment(x_shift, "shift") check_alignment(x_scale, "scale") check_alignment(x_ss, "shift_scale") }) test_that("affine registered functions may have NA at boundaries", { t <- seq(0, 2 * pi, length.out = 101) true_shifts <- c(-0.8, 0.8) x <- tfd(t(sapply(true_shifts, \(s) sin(t + s))), arg = t) template <- tfd(sin(t), arg = t) warp <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", template = template, type = "shift") ) x_aligned <- quiet_expected_registration_warnings(tf_align(x, warp)) evals <- tf_evaluations(x_aligned) expect_length(evals, length(true_shifts)) # Warp values should extend outside domain for large shifts warp_evals <- tf_evaluations(warp) warp_ranges <- vapply(warp_evals, range, numeric(2)) expect_true( any(warp_ranges[1, ] < tf_domain(x)[1]) || any(warp_ranges[2, ] > tf_domain(x)[2]) ) }) test_that("tf_estimate_warps affine method validates input", { t <- seq(0, 1, length.out = 51) x <- tfd(t(cbind(sin(t * pi), sin(t * pi + 0.1))), arg = t) expect_error( tf_estimate_warps(x, method = "affine", template = x), "length 1" ) x2 <- tfd( sin(seq(0, 2, length.out = 51) * pi), arg = seq(0, 2, length.out = 51) ) expect_error( tf_estimate_warps(x, method = "affine", template = x2), "same domain" ) expect_error( tf_estimate_warps(x, method = "affine", shift_range = c(0.5)) ) expect_error( tf_estimate_warps(x, method = "affine", shift_range = c(0.5, 0.3)) ) expect_error( tf_estimate_warps(x, method = "affine", scale_range = c(-1, 2)) ) expect_error( tf_estimate_warps(x, method = "affine", scale_range = c(0.5, NA)) ) }) test_that("tf_estimate_warps affine method respects custom bounds", { arg <- seq(0, 1, length.out = 101) template <- tfd(sin(2 * pi * arg), arg = arg) # Function with shift of 0.4 x <- tfd(sin(2 * pi * (arg - 0.4)), arg = arg) # Default bounds find the shift warp_default <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "affine", template = template, type = "shift" ) ) b_default <- tf_evaluations(warp_default)[[1]][51] - 0.5 expect_equal(b_default, 0.4, tolerance = 0.01) # Restrictive bounds hit boundary warp_restricted <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "affine", template = template, type = "shift", shift_range = c(-0.1, 0.1) ) ) b_restricted <- tf_evaluations(warp_restricted)[[1]][51] - 0.5 expect_equal(b_restricted, 0.1, tolerance = 0.01) # Custom scale bounds x_scaled <- tfd(sin(2 * pi * (arg / 0.6 + 0.5 * (1 - 1 / 0.6))), arg = arg) warp_scale <- quiet_expected_registration_warnings( tf_estimate_warps( x_scaled, method = "affine", template = template, type = "scale", scale_range = c(0.4, 0.8) ) ) a_scale <- tf_evaluations(warp_scale)[[1]][101] - tf_evaluations(warp_scale)[[1]][1] expect_equal(a_scale, 0.6, tolerance = 0.01) }) # --- Procrustes Iteration (max_iter / tol) ------------------------------------ test_that("tf_estimate_warps rejects invalid max_iter and tol", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) expect_error( tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 0L) ) expect_error(tf_estimate_warps( x, method = "affine", type = "shift", max_iter = -1L )) expect_error( tf_estimate_warps(x, method = "affine", type = "shift", tol = -1) ) }) test_that("template forces single-pass regardless of max_iter", { withr::local_seed(123) t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) template <- tfd(matrix(sin(t), nrow = 1), arg = t) # Without template: explicit max_iter=3 matches default w_default <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", type = "shift") ) w_explicit <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 3L) ) expect_equal(as.matrix(w_default), as.matrix(w_explicit)) # With template: max_iter=1 and max_iter=5 are identical w1 <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "affine", type = "shift", template = template, max_iter = 1L ) ) w5 <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "affine", type = "shift", template = template, max_iter = 5L ) ) expect_equal(as.matrix(w1), as.matrix(w5)) }) test_that("tf_estimate_warps Procrustes iteration runs and improves for affine", { withr::local_seed(42) t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) w1 <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 1L) ) w3 <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "affine", type = "shift", max_iter = 3L) ) expect_s3_class(w3, "tfd_reg") expect_length(w3, 3) # Procrustes iteration should not increase residual variance. # Interpolate to a common regular grid to avoid irregular-grid arithmetic warnings. aligned_1 <- quiet_expected_registration_warnings(tf_align(x, w1)) aligned_3 <- quiet_expected_registration_warnings(tf_align(x, w3)) arg <- tf_arg(x) aligned_1_mat <- quiet_expected_registration_warnings( as.matrix(tf_interpolate(aligned_1, arg = arg)) ) aligned_3_mat <- quiet_expected_registration_warnings( as.matrix(tf_interpolate(aligned_3, arg = arg)) ) center_1 <- matrix( colMeans(aligned_1_mat, na.rm = TRUE), nrow = nrow(aligned_1_mat), ncol = ncol(aligned_1_mat), byrow = TRUE ) center_3 <- matrix( colMeans(aligned_3_mat, na.rm = TRUE), nrow = nrow(aligned_3_mat), ncol = ncol(aligned_3_mat), byrow = TRUE ) var_1 <- mean((aligned_1_mat - center_1)^2, na.rm = TRUE) var_3 <- mean((aligned_3_mat - center_3)^2, na.rm = TRUE) expect_lte(var_3, var_1 + 1e-6) }) test_that("tf_estimate_warps Procrustes iteration handles irregular affine updates", { t <- seq(0, 2 * pi, length.out = 101) x <- tfd(t(sapply(c(-0.3, 0, 0.3), \(s) sin(t + s))), arg = t) expect_no_error( w <- quiet_expected_registration_warnings( tf_estimate_warps( x, method = "affine", type = "shift", max_iter = 2L, shift_range = c(0.05, 0.2) ) ) ) expect_s3_class(w, "tfd_reg") expect_length(w, length(x)) }) test_that("tf_estimate_warps Procrustes iteration runs for CC", { skip_on_cran() withr::local_seed(42) t <- seq(0, 1, length.out = 61) x <- tfd( t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))), arg = t ) w <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "cc", max_iter = 2L, iterlim = 5) ) expect_s3_class(w, "tfd_reg") expect_length(w, 3) }) test_that("tf_estimate_warps CC outer loop improves the CC criterion", { skip_on_cran() withr::local_seed(42) t <- seq(0, 1, length.out = 61) x <- tfd( t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))), arg = t ) w_2 <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "cc", max_iter = 2L, iterlim = 5) ) w_5 <- quiet_expected_registration_warnings( tf_estimate_warps(x, method = "cc", max_iter = 5L, iterlim = 5) ) aligned_2 <- suppressWarnings(tf_align(x, w_2)) aligned_5 <- suppressWarnings(tf_align(x, w_5)) obj_2 <- tf:::registration_objective_cc( aligned = aligned_2, template = attr(w_2, "template") ) obj_5 <- tf:::registration_objective_cc( aligned = aligned_5, template = attr(w_5, "template") ) expect_lte(obj_5, obj_2 + 1e-8) }) test_that("CC registration handles pinch warps with boundary ties", { skip_on_cran() pinch_small <- pinch[1:10] reg <- quiet_expected_registration_warnings( tf_register( pinch_small, method = "cc", max_iter = 10L, nbasis = 10L ) ) expect_s3_class(reg, "tf_registration") expect_no_error(tf_invert(tf_inv_warps(reg))) expect_true(all(vapply( tf_evaluations(tf_inv_warps(reg)), \(x) all(diff(x) > 0), logical(1) ))) }) test_that("SRVF with template=NULL gives same result regardless of max_iter", { skip_if_not_installed("fdasrvf") withr::local_seed(42) t <- seq(0, 1, length.out = 101) x <- tfd( t(sapply(c(-0.05, 0, 0.05), \(s) sin(2 * pi * (t + s)))), arg = t ) w1 <- tf_estimate_warps(x, method = "srvf", max_iter = 1L) w5 <- tf_estimate_warps(x, method = "srvf", max_iter = 5L) expect_equal(as.matrix(w1), as.matrix(w5)) })