# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. skip_on_cran() skip_on_os("windows") skip_if_not_installed("dbplyr") skip_if_not_installed("dplyr") skip_if_not_installed("arrow", "5.0.0") # Skip if parquet is not a capability as an indicator that Arrow is fully installed. skip_if_not(arrow::arrow_with_parquet(), message = "The installed Arrow is not fully featured, skipping Arrow integration tests") library(arrow, warn.conflicts = FALSE) library(dplyr, warn.conflicts = FALSE) library(duckdb) library("DBI") example_data <- dplyr::tibble( int = c(1:3, NA_integer_, 5:10), dbl = c(1:8, NA, 10) + .1, dbl2 = rep(5, 10), lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), false = logical(10), chr = letters[c(1:5, NA, 7:10)], fct = factor(letters[c(1:4, NA, NA, 7:10)]) ) test_that("to_duckdb", { ds <- InMemoryDataset$create(example_data) con <- dbConnect(duckdb()) on.exit(dbDisconnect(con, shutdown = TRUE)) dbExecute(con, "PRAGMA threads=1") expect_equal( ds %>% to_duckdb(con = con) %>% collect() %>% # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(!fct) %>% arrange(int), example_data %>% select(!fct) %>% arrange(int) ) df1 <- ds %>% select(int, lgl, dbl) %>% to_duckdb(con = con) %>% group_by(lgl) %>% summarise(sum_int = sum(int, na.rm = TRUE)) %>% collect() %>% arrange(lgl, sum_int) df2 <- example_data %>% select(int, lgl, dbl) %>% group_by(lgl) %>% summarise(sum_int = sum(int, na.rm = TRUE)) %>% arrange(lgl, sum_int) # can group_by before the to_duckdb df1 <- ds %>% select(int, lgl, dbl) %>% group_by(lgl) %>% to_duckdb(con = con) %>% summarise(sum_int = sum(int, na.rm = TRUE)) %>% collect() %>% arrange(lgl, sum_int) df2 <- example_data %>% select(int, lgl, dbl) %>% group_by(lgl) %>% summarise(sum_int = sum(int, na.rm = TRUE)) %>% arrange(lgl, sum_int) }) test_that("to_duckdb then to_arrow", { ds <- InMemoryDataset$create(example_data) ds_rt <- ds %>% to_duckdb() %>% # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(-fct) %>% to_arrow() expect_identical( collect(ds_rt), ds %>% select(-fct) %>% collect() ) # And we can continue the pipeline ds_rt <- ds %>% to_duckdb() %>% # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(-fct) %>% to_arrow() %>% filter(int > 5) expect_identical( ds_rt %>% collect() %>% arrange(int), ds %>% select(-fct) %>% filter(int > 5) %>% collect() %>% arrange(int) ) # Now check errors ds_rt <- ds %>% to_duckdb() %>% # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(-fct) # alter the class of ds_rt's connection to simulate some other database class(ds_rt$src$con) <- "some_other_connection" expect_error( to_arrow(ds_rt), "to_arrow\\(\\) currently only supports Arrow tables, Arrow datasets," ) }) test_that("to_arrow roundtrip, with dataset", { # these will continue to error until 0.3.2 is released # https://github.com/duckdb/duckdb/pull/2957 skip_if_not_installed("duckdb", minimum_version = "0.3.2") # With a multi-part dataset tf <- tempfile() new_ds <- rbind( cbind(example_data, part = 1), cbind(example_data, part = 2), cbind(mutate(example_data, dbl = dbl * 3, dbl2 = dbl2 * 3), part = 3), cbind(mutate(example_data, dbl = dbl * 4, dbl2 = dbl2 * 4), part = 4) ) write_dataset(new_ds, tf, partitioning = "part") ds <- open_dataset(tf) expect_identical( ds %>% to_duckdb() %>% select(-fct) %>% mutate(dbl_plus = dbl + 1) %>% to_arrow() %>% filter(int > 5 & part > 1) %>% collect() %>% arrange(part, int) %>% as.data.frame(), ds %>% select(-fct) %>% filter(int > 5 & part > 1) %>% mutate(dbl_plus = dbl + 1) %>% collect() %>% arrange(part, int) ) }) # test_that("to_arrow roundtrip, with dataset (without wrapping", { # # these will continue to error until 0.3.2 is released # # https://github.com/duckdb/duckdb/pull/2957 # skip_if_not_installed("duckdb", minimum_version = "0.3.2") # # With a multi-part dataset # tf <- tempfile() # new_ds <- rbind( # cbind(example_data, part = 1), # cbind(example_data, part = 2), # cbind(mutate(example_data, dbl = dbl * 3, dbl2 = dbl2 * 3), part = 3), # cbind(mutate(example_data, dbl = dbl * 4, dbl2 = dbl2 * 4), part = 4) # ) # write_dataset(new_ds, tf, partitioning = "part") # out <- ds %>% # to_duckdb() %>% # select(-fct) %>% # mutate(dbl_plus = dbl + 1) %>% # to_arrow(as_arrow_query = FALSE) # expect_r6_class(out, "RecordBatchReader") # }) # The next set of tests use an already-extant connection to test features of # persistence and querying against the table without using the `tbl` itself, so # we need to create a connection separate from the ephemeral one that is made # with arrow_duck_connection() con <- dbConnect(duckdb()) dbExecute(con, "PRAGMA threads=1") on.exit(dbDisconnect(con, shutdown = TRUE), add = TRUE) test_that("Joining, auto-cleanup enabled", { ds <- InMemoryDataset$create(example_data) table_one_name <- "my_arrow_table_1" table_one <- to_duckdb(ds, con = con, table_name = table_one_name) table_two_name <- "my_arrow_table_2" table_two <- to_duckdb(ds, con = con, table_name = table_two_name) res <- dbGetQuery( con, paste0( "SELECT * FROM ", table_one_name, " INNER JOIN ", table_two_name, " ON ", table_one_name, ".int = ", table_two_name, ".int" ) ) expect_identical(dim(res), c(9L, 14L)) # clean up cleans up the tables expect_true(all(c(table_one_name, table_two_name) %in% duckdb_list_arrow(con))) rm(table_one, table_two) gc() expect_false(any(c(table_one_name, table_two_name) %in% duckdb_list_arrow(con))) }) test_that("Joining, auto-cleanup disabled", { ds <- InMemoryDataset$create(example_data) table_three_name <- "my_arrow_table_3" table_three <- to_duckdb(ds, con = con, table_name = table_three_name, auto_disconnect = FALSE) # clean up does *not* clean these tables expect_true(table_three_name %in% duckdb_list_arrow(con)) rm(table_three) gc() # but because we aren't auto_disconnecting then we still have this table. expect_true(table_three_name %in% duckdb_list_arrow(con)) }) test_that("to_duckdb with a table", { tab <- Table$create(example_data) expect_identical( tab %>% to_duckdb() %>% group_by(int > 4) %>% summarise( int_mean = mean(int, na.rm = TRUE), dbl_mean = mean(dbl, na.rm = TRUE) ) %>% collect(), dplyr::tibble( "int > 4" = c(FALSE, NA, TRUE), int_mean = c(2, NA, 7.5), dbl_mean = c(2.1, 4.1, 7.3) ) ) }) test_that("to_duckdb passing a connection", { ds <- InMemoryDataset$create(example_data) con_separate <- dbConnect(duckdb()) # we always want to test in parallel dbExecute(con_separate, "PRAGMA threads=2") on.exit(dbDisconnect(con_separate, shutdown = TRUE), add = TRUE) # create a table to join to that we know is in our con_separate new_df <- data.frame( int = 1:10, char = letters[26:17], stringsAsFactors = FALSE ) DBI::dbWriteTable(con_separate, "separate_join_table", new_df) table_four <- ds %>% select(int, lgl, dbl) %>% to_duckdb(con = con_separate, auto_disconnect = FALSE) table_four_name <- dbplyr::remote_name(table_four) result <- DBI::dbGetQuery( con_separate, paste0( "SELECT * FROM ", table_four_name, " INNER JOIN separate_join_table ", "ON separate_join_table.int = ", table_four_name, ".int" ) ) expect_identical(dim(result), c(9L, 5L)) expect_identical(result$char, new_df[new_df$int != 4, ]$char) })