diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3be4b35c9eb..22986843acf 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -538,6 +538,7 @@ add_library( src/io/parquet/experimental/hybrid_scan_impl.cpp src/io/parquet/experimental/hybrid_scan_preprocess.cu src/io/parquet/experimental/page_index_filter.cu + src/io/parquet/experimental/page_index_filter_utils.cu src/io/parquet/page_data.cu src/io/parquet/chunk_dict.cu src/io/parquet/page_enc.cu diff --git a/cpp/examples/hybrid_scan_io/common_utils.cpp b/cpp/examples/hybrid_scan_io/common_utils.cpp index 6d70b1b8044..d7421656abb 100644 --- a/cpp/examples/hybrid_scan_io/common_utils.cpp +++ b/cpp/examples/hybrid_scan_io/common_utils.cpp @@ -70,13 +70,24 @@ void check_tables_equal(cudf::table_view const& lhs_table, cudf::filtered_join join_obj( lhs_table, cudf::null_equality::EQUAL, cudf::set_as_build_table::RIGHT, stream); auto const indices = join_obj.anti_join(rhs_table, stream); - // No exception thrown, check indices - auto const valid = indices->size() == 0; - std::cout << "Tables identical: " << std::boolalpha << valid << "\n\n"; + auto const tables_equal = indices->size() == 0; + if (tables_equal) { + std::cout << "Tables identical: " << std::boolalpha << tables_equal << "\n\n"; + } else { + // Helper to write parquet data for inspection + auto const write_parquet = + [](cudf::table_view table, std::string filepath, rmm::cuda_stream_view stream) { + auto sink_info = cudf::io::sink_info(filepath); + auto opts = cudf::io::parquet_writer_options::builder(sink_info, table).build(); + cudf::io::write_parquet(opts, stream); + }; + write_parquet(lhs_table, "lhs_table.parquet", stream); + write_parquet(rhs_table, "rhs_table.parquet", stream); + throw std::logic_error("Tables identical: false\n\n"); + } } catch (std::exception& e) { - std::cerr << e.what() << std::endl << std::endl; - throw std::runtime_error("Tables identical: false\n\n"); + std::cout << e.what() << std::endl; } } diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu b/cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu index 6abf03222b9..9b5bcd8ca0e 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu +++ b/cpp/src/io/parquet/experimental/hybrid_scan_chunking.cu @@ -32,8 +32,8 @@ using parquet::detail::pass_intermediate_data; void hybrid_scan_reader_impl::handle_chunking( read_mode mode, - std::vector column_chunk_buffers, - cudf::host_span const> data_page_mask) + std::vector&& column_chunk_buffers, + cudf::host_span data_page_mask) { // if this is our first time in here, setup the first pass. if (!_pass_itm_data) { @@ -77,7 +77,8 @@ void hybrid_scan_reader_impl::handle_chunking( setup_next_subpass(mode); } -void hybrid_scan_reader_impl::setup_next_pass(std::vector column_chunk_buffers) +void hybrid_scan_reader_impl::setup_next_pass( + std::vector&& column_chunk_buffers) { auto const num_passes = _file_itm_data.num_passes(); CUDF_EXPECTS(num_passes == 1, diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp index 5a61d08de1c..4eefa293449 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_helpers.hpp @@ -270,17 +270,21 @@ class aggregate_reader_metadata : public aggregate_reader_metadata_base { * Compute a vector of boolean vectors indicating which data pages need to be decoded to * construct each input column based on the row mask, one vector per column * + * @tparam ColumnView Type of the row mask column view - cudf::mutable_column_view for filter + * columns and cudf::column_view for payload columns + * * @param row_mask Boolean column indicating which rows need to be read after page-pruning * @param row_group_indices Input row groups indices * @param input_columns Input column information * @param row_mask_offset Offset into the row mask column for the current pass * @param stream CUDA stream used for device memory operations and kernel launches * - * @return A vector of boolean vectors indicating which data pages need to be decoded to produce - * the output table based on the input row mask, one per input column + * @return Boolean vector indicating which data pages need to be decoded to produce + * the output table based on the input row mask across all input columns */ - [[nodiscard]] std::vector> compute_data_page_mask( - cudf::column_view row_mask, + template + [[nodiscard]] cudf::detail::host_vector compute_data_page_mask( + ColumnView const& row_mask, cudf::host_span const> row_group_indices, cudf::host_span input_columns, cudf::size_type row_mask_offset, diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp b/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp index 6c665c45c3c..8e06535b9f0 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp @@ -444,7 +444,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_filter_columns( (mask_data_pages == use_data_page_mask::YES) ? _extended_metadata->compute_data_page_mask( row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream) - : std::vector>{}; + : cudf::detail::make_empty_host_vector(0, stream); prepare_data( read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask); @@ -474,7 +474,7 @@ table_with_metadata hybrid_scan_reader_impl::materialize_payload_columns( (mask_data_pages == use_data_page_mask::YES) ? _extended_metadata->compute_data_page_mask( row_mask, row_group_indices, _input_columns, _rows_processed_so_far, stream) - : std::vector>{}; + : cudf::detail::make_empty_host_vector(0, stream); prepare_data( read_mode::READ_ALL, row_group_indices, std::move(column_chunk_buffers), data_page_mask); @@ -513,7 +513,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_filter_columns( (mask_data_pages == use_data_page_mask::YES) ? _extended_metadata->compute_data_page_mask( row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream) - : std::vector>{}; + : cudf::detail::make_empty_host_vector(0, _stream); prepare_data( read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask); @@ -564,7 +564,7 @@ void hybrid_scan_reader_impl::setup_chunking_for_payload_columns( (mask_data_pages == use_data_page_mask::YES) ? _extended_metadata->compute_data_page_mask( row_mask, row_group_indices, _input_columns, _rows_processed_so_far, _stream) - : std::vector>{}; + : cudf::detail::make_empty_host_vector(0, _stream); prepare_data( read_mode::CHUNKED_READ, row_group_indices, std::move(column_chunk_buffers), data_page_mask); @@ -645,7 +645,7 @@ void hybrid_scan_reader_impl::prepare_data( read_mode mode, cudf::host_span const> row_group_indices, std::vector&& column_chunk_buffers, - cudf::host_span const> data_page_mask) + cudf::host_span data_page_mask) { // if we have not preprocessed at the whole-file level, do that now if (not _file_preprocessed) { @@ -857,8 +857,7 @@ table_with_metadata hybrid_scan_reader_impl::finalize_output( } } -void hybrid_scan_reader_impl::set_pass_page_mask( - cudf::host_span const> data_page_mask) +void hybrid_scan_reader_impl::set_pass_page_mask(cudf::host_span data_page_mask) { auto const& pass = _pass_itm_data; auto const& chunks = pass->chunks; @@ -872,13 +871,11 @@ void hybrid_scan_reader_impl::set_pass_page_mask( return; } + size_t num_inserted_data_pages = 0; std::for_each( thrust::counting_iterator(0), thrust::counting_iterator(_input_columns.size()), [&](auto col_idx) { - auto const& col_page_mask = data_page_mask[col_idx]; - size_t num_inserted_data_pages = 0; - for (size_t chunk_idx = col_idx; chunk_idx < chunks.size(); chunk_idx += num_columns) { // Insert a true value for each dictionary page if (chunks[chunk_idx].num_dict_pages > 0) { _pass_page_mask.push_back(true); } @@ -888,21 +885,17 @@ void hybrid_scan_reader_impl::set_pass_page_mask( // Make sure we have enough page mask for this column chunk CUDF_EXPECTS( - col_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk, + data_page_mask.size() >= num_inserted_data_pages + num_data_pages_this_col_chunk, "Encountered invalid data page mask size"); // Insert page mask for this column chunk _pass_page_mask.insert( _pass_page_mask.end(), - col_page_mask.begin() + num_inserted_data_pages, - col_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk); - + data_page_mask.begin() + num_inserted_data_pages, + data_page_mask.begin() + num_inserted_data_pages + num_data_pages_this_col_chunk); // Update the number of inserted data pages num_inserted_data_pages += num_data_pages_this_col_chunk; } - // Make sure we inserted exactly the number of data pages for this column - CUDF_EXPECTS(num_inserted_data_pages == col_page_mask.size(), - "Encountered mismatch in number of data pages and page mask size"); }); // Make sure we inserted exactly the number of pages for this pass diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp b/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp index 820fcdbec62..15540c1851a 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp +++ b/cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp @@ -255,7 +255,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { * * @param data_page_mask Input data page mask from page-pruning step */ - void set_pass_page_mask(cudf::host_span const> data_page_mask); + void set_pass_page_mask(cudf::host_span data_page_mask); /** * @brief Select the columns to be read based on the read mode @@ -285,11 +285,12 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { * @param mode Value indicating if the data sources are read all at once or chunk by chunk * @param row_group_indices Row group indices to read * @param column_chunk_buffers Device buffers containing column chunk data + * @param data_page_mask Input data page mask from page-pruning step */ void prepare_data(read_mode mode, cudf::host_span const> row_group_indices, std::vector&& column_chunk_buffers, - cudf::host_span const> data_page_mask); + cudf::host_span data_page_mask); /** * @brief Create descriptors for filter column chunks and decode dictionary page headers @@ -330,8 +331,8 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { * @param data_page_mask Input data page mask from page-pruning step for the current pass */ void handle_chunking(read_mode mode, - std::vector column_chunk_buffers, - cudf::host_span const> data_page_mask); + std::vector&& column_chunk_buffers, + cudf::host_span data_page_mask); /** * @brief Setup step for the next input read pass. @@ -341,7 +342,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { * * @param column_chunk_buffers Device buffers containing column chunk data */ - void setup_next_pass(std::vector column_chunk_buffers); + void setup_next_pass(std::vector&& column_chunk_buffers); /** * @brief Setup pointers to columns chunks to be processed for this pass. @@ -357,7 +358,7 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl { * * @param column_chunk_buffers Device buffers containing column chunk data */ - void setup_compressed_data(std::vector column_chunk_buffers); + void setup_compressed_data(std::vector&& column_chunk_buffers); /** * @brief Reset the internal state of the reader. diff --git a/cpp/src/io/parquet/experimental/hybrid_scan_preprocess.cu b/cpp/src/io/parquet/experimental/hybrid_scan_preprocess.cu index ecc78870673..c5ac7b0bd88 100644 --- a/cpp/src/io/parquet/experimental/hybrid_scan_preprocess.cu +++ b/cpp/src/io/parquet/experimental/hybrid_scan_preprocess.cu @@ -172,7 +172,7 @@ bool hybrid_scan_reader_impl::setup_column_chunks() } void hybrid_scan_reader_impl::setup_compressed_data( - std::vector column_chunk_buffers) + std::vector&& column_chunk_buffers) { auto& pass = *_pass_itm_data; diff --git a/cpp/src/io/parquet/experimental/page_index_filter.cu b/cpp/src/io/parquet/experimental/page_index_filter.cu index c8890fae7af..51b15cf69a4 100644 --- a/cpp/src/io/parquet/experimental/page_index_filter.cu +++ b/cpp/src/io/parquet/experimental/page_index_filter.cu @@ -4,8 +4,8 @@ */ #include "hybrid_scan_helpers.hpp" -#include "io/parquet/reader_impl_helpers.hpp" #include "io/parquet/stats_filter_helpers.hpp" +#include "page_index_filter_utils.hpp" #include #include @@ -13,15 +13,15 @@ #include #include #include +#include #include +#include #include #include -#include #include #include #include #include -#include #include #include #include @@ -32,14 +32,11 @@ #include #include -#include #include #include -#include #include #include -#include namespace cudf::io::parquet::experimental::detail { @@ -50,189 +47,8 @@ using string_index_pair = parquet::detail::string_index_pair; namespace { /** - * @brief Make a device vector where each row contains the index of the page it belongs to - */ -[[nodiscard]] rmm::device_uvector make_page_indices_async( - cudf::host_span page_row_counts, - cudf::host_span page_row_offsets, - cudf::size_type total_rows, - rmm::cuda_stream_view stream) -{ - auto mr = cudf::get_current_device_resource_ref(); - - // Copy page-level row counts and offsets to device - auto row_counts = cudf::detail::make_device_uvector_async(page_row_counts, stream, mr); - auto row_offsets = cudf::detail::make_device_uvector_async(page_row_offsets, stream, mr); - - // Make a zeroed device vector to store page indices of each row - auto page_indices = - cudf::detail::make_zeroed_device_uvector_async(total_rows, stream, mr); - - // Scatter page indices across the their first row's index - thrust::scatter_if(rmm::exec_policy_nosync(stream), - thrust::counting_iterator(0), - thrust::counting_iterator(row_counts.size()), - row_offsets.begin(), - row_counts.begin(), - page_indices.begin()); - - // Inclusive scan with maximum to replace zeros with the (increasing) page index it belongs to. - // Page indices are scattered at their first row's index. - thrust::inclusive_scan(rmm::exec_policy_nosync(stream), - page_indices.begin(), - page_indices.end(), - page_indices.begin(), - cuda::maximum()); - return page_indices; -} - -/** - * @brief Compute page row counts and page row offsets and column chunk page (count) offsets for a - * given column schema index - */ -[[nodiscard]] auto make_page_row_counts_and_offsets( - cudf::host_span per_file_metadata, - cudf::host_span const> row_group_indices, - size_type schema_idx, - rmm::cuda_stream_view stream) -{ - // Compute total number of row groups - auto const total_row_groups = - std::accumulate(row_group_indices.begin(), - row_group_indices.end(), - size_t{0}, - [](auto sum, auto const& rg_indices) { return sum + rg_indices.size(); }); - - // Vector to store how many rows are present in each page - set initial capacity to two data pages - // per row group - auto page_row_counts = - cudf::detail::make_empty_host_vector(2 * total_row_groups, stream); - // Vector to store the cumulative number of rows in each page - - set initial capacity to two data - // pages per row group - auto page_row_offsets = - cudf::detail::make_empty_host_vector((2 * total_row_groups) + 1, stream); - // Vector to store the cumulative number of pages in each column chunk - auto col_chunk_page_offsets = - cudf::detail::make_empty_host_vector(total_row_groups + 1, stream); - - page_row_offsets.push_back(0); - col_chunk_page_offsets.push_back(0); - - // For all data sources - std::for_each( - thrust::counting_iterator(0), - thrust::counting_iterator(row_group_indices.size()), - [&](auto src_idx) { - auto const& rg_indices = row_group_indices[src_idx]; - // For all column chunks in this data source - std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) { - auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; - // Find the column chunk with the given schema index - auto colchunk_iter = std::find_if( - row_group.columns.begin(), row_group.columns.end(), [schema_idx](ColumnChunk const& col) { - return col.schema_idx == schema_idx; - }); - - CUDF_EXPECTS(colchunk_iter != row_group.columns.end(), - "Column chunk with schema index " + std::to_string(schema_idx) + - " not found in row group", - std::invalid_argument); - - // Compute page row counts and offsets if this column chunk has column and offset indexes - if (colchunk_iter->offset_index.has_value()) { - CUDF_EXPECTS(colchunk_iter->column_index.has_value(), - "Both offset and column indexes must be present"); - // Get the offset and column indexes of the column chunk - auto const& offset_index = colchunk_iter->offset_index.value(); - auto const& column_index = colchunk_iter->column_index.value(); - - // Number of pages in this column chunk - auto const row_group_num_pages = offset_index.page_locations.size(); - - CUDF_EXPECTS(column_index.min_values.size() == column_index.max_values.size(), - "page min and max values should be of same size"); - CUDF_EXPECTS(column_index.min_values.size() == row_group_num_pages, - "mismatch between size of min/max page values and the size of page " - "locations"); - // Update the cumulative number of pages in this column chunk - col_chunk_page_offsets.push_back(col_chunk_page_offsets.back() + row_group_num_pages); - - // For all pages in this column chunk, update page row counts and offsets. - std::for_each( - thrust::counting_iterator(0), - thrust::counting_iterator(row_group_num_pages), - [&](auto const page_idx) { - int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index; - // For the last page, this is simply the total number of rows in the column chunk - int64_t const last_row_idx = - (page_idx < row_group_num_pages - 1) - ? offset_index.page_locations[page_idx + 1].first_row_index - : row_group.num_rows; - - // Update the page row counts and offsets - page_row_counts.push_back(last_row_idx - first_row_idx); - page_row_offsets.push_back(page_row_offsets.back() + page_row_counts.back()); - }); - } - }); - }); - - return std::tuple{ - std::move(page_row_counts), std::move(page_row_offsets), std::move(col_chunk_page_offsets)}; -} - -/** - * @brief Compute if the page index is present in all parquet data sources for all columns - */ -[[nodiscard]] bool compute_has_page_index( - cudf::host_span file_metadatas, - cudf::host_span const> row_group_indices, - cudf::host_span column_schema_indices) -{ - // For all output columns, check all parquet data sources - return std::all_of( - column_schema_indices.begin(), column_schema_indices.end(), [&](auto const schema_idx) { - // For all parquet data sources - return std::all_of( - thrust::counting_iterator(0), - thrust::counting_iterator(row_group_indices.size()), - [&](auto const src_index) { - // For all row groups in this parquet data source - auto const& rg_indices = row_group_indices[src_index]; - return std::all_of(rg_indices.begin(), rg_indices.end(), [&](auto const& rg_index) { - auto const& row_group = file_metadatas[src_index].row_groups[rg_index]; - auto col = std::find_if( - row_group.columns.begin(), - row_group.columns.end(), - [schema_idx](ColumnChunk const& col) { return col.schema_idx == schema_idx; }); - // Check if the offset_index and column_index are present - return col != file_metadatas[src_index].row_groups[rg_index].columns.end() and - col->offset_index.has_value() and col->column_index.has_value(); - }); - }); - }); -} - -/** - * @brief Construct a vector of all required data pages from the page row counts - */ -[[nodiscard]] auto all_required_data_pages( - cudf::host_span const> page_row_counts) -{ - std::vector> all_required_data_pages; - all_required_data_pages.reserve(page_row_counts.size()); - std::transform( - page_row_counts.begin(), - page_row_counts.end(), - std::back_inserter(all_required_data_pages), - [&](auto const& col_page_counts) { return std::vector(col_page_counts.size(), true); }); - - return all_required_data_pages; -}; - -/** - * @brief Converts page-level statistics of a column to 3 device columns - min, max and is_null - * values. Each column has number of rows equal to the total rows in all row groups. + * @brief Converts page-level statistics of a column to 2 device columns - min, max values. Each + * column has number of rows equal to the total rows in all row groups. */ struct page_stats_caster : public stats_caster_base { cudf::size_type total_rows; @@ -488,7 +304,8 @@ struct page_stats_caster : public stats_caster_base { } else { // Compute column chunk level page count offsets, and page level row counts and row offsets. auto const [page_row_counts, page_row_offsets, col_chunk_page_offsets] = - make_page_row_counts_and_offsets(per_file_metadata, row_group_indices, schema_idx, stream); + compute_page_row_counts_and_offsets( + per_file_metadata, row_group_indices, schema_idx, stream); CUDF_EXPECTS( page_row_offsets.back() == total_rows, @@ -514,7 +331,7 @@ struct page_stats_caster : public stats_caster_base { std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) { auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; // Find colchunk_iter in row_group.columns. Guaranteed to be found as already verified - // in make_page_row_counts_and_offsets() + // in compute_page_row_counts_and_offsets() auto colchunk_iter = std::find_if( row_group.columns.begin(), row_group.columns.end(), @@ -667,6 +484,189 @@ struct page_stats_caster : public stats_caster_base { } }; +/** + * @brief Functor to build the NEXT Fenwick tree level from the current level data + * + * @param tree_level_ptrs Pointers to the start of Fenwick tree level data + * @param current_level Current tree level + * @param current_level_size Size of the current tree level + * @param next_level_size Size of the next tree level + */ +struct build_next_fenwick_tree_level_functor { + bool** tree_level_ptrs; + cudf::size_type current_level; + cudf::size_type current_level_size; + cudf::size_type next_level_size; + + /** + * @brief Builds the next Fenwick tree level from the current level data + * by ORing the two children of the current level. + * + * @param next_level_idx Next tree level element index + */ + __device__ void operator()(cudf::size_type next_level_idx) const noexcept + { + auto const current_level_ptr = tree_level_ptrs[current_level]; + auto next_level_ptr = tree_level_ptrs[current_level + 1]; + + // Handle the odd-sized remaining element if current_level_size is odd + if (current_level_size % 2 and next_level_idx == next_level_size - 1) { + next_level_ptr[next_level_idx] = current_level_ptr[current_level_size - 1]; + } else { + next_level_ptr[next_level_idx] = + current_level_ptr[(next_level_idx * 2)] or current_level_ptr[(next_level_idx * 2) + 1]; + } + } +}; + +/** + * @brief Functor to binary search a `true` value in the Fenwick tree in range [start, end) + * + * @param tree_level_ptrs Pointers to the start of Fenwick tree level data + * @param page_offsets Pointer to page offsets describing each search range i as [page_offsets[i], + * page_offsets[i+1)) + * @param num_ranges Number of search ranges + */ +struct search_fenwick_tree_functor { + bool** tree_level_ptrs; + cudf::size_type const* page_offsets; + cudf::size_type num_ranges; + + /** + * @brief Enum class to represent which range boundary to align + */ + enum class alignment : uint8_t { + START = 0, + END = 1, + }; + + /** + * @brief Checks if a value is a power of two + * + * @param value Value to check + * @return Boolean indicating if the value is a power of two + */ + __device__ bool inline constexpr is_power_of_two(cudf::size_type value) const noexcept + { + return (value & (value - 1)) == 0; + } + + /** + * @brief Finds the smallest power of two in the range [start, end); 0 otherwise + * + * @param start Range start + * @param end Range end + * @return Largest power of two in the range [start, end); 0 otherwise + */ + __device__ cudf::size_type inline constexpr smallest_power_of_two_in_range( + cudf::size_type start, cudf::size_type end) const noexcept + { + start--; + start |= start >> 1; + start |= start >> 2; + start |= start >> 4; + start |= start >> 8; + start |= start >> 16; + auto const result = start + 1; + return result < end ? result : 0; + } + + /** + * @brief Finds the largest power of two in the range (start, end]; 0 otherwise + * + * @param start Range start + * @param end Range end + * @return Largest power of two in the range (start, end]; 0 otherwise + */ + __device__ size_type inline constexpr largest_power_of_two_in_range(size_type start, + size_type end) const noexcept + { + auto constexpr nbits = cudf::detail::size_in_bits() - 1; + auto const result = size_type{1} << (nbits - cuda::std::countl_zero(end)); + return result > start ? result : 0; + } + + /** + * @brief Aligns a range boundary to the next power-of-two block + * + * @tparam Alignment The boundary (start or end) to align + * @param start Range start + * @param end Range end + * @return A pair of the tree level and block size + */ + template + __device__ + cuda::std::pair inline constexpr align_range_boundary( + cudf::size_type start, cudf::size_type end) const noexcept + { + if constexpr (Alignment == alignment::START) { + if (start == 0 or is_power_of_two(start)) { + auto const block_size = largest_power_of_two_in_range(start, end); + auto const tree_level = cuda::std::countr_zero(block_size); + return cuda::std::pair{tree_level, block_size}; + } else { + auto const tree_level = cuda::std::countr_zero(start); + return cuda::std::pair{tree_level, size_type{1} << tree_level}; + } + } else { + auto block_size = end & -end; + if (start > 0 and is_power_of_two(end)) { + block_size = end - smallest_power_of_two_in_range(start, end); + } + return cuda::std::pair{cuda::std::countr_zero(block_size), block_size}; + } + } + + /** + * @brief Searches the Fenwick tree to find a `true` value in range [start, end) + * + * @param range_idx Index of the range to search + * @return Boolean indicating if a `true` value is found in the range + */ + __device__ bool operator()(cudf::size_type range_idx) const noexcept + { + // Retrieve start and end for the current range [start, end) + size_type start = page_offsets[range_idx]; + size_type end = page_offsets[range_idx + 1]; + + // Return early if the range is empty or invalid + if (start >= end or range_idx >= num_ranges) { return false; } + + // Binary search decomposition loop + while (start < end) { + // Find the largest power-of-two block that begins and `start` and aligns it up + auto const [start_tree_level, start_block_size] = + align_range_boundary(start, end); + + // Find the largest power-of-two block that aligns `end` down. + auto const [end_tree_level, end_block_size] = + align_range_boundary(start, end); + + // Check the `start` side alignment block: [start, start + start_block_size) and if it's the + // larger block + if (start + start_block_size <= end and start_block_size >= end_block_size) { + auto const tree_level = start_tree_level; + auto const mask_index = start >> tree_level; + if (tree_level_ptrs[tree_level][mask_index]) { return true; } + start += start_block_size; + } + // Otherwise, check the `end` side alignment block: [end - end_block_size, end) + else if (end - end_block_size >= start) { + auto const tree_level = end_tree_level; + auto const mask_index = (end - end_block_size) >> tree_level; + if (tree_level_ptrs[tree_level][mask_index]) { return true; } + end -= end_block_size; + } + // Fallback for small, unaligned ranges. e.g., [11, 13) + else { + if (tree_level_ptrs[0][start]) { return true; } + start++; + } + } + return false; + } +}; + } // namespace std::unique_ptr aggregate_reader_metadata::build_row_mask_with_page_index_stats( @@ -683,8 +683,7 @@ std::unique_ptr aggregate_reader_metadata::build_row_mask_with_pag if (row_group_indices.empty()) { return cudf::make_empty_column(cudf::type_id::BOOL8); } // Check if we have page index for all columns in all row groups - auto const has_page_index = - compute_has_page_index(per_file_metadata, row_group_indices, output_column_schemas); + auto const has_page_index = compute_has_page_index(per_file_metadata, row_group_indices); // Return if page index is not present CUDF_EXPECTS(has_page_index, @@ -786,8 +785,9 @@ std::unique_ptr aggregate_reader_metadata::build_row_mask_with_pag page_stats_table, stats_expr.get_stats_expr().get(), stream, mr); } -std::vector> aggregate_reader_metadata::compute_data_page_mask( - cudf::column_view row_mask, +template +cudf::detail::host_vector aggregate_reader_metadata::compute_data_page_mask( + ColumnView const& row_mask, cudf::host_span const> row_group_indices, cudf::host_span input_columns, cudf::size_type row_mask_offset, @@ -798,7 +798,29 @@ std::vector> aggregate_reader_metadata::compute_data_page_mask CUDF_EXPECTS(row_mask.type().id() == cudf::type_id::BOOL8, "Input row bitmask should be of type BOOL8"); - auto const num_columns = input_columns.size(); + auto const total_rows = total_rows_in_row_groups(row_group_indices); + + // Return an empty vector if all rows are invalid or all rows are required + if (row_mask.null_count(row_mask_offset, row_mask_offset + total_rows, stream) == total_rows or + thrust::all_of(rmm::exec_policy(stream), + row_mask.template begin() + row_mask_offset, + row_mask.template begin() + row_mask_offset + total_rows, + cuda::std::identity{})) { + return cudf::detail::make_empty_host_vector(0, stream); + } + + CUDF_EXPECTS(row_mask_offset + total_rows <= row_mask.size(), + "Mismatch in total rows in input row mask and row groups", + std::invalid_argument); + + auto const has_page_index = compute_has_page_index(per_file_metadata, row_group_indices); + + // Return early if page index is not present + if (not has_page_index) { + CUDF_LOG_WARN("Encountered missing Parquet page index for one or more output columns"); + return cudf::detail::make_empty_host_vector( + 0, stream); // An empty data page mask indicates all pages are required + } // Collect column schema indices from the input columns. auto column_schema_indices = std::vector(input_columns.size()); @@ -806,170 +828,160 @@ std::vector> aggregate_reader_metadata::compute_data_page_mask input_columns.begin(), input_columns.end(), column_schema_indices.begin(), [](auto const& col) { return col.schema_idx; }); - auto const has_page_index = - compute_has_page_index(per_file_metadata, row_group_indices, column_schema_indices); - // Return early if page index is not present - if (not has_page_index) { - CUDF_LOG_WARN("Encountered missing Parquet page index for one or more output columns"); - return {}; // An empty data page mask indicates all pages are required + // Compute page row offsets and column chunk page offsets for each column + auto const num_columns = input_columns.size(); + std::vector page_row_offsets; + std::vector col_page_offsets; + col_page_offsets.reserve(num_columns + 1); + col_page_offsets.push_back(0); + + size_type max_page_size = 0; + + if (num_columns <= 2) { + std::for_each( + column_schema_indices.begin(), column_schema_indices.end(), [&](auto const schema_idx) { + auto [col_page_row_offsets, col_max_page_size] = + compute_page_row_offsets(per_file_metadata, row_group_indices, schema_idx); + page_row_offsets.insert( + page_row_offsets.end(), col_page_row_offsets.begin(), col_page_row_offsets.end()); + max_page_size = std::max(max_page_size, col_max_page_size); + col_page_offsets.emplace_back(page_row_offsets.size()); + }); + } else { + auto constexpr num_threads = 2; + std::vector, size_type>>>> + page_row_offset_tasks{}; + page_row_offset_tasks.reserve(num_threads); + auto const cols_per_thread = cudf::util::div_rounding_up_unsafe(num_columns, num_threads); + + // Submit page row offset compute tasks + std::transform( + thrust::counting_iterator(0), + thrust::counting_iterator(num_threads), + std::back_inserter(page_row_offset_tasks), + [&](auto const tid) { + return cudf::detail::host_worker_pool().submit_task([&, tid = tid]() { + auto const start_col = std::min(tid * cols_per_thread, num_columns); + auto const end_col = std::min(start_col + cols_per_thread, num_columns); + std::vector, size_type>> thread_page_row_offsets{}; + thread_page_row_offsets.reserve(end_col - start_col); + std::transform(thrust::counting_iterator(start_col), + thrust::counting_iterator(end_col), + std::back_inserter(thread_page_row_offsets), + [&](auto const col_idx) { + return compute_page_row_offsets( + per_file_metadata, row_group_indices, column_schema_indices[col_idx]); + }); + return thread_page_row_offsets; + }); + }); + + std::for_each(page_row_offset_tasks.begin(), page_row_offset_tasks.end(), [&](auto& task) { + auto const& thread_page_row_offsets = task.get(); + for (auto& [col_page_row_offsets, col_max_page_size] : thread_page_row_offsets) { + page_row_offsets.insert( + page_row_offsets.end(), col_page_row_offsets.begin(), col_page_row_offsets.end()); + max_page_size = std::max(max_page_size, col_max_page_size); + col_page_offsets.emplace_back(page_row_offsets.size()); + } + }); } - // Compute page row counts, offsets, and column chunk page offsets for each column - std::vector> page_row_counts; - std::vector> page_row_offsets; - std::vector> col_chunk_page_offsets; - page_row_counts.reserve(num_columns); - page_row_offsets.reserve(num_columns); - col_chunk_page_offsets.reserve(num_columns); - - if (num_columns == 1) { - auto const schema_idx = column_schema_indices.front(); - auto [counts, offsets, chunk_offsets] = - make_page_row_counts_and_offsets(per_file_metadata, row_group_indices, schema_idx, stream); - page_row_counts.emplace_back(std::move(counts)); - page_row_offsets.emplace_back(std::move(offsets)); + // Make sure all row_mask elements contain valid values even if they are nulls + if constexpr (cuda::std::is_same_v) { + if (row_mask.nullable() and row_mask.null_count() > 0) { + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::counting_iterator(row_mask_offset), + thrust::counting_iterator(row_mask_offset + total_rows), + [row_mask = row_mask.template begin(), + null_mask = row_mask.null_mask()] __device__(auto const row_idx) { + if (not bit_is_set(null_mask, row_idx)) { row_mask[row_idx] = true; } + }); + } } else { - std::vector, - cudf::detail::host_vector, - cudf::detail::host_vector>>> - page_row_counts_and_offsets_tasks; - page_row_counts_and_offsets_tasks.reserve(num_columns); - - auto streams = cudf::detail::fork_streams(stream, num_columns); - - std::for_each(thrust::counting_iterator(0), - thrust::counting_iterator(num_columns), - [&](auto const col_idx) { - page_row_counts_and_offsets_tasks.emplace_back( - cudf::detail::host_worker_pool().submit_task([&, col_idx = col_idx] { - return make_page_row_counts_and_offsets(per_file_metadata, - row_group_indices, - column_schema_indices[col_idx], - streams[col_idx]); - })); - }); - - // Collect results from all tasks - std::for_each(page_row_counts_and_offsets_tasks.begin(), - page_row_counts_and_offsets_tasks.end(), - [&](auto& task) { - auto [counts, offsets, chunk_offsets] = std::move(task).get(); - page_row_counts.emplace_back(std::move(counts)); - page_row_offsets.emplace_back(std::move(offsets)); - col_chunk_page_offsets.emplace_back(std::move(chunk_offsets)); - }); + CUDF_EXPECTS(not row_mask.nullable() or row_mask.null_count() == 0, + "Row mask must not contain nulls for payload columns"); } - auto const total_rows = page_row_offsets.back().back(); - - CUDF_EXPECTS(row_mask_offset + total_rows <= row_mask.size(), - "Mismatch in total rows in input row mask and row groups", - std::invalid_argument); + auto const mr = cudf::get_current_device_resource_ref(); - // Return if all rows are required or all are invalid. - if (row_mask.null_count(row_mask_offset, row_mask_offset + total_rows) == total_rows or - thrust::all_of(rmm::exec_policy(stream), - row_mask.begin() + row_mask_offset, - row_mask.begin() + row_mask_offset + total_rows, - cuda::std::identity{})) { - return all_required_data_pages(page_row_counts); - } + // Compute fenwick tree level offsets and total size (level 1 and higher) + auto const tree_level_offsets = compute_fenwick_tree_level_offsets(total_rows, max_page_size); + auto const num_levels = static_cast(tree_level_offsets.size()); + // Buffer to store Fenwick tree levels (level 1 and higher) data + auto tree_levels_data = rmm::device_uvector(tree_level_offsets.back(), stream, mr); - // Vector to hold data page mask for each column - auto data_page_mask = std::vector>(num_columns); - - // Total number of surviving pages across all columns - std::atomic total_surviving_pages{0}; - - // Tasks to compute data page mask for each column - std::vector> data_page_mask_tasks; - data_page_mask_tasks.reserve(num_columns); - - // Host row mask validity and first bit offset - auto const [host_row_mask_validity, first_bit_offset] = [&] { - if (row_mask.nullable()) { - auto const first_word_idx = word_index(row_mask_offset); - auto const last_word_idx = word_index(row_mask_offset + total_rows); - auto const num_words = last_word_idx - first_word_idx + 1; - auto const max_words = num_bitmask_words(row_mask.size()) - first_word_idx - 1; - CUDF_EXPECTS(num_words <= max_words, - "Encountered unexpected number of bitmask words to copy from the row mask"); - return std::pair{ - cudf::detail::make_host_vector( - device_span(row_mask.null_mask() + first_word_idx, num_words), - stream), - intra_word_index(row_mask_offset)}; - } else { - // Empty vector if row mask is not nullable - return std::pair{cudf::detail::make_host_vector(0, stream), 0}; - } - }(); - - // Iterator for row mask validity - auto is_row_valid = cudf::detail::make_counting_transform_iterator( - first_bit_offset, - [is_nullable = row_mask.nullable(), nullmask = host_row_mask_validity.data()](auto bit_index) { - // Always valid if row mask is not nullable or check if the corresponding bit is set - return not is_nullable or bit_is_set(nullmask, bit_index); + // Pointers to each Fenwick tree level data + auto host_tree_level_ptrs = cudf::detail::make_host_vector(num_levels, stream); + // Zeroth level is just the row mask itself + host_tree_level_ptrs[0] = const_cast(row_mask.template begin()) + row_mask_offset; + std::for_each( + thrust::counting_iterator(1), thrust::counting_iterator(num_levels), [&](auto const level_idx) { + host_tree_level_ptrs[level_idx] = tree_levels_data.data() + tree_level_offsets[level_idx - 1]; }); - // Host row mask data - auto const is_row_required = cudf::detail::make_host_vector( - device_span(row_mask.data() + row_mask_offset, total_rows), stream); + auto fenwick_tree_level_ptrs = + cudf::detail::make_device_uvector_async(host_tree_level_ptrs, stream, mr); - // For all columns, look up which pages contain at least one required row. i.e. - // !validity_it[row_idx] or is_row_required[row_idx] satisfies, and add its byte range to the - // output list of byte ranges for the column. + // Build Fenwick tree levels (zeroth level is just the row mask itself) + auto current_level_size = total_rows; std::for_each( - thrust::counting_iterator(0), - thrust::counting_iterator(num_columns), - [&](auto const col_idx) { - data_page_mask_tasks.emplace_back( - cudf::detail::host_worker_pool().submit_task([&, col_idx = col_idx] { - // Construct a row indices mapping based on page row counts and offsets - auto const total_pages_this_column = page_row_counts[col_idx].size(); - auto valid_pages_this_column = std::vector(total_pages_this_column, false); - // Number of final filtered pages for this column - size_t num_surviving_pages_this_column = 0; - // For all rows - for (auto row_idx = 0; row_idx < total_rows; ++row_idx) { - // If this row is required or invalid, add its page index to the output list. - if (not is_row_valid[row_idx] or is_row_required[row_idx]) { - // binary search to find the page index this row_idx belongs to and set the - // page index to true page_indices - auto const& offsets = page_row_offsets[col_idx]; - auto const page_itr = std::upper_bound(offsets.cbegin(), offsets.cend(), row_idx); - CUDF_EXPECTS(page_itr != offsets.cbegin() and page_itr != offsets.cend(), - "Invalid page index"); - auto const page_idx = std::distance(offsets.cbegin(), page_itr) - 1; - valid_pages_this_column[page_idx] = true; - num_surviving_pages_this_column++; - // Move row_idx to the last row of this page - row_idx = offsets[page_idx + 1] - 1; - } - } - - total_surviving_pages.fetch_add(num_surviving_pages_this_column); - data_page_mask[col_idx] = std::move(valid_pages_this_column); - })); + thrust::counting_iterator(0), thrust::counting_iterator(num_levels - 1), [&](auto const level) { + auto const next_level_size = cudf::util::div_rounding_up_unsafe(current_level_size, 2); + thrust::for_each( + rmm::exec_policy_nosync(stream), + thrust::counting_iterator(0), + thrust::counting_iterator(next_level_size), + build_next_fenwick_tree_level_functor{ + fenwick_tree_level_ptrs.data(), level, current_level_size, next_level_size}); + current_level_size = next_level_size; }); - std::for_each( - data_page_mask_tasks.begin(), data_page_mask_tasks.end(), [](auto& task) { task.get(); }); - - // Total number of input pages across all columns - auto const total_pages = std::accumulate( - page_row_counts.cbegin(), - page_row_counts.cend(), - size_t{0}, - [](auto sum, auto const& page_row_counts) { return sum + page_row_counts.size(); }); - - CUDF_EXPECTS( - total_surviving_pages <= total_pages, - "Number of surviving pages must be less than or equal to the total number of input pages"); - + // Search the Fenwick tree to see if there's a surviving row in each page's row range + auto const num_ranges = static_cast(page_row_offsets.size() - 1); + rmm::device_uvector device_data_page_mask(num_ranges, stream, mr); + auto page_offsets = cudf::detail::make_device_uvector_async(page_row_offsets, stream, mr); + thrust::transform( + rmm::exec_policy_nosync(stream), + thrust::counting_iterator(0), + thrust::counting_iterator(num_ranges), + device_data_page_mask.begin(), + search_fenwick_tree_functor{fenwick_tree_level_ptrs.data(), page_offsets.data(), num_ranges}); + + // Copy over search results to host + auto host_results = cudf::detail::make_host_vector_async(device_data_page_mask, stream); + auto const total_pages = page_row_offsets.size() - num_columns; + auto data_page_mask = cudf::detail::make_empty_host_vector(total_pages, stream); + auto host_results_iter = host_results.begin(); + stream.synchronize(); + // Discard results for invalid ranges. i.e. ranges starting at the last page of a column and + // ending at the first page of the next column + std::for_each(thrust::counting_iterator(0), + thrust::counting_iterator(num_columns), + [&](auto col_idx) { + auto const col_num_pages = + col_page_offsets[col_idx + 1] - col_page_offsets[col_idx] - 1; + data_page_mask.insert( + data_page_mask.end(), host_results_iter, host_results_iter + col_num_pages); + host_results_iter += col_num_pages + 1; + }); return data_page_mask; } +// Instantiate the templates with ColumnView as cudf::column_view and cudf::mutable_column_view +template cudf::detail::host_vector aggregate_reader_metadata::compute_data_page_mask< + cudf::column_view>(cudf::column_view const& row_mask, + cudf::host_span const> row_group_indices, + cudf::host_span input_columns, + cudf::size_type row_mask_offset, + rmm::cuda_stream_view stream) const; + +template cudf::detail::host_vector aggregate_reader_metadata::compute_data_page_mask< + cudf::mutable_column_view>(cudf::mutable_column_view const& row_mask, + cudf::host_span const> row_group_indices, + cudf::host_span input_columns, + cudf::size_type row_mask_offset, + rmm::cuda_stream_view stream) const; + } // namespace cudf::io::parquet::experimental::detail diff --git a/cpp/src/io/parquet/experimental/page_index_filter_utils.cu b/cpp/src/io/parquet/experimental/page_index_filter_utils.cu new file mode 100644 index 00000000000..ff4c4d2ae58 --- /dev/null +++ b/cpp/src/io/parquet/experimental/page_index_filter_utils.cu @@ -0,0 +1,253 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "page_index_filter_utils.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include + +namespace cudf::io::parquet::experimental::detail { + +namespace { + +/** + * @brief Find the offset of the column chunk with the given schema index in the row group + * + * @param row_group Row group + * @param schema_idx Schema index + * @return Offset of the column chunk iterator + */ +[[nodiscard]] auto find_colchunk_iter_offset(RowGroup const& row_group, size_type schema_idx) +{ + auto const& colchunk_iter = + std::find_if(row_group.columns.begin(), row_group.columns.end(), [schema_idx](auto const& col) { + return col.schema_idx == schema_idx; + }); + CUDF_EXPECTS( + colchunk_iter != row_group.columns.end(), + "Column chunk with schema index " + std::to_string(schema_idx) + " not found in row group", + std::invalid_argument); + return std::distance(row_group.columns.begin(), colchunk_iter); +} + +} // namespace + +bool compute_has_page_index(cudf::host_span file_metadatas, + cudf::host_span const> row_group_indices) +{ + // For all parquet data sources + return std::all_of( + thrust::counting_iterator(0), + thrust::counting_iterator(row_group_indices.size()), + [&](auto const src_index) { + // For all row groups in this parquet data source + auto const& rg_indices = row_group_indices[src_index]; + return std::all_of(rg_indices.begin(), rg_indices.end(), [&](auto const& rg_index) { + auto const& row_group = file_metadatas[src_index].row_groups[rg_index]; + return std::any_of( + row_group.columns.begin(), row_group.columns.end(), [&](auto const& col) { + return col.offset_index.has_value() and col.column_index.has_value(); + }); + }); + }); +} + +std::tuple, + cudf::detail::host_vector, + cudf::detail::host_vector> +compute_page_row_counts_and_offsets(cudf::host_span per_file_metadata, + cudf::host_span const> row_group_indices, + size_type schema_idx, + rmm::cuda_stream_view stream) +{ + // Compute total number of row groups + auto const total_row_groups = + std::accumulate(row_group_indices.begin(), + row_group_indices.end(), + size_t{0}, + [](auto sum, auto const& rg_indices) { return sum + rg_indices.size(); }); + + // Vector to store how many rows are present in each page - set initial capacity to two data pages + // per row group + auto page_row_counts = + cudf::detail::make_empty_host_vector(2 * total_row_groups, stream); + // Vector to store the cumulative number of rows in each page - - set initial capacity to two data + // pages per row group + auto page_row_offsets = + cudf::detail::make_empty_host_vector((2 * total_row_groups) + 1, stream); + // Vector to store the cumulative number of pages in each column chunk + auto col_chunk_page_offsets = + cudf::detail::make_empty_host_vector(total_row_groups + 1, stream); + + page_row_offsets.push_back(0); + col_chunk_page_offsets.push_back(0); + + // For all data sources + std::for_each( + thrust::counting_iterator(0), + thrust::counting_iterator(row_group_indices.size()), + [&](auto src_idx) { + // For all column chunks in this data source + auto const& rg_indices = row_group_indices[src_idx]; + std::optional colchunk_iter_offset{}; + std::for_each(rg_indices.cbegin(), rg_indices.cend(), [&](auto rg_idx) { + auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; + if (not colchunk_iter_offset.has_value() or + row_group.columns[colchunk_iter_offset.value()].schema_idx != schema_idx) { + colchunk_iter_offset = find_colchunk_iter_offset(row_group, schema_idx); + } + auto const& colchunk_iter = row_group.columns.begin() + colchunk_iter_offset.value(); + + // Compute page row counts and offsets if this column chunk has column and offset indexes + if (colchunk_iter->offset_index.has_value()) { + // Get the offset index of the column chunk + auto const& offset_index = colchunk_iter->offset_index.value(); + auto const row_group_num_pages = offset_index.page_locations.size(); + + col_chunk_page_offsets.push_back(col_chunk_page_offsets.back() + row_group_num_pages); + + // For all pages in this column chunk, update page row counts and offsets. + std::for_each( + thrust::counting_iterator(0), + thrust::counting_iterator(row_group_num_pages), + [&](auto const page_idx) { + int64_t const first_row_idx = offset_index.page_locations[page_idx].first_row_index; + // For the last page, this is simply the total number of rows in the column chunk + int64_t const last_row_idx = + (page_idx < row_group_num_pages - 1) + ? offset_index.page_locations[page_idx + 1].first_row_index + : row_group.num_rows; + + // Update the page row counts and offsets + page_row_counts.push_back(last_row_idx - first_row_idx); + page_row_offsets.push_back(page_row_offsets.back() + page_row_counts.back()); + }); + } + }); + }); + + return { + std::move(page_row_counts), std::move(page_row_offsets), std::move(col_chunk_page_offsets)}; +} + +std::pair, size_type> compute_page_row_offsets( + cudf::host_span per_file_metadata, + cudf::host_span const> row_group_indices, + cudf::size_type schema_idx) +{ + // Compute total number of row groups + auto const total_row_groups = + std::accumulate(row_group_indices.begin(), + row_group_indices.end(), + size_t{0}, + [](auto sum, auto const& rg_indices) { return sum + rg_indices.size(); }); + + std::vector page_row_offsets; + page_row_offsets.push_back(0); + size_type max_page_size = 0; + + std::for_each(thrust::counting_iterator(0), + thrust::counting_iterator(row_group_indices.size()), + [&](auto const src_idx) { + // For all row groups in this source + auto const& rg_indices = row_group_indices[src_idx]; + std::optional colchunk_iter_offset{}; + std::for_each(rg_indices.begin(), rg_indices.end(), [&](auto const& rg_idx) { + auto const& row_group = per_file_metadata[src_idx].row_groups[rg_idx]; + // Find the column chunk with the given schema index + if (not colchunk_iter_offset.has_value() or + row_group.columns[colchunk_iter_offset.value()].schema_idx != schema_idx) { + colchunk_iter_offset = find_colchunk_iter_offset(row_group, schema_idx); + } + auto const& colchunk_iter = + row_group.columns.begin() + colchunk_iter_offset.value(); + auto const& offset_index = colchunk_iter->offset_index.value(); + auto const row_group_num_pages = offset_index.page_locations.size(); + std::for_each(thrust::counting_iterator(0), + thrust::counting_iterator(row_group_num_pages), + [&](auto const page_idx) { + int64_t const first_row_idx = + offset_index.page_locations[page_idx].first_row_index; + int64_t const last_row_idx = + (page_idx < row_group_num_pages - 1) + ? offset_index.page_locations[page_idx + 1].first_row_index + : row_group.num_rows; + auto const page_size = last_row_idx - first_row_idx; + max_page_size = std::max(max_page_size, page_size); + page_row_offsets.push_back(page_row_offsets.back() + page_size); + }); + }); + }); + + return {std::move(page_row_offsets), max_page_size}; +} + +rmm::device_uvector make_page_indices_async( + cudf::host_span page_row_counts, + cudf::host_span page_row_offsets, + cudf::size_type total_rows, + rmm::cuda_stream_view stream) +{ + auto mr = cudf::get_current_device_resource_ref(); + + // Copy page-level row counts and offsets to device + auto row_counts = cudf::detail::make_device_uvector_async(page_row_counts, stream, mr); + auto row_offsets = cudf::detail::make_device_uvector_async(page_row_offsets, stream, mr); + + // Make a zeroed device vector to store page indices of each row + auto page_indices = + cudf::detail::make_zeroed_device_uvector_async(total_rows, stream, mr); + + // Scatter page indices across the their first row's index + thrust::scatter_if(rmm::exec_policy_nosync(stream), + thrust::counting_iterator(0), + thrust::counting_iterator(row_counts.size()), + row_offsets.begin(), + row_counts.begin(), + page_indices.begin()); + + // Inclusive scan with maximum to replace zeros with the (increasing) page index it belongs to. + // Page indices are scattered at their first row's index. + thrust::inclusive_scan(rmm::exec_policy_nosync(stream), + page_indices.begin(), + page_indices.end(), + page_indices.begin(), + cuda::maximum()); + return page_indices; +} + +std::vector compute_fenwick_tree_level_offsets(cudf::size_type level0_size, + cudf::size_type max_page_size) +{ + std::vector tree_level_offsets; + tree_level_offsets.push_back(0); + + cudf::size_type current_level_size = cudf::util::div_rounding_up_unsafe(level0_size, 2); + cudf::size_type current_level = 1; + + while (current_level_size > 0) { + auto const block_size = 1 << current_level; + if (std::cmp_greater(block_size, max_page_size)) { break; } + tree_level_offsets.push_back(tree_level_offsets.back() + current_level_size); + current_level_size = + current_level_size == 1 ? 0 : cudf::util::div_rounding_up_unsafe(current_level_size, 2); + current_level++; + } + return tree_level_offsets; +} + +} // namespace cudf::io::parquet::experimental::detail diff --git a/cpp/src/io/parquet/experimental/page_index_filter_utils.hpp b/cpp/src/io/parquet/experimental/page_index_filter_utils.hpp new file mode 100644 index 00000000000..873cdfd8ec7 --- /dev/null +++ b/cpp/src/io/parquet/experimental/page_index_filter_utils.hpp @@ -0,0 +1,92 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "io/parquet/reader_impl_helpers.hpp" + +#include +#include +#include +#include + +#include +#include + +namespace cudf::io::parquet::experimental::detail { + +using metadata_base = parquet::detail::metadata; + +/** + * @brief Compute if the page index is present in all parquet data sources for all columns + * + * @param file_metadatas Span of parquet footer metadata + * @param row_group_indices Span of input row group indices + * @return Boolean indicating if the page index is present in all parquet data sources for all + * columns + */ +[[nodiscard]] bool compute_has_page_index( + cudf::host_span file_metadatas, + cudf::host_span const> row_group_indices); + +/** + * @brief Compute page row counts and page row offsets and column chunk page (count) offsets for a + * given column schema index + * + * @param per_file_metadata Span of parquet footer metadata + * @param row_group_indices Span of input row group indices + * @param schema_idx Column's schema index + * @param stream CUDA stream + * @return Tuple of page row counts, page row offsets, and column chunk page (count) offsets + */ +[[nodiscard]] std::tuple, + cudf::detail::host_vector, + cudf::detail::host_vector> +compute_page_row_counts_and_offsets(cudf::host_span per_file_metadata, + cudf::host_span const> row_group_indices, + size_type schema_idx, + rmm::cuda_stream_view stream); + +/** + * @brief Compute page row offsets for a given column schema index + * + * @param per_file_metadata Span of parquet footer metadata + * @param row_group_indices Span of input row group indices + * @param schema_idx Column's schema index + * @return Pair of page row offsets and the size of the largest page in this + * column + */ +[[nodiscard]] std::pair, size_type> compute_page_row_offsets( + cudf::host_span per_file_metadata, + cudf::host_span const> row_group_indices, + size_type schema_idx); + +/** + * @brief Make a device vector where each row contains the index of the page it belongs to + * + * @param page_row_counts Span of page row counts + * @param page_row_offsets Span of page row offsets + * @param total_rows Total number of rows + * @param stream CUDA stream + * @return Device vector where each row contains the index of the page it belongs to + */ +[[nodiscard]] rmm::device_uvector make_page_indices_async( + cudf::host_span page_row_counts, + cudf::host_span page_row_offsets, + cudf::size_type total_rows, + rmm::cuda_stream_view stream); + +/** + * @brief Computes the offsets of the Fenwick tree levels (level 1 and higher) until the tree level + * block size becomes larger than the maximum page (search range) size + * + * @param level0_size Size of the zeroth tree level (the row mask) + * @param max_page_size Maximum page (search range) size + * @return Fenwick tree level offsets + */ +[[nodiscard]] std::vector compute_fenwick_tree_level_offsets( + cudf::size_type level0_size, cudf::size_type max_page_size); + +} // namespace cudf::io::parquet::experimental::detail