library(testthat) library(dplyr, warn.conflicts = FALSE) ### CDM object DBI drivers ------ test_cdm_from_con <- function(con, cdm_schema, write_schema) { expect_error(cdm_from_con(con, cdm_schema = cdm_schema, cdm_name = "test"), "write_schema") cdm <- cdm_from_con(con, cdm_schema = cdm_schema, cdm_name = "test", write_schema = write_schema) expect_s3_class(cdm, "cdm_reference") expect_error(assert_tables(cdm, "person"), NA) expect_warning(version(cdm)) expect_true(cdmVersion(cdm) %in% c("5.3", "5.4")) expect_s3_class(snapshot(cdm), "data.frame") expect_true("concept" %in% names(cdm)) expect_s3_class(dplyr::collect(head(cdm$concept)), "data.frame") cdm <- cdm_from_con(con, cdm_schema = cdm_schema, write_schema = write_schema) expect_s3_class(cdm, "cdm_reference") # expect_error(assert_write_schema(cdm), NA) expect_true("concept" %in% names(cdm)) expect_s3_class(collect(head(cdm$concept)), "data.frame") expect_equal(dbms(cdm), dbms(attr(cdm, "dbcon"))) # check cdm_reference attribute expect_true("cdm_reference" %in% names(attributes(cdm[["person"]]))) x <- unclass(cdm) expect_false("cdm_reference" %in% names(attributes(x[["person"]]))) x[["person"]] <- cdm[["person"]] %>% compute() expect_true("cdm_reference" %in% names(attributes(x[["person"]]))) cdm[["person"]] <- cdm[["person"]] %>% compute() x <- unclass(cdm) expect_false("cdm_reference" %in% names(attributes(x[["person"]]))) # simple join df <- dplyr::inner_join(cdm$person, cdm$observation_period, by = "person_id") %>% head(2) %>% dplyr::collect() expect_s3_class(df, "data.frame") } for (dbtype in dbToTest) { test_that(glue::glue("{dbtype} - cdm_from_con"), { if (!(dbtype %in% ciTestDbs)) skip_on_ci() if (dbtype != "duckdb") skip_on_cran() else skip_if_not_installed("duckdb") con <- get_connection(dbtype, DatabaseConnector = testUsingDatabaseConnector) cdm_schema <- get_cdm_schema(dbtype) write_schema <- get_write_schema(dbtype) skip_if(any(write_schema == "") || any(cdm_schema == "") || is.null(con)) test_cdm_from_con(con, cdm_schema = cdm_schema, write_schema = write_schema) disconnect(con) }) } test_that("Uppercase tables are stored as lowercase in cdm", { skip_if_not_installed("duckdb") skip_if_not(eunomia_is_available()) # create a test cdm with upppercase table names con <- DBI::dbConnect(duckdb::duckdb(eunomia_dir())) for (name in list_tables(con, "main")) { DBI::dbExecute(con, glue::glue("ALTER TABLE {name} RENAME TO {name}2;")) DBI::dbExecute(con, glue::glue("ALTER TABLE {name}2 RENAME TO {toupper(name)};")) } expect_true(all(list_tables(con, "main") == toupper(list_tables(con, "main")))) # check that names in cdm are lowercase cdm <- cdm_from_con(con = con, cdm_name = "eunomia", cdm_schema = "main", write_schema = "main") expect_true(all(names(cdm) == tolower(names(cdm)))) DBI::dbDisconnect(con, shutdown = TRUE) }) # TODO add this test back when we have an example cdm with achilles tables # test_that("adding achilles", { # skip_if_not(eunomia_is_available()) # skip_if_not_installed("duckdb") # # con <- DBI::dbConnect(duckdb::duckdb(eunomia_dir())) # # expect_error(cdm_from_con(con = con, # cdm_schema = "main", # write_schema = "main", # achilles_schema = "main")) # # DBI::dbWriteTable( # conn = con, # name = "achilles_analysis", # value = tibble( # analysis_id = 1L, analysis_name = "1", stratum_1_name = "1", # stratum_2_name = "1", stratum_3_name = "1", stratum_4_name = "1", # stratum_5_name = "1", is_default = 1L, category = "1"), # overwrite = TRUE # ) # # DBI::dbWriteTable( # conn = con, # name = "achilles_results", # value = tibble(analysis_id = 1L, # stratum_1 = "a", # stratum_2 = "b", # stratum_3 = "1", # stratum_4 = "5", # stratum_5 = "u", # count_value = 1500L), # overwrite = TRUE # ) # # DBI::dbWriteTable( # conn = con, # name = "achilles_results_dist", # value = tibble( # analysis_id = 1L, count_value = 5L, stratum_1 = "1", stratum_2 = "1", # stratum_3 = "1", stratum_4 = "1", stratum_5 = "1", min_value = 1, max_value = 1, # avg_value = 1, stdev_value = 1, median_value = 1, p10_value = 1, # p25_value = 1, p75_value = 1, p90_value = 1), # overwrite = TRUE # ) # # cdm <- cdm_from_con(con = con, # cdm_schema = "main", # write_schema = "main", # achilles_schema = "main") # # expect_true(cdm$achilles_analysis %>% dplyr::pull("analysis_name") == 1) # expect_true(cdm$achilles_results %>% dplyr::pull("stratum_1") == "a") # expect_true(cdm$achilles_results_dist %>% dplyr::pull("count_value") == 5) # # # we should also be able to add achilles tables manually if in db # cdm <- cdm_from_con( # con = con, cdm_name = "eunomia", cdm_schema = "main", write_schema = "main" # ) # cdm$achilles_analysis <- dplyr::tbl(con, "achilles_analysis") # # but should not work if tables are not in db (as cdm is db side) # expect_error( # cdm$achilles_analysis <- dplyr::tibble( # analysis_id = 1, analysis_name = 1, stratum_1_name = 1, # stratum_2_name = 1, stratum_3_name = 1, stratum_4_name = 1, # stratum_5_name = 1, is_default = 1, category = 1 # ) # ) # # if local tables, insert table would take care of this # # achilles_analysis_tibble <- dplyr::tibble( # analysis_id = 1, analysis_name = 1, stratum_1_name = 1, # stratum_2_name = 1, stratum_3_name = 1, stratum_4_name = 1, # stratum_5_name = 1, is_default = 1, category = 1 # ) # cdm <- omopgenerics::insertTable(cdm = cdm, # table = achilles_analysis_tibble, # name = "achilles_analysis", # overwrite = TRUE) # # cdm$achilles_analysis <- dplyr::tbl(con, "achilles_analysis") # # # but should not work if tables are not in db (as cdm is db side) # expect_error( # cdm$achilles_analysis <- dplyr::tibble( # analysis_id = 1, analysis_name = 1, stratum_1_name = 1, # stratum_2_name = 1, stratum_3_name = 1, stratum_4_name = 1, # stratum_5_name = 1, is_default = 1, category = 1 # )) # # # if local tables, insert table would take care of this # achilles_analysis_tibble <- dplyr::tibble( # analysis_id = 1, analysis_name = 1, stratum_1_name = 1, # stratum_2_name = 1, stratum_3_name = 1, stratum_4_name = 1, # stratum_5_name = 1, is_default = 1, category = 1 # ) # # cdm <- omopgenerics::insertTable(cdm = cdm, # table = achilles_analysis_tibble, # name = "achilles_analysis", # overwrite = TRUE) # # DBI::dbDisconnect(con, shutdown = TRUE) # }) test_that("adding cohort tables to the cdm", { skip_if_not(eunomia_is_available()) skip_if_not_installed("duckdb") con <- DBI::dbConnect(duckdb::duckdb(), dbdir = eunomia_dir()) cohorts <- data.frame( cohortId = c(1, 2, 3), cohortName = c("X", "A", "B"), type = c("target", "event", "event") ) cohort_table <- dplyr::tribble( ~cohort_definition_id, ~subject_id, ~cohort_start_date, ~cohort_end_date, 1L, 5L, as.Date("2020-01-01"), as.Date("2020-01-01"), 2L, 5L, as.Date("2020-01-10"), as.Date("2020-03-10") ) dplyr::copy_to(dest = con, df = cohort_table, name = "test_cohort_table", overwrite = TRUE) expect_equal(unname(sapply(DBI::dbReadTable(con, "test_cohort_table"), class)), c("integer", "integer", "Date", "Date")) expect_error(cdmFromCon(con, cdmSchema = "main", writeSchema = c(schema = "main"), cohortTables = "test_cohort_table", .softValidation = FALSE)) # error because cohorts out of obs expect_no_error(cdmFromCon(con, cdmSchema = "main", writeSchema = c(schema = "main"), cohortTables = "test_cohort_table", .softValidation = TRUE)) # passes without validation DBI::dbDisconnect(con, shutdown = TRUE) }) test_that("write_schema argument specification and cdm_disconnect works", { skip_if_not_installed("duckdb") con <- DBI::dbConnect(duckdb::duckdb(), eunomia_dir()) cdm <- cdm_from_con(con, "main", "main", write_prefix = "tmp_") expect_equal(attr(cdm, "write_schema"), c(schema = "main", prefix = "tmp_")) cdm_disconnect(cdm) }) test_that("schema specification with . works", { skip("manual test") con <- DBI::dbConnect(odbc::odbc(), SERVER = Sys.getenv("SNOWFLAKE_SERVER"), UID = Sys.getenv("SNOWFLAKE_USER"), PWD = Sys.getenv("SNOWFLAKE_PASSWORD"), DATABASE = Sys.getenv("SNOWFLAKE_DATABASE"), WAREHOUSE = Sys.getenv("SNOWFLAKE_WAREHOUSE"), DRIVER = Sys.getenv("SNOWFLAKE_DRIVER")) cdm_schema <- Sys.getenv("SNOWFLAKE_CDM_SCHEMA") write_schema <- Sys.getenv("SNOWFLAKE_SCRATCH_SCHEMA") cdm <- cdm_from_con(con, cdm_schema, write_schema, write_prefix = "tmp_", cdm_name = "test") write_schema_split <- stringr::str_split(write_schema, "\\.")[[1]] %>% purrr::set_names("catalog", "schema") %>% c(., prefix = "tmp_") cdm_schema_split <- stringr::str_split(cdm_schema, "\\.")[[1]] %>% purrr::set_names("catalog", "schema") expect_equal(attr(cdm, "write_schema"), write_schema_split) expect_equal(attr(cdm, "cdm_schema"), cdm_schema_split) DBI::dbDisconnect(con) })