# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. skip_if_not_available("parquet") pq_file <- system.file("v0.7.1.parquet", package = "arrow") test_that("reading a known Parquet file to tibble", { skip_if_not_available("snappy") df <- read_parquet(pq_file) expect_true(tibble::is_tibble(df)) expect_identical(dim(df), c(10L, 11L)) # TODO: assert more about the contents }) test_that("simple int column roundtrip", { df <- tibble::tibble(x = 1:5) pq_tmp_file <- tempfile() # You can specify the .parquet here but that's probably not necessary write_parquet(df, pq_tmp_file) df_read <- read_parquet(pq_tmp_file, mmap = FALSE) expect_equal(df, df_read) # Make sure file connection is cleaned up expect_error(file.remove(pq_tmp_file), NA) expect_false(file.exists(pq_tmp_file)) }) test_that("read_parquet() supports col_select", { skip_if_not_available("snappy") df <- read_parquet(pq_file, col_select = c(x, y, z)) expect_named(df, c("x", "y", "z")) df <- read_parquet(pq_file, col_select = starts_with("c")) expect_named(df, c("carat", "cut", "color", "clarity")) }) test_that("read_parquet() with raw data", { skip_if_not_available("snappy") test_raw <- readBin(pq_file, what = "raw", n = 5000) df <- read_parquet(test_raw) expect_identical(dim(df), c(10L, 11L)) }) test_that("write_parquet() handles various compression= specs", { skip_if_not_available("snappy") tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5) expect_parquet_roundtrip(tab, compression = "snappy") expect_parquet_roundtrip(tab, compression = rep("snappy", 3L)) expect_parquet_roundtrip(tab, compression = c(x1 = "snappy", x2 = "snappy")) }) test_that("write_parquet() handles various compression_level= specs", { skip_if_not_available("gzip") tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5) expect_parquet_roundtrip(tab, compression = "gzip", compression_level = 4) expect_parquet_roundtrip(tab, compression = "gzip", compression_level = rep(4L, 3L)) expect_parquet_roundtrip(tab, compression = "gzip", compression_level = c(x1 = 5L, x2 = 3L)) }) test_that("write_parquet() handles various use_dictionary= specs", { tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5) expect_parquet_roundtrip(tab, use_dictionary = TRUE) expect_parquet_roundtrip(tab, use_dictionary = c(TRUE, FALSE, TRUE)) expect_parquet_roundtrip(tab, use_dictionary = c(x1 = TRUE, x2 = TRUE)) expect_error( write_parquet(tab, tempfile(), use_dictionary = c(TRUE, FALSE)), "unsupported use_dictionary= specification" ) expect_error( write_parquet(tab, tempfile(), use_dictionary = 12), "is.logical(use_dictionary) is not TRUE", fixed = TRUE ) }) test_that("write_parquet() handles various write_statistics= specs", { tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5) expect_parquet_roundtrip(tab, write_statistics = TRUE) expect_parquet_roundtrip(tab, write_statistics = c(TRUE, FALSE, TRUE)) expect_parquet_roundtrip(tab, write_statistics = c(x1 = TRUE, x2 = TRUE)) }) test_that("write_parquet() accepts RecordBatch too", { batch <- RecordBatch$create(x1 = 1:5, x2 = 1:5, y = 1:5) tab <- parquet_roundtrip(batch) expect_equal(tab, Table$create(batch)) }) test_that("write_parquet() handles grouped_df", { library(dplyr, warn.conflicts = FALSE) df <- tibble::tibble(a = 1:4, b = 5) %>% group_by(b) # Since `df` is a "grouped_df", this test asserts that we get a grouped_df back expect_parquet_roundtrip(df, as_data_frame = TRUE) }) test_that("write_parquet() with invalid input type", { bad_input <- Array$create(1:5) expect_snapshot_error(write_parquet(bad_input, tempfile())) }) test_that("write_parquet() can truncate timestamps", { tab <- Table$create(x1 = as.POSIXct("2020/06/03 18:00:00", tz = "UTC")) expect_type_equal(tab$x1, timestamp("us", "UTC")) tf <- tempfile() on.exit(unlink(tf)) write_parquet(tab, tf, coerce_timestamps = "ms", allow_truncated_timestamps = TRUE) new <- read_parquet(tf, as_data_frame = FALSE) expect_type_equal(new$x1, timestamp("ms", "UTC")) expect_equal(as.data.frame(tab), as.data.frame(new)) }) test_that("make_valid_parquet_version()", { expect_equal( make_valid_parquet_version("1.0"), ParquetVersionType$PARQUET_1_0 ) expect_deprecated( expect_equal( make_valid_parquet_version("2.0"), ParquetVersionType$PARQUET_2_0 ) ) expect_equal( make_valid_parquet_version("2.4"), ParquetVersionType$PARQUET_2_4 ) expect_equal( make_valid_parquet_version("2.6"), ParquetVersionType$PARQUET_2_6 ) expect_equal( make_valid_parquet_version("latest"), ParquetVersionType$PARQUET_2_6 ) expect_equal(make_valid_parquet_version(1), ParquetVersionType$PARQUET_1_0) expect_deprecated( expect_equal(make_valid_parquet_version(2), ParquetVersionType$PARQUET_2_0) ) expect_equal(make_valid_parquet_version(1.0), ParquetVersionType$PARQUET_1_0) expect_equal(make_valid_parquet_version(2.4), ParquetVersionType$PARQUET_2_4) }) test_that("make_valid_parquet_version() input validation", { expect_error( make_valid_parquet_version("0.3.14"), "`version` must be one of" ) expect_error( make_valid_parquet_version(NULL), "`version` must be one of" ) expect_error( make_valid_parquet_version(c("2", "4")), "`version` must be one of" ) }) test_that("write_parquet() defaults to snappy compression", { skip_if_not_available("snappy") tmp1 <- tempfile() tmp2 <- tempfile() write_parquet(mtcars, tmp1) write_parquet(mtcars, tmp2, compression = "snappy") expect_equal(file.size(tmp1), file.size(tmp2)) }) test_that("write_parquet() does not detect compression from filename", { # TODO(ARROW-17221): should this be supported? without <- tempfile(fileext = ".parquet") with_gz <- tempfile(fileext = ".parquet.gz") write_parquet(mtcars, without) write_parquet(mtcars, with_gz) expect_equal(file.size(with_gz), file.size(without)) }) test_that("read_parquet() handles (ignores) compression in filename", { df <- tibble::tibble(x = 1:5) f <- tempfile(fileext = ".parquet.gz") write_parquet(df, f) expect_equal(read_parquet(f), df) }) test_that("Factors are preserved when writing/reading from Parquet", { fct <- factor(c("a", "b"), levels = c("c", "a", "b")) ord <- factor(c("a", "b"), levels = c("c", "a", "b"), ordered = TRUE) chr <- c("a", "b") df <- tibble::tibble(fct = fct, ord = ord, chr = chr) pq_tmp_file <- tempfile() on.exit(unlink(pq_tmp_file)) write_parquet(df, pq_tmp_file) df_read <- read_parquet(pq_tmp_file) expect_equal(df, df_read) }) test_that("Lists are preserved when writing/reading from Parquet", { bool <- list(logical(0), NA, c(TRUE, FALSE)) int <- list(integer(0), NA_integer_, 1:4) num <- list(numeric(0), NA_real_, c(1, 2)) char <- list(character(0), NA_character_, c("itsy", "bitsy")) df <- tibble::tibble(bool = bool, int = int, num = num, char = char) pq_tmp_file <- tempfile() on.exit(unlink(pq_tmp_file)) write_parquet(df, pq_tmp_file) df_read <- read_parquet(pq_tmp_file) expect_equal(df, df_read, ignore_attr = TRUE) }) test_that("Maps are preserved when writing/reading from Parquet", { string_bool <- Array$create(list(data.frame(key = c("a", "b"), value = c(TRUE, FALSE), stringsAsFactors = FALSE)), type = map_of(utf8(), boolean()) ) int_struct <- Array$create( list(tibble::tibble(key = c(2, 4), value = data.frame(x = c(1, 2), y = c("a", "b"), stringsAsFactors = FALSE))), type = map_of(int64(), struct(x = int64(), y = utf8())) ) df <- arrow_table(string_bool = string_bool, int_struct = int_struct) pq_tmp_file <- tempfile() on.exit(unlink(pq_tmp_file)) write_parquet(df, pq_tmp_file) df_read <- read_parquet(pq_tmp_file, as_data_frame = FALSE) expect_equal(df, df_read, ignore_attr = TRUE) }) test_that("read_parquet() and write_parquet() accept connection objects", { skip_if_not_available("snappy") tf <- tempfile() on.exit(unlink(tf)) # make this big enough that we might expose concurrency problems, # but not so big that it slows down the tests test_tbl <- tibble::tibble( x = 1:1e4, y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) ) write_parquet(test_tbl, file(tf)) expect_identical(read_parquet(tf), test_tbl) expect_identical(read_parquet(file(tf)), read_parquet(tf)) }) test_that("write_parquet() to stream", { df <- tibble::tibble(x = 1:5) tf <- tempfile() con <- FileOutputStream$create(tf) on.exit(unlink(tf)) write_parquet(df, con) con$close() expect_equal(read_parquet(tf), df) }) test_that("write_parquet() returns its input", { df <- tibble::tibble(x = 1:5) tf <- tempfile() on.exit(unlink(tf)) df_out <- write_parquet(df, tf) expect_equal(df, df_out) }) test_that("write_parquet() handles version argument", { df <- tibble::tibble(x = 1:5) versions <- list("1.0", "2.4", "2.6", "latest", 1.0, 2.4, 2.6, 1L) purrr::walk(versions, function(x) { tf <- tempfile() on.exit(unlink(tf)) write_parquet(df, tf, version = x) expect_identical(read_parquet(tf), df) }) invalid_versions <- list("3.0", 3.0, 3L, "A") purrr::walk(invalid_versions, function(x) { tf <- tempfile() on.exit(unlink(tf)) expect_error(write_parquet(df, tf, version = x)) }) }) test_that("ParquetFileReader raises an error for non-RandomAccessFile source", { skip_if_not_available("gzip") expect_error( ParquetFileReader$create(CompressedInputStream$create(pq_file)), 'file must be a "RandomAccessFile"' ) }) test_that("ParquetFileWriter raises an error for non-OutputStream sink", { sch <- schema(a = float32()) # ARROW-9946 expect_error( ParquetFileWriter$create(schema = sch, sink = tempfile()), regexp = "OutputStream" ) }) test_that("ParquetFileReader $ReadRowGroup(s) methods", { tab <- Table$create(x = 1:100) tf <- tempfile() on.exit(unlink(tf)) write_parquet(tab, tf, chunk_size = 10) reader <- ParquetFileReader$create(tf) expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:10)) expect_true(reader$ReadRowGroup(9) == Table$create(x = 91:100)) expect_error(reader$ReadRowGroup(-1), "Some index in row_group_indices") expect_error(reader$ReadRowGroup(111), "Some index in row_group_indices") expect_error(reader$ReadRowGroup(c(1, 2))) expect_error(reader$ReadRowGroup("a")) expect_true(reader$ReadRowGroups(c(0, 1)) == Table$create(x = 1:20)) expect_error(reader$ReadRowGroups(c(0, 1, -2))) # although it gives a weird error expect_error(reader$ReadRowGroups(c(0, 1, 31))) # ^^ expect_error(reader$ReadRowGroups(c("a", "b"))) ## -- with column_indices expect_true(reader$ReadRowGroup(0, 0) == Table$create(x = 1:10)) expect_error(reader$ReadRowGroup(0, 1)) expect_true(reader$ReadRowGroups(c(0, 1), 0) == Table$create(x = 1:20)) expect_error(reader$ReadRowGroups(c(0, 1), 1)) }) test_that("Error messages are shown when the compression algorithm snappy is not found", { msg <- paste0( ".*", "you will need to reinstall arrow with additional features enabled.\nSet one of these ", "environment variables before installing:", "\n\n \\* Sys\\.setenv\\(LIBARROW_MINIMAL = \"false\"\\) ", "\\(for all optional features, including 'snappy'\\)", "\n \\* Sys\\.setenv\\(ARROW_WITH_SNAPPY = \"ON\"\\) \\(for just 'snappy')\n\n", "See https://arrow.apache.org/docs/r/articles/install.html for details" ) if (codec_is_available("snappy")) { d <- read_parquet(pq_file) expect_s3_class(d, "data.frame") } else { expect_error(read_parquet(pq_file), msg) } }) test_that("Error is created when parquet reads a feather file", { expect_error( read_parquet(test_path("golden-files/data-arrow_2.0.0_lz4.feather")), "Parquet magic bytes not found in footer" ) }) test_that("ParquetFileWrite chunk_size defaults", { tab <- Table$create(x = 1:101) tf <- tempfile() on.exit(unlink(tf)) # we can alter our default cells per group withr::with_options( list( arrow.parquet_cells_per_group = 25 ), { # this will be 4 chunks write_parquet(tab, tf) reader <- ParquetFileReader$create(tf) expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:26)) expect_true(reader$ReadRowGroup(3) == Table$create(x = 79:101)) expect_error(reader$ReadRowGroup(4), "Some index in row_group_indices") } ) # but we always have no more than max_chunks (even if cells_per_group is low!) # use a new tempfile so that windows doesn't complain about the file being over-written tf <- tempfile() on.exit(unlink(tf)) withr::with_options( list( arrow.parquet_cells_per_group = 25, arrow.parquet_max_chunks = 2 ), { # this will be 4 chunks write_parquet(tab, tf) reader <- ParquetFileReader$create(tf) expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:51)) expect_true(reader$ReadRowGroup(1) == Table$create(x = 52:101)) expect_error(reader$ReadRowGroup(2), "Some index in row_group_indices") } ) }) test_that("ParquetFileWrite chunk_size calculation doesn't have integer overflow issues (ARROW-14894)", { expect_equal(calculate_chunk_size(31869547, 108, 2.5e8, 200), 2451504) # we can set the target cells per group, and it rounds appropriately expect_equal(calculate_chunk_size(100, 1, 25), 25) expect_equal(calculate_chunk_size(101, 1, 25), 26) # but our max_chunks is respected expect_equal(calculate_chunk_size(101, 1, 25, 2), 51) }) test_that("deprecated int96 timestamp unit can be specified when reading Parquet files", { tf <- tempfile() on.exit(unlink(tf)) table <- Table$create( some_datetime = as.POSIXct("2001-01-01 12:34:56.789") ) write_parquet( table, tf, use_deprecated_int96_timestamps = TRUE ) props <- ParquetArrowReaderProperties$create() props$set_coerce_int96_timestamp_unit(TimeUnit$MILLI) expect_identical(props$coerce_int96_timestamp_unit(), TimeUnit$MILLI) result <- read_parquet( tf, as_data_frame = FALSE, props = props ) expect_identical(result$some_datetime$type$unit(), TimeUnit$MILLI) expect_equal(result$some_datetime, table$some_datetime$cast(result$some_datetime$type)) }) test_that("Can read parquet with nested lists and maps", { # Construct the path to the parquet-testing submodule. This will search: # * $ARROW_SOURCE_HOME/cpp/submodules/parquet-testing/data # * ../cpp/submodules/parquet-testing/data # ARROW_SOURCE_HOME is set in many of our CI setups, so that will find the files # the .. version should catch some (thought not all) ways of running tests locally base_path <- Sys.getenv("ARROW_SOURCE_HOME", "..") # make this a full path, at the root of the filesystem if we're using ARROW_SOURCE_HOME if (base_path != "..") { base_path <- file.path("", base_path) } parquet_test_data <- file.path(base_path, "cpp", "submodules", "parquet-testing", "data") skip_if_not(dir.exists(parquet_test_data) | force_tests(), "Parquet test data missing") skip_if_not_available("snappy") pq <- read_parquet(paste0(parquet_test_data, "/nested_lists.snappy.parquet"), as_data_frame = FALSE) expect_type_equal(pq$a, list_of(field("element", list_of(field("element", list_of(field("element", utf8()))))))) pq <- read_parquet(paste0(parquet_test_data, "/nested_maps.snappy.parquet"), as_data_frame = FALSE) expect_true(pq$a$type == map_of(utf8(), map_of(int32(), field("value", boolean(), nullable = FALSE)))) }) test_that("Can read Parquet files from a URL", { skip_if_offline() skip_on_cran() skip_if_not_available("snappy") parquet_url <- "https://github.com/apache/arrow/blob/64f2cc7986ce672dd1a8cb268d193617a80a1653/r/inst/v0.7.1.parquet?raw=true" # nolint pu <- read_parquet(parquet_url) expect_true(tibble::is_tibble(pu)) expect_identical(dim(pu), c(10L, 11L)) }) test_that("thrift string and container size can be specified when reading Parquet files", { tf <- tempfile() on.exit(unlink(tf)) table <- arrow_table(example_data) write_parquet(table, tf) file <- make_readable_file(tf) on.exit(file$close()) # thrift string size reader_props <- ParquetReaderProperties$create() reader_props$set_thrift_string_size_limit(1) expect_identical(reader_props$thrift_string_size_limit(), 1L) # We get an error if we set the Thrift string size limit too small expect_error(ParquetFileReader$create(file, reader_props = reader_props), "TProtocolException: Exceeded size limit") # Increase the size and we can read successfully reader_props$set_thrift_string_size_limit(10000) reader <- ParquetFileReader$create(file, reader_props = reader_props) data <- reader$ReadTable() expect_identical(collect.ArrowTabular(data), example_data) # thrift container size reader_props_container <- ParquetReaderProperties$create() reader_props_container$set_thrift_container_size_limit(1) expect_identical(reader_props_container$thrift_container_size_limit(), 1L) expect_error( ParquetFileReader$create(file, reader_props = reader_props_container), "TProtocolException: Exceeded size limit" ) reader_props_container$set_thrift_container_size_limit(100) reader_container <- ParquetFileReader$create(file, reader_props = reader_props_container) data <- reader_container$ReadTable() expect_identical(collect.ArrowTabular(data), example_data) }) test_that("We can use WriteBatch on ParquetFileWriter", { tf <- tempfile() on.exit(unlink(tf)) sink <- FileOutputStream$create(tf) sch <- schema(a = int32()) props <- ParquetWriterProperties$create(column_names = names(sch)) writer <- ParquetFileWriter$create(schema = sch, sink = sink, properties = props) batch <- RecordBatch$create(data.frame(a = 1:10)) writer$WriteBatch(batch, chunk_size = 10) writer$WriteBatch(batch, chunk_size = 10) writer$WriteBatch(batch, chunk_size = 10) writer$Close() tbl <- read_parquet(tf) expect_equal(nrow(tbl), 30) }) test_that("WriteBatch on ParquetFileWriter errors when called on closed sink", { sink <- FileOutputStream$create(tempfile()) sch <- schema(a = int32()) props <- ParquetWriterProperties$create(column_names = names(sch)) writer <- ParquetFileWriter$create(schema = sch, sink = sink, properties = props) writer$Close() batch <- RecordBatch$create(data.frame(a = 1:10)) expect_error(writer$WriteBatch(batch, chunk_size = 10), "Operation on closed file") })