skip_if_not_installed("bigmemory") library(bigmemory) metric_distance <- function(ref, query_row, metric) { metric <- match.arg(metric, c("euclidean", "sqeuclidean", "cosine")) if (identical(metric, "cosine")) { ref_norm <- sqrt(rowSums(ref^2)) query_norm <- sqrt(sum(query_row^2)) 1 - drop(ref %*% query_row) / (ref_norm * query_norm) } else { sq <- rowSums((t(t(ref) - query_row))^2) if (identical(metric, "euclidean")) sqrt(sq) else sq } } brute_force_knn <- function(ref, query = ref, k, metric, exclude_self = identical(ref, query)) { n_ref <- nrow(ref) n_query <- nrow(query) index <- matrix(NA_integer_, nrow = n_query, ncol = k) distance <- matrix(NA_real_, nrow = n_query, ncol = k) for (i in seq_len(n_query)) { dists <- metric_distance(ref, query[i, ], metric) if (exclude_self && identical(ref, query)) { dists[i] <- Inf } order_idx <- order(dists, seq_len(n_ref)) keep <- order_idx[seq_len(k)] index[i, ] <- keep distance[i, ] <- dists[keep] } list(index = index, distance = distance) } brute_force_radius <- function(ref, query = ref, radius, metric, exclude_self = identical(ref, query), sort = TRUE) { n_query <- nrow(query) counts <- integer(n_query) matches <- vector("list", n_query) for (i in seq_len(n_query)) { dists <- metric_distance(ref, query[i, ], metric) if (exclude_self && identical(ref, query)) { dists[i] <- Inf } keep <- which(dists <= radius) if (sort && length(keep) > 0L) { ord <- order(dists[keep], keep) keep <- keep[ord] } matches[[i]] <- list(index = keep, distance = dists[keep]) counts[i] <- length(keep) } offsets <- cumsum(c(1L, counts)) list( index = unlist(lapply(matches, `[[`, "index"), use.names = FALSE), distance = unlist(lapply(matches, `[[`, "distance"), use.names = FALSE), offset = as.numeric(offsets), n_match = counts ) } test_that("prepared search matches brute-force across supported metrics", { ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) query <- matrix( c(1, 0.2, 0.4, 1.2), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) for (metric in c("euclidean", "sqeuclidean", "cosine")) { prepared <- knn_prepare_bigmatrix(big_ref, metric = metric) result <- knn_search_prepared(prepared, query = query, k = 2, exclude_self = FALSE) expected <- brute_force_knn(ref, query = query, k = 2, metric = metric, exclude_self = FALSE) expect_s3_class(prepared, "bigknn_prepared") expect_equal(result$index, expected$index) expect_equal(result$distance, expected$distance, tolerance = 1e-8) expect_identical(result$metric, metric) } }) test_that("prepared streaming search matches in-memory search", { ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) old_options <- options(bigmemory.typecast.warning = FALSE) on.exit(options(old_options), add = TRUE) prepared <- knn_prepare_bigmatrix(as.big.matrix(ref, type = "float"), metric = "sqeuclidean") expected <- knn_search_prepared(prepared, k = 2) index_store <- big.matrix(nrow(ref), 2, type = "integer") distance_store <- big.matrix(nrow(ref), 2, type = "double") streamed <- knn_search_stream_prepared( prepared, xpIndex = index_store, xpDistance = distance_store, k = 2 ) expect_equal(bigmemory::as.matrix(streamed$index), expected$index) expect_equal(bigmemory::as.matrix(streamed$distance), expected$distance, tolerance = 1e-6) }) test_that("count and radius search agree with brute-force results", { ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) expected_euclid <- brute_force_radius(ref, radius = 1.1, metric = "euclidean") counts <- count_within_radius_bigmatrix(big_ref, radius = 1.1) result <- radius_bigmatrix(big_ref, radius = 1.1) expect_equal(counts, expected_euclid$n_match) expect_equal(result$index, as.integer(expected_euclid$index)) expect_equal(result$distance, expected_euclid$distance, tolerance = 1e-8) expect_equal(result$offset, expected_euclid$offset) expect_equal(result$n_match, expected_euclid$n_match) expected_cosine <- brute_force_radius(ref, radius = 0.3, metric = "cosine") cosine_result <- radius_bigmatrix(big_ref, radius = 0.3, metric = "cosine") expect_equal(cosine_result$index, as.integer(expected_cosine$index)) expect_equal(cosine_result$distance, expected_cosine$distance, tolerance = 1e-8) expect_equal(cosine_result$offset, expected_cosine$offset) }) test_that("radius streaming writes flattened matches and offsets", { ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) expected <- radius_bigmatrix(big_ref, radius = 1.1) index_store <- big.matrix(length(expected$index), 1, type = "integer") distance_store <- big.matrix(length(expected$distance), 1, type = "double") offset_store <- big.matrix(length(expected$offset), 1, type = "double") streamed <- radius_stream_bigmatrix( big_ref, xpIndex = index_store, xpDistance = distance_store, xpOffset = offset_store, radius = 1.1 ) expect_equal(as.vector(bigmemory::as.matrix(streamed$index)), expected$index) expect_equal(as.vector(bigmemory::as.matrix(streamed$distance)), expected$distance, tolerance = 1e-8) expect_equal(as.vector(bigmemory::as.matrix(streamed$offset)), expected$offset) expect_equal(streamed$n_match, expected$n_match) }) test_that("graph builders produce expected structures", { skip_if_not_installed("Matrix") ref <- matrix( c(0, 0, 1, 0, 5, 0, 6, 0), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) graph_edges <- knn_graph_bigmatrix(big_ref, k = 1, format = "edge_list", symmetrize = "none") expect_equal(nrow(graph_edges), 4L) expect_true(all(c("from", "to", "distance") %in% names(graph_edges))) mutual_edges <- mutual_knn_graph_bigmatrix(big_ref, k = 1, format = "edge_list") expect_equal(as.data.frame(mutual_edges)[, c("from", "to")], data.frame(from = c(1L, 3L), to = c(2L, 4L))) matrix_graph <- knn_graph_bigmatrix(big_ref, k = 1, format = "dgCMatrix", symmetrize = "union", include_distance = FALSE) expect_s4_class(matrix_graph, "dgCMatrix") snn_ref <- as.big.matrix(matrix( c(0, 0, 0, 1, 1, 0, 5, 5), ncol = 2, byrow = TRUE )) snn_edges <- snn_graph_bigmatrix(snn_ref, k = 2, format = "edge_list", weight = "count") expect_true(nrow(snn_edges) > 0L) expect_true(all(snn_edges$weight > 0)) }) test_that("plans, persisted prepared caches, and sparse queries work together", { skip_if_not_installed("Matrix") ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) query <- matrix( c(1, 0.2, 0.4, 1.2), ncol = 2, byrow = TRUE ) sparse_query <- Matrix::Matrix(query, sparse = TRUE) big_ref <- as.big.matrix(ref) plan <- knn_plan_bigmatrix(big_ref, metric = "cosine", memory_budget = "64KB", num_threads = 2L, progress = FALSE) cache_path <- tempfile(fileext = ".rds") on.exit(unlink(cache_path), add = TRUE) prepared <- knn_prepare_bigmatrix(big_ref, plan = plan, cache_path = cache_path) loaded <- knn_load_prepared(cache_path) expected <- knn_search_prepared(prepared, query = query, k = 2, exclude_self = FALSE) observed <- knn_search_prepared(loaded, query = sparse_query, k = 2, plan = plan, exclude_self = FALSE) expect_s3_class(plan, "bigknn_plan") expect_true(file.exists(cache_path)) expect_true(knn_validate_prepared(prepared)) expect_equal(observed$index, expected$index) expect_equal(observed$distance, expected$distance, tolerance = 1e-8) }) test_that("coercion helpers and radius graph outputs are available", { skip_if_not_installed("Matrix") ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) radius_result <- radius_bigmatrix(big_ref, radius = 1.1) radius_edges <- as_edge_list(radius_result) radius_triplet <- as_triplet(radius_result) radius_sparse <- as_sparse_matrix(radius_result) radius_graph <- radius_graph_bigmatrix(big_ref, radius = 1.1, format = "edge_list", symmetrize = "union") expect_true(nrow(radius_edges) == length(radius_result$index)) expect_equal(radius_triplet$Dim, c(4L, 4L)) expect_s4_class(radius_sparse, "dgCMatrix") expect_s3_class(radius_graph, "bigknn_graph_edge_list") expect_true(nrow(radius_graph) > 0L) }) test_that("streamed graph jobs and resume checkpoints produce exact edge lists", { ref <- matrix( c(0, 0, 1, 0, 5, 0, 6, 0), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) expected <- as.data.frame(knn_graph_bigmatrix(big_ref, k = 1, format = "edge_list", symmetrize = "none")) attr(expected, "bigknn_graph") <- NULL from_store <- big.matrix(nrow(expected), 1, type = "integer") to_store <- big.matrix(nrow(expected), 1, type = "integer") value_store <- big.matrix(nrow(expected), 1, type = "double") checkpoint_path <- tempfile(fileext = ".rds") on.exit(unlink(checkpoint_path), add = TRUE) job <- knn_graph_stream_bigmatrix( big_ref, k = 1, xpFrom = from_store, xpTo = to_store, xpValue = value_store, checkpoint_path = checkpoint_path ) observed <- data.frame( from = as.integer(as.vector(bigmemory::as.matrix(from_store))), to = as.integer(as.vector(bigmemory::as.matrix(to_store))), distance = as.numeric(as.vector(bigmemory::as.matrix(value_store))) ) expect_s3_class(job, "bigknn_job") expect_equal(observed, expected, tolerance = 1e-8, ignore_attr = TRUE) spec <- readRDS(checkpoint_path) spec$status <- "running" spec$next_row <- 3L spec$next_edge <- 3L saveRDS(spec, checkpoint_path) from_store[, 1] <- 0L to_store[, 1] <- 0L value_store[, 1] <- 0 from_store[1:2, 1] <- as.integer(expected$from[1:2]) to_store[1:2, 1] <- as.integer(expected$to[1:2]) value_store[1:2, 1] <- expected$distance[1:2] resumed <- resume_knn_job(checkpoint_path) resumed_observed <- data.frame( from = as.integer(as.vector(bigmemory::as.matrix(from_store))), to = as.integer(as.vector(bigmemory::as.matrix(to_store))), distance = as.numeric(as.vector(bigmemory::as.matrix(value_store))) ) expect_s3_class(resumed, "bigknn_job") expect_equal(resumed_observed, expected, tolerance = 1e-8, ignore_attr = TRUE) }) test_that("streamed radius jobs and resume checkpoints produce exact flattened matches", { ref <- matrix( c(1, 0, 0, 1, 1, 1, 2, 1), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) expected <- radius_bigmatrix(big_ref, radius = 1.1) index_store <- big.matrix(length(expected$index), 1, type = "integer") distance_store <- big.matrix(length(expected$distance), 1, type = "double") offset_store <- big.matrix(length(expected$offset), 1, type = "double") checkpoint_path <- tempfile(fileext = ".rds") on.exit(unlink(checkpoint_path), add = TRUE) job <- radius_stream_job_bigmatrix( big_ref, xpIndex = index_store, xpDistance = distance_store, xpOffset = offset_store, radius = 1.1, checkpoint_path = checkpoint_path ) expect_s3_class(job, "bigknn_job") expect_equal(as.vector(bigmemory::as.matrix(index_store)), expected$index) expect_equal(as.vector(bigmemory::as.matrix(distance_store)), expected$distance, tolerance = 1e-8) expect_equal(as.vector(bigmemory::as.matrix(offset_store)), expected$offset) spec <- readRDS(checkpoint_path) spec$status <- "running" spec$phase <- "collect" spec$next_row <- 3L saveRDS(spec, checkpoint_path) index_store[, 1] <- 0L distance_store[, 1] <- 0 offset_store[, 1] <- 0 prefix_end <- expected$offset[3L] - 1L index_store[seq_len(prefix_end), 1] <- as.integer(expected$index[seq_len(prefix_end)]) distance_store[seq_len(prefix_end), 1] <- expected$distance[seq_len(prefix_end)] resumed <- resume_knn_job(checkpoint_path) expect_s3_class(resumed, "bigknn_job") expect_equal(as.vector(bigmemory::as.matrix(index_store)), expected$index) expect_equal(as.vector(bigmemory::as.matrix(distance_store)), expected$distance, tolerance = 1e-8) expect_equal(as.vector(bigmemory::as.matrix(offset_store)), expected$offset) }) test_that("recall summaries and candidate reranking agree with exact search", { ref <- matrix( c(0, 0, 1, 0, 0, 1, 3, 3), ncol = 2, byrow = TRUE ) big_ref <- as.big.matrix(ref) exact <- knn_bigmatrix(big_ref, k = 2) approx_index <- exact$index approx_index[1, 2] <- 4L recall <- recall_against_exact(exact, approx_index) candidates <- cbind(seq_len(nrow(ref)), exact$index[, 2], exact$index[, 1]) reranked <- rerank_candidates_bigmatrix(big_ref, query = NULL, candidate_index = candidates, top_k = 2) expect_s3_class(recall, "bigknn_recall") expect_true(recall$overall < 1) expect_equal(reranked$index, exact$index) expect_equal(reranked$distance, exact$distance, tolerance = 1e-8) }) test_that("input validation catches impossible parameters", { ref <- as.big.matrix(matrix(c(1, 0, 0, 1), ncol = 2, byrow = TRUE)) expect_error(knn_bigmatrix(ref, k = 2), "`k` exceeds") expect_error(radius_bigmatrix(ref, radius = -1), "`radius` must") prepared <- knn_prepare_bigmatrix(ref) expect_error(knn_search_prepared(prepared, k = 2), "`k` exceeds") radius_ref <- as.big.matrix(matrix(c(1, 0, 2, 0), ncol = 2, byrow = TRUE)) expected <- radius_bigmatrix(radius_ref, radius = 2) index_store <- big.matrix(length(expected$index), 1, type = "double") distance_store <- big.matrix(length(expected$distance), 1, type = "integer") offset_store <- big.matrix(length(expected$offset), 1, type = "double") expect_error( radius_stream_bigmatrix(radius_ref, xpIndex = index_store, xpDistance = distance_store, xpOffset = offset_store, radius = 2), "`xpDistance` big.matrix must store doubles" ) })