# Out-of-core test for huge Parquet files. # # Uses pre-existing Parquet datasets on disk (not generated by the test). # Verifies that: # 1. The comparison completes without OOM crash # 2. RAM overhead stays below threshold (pipeline stays lazy) # 3. Execution completes within a reasonable time limit # # To run: # Sys.setenv(RUN_HUGE_PARQUET_TEST = "true") # devtools::test(filter = "^huge-parquet") # # Data directories (configurable via env vars): # HUGE_PARQUET_DIR_A (default: C:/Users/PC/Desktop/data_a) # HUGE_PARQUET_DIR_B (default: C:/Users/PC/Desktop/data_b) test_that("huge parquet: out-of-core comparison without OOM", { skip_on_cran() skip_on_ci() skip_if( Sys.getenv("RUN_HUGE_PARQUET_TEST") != "true", message = "Set RUN_HUGE_PARQUET_TEST=true to run this test" ) skip_if_not_installed("arrow") skip_if_not_installed("duckdb") # ---- configuration -------------------------------------------------------- dir_a <- Sys.getenv("HUGE_PARQUET_DIR_A", unset = "D:\\things_reboot\\data_a") dir_b <- Sys.getenv("HUGE_PARQUET_DIR_B", unset = "D:\\things_reboot\\data_b") skip_if(!dir.exists(dir_a), message = sprintf("Directory not found: %s", dir_a)) skip_if(!dir.exists(dir_b), message = sprintf("Directory not found: %s", dir_b)) max_ram_gb <- 10 max_time_s <- 300 # 5 minutes # ---- open as lazy Arrow datasets ------------------------------------------ ds_a <- arrow::open_dataset(dir_a) ds_b <- arrow::open_dataset(dir_b) n_a <- dplyr::pull(dplyr::collect(dplyr::count(ds_a)), n) n_b <- dplyr::pull(dplyr::collect(dplyr::count(ds_b)), n) raw_gb_a <- n_a * length(names(ds_a)) * 8 / 1024^3 message(sprintf( "Dataset A: %s rows (%d cols, ~%.1f GB raw) | Dataset B: %s rows", format(n_a, big.mark = ","), length(names(ds_a)), raw_gb_a, format(n_b, big.mark = ",") )) raw_gb_b <- n_b * length(names(ds_b)) * 8 / 1024^3 # Log a note when datasets are small (< 32 GB combined), but still run the # test to validate the lazy pipeline end-to-end. if (raw_gb_a + raw_gb_b < 32) { message(sprintf( "Note: datasets are small (%.1f + %.1f = %.1f GB raw). For a true OOM stress test, use datasets > 32 GB combined.", raw_gb_a, raw_gb_b, raw_gb_a + raw_gb_b )) } # ---- run datadiff ---------------------------------------------------------- gc(verbose = FALSE, reset = TRUE) mem_before <- sum(gc()[, 2]) # MB used t0 <- proc.time() # write_rules_template(data_reference = ds_a,path = "boom.yml" ) result <- compare_datasets_from_yaml( data_reference = ds_a, data_candidate = ds_b, key = "ID" ,path = "boom.yml" ) elapsed <- (proc.time() - t0)[["elapsed"]] gc(verbose = FALSE) mem_after <- sum(gc()[, 2]) # MB used mem_overhead_gb <- (mem_after - mem_before) / 1024 message(sprintf( "Done in %.1f s | RAM overhead: %.2f GB | all_passed: %s", elapsed, mem_overhead_gb, result$all_passed )) # ---- assertions ------------------------------------------------------------ # RAM overhead should stay low (pipeline stays lazy via Arrow -> DuckDB) expect_true( mem_overhead_gb < max_ram_gb, label = sprintf("RAM overhead %.2f GB exceeds %d GB limit", mem_overhead_gb, max_ram_gb) ) # Should complete within time budget expect_true( elapsed < max_time_s, label = sprintf("Took %.0f s, exceeds %d s limit", elapsed, max_time_s) ) # Result should be a valid datadiff output expect_type(result, "list") expect_true("all_passed" %in% names(result)) expect_s3_class(result$agent, "ptblank_agent") })