# =========================================================================== # Integration tests for executeSpec / executeSpecs using Eunomia # =========================================================================== # Skip all tests if Eunomia or DatabaseConnector are not available skip_if_not_installed("Eunomia") skip_if_not_installed("DatabaseConnector") # --------------------------------------------------------------------------- # Helper: get a connection to the Eunomia CDM with cohorts created # --------------------------------------------------------------------------- get_eunomia_conn <- function() { connDetails <- Eunomia::getEunomiaConnectionDetails() # Create cohorts first (requires connectionDetails) Eunomia::createCohorts( connectionDetails = connDetails, cdmDatabaseSchema = "main", cohortDatabaseSchema = "main", cohortTable = "cohort" ) conn <- DatabaseConnector::connect(connDetails) conn } make_condition_spec <- function(aggregated = TRUE) { plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = TRUE, type = "start"), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main", aggregated = aggregated ) specs } # =========================================================================== # executeSpec — aggregated (last statement is SELECT) # =========================================================================== test_that("executeSpec executes aggregated spec and returns data.frame", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) specs <- make_condition_spec(aggregated = TRUE) result <- executeSpec( connection = conn, spec = specs[[1]], targetDialect = "sqlite" ) expect_s3_class(result, "data.frame") expect_true(nrow(result) >= 0) expected_cols <- c( "COHORT_DEFINITION_ID", "COVARIATE_ID", "COVARIATE_NAME", "CONCEPT_ID", "ANALYSIS_ID", "SUM_VALUE", "MEAN_VALUE" ) expect_true(all(expected_cols %in% toupper(names(result)))) }) # =========================================================================== # executeSpec — non-aggregated (raw patient-level rows) # =========================================================================== test_that("executeSpec executes non-aggregated spec and returns data.frame", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) specs <- make_condition_spec(aggregated = FALSE) result <- executeSpec( connection = conn, spec = specs[[1]], targetDialect = "sqlite" ) expect_s3_class(result, "data.frame") expect_true(nrow(result) >= 0) }) # =========================================================================== # executeSpec — targetDialect defaults from connection # =========================================================================== test_that("executeSpec infers targetDialect from connection", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) specs <- make_condition_spec(aggregated = TRUE) # Don't pass targetDialect — should auto-detect from connection result <- executeSpec( connection = conn, spec = specs[[1]] ) expect_s3_class(result, "data.frame") }) # =========================================================================== # executeSpec — empty SQL error # =========================================================================== test_that("executeSpec errors on empty rendered SQL", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) # Build a spec with empty SQL spec <- structure( list( analysisId = 999L, analysisName = "empty", sql = "", aggregated = TRUE ), class = "singleNodeSpec" ) expect_error( executeSpec(conn, spec, targetDialect = "sqlite"), "sql is empty" ) }) # =========================================================================== # executeSpecs — batch execution (stopOnError = TRUE) # =========================================================================== test_that("executeSpecs executes multiple specs with stopOnError = TRUE", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) # Create specs with two windows → two specs plan <- planAnalysis( analysisWindows = defineAnalysisWindows( startDays = c(-365, -30), endDays = c(-1, -1) ), useBaseFeatures = list( condition_occurrence = list(include = TRUE, type = "start"), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main", aggregated = TRUE ) results <- executeSpecs( connection = conn, specs = specs, targetDialect = "sqlite" ) expect_type(results, "list") expect_length(results, length(specs)) for (r in results) { expect_s3_class(r, "data.frame") } }) # =========================================================================== # executeSpecs — batch execution (stopOnError = FALSE) # =========================================================================== test_that("executeSpecs continues on error with stopOnError = FALSE", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) good_specs <- make_condition_spec(aggregated = TRUE) # Create a bad spec that will fail during execution bad_spec <- structure( list( analysisId = 9999L, analysisName = "bad_spec", sql = "SELECT 1 FROM nonexistent_table_xyz;", aggregated = TRUE, cohortDatabaseSchema = "main", cohortTable = "cohort", tableDatabaseSchema = "main", domainTable = "nonexistent_table_xyz", conceptIdCol = "concept_id", dateCol = "start_date", dateColEnd = NULL, personIdCol = "person_id", startDay = -365L, endDay = -1L, cohortId = 1L, overlap = FALSE, atc = FALSE, conceptSet = FALSE, vocabularyDatabaseSchema = "main" ), class = "singleNodeSpec" ) mixed_specs <- structure( list(good_specs[[1]], bad_spec), class = "singleNodeSettingList" ) results <- executeSpecs( connection = conn, specs = mixed_specs, targetDialect = "sqlite", stopOnError = FALSE ) expect_length(results, 2) # First spec should succeed expect_s3_class(results[[1]], "data.frame") # Second spec should have error attribute expect_s3_class(results[[2]], "data.frame") expect_true(!is.null(attr(results[[2]], "error"))) expect_equal(nrow(results[[2]]), 0L) }) # =========================================================================== # executeSpecs — cleanTempTables # =========================================================================== test_that("executeSpecs with cleanTempTables = TRUE does not error", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) specs <- make_condition_spec(aggregated = TRUE) results <- executeSpecs( connection = conn, specs = specs, targetDialect = "sqlite", cleanTempTables = TRUE ) expect_type(results, "list") expect_length(results, 1) expect_s3_class(results[[1]], "data.frame") }) # =========================================================================== # executeSpecs — empty list # =========================================================================== test_that("executeSpecs handles empty spec list", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) empty_specs <- structure(list(), class = "singleNodeSettingList") results <- executeSpecs( connection = conn, specs = empty_specs, targetDialect = "sqlite" ) expect_type(results, "list") expect_length(results, 0) }) # =========================================================================== # executeSpecs — targetDialect defaults from connection # =========================================================================== test_that("executeSpecs infers targetDialect from connection", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) specs <- make_condition_spec(aggregated = TRUE) results <- executeSpecs( connection = conn, specs = specs ) expect_type(results, "list") expect_length(results, 1) }) # =========================================================================== # executeSpec — multiple domain types # =========================================================================== test_that("executeSpec works with visit_occurrence domain", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = TRUE, type = "start"), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) result <- executeSpec(conn, specs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") expect_true(nrow(result) >= 0) }) test_that("executeSpec works with procedure_occurrence", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = TRUE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) result <- executeSpec(conn, specs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") }) test_that("executeSpec works with measurement domain", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = TRUE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) result <- executeSpec(conn, specs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") }) # =========================================================================== # executeSpec — drug_exposure domain # =========================================================================== test_that("executeSpec works with drug_exposure domain", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = TRUE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) result <- executeSpec(conn, specs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") expect_true(nrow(result) > 0) }) # =========================================================================== # executeSpecs — multi-domain batch # =========================================================================== test_that("executeSpecs handles multi-domain specs", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = TRUE, type = "start"), drug_exposure = list(include = TRUE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = TRUE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) results <- executeSpecs(conn, specs, targetDialect = "sqlite") expect_length(results, length(specs)) for (r in results) { expect_s3_class(r, "data.frame") } }) # =========================================================================== # executeSpec — concept set temp table creation # =========================================================================== test_that("executeSpec creates concept set temp table for concept set specs", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list(include = FALSE), useConceptSetFeatures = list( include = TRUE, type = "binary", conceptSets = list( gi = list( items = list( list(concept = list(CONCEPT_ID = 4112343L), includeDescendants = FALSE) ), tables = c("condition_occurrence") ) ) ) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) result <- executeSpec(conn, specs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") }) # =========================================================================== # executeSpec — cohort covariate (single SELECT path) # =========================================================================== test_that("executeSpec handles cohort covariate single-SELECT spec", { conn <- get_eunomia_conn() on.exit(DatabaseConnector::disconnect(conn), add = TRUE) plan <- planAnalysis( analysisWindows = defineAnalysisWindows(startDays = -365, endDays = -1), useBaseFeatures = list( condition_occurrence = list(include = FALSE), drug_exposure = list(include = FALSE), condition_era = list(include = FALSE), drug_era = list(include = FALSE), procedure_occurrence = list(include = FALSE), observation = list(include = FALSE), device_exposure = list(include = FALSE), visit_occurrence = list(include = FALSE), measurement = list(include = FALSE) ), useCohortFeatures = list( include = TRUE, type = "start", cohortIds = c(1L), cohortNames = c("Celecoxib"), cohortTable = "cohort", covariateSchema = "main" ), useConceptSetFeatures = list(include = FALSE) ) specs <- singleNodeSetting( plan = plan, cohortId = 1L, cohortDatabaseSchema = "main", cohortTable = "cohort", cdmDatabaseSchema = "main", vocabularyDatabaseSchema = "main" ) # Filter to cohort source specs only cohortSpecs <- Filter(function(s) identical(s$source, "cohort"), specs) expect_true(length(cohortSpecs) >= 1L) result <- executeSpec(conn, cohortSpecs[[1]], targetDialect = "sqlite") expect_s3_class(result, "data.frame") # Cohort covariate should produce at most 1 row expect_true(nrow(result) <= 1L) })