From 4c8089f807ee6876e1dd6533c3fd9be87fbf1bd7 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Wed, 7 Jan 2026 10:53:04 +0800 Subject: [PATCH 1/5] refactor: extract interfaces from FileBatchReader to PrefetchFileBatchReader --- include/paimon/reader/file_batch_reader.h | 43 ------ .../reader/prefetch_file_batch_reader.h | 73 ++++++++++ src/paimon/CMakeLists.txt | 4 +- .../apply_bitmap_index_batch_reader_test.cpp | 4 +- .../reader/delegating_prefetch_reader.h | 26 +--- ...pp => prefetch_file_batch_reader_impl.cpp} | 82 ++++++----- ...er.h => prefetch_file_batch_reader_impl.h} | 26 ++-- ... prefetch_file_batch_reader_impl_test.cpp} | 131 +++++++++--------- ...pply_deletion_vector_batch_reader_test.cpp | 4 +- .../io/complete_row_tracking_fields_reader.h | 17 --- .../core/operation/abstract_split_read.cpp | 6 +- .../format/avro/avro_file_batch_reader.h | 21 --- .../format/blob/blob_file_batch_reader.cpp | 5 - .../format/blob/blob_file_batch_reader.h | 18 --- .../blob/blob_file_batch_reader_test.cpp | 8 -- .../format/lance/lance_file_batch_reader.h | 18 --- .../parquet/parquet_file_batch_reader.h | 4 +- .../testing/mock/mock_file_batch_reader.h | 6 +- 18 files changed, 219 insertions(+), 277 deletions(-) create mode 100644 include/paimon/reader/prefetch_file_batch_reader.h rename src/paimon/common/reader/{prefetch_file_batch_reader.cpp => prefetch_file_batch_reader_impl.cpp} (87%) rename src/paimon/common/reader/{prefetch_file_batch_reader.h => prefetch_file_batch_reader_impl.h} (86%) rename src/paimon/common/reader/{prefetch_file_batch_reader_test.cpp => prefetch_file_batch_reader_impl_test.cpp} (87%) diff --git a/include/paimon/reader/file_batch_reader.h b/include/paimon/reader/file_batch_reader.h index 88e0335d..21c780a1 100644 --- a/include/paimon/reader/file_batch_reader.h +++ b/include/paimon/reader/file_batch_reader.h @@ -46,55 +46,12 @@ class PAIMON_EXPORT FileBatchReader : public BatchReader { using BatchReader::NextBatch; using BatchReader::NextBatchWithBitmap; - /// Seeks to a specific row in the file. - /// @param row_number The row number to seek to. - /// @return The status of the operation. - virtual Status SeekToRow(uint64_t row_number) = 0; - /// Get the row number of the first row in the previously read batch. virtual uint64_t GetPreviousBatchFirstRowNumber() const = 0; /// Get the number of rows in the file. virtual uint64_t GetNumberOfRows() const = 0; - /// Retrieves the row number of the next row to be read. - /// This method indicates the current read position within the file. - /// @return The row number of the next row to read. - virtual uint64_t GetNextRowToRead() const = 0; - - /// Generates a list of row ranges to be read in batches. - /// Each range specifies the start and end row numbers for a batch, - /// allowing for efficient batch processing. - /// - /// The underlying format layer (e.g., parquet) is responsible for determining - /// the most effective way to split the data. This could be by row groups, stripes, - /// or other internal data structures. The key principle is to split the data - /// into contiguous, seekable ranges to minimize read amplification. - /// - /// For example: - /// - A parquet format could split by RowGroup directly, ensuring each range aligns - /// with a single RowGroup. - /// - /// The smallest splittable unit must be seekable to its start position, and the - /// splitting strategy should aim to avoid read amplification. - /// - /// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether - /// prefetching is beneficial for the current scenario, to avoid performance regression in - /// certain cases. - /// @return A vector of pairs, where each pair represents a range with a start and end row - /// number. - virtual Result>> GenReadRanges( - bool* need_prefetch) const = 0; - - /// Sets the specific row ranges as a hint to be read from format file. - /// - /// If the specific file format does not support explicit range-based reads, implementations may - /// gracefully ignore this hint and provide an empty (no-op) implementation. - /// - /// @param read_ranges A vector of pairs, where each pair defines a half-open interval - /// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive. - virtual Status SetReadRanges(const std::vector>& read_ranges) = 0; - /// Get whether or not support read precisely while bitmap pushed down. virtual bool SupportPreciseBitmapSelection() const = 0; }; diff --git a/include/paimon/reader/prefetch_file_batch_reader.h b/include/paimon/reader/prefetch_file_batch_reader.h new file mode 100644 index 00000000..facd80c2 --- /dev/null +++ b/include/paimon/reader/prefetch_file_batch_reader.h @@ -0,0 +1,73 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/reader/file_batch_reader.h" + +namespace paimon { + +class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader { + public: + /// Seeks to a specific row in the file. + /// @param row_number The row number to seek to. + /// @return The status of the operation. + virtual Status SeekToRow(uint64_t row_number) = 0; + + /// Retrieves the row number of the next row to be read. + /// This method indicates the current read position within the file. + /// @return The row number of the next row to read. + virtual uint64_t GetNextRowToRead() const = 0; + + /// Generates a list of row ranges to be read in batches. + /// Each range specifies the start and end row numbers for a batch, + /// allowing for efficient batch processing. + /// + /// The underlying format layer (e.g., parquet) is responsible for determining + /// the most effective way to split the data. This could be by row groups, stripes, + /// or other internal data structures. The key principle is to split the data + /// into contiguous, seekable ranges to minimize read amplification. + /// + /// For example: + /// - A parquet format could split by RowGroup directly, ensuring each range aligns + /// with a single RowGroup. + /// + /// The smallest splittable unit must be seekable to its start position, and the + /// splitting strategy should aim to avoid read amplification. + /// + /// @param need_prefetch A pointer to a boolean. The format layer sets this to indicate whether + /// prefetching is beneficial for the current scenario, to avoid performance regression in + /// certain cases. + /// @return A vector of pairs, where each pair represents a range with a start and end row + /// number. + virtual Result>> GenReadRanges( + bool* need_prefetch) const = 0; + + /// Sets the specific row ranges as a hint to be read from format file. + /// + /// If the specific file format does not support explicit range-based reads, implementations may + /// gracefully ignore this hint and provide an empty (no-op) implementation. + /// + /// @param read_ranges A vector of pairs, where each pair defines a half-open interval + /// `[start_row, end_row)`. The `start_row` is inclusive, and the `end_row` is exclusive. + virtual Status SetReadRanges(const std::vector>& read_ranges) = 0; +}; + +} // namespace paimon diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 82d516c0..4a11f181 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -86,7 +86,7 @@ set(PAIMON_COMMON_SRCS common/reader/batch_reader.cpp common/reader/concat_batch_reader.cpp common/reader/predicate_batch_reader.cpp - common/reader/prefetch_file_batch_reader.cpp + common/reader/prefetch_file_batch_reader_impl.cpp common/reader/reader_utils.cpp common/reader/complete_row_kind_batch_reader.cpp common/reader/data_evolution_file_reader.cpp @@ -351,7 +351,7 @@ if(PAIMON_BUILD_TESTS) common/predicate/predicate_validator_test.cpp common/reader/concat_batch_reader_test.cpp common/reader/predicate_batch_reader_test.cpp - common/reader/prefetch_file_batch_reader_test.cpp + common/reader/prefetch_file_batch_reader_impl_test.cpp common/reader/reader_utils_test.cpp common/reader/complete_row_kind_batch_reader_test.cpp common/reader/data_evolution_file_reader_test.cpp diff --git a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp index 6b621671..0ca7ed56 100644 --- a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp +++ b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp @@ -26,7 +26,7 @@ #include "fmt/format.h" #include "fmt/ranges.h" #include "gtest/gtest.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/executor.h" #include "paimon/memory/memory_pool.h" @@ -88,7 +88,7 @@ class ApplyBitmapIndexBatchReaderTest : public ::testing::Test, if (enable_prefetch) { MockFormatReaderBuilder reader_builder(data, target_type_, batch_size); ASSERT_OK_AND_ASSIGN(file_batch_reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"DUMMY", &reader_builder, fs_, prefetch_batch_count, batch_size, prefetch_batch_count * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h b/src/paimon/common/reader/delegating_prefetch_reader.h index 78b61d4b..99dd5c62 100644 --- a/src/paimon/common/reader/delegating_prefetch_reader.h +++ b/src/paimon/common/reader/delegating_prefetch_reader.h @@ -22,14 +22,14 @@ #include "arrow/c/bridge.h" #include "arrow/type.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/reader/file_batch_reader.h" namespace paimon { class DelegatingPrefetchReader : public FileBatchReader { public: - explicit DelegatingPrefetchReader(std::unique_ptr prefetch_reader) + explicit DelegatingPrefetchReader(std::unique_ptr prefetch_reader) : prefetch_reader_(std::move(prefetch_reader)) {} Result NextBatch() override { @@ -48,38 +48,24 @@ class DelegatingPrefetchReader : public FileBatchReader { Result> GetFileSchema() const override { return GetReader()->GetFileSchema(); } + Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override { return prefetch_reader_->SetReadSchema(read_schema, predicate, selection_bitmap); } - Status SeekToRow(uint64_t row_number) override { - assert(false); - return Status::NotImplemented("not support seek to row for delegate reader"); - } uint64_t GetPreviousBatchFirstRowNumber() const override { return GetReader()->GetPreviousBatchFirstRowNumber(); } + uint64_t GetNumberOfRows() const override { return GetReader()->GetNumberOfRows(); } - uint64_t GetNextRowToRead() const override { - return GetReader()->GetNextRowToRead(); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("gen read ranges not implemented"); - } void Close() override { return prefetch_reader_->Close(); } - Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("not support set read ranges for delegate reader"); - } + bool SupportPreciseBitmapSelection() const override { return GetReader()->SupportPreciseBitmapSelection(); } @@ -94,7 +80,7 @@ class DelegatingPrefetchReader : public FileBatchReader { } } - std::unique_ptr prefetch_reader_; + std::unique_ptr prefetch_reader_; }; } // namespace paimon diff --git a/src/paimon/common/reader/prefetch_file_batch_reader.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp similarity index 87% rename from src/paimon/common/reader/prefetch_file_batch_reader.cpp rename to src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp index 0dcb3228..2d2d337e 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include #include @@ -38,7 +38,7 @@ class Schema; namespace paimon { -Result> PrefetchFileBatchReader::Create( +Result> PrefetchFileBatchReaderImpl::Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy, @@ -63,7 +63,6 @@ Result> PrefetchFileBatchReader::Create } std::vector>>> futures; - std::vector> readers; for (uint32_t i = 0; i < prefetch_max_parallel_num; i++) { futures.push_back(Via( executor.get(), @@ -72,20 +71,27 @@ Result> PrefetchFileBatchReader::Create return reader_builder->Build(std::move(input_stream)); })); } + std::vector> readers; for (auto& file_batch_reader : CollectAll(futures)) { if (!file_batch_reader.ok()) { return file_batch_reader.status(); } - readers.emplace_back(std::move(file_batch_reader).value()); + std::shared_ptr reader = std::move(file_batch_reader).value(); + auto prefetch_file_batch_reader = + std::dynamic_pointer_cast(reader); + if (prefetch_file_batch_reader == nullptr) { + return Status::Invalid( + "failed to cast to prefetch file batch reader. file format not support prefetch"); + } + readers.emplace_back(prefetch_file_batch_reader); } if (prefetch_batch_count < readers.size()) { prefetch_batch_count = readers.size(); } uint32_t prefetch_queue_capacity = prefetch_batch_count / readers.size(); - auto reader = std::unique_ptr( - new PrefetchFileBatchReader(std::move(readers), batch_size, prefetch_queue_capacity, - enable_adaptive_prefetch_strategy, executor)); + auto reader = std::unique_ptr(new PrefetchFileBatchReaderImpl( + readers, batch_size, prefetch_queue_capacity, enable_adaptive_prefetch_strategy, executor)); if (initialize_read_ranges) { // normally initialize read ranges should be false, as set read schema will refresh read // ranges, and set read schema will always be called before read. @@ -94,8 +100,8 @@ Result> PrefetchFileBatchReader::Create return reader; } -PrefetchFileBatchReader::PrefetchFileBatchReader( - std::vector>&& readers, int32_t batch_size, +PrefetchFileBatchReaderImpl::PrefetchFileBatchReaderImpl( + const std::vector>& readers, int32_t batch_size, uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy, const std::shared_ptr& executor) : readers_(std::move(readers)), @@ -111,11 +117,11 @@ PrefetchFileBatchReader::PrefetchFileBatchReader( parallel_num_ = readers_.size(); } -PrefetchFileBatchReader::~PrefetchFileBatchReader() { +PrefetchFileBatchReaderImpl::~PrefetchFileBatchReaderImpl() { (void)CleanUp(); } -Status PrefetchFileBatchReader::SetReadSchema( +Status PrefetchFileBatchReaderImpl::SetReadSchema( ::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) { PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, @@ -129,7 +135,7 @@ Status PrefetchFileBatchReader::SetReadSchema( return RefreshReadRanges(); } -Status PrefetchFileBatchReader::RefreshReadRanges() { +Status PrefetchFileBatchReaderImpl::RefreshReadRanges() { PAIMON_RETURN_NOT_OK(CleanUp()); bool need_prefetch; PAIMON_ASSIGN_OR_RAISE(auto read_ranges, readers_[0]->GenReadRanges(&need_prefetch)); @@ -150,7 +156,7 @@ Status PrefetchFileBatchReader::RefreshReadRanges() { return Status::OK(); } -std::vector> PrefetchFileBatchReader::FilterReadRanges( +std::vector> PrefetchFileBatchReaderImpl::FilterReadRanges( const std::vector>& read_ranges, const std::optional& selection_bitmap) { if (!selection_bitmap) { @@ -165,7 +171,7 @@ std::vector> PrefetchFileBatchReader::FilterReadRa return result; } -Status PrefetchFileBatchReader::SetReadRanges( +Status PrefetchFileBatchReaderImpl::SetReadRanges( const std::vector>& read_ranges) { // push down read ranges for reducing IO amplification read_ranges_in_group_ = DispatchReadRanges(read_ranges, readers_.size()); @@ -194,7 +200,8 @@ Status PrefetchFileBatchReader::SetReadRanges( return Status::OK(); } -std::vector>> PrefetchFileBatchReader::DispatchReadRanges( +std::vector>> +PrefetchFileBatchReaderImpl::DispatchReadRanges( const std::vector>& read_ranges, size_t group_count) { std::vector>> read_ranges_in_group; read_ranges_in_group.resize(group_count); @@ -204,7 +211,7 @@ std::vector>> PrefetchFileBatchReader: return read_ranges_in_group; } -Status PrefetchFileBatchReader::CleanUp() { +Status PrefetchFileBatchReaderImpl::CleanUp() { auto clean_prefetch_queue = [this]() { for (auto& prefetch_queue : prefetch_queues_) { while (true) { @@ -248,7 +255,7 @@ Status PrefetchFileBatchReader::CleanUp() { return Status::OK(); } -void PrefetchFileBatchReader::Workloop() { +void PrefetchFileBatchReaderImpl::Workloop() { std::vector> futures; futures.resize(readers_.size()); while (true) { @@ -312,14 +319,14 @@ void PrefetchFileBatchReader::Workloop() { Wait(futures); } -void PrefetchFileBatchReader::ReadBatch(size_t reader_idx) { +void PrefetchFileBatchReaderImpl::ReadBatch(size_t reader_idx) { Status status = DoReadBatch(reader_idx); if (!status.ok()) { SetReadStatus(status); } } -std::optional> PrefetchFileBatchReader::GetCurrentReadRange( +std::optional> PrefetchFileBatchReaderImpl::GetCurrentReadRange( size_t reader_idx) const { const auto& read_ranges = read_ranges_in_group_[reader_idx]; const auto& current_pos = readers_pos_[reader_idx]; @@ -333,7 +340,7 @@ std::optional> PrefetchFileBatchReader::GetCurrent return std::nullopt; } -Status PrefetchFileBatchReader::EnsureReaderPosition( +Status PrefetchFileBatchReaderImpl::EnsureReaderPosition( size_t reader_idx, const std::pair& current_read_range) const { uint64_t pos = std::max(readers_pos_[reader_idx]->load(), current_read_range.first); if (readers_[reader_idx]->GetNextRowToRead() != pos) { @@ -342,9 +349,9 @@ Status PrefetchFileBatchReader::EnsureReaderPosition( return Status::OK(); } -Status PrefetchFileBatchReader::HandleReadResult(size_t reader_idx, - const std::pair& read_range, - ReadBatchWithBitmap&& read_batch_with_bitmap) { +Status PrefetchFileBatchReaderImpl::HandleReadResult( + size_t reader_idx, const std::pair& read_range, + ReadBatchWithBitmap&& read_batch_with_bitmap) { uint64_t first_row_number = readers_[reader_idx]->GetPreviousBatchFirstRowNumber(); auto& prefetch_queue = prefetch_queues_[reader_idx]; if (!BatchReader::IsEofBatch(read_batch_with_bitmap)) { @@ -383,7 +390,7 @@ Status PrefetchFileBatchReader::HandleReadResult(size_t reader_idx, return Status::OK(); } -Status PrefetchFileBatchReader::DoReadBatch(size_t reader_idx) { +Status PrefetchFileBatchReaderImpl::DoReadBatch(size_t reader_idx) { PAIMON_RETURN_NOT_OK(GetReadStatus()); if (is_shutdown_) { return Status::OK(); @@ -414,13 +421,13 @@ Status PrefetchFileBatchReader::DoReadBatch(size_t reader_idx) { return HandleReadResult(reader_idx, read_range, std::move(read_batch_with_bitmap)); } -Result PrefetchFileBatchReader::NextBatchWithBitmap() { +Result PrefetchFileBatchReaderImpl::NextBatchWithBitmap() { if (!read_ranges_freshed_) { return Status::Invalid("prefetch reader read ranges are not initialized"); } if (!background_thread_) { background_thread_ = - std::make_unique(&PrefetchFileBatchReader::Workloop, this); + std::make_unique(&PrefetchFileBatchReaderImpl::Workloop, this); } while (true) { PAIMON_RETURN_NOT_OK(GetReadStatus()); @@ -489,51 +496,52 @@ Result PrefetchFileBatchReader::NextBatchWithB } } -Status PrefetchFileBatchReader::SeekToRow(uint64_t row_number) { +Status PrefetchFileBatchReaderImpl::SeekToRow(uint64_t row_number) { return Status::NotImplemented("not support seek to row for prefetch reader"); } -std::shared_ptr PrefetchFileBatchReader::GetReaderMetrics() const { +std::shared_ptr PrefetchFileBatchReaderImpl::GetReaderMetrics() const { return MetricsImpl::CollectReadMetrics(readers_); } -Result> PrefetchFileBatchReader::GetFileSchema() const { +Result> PrefetchFileBatchReaderImpl::GetFileSchema() const { assert(!readers_.empty()); return readers_[0]->GetFileSchema(); } -uint64_t PrefetchFileBatchReader::GetPreviousBatchFirstRowNumber() const { +uint64_t PrefetchFileBatchReaderImpl::GetPreviousBatchFirstRowNumber() const { return previous_batch_first_row_num_; } -uint64_t PrefetchFileBatchReader::GetNumberOfRows() const { +uint64_t PrefetchFileBatchReaderImpl::GetNumberOfRows() const { assert(!readers_.empty()); return readers_[0]->GetNumberOfRows(); } -uint64_t PrefetchFileBatchReader::GetNextRowToRead() const { +uint64_t PrefetchFileBatchReaderImpl::GetNextRowToRead() const { assert(false); return -1; } -void PrefetchFileBatchReader::SetReadStatus(const Status& status) { +void PrefetchFileBatchReaderImpl::SetReadStatus(const Status& status) { std::unique_lock lock(rw_mutex_); read_status_ = status; } -Status PrefetchFileBatchReader::GetReadStatus() const { +Status PrefetchFileBatchReaderImpl::GetReadStatus() const { std::shared_lock lock(rw_mutex_); return read_status_; } -bool PrefetchFileBatchReader::IsEofRange(const std::pair& read_range) const { +bool PrefetchFileBatchReaderImpl::IsEofRange( + const std::pair& read_range) const { return read_range.first >= GetNumberOfRows(); } -std::pair PrefetchFileBatchReader::EofRange() const { +std::pair PrefetchFileBatchReaderImpl::EofRange() const { return {GetNumberOfRows(), GetNumberOfRows() + 1}; } -void PrefetchFileBatchReader::Close() { +void PrefetchFileBatchReaderImpl::Close() { (void)CleanUp(); for (const auto& reader : readers_) { reader->Close(); diff --git a/src/paimon/common/reader/prefetch_file_batch_reader.h b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h similarity index 86% rename from src/paimon/common/reader/prefetch_file_batch_reader.h rename to src/paimon/common/reader/prefetch_file_batch_reader_impl.h index c4132d8e..1ede5cab 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader.h +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl.h @@ -35,7 +35,7 @@ #include "arrow/c/abi.h" #include "paimon/common/utils/threadsafe_queue.h" #include "paimon/reader/batch_reader.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" #include "paimon/status.h" #include "paimon/utils/roaring_bitmap32.h" @@ -50,21 +50,21 @@ class Executor; class Predicate; class Metrics; -class PrefetchFileBatchReader : public FileBatchReader { +class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader { public: - static Result> Create( + static Result> Create( const std::string& data_file_path, const ReaderBuilder* reader_builder, const std::shared_ptr& fs, uint32_t prefetch_max_parallel_num, int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy, const std::shared_ptr& executor, bool initialize_read_ranges); - ~PrefetchFileBatchReader() override; + ~PrefetchFileBatchReaderImpl() override; - Result NextBatch() override { + Result NextBatch() override { return Status::Invalid( "paimon inner reader PrefetchFileBatchReader should use NextBatchWithBitmap"); } - Result NextBatchWithBitmap() override; + Result NextBatchWithBitmap() override; std::shared_ptr GetReaderMetrics() const override; @@ -90,7 +90,7 @@ class PrefetchFileBatchReader : public FileBatchReader { Status RefreshReadRanges(); - inline FileBatchReader* GetFirstReader() const { + inline PrefetchFileBatchReader* GetFirstReader() const { return readers_[0].get(); } @@ -105,10 +105,10 @@ class PrefetchFileBatchReader : public FileBatchReader { uint64_t previous_batch_first_row_num; }; - PrefetchFileBatchReader(std::vector>&& readers, - int32_t batch_size, uint32_t prefetch_queue_capacity, - bool enable_adaptive_prefetch_strategy, - const std::shared_ptr& executor); + PrefetchFileBatchReaderImpl( + const std::vector>& readers, int32_t batch_size, + uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy, + const std::shared_ptr& executor); Status CleanUp(); void Workloop(); @@ -130,10 +130,10 @@ class PrefetchFileBatchReader : public FileBatchReader { Status EnsureReaderPosition(size_t reader_idx, const std::pair& read_range) const; Status HandleReadResult(size_t reader_idx, const std::pair& read_range, - ReadBatchWithBitmap&& read_batch_with_bitmap); + FileBatchReader::ReadBatchWithBitmap&& read_batch_with_bitmap); private: - std::vector> readers_; + std::vector> readers_; // The meaning of readers_pos_ is: all data before this pos has been filtered out or effectively // consumed, and the data after this pos may need to be read in the next round of reading. std::vector>> readers_pos_; diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_test.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp similarity index 87% rename from src/paimon/common/reader/prefetch_file_batch_reader_test.cpp rename to src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp index a05f5046..1f100835 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_test.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include @@ -37,8 +37,8 @@ namespace paimon::test { -class PrefetchFileBatchReaderTest : public ::testing::Test, - public ::testing::WithParamInterface { +class PrefetchFileBatchReaderImplTest : public ::testing::Test, + public ::testing::WithParamInterface { public: void SetUp() override { fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int64()), @@ -81,8 +81,10 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, ASSERT_TRUE(arrow::ExportSchema(schema, &c_schema).ok()); ASSERT_OK_AND_ASSIGN( std::unique_ptr file_format, - FileFormatFactory::Get(file_format_str, {{"parquet.write.max-row-group-length", - std::to_string(row_index_stride)}})); + FileFormatFactory::Get( + file_format_str, + {{"parquet.write.max-row-group-length", std::to_string(row_index_stride)}, + {"orc.row.index.stride", std::to_string(row_index_stride)}})); ASSERT_OK_AND_ASSIGN(auto writer_builder, file_format->CreateWriterBuilder(&c_schema, 1024)); @@ -107,7 +109,7 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, ASSERT_OK(out->Close()); } - std::unique_ptr PreparePrefetchReader( + std::unique_ptr PreparePrefetchReader( const std::string& file_format_str, const arrow::Schema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap, int32_t batch_size, @@ -116,8 +118,8 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, FileFormatFactory::Get(file_format_str, {})); EXPECT_OK_AND_ASSIGN(auto reader_builder, file_format->CreateReaderBuilder(batch_size)); EXPECT_OK_AND_ASSIGN( - std::unique_ptr reader, - PrefetchFileBatchReader::Create( + std::unique_ptr reader, + PrefetchFileBatchReaderImpl::Create( PathUtil::JoinPath(dir_->Str(), "file." + file_format->Identifier()), reader_builder.get(), local_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, @@ -130,9 +132,9 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, return reader; } - bool HasValue( - const std::vector>>& - prefetch_queues) { + bool HasValue(const std::vector< + std::unique_ptr>>& + prefetch_queues) { for (const auto& queue : prefetch_queues) { if (!queue->empty()) { return true; @@ -166,19 +168,19 @@ class PrefetchFileBatchReaderTest : public ::testing::Test, }; std::vector GetTestValues() { - return {"parquet"}; + return {"parquet", "orc"}; } -INSTANTIATE_TEST_SUITE_P(FileFormat, PrefetchFileBatchReaderTest, +INSTANTIATE_TEST_SUITE_P(FileFormat, PrefetchFileBatchReaderImplTest, ::testing::ValuesIn(GetTestValues())); -TEST_F(PrefetchFileBatchReaderTest, TestSimple) { +TEST_F(PrefetchFileBatchReaderImplTest, TestSimple) { auto data_array = PrepareArray(101); int32_t batch_size = 10; for (auto prefetch_max_parallel_num : {1, 2, 3, 5, 8, 10}) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( - auto reader, PrefetchFileBatchReader::Create( + auto reader, PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -193,7 +195,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestSimple) { } } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLimits) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 12; @@ -201,7 +203,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -221,7 +223,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLimits) { ASSERT_TRUE(read_metrics); } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithoutInitializeReadRanges) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 12; @@ -229,7 +231,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); @@ -239,24 +241,24 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithoutInitializeReadRanges) { reader->Close(); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithoutBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithoutBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, std::nullopt); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, std::nullopt); ASSERT_EQ(filtered_ranges, read_ranges); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithAllZeroBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithAllZeroBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; auto bitmap = RoaringBitmap32::From({}); - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, bitmap); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap); ASSERT_TRUE(filtered_ranges.empty()); } -TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, FilterReadRangesWithBitmap) { auto data_array = PrepareArray(10000); std::set valid_row_ids; for (int32_t i = 1000; i < 2000; i++) { @@ -270,25 +272,25 @@ TEST_F(PrefetchFileBatchReaderTest, FilterReadRangesWithBitmap) { std::vector> read_ranges = { {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}, {7000, 8000}, {8000, 9000}, {9000, 10000}}; - auto filtered_ranges = PrefetchFileBatchReader::FilterReadRanges(read_ranges, bitmap); + auto filtered_ranges = PrefetchFileBatchReaderImpl::FilterReadRanges(read_ranges, bitmap); std::vector> expected_filtered_ranges = { {1000, 2000}, {3000, 4000}, {4000, 5000}, {5000, 6000}, {6000, 7000}}; ASSERT_EQ(expected_filtered_ranges, filtered_ranges); } -TEST_F(PrefetchFileBatchReaderTest, DispatchReadRangesEmpty) { +TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRangesEmpty) { std::vector> read_ranges; - auto read_ranges_in_group = PrefetchFileBatchReader::DispatchReadRanges(read_ranges, 3); + auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3); ASSERT_EQ(read_ranges_in_group.size(), 3); ASSERT_TRUE(read_ranges_in_group[0].empty()); ASSERT_TRUE(read_ranges_in_group[1].empty()); ASSERT_TRUE(read_ranges_in_group[2].empty()); } -TEST_F(PrefetchFileBatchReaderTest, DispatchReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, DispatchReadRanges) { std::vector> read_ranges = { {0, 10000}, {10000, 20000}, {20000, 30000}, {30000, 40000}}; - auto read_ranges_in_group = PrefetchFileBatchReader::DispatchReadRanges(read_ranges, 3); + auto read_ranges_in_group = PrefetchFileBatchReaderImpl::DispatchReadRanges(read_ranges, 3); std::vector> expected_group_0 = {{0, 10000}, {30000, 40000}}; ASSERT_EQ(read_ranges_in_group[0], expected_group_0); std::vector> expected_group_1 = {{10000, 20000}}; @@ -297,18 +299,18 @@ TEST_F(PrefetchFileBatchReaderTest, DispatchReadRanges) { ASSERT_EQ(read_ranges_in_group[2], expected_group_2); } -TEST_F(PrefetchFileBatchReaderTest, RefreshReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, RefreshReadRanges) { auto data_array = PrepareArray(101); int32_t batch_size = 30; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); ASSERT_OK(prefetch_reader->RefreshReadRanges()); std::vector> read_ranges_0 = {{0, 30}, {90, 101}}; auto mock_reader_0 = dynamic_cast(prefetch_reader->readers_[0].get()); @@ -321,18 +323,18 @@ TEST_F(PrefetchFileBatchReaderTest, RefreshReadRanges) { ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2); } -TEST_F(PrefetchFileBatchReaderTest, SetReadRanges) { +TEST_F(PrefetchFileBatchReaderImplTest, SetReadRanges) { auto data_array = PrepareArray(400); int32_t batch_size = 30; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/false)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); ASSERT_FALSE(prefetch_reader->need_prefetch_); prefetch_reader->need_prefetch_ = true; std::vector> ranges = { @@ -357,14 +359,14 @@ TEST_F(PrefetchFileBatchReaderTest, SetReadRanges) { ASSERT_EQ(mock_reader_2->GetReadRanges(), read_ranges_2); } -TEST_F(PrefetchFileBatchReaderTest, TestReadWithLargeBatchSize) { +TEST_F(PrefetchFileBatchReaderImplTest, TestReadWithLargeBatchSize) { auto data_array = PrepareArray(101); int32_t batch_size = 150; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -377,18 +379,18 @@ TEST_F(PrefetchFileBatchReaderTest, TestReadWithLargeBatchSize) { ASSERT_TRUE(result_array->Equals(expected_array)); } -TEST_F(PrefetchFileBatchReaderTest, TestPartialReaderSuccessRead) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPartialReaderSuccessRead) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); for (int32_t i = 0; i < prefetch_max_parallel_num; i++) { dynamic_cast(prefetch_reader->readers_[i].get()) ->EnableRandomizeBatchSize(false); @@ -421,19 +423,19 @@ TEST_F(PrefetchFileBatchReaderTest, TestPartialReaderSuccessRead) { ReaderUtils::ReleaseReadBatch(std::move(batch_with_bitmap.first)); } -TEST_F(PrefetchFileBatchReaderTest, TestAllReaderFailedWithIOError) { +TEST_F(PrefetchFileBatchReaderImplTest, TestAllReaderFailedWithIOError) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); - auto prefetch_reader = dynamic_cast(reader.get()); + auto prefetch_reader = dynamic_cast(reader.get()); for (int32_t i = 0; i < prefetch_max_parallel_num; i++) { dynamic_cast(prefetch_reader->readers_[i].get()) ->SetNextBatchStatus(Status::IOError("mock error")); @@ -455,14 +457,14 @@ TEST_F(PrefetchFileBatchReaderTest, TestAllReaderFailedWithIOError) { ASSERT_TRUE(batch_result2.status().IsIOError()); } -TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithEmptyData) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithEmptyData) { auto data_array = PrepareArray(0); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -474,14 +476,14 @@ TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithEmptyData) { ASSERT_FALSE(result_array); } -TEST_F(PrefetchFileBatchReaderTest, TestCallNextBatchAfterReadingEof) { +TEST_F(PrefetchFileBatchReaderImplTest, TestCallNextBatchAfterReadingEof) { auto data_array = PrepareArray(10); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 6; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); @@ -498,54 +500,54 @@ TEST_F(PrefetchFileBatchReaderTest, TestCallNextBatchAfterReadingEof) { ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap)); } -TEST_F(PrefetchFileBatchReaderTest, TestCreateReaderWithoutNextBatch) { +TEST_F(PrefetchFileBatchReaderImplTest, TestCreateReaderWithoutNextBatch) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); ASSERT_OK_AND_ASSIGN( auto reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } -TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { +TEST_F(PrefetchFileBatchReaderImplTest, TestInvalidCase) { auto data_array = PrepareArray(101); int32_t batch_size = 10; int32_t prefetch_max_parallel_num = 3; std::string data_file_path = ""; MockFormatReaderBuilder reader_builder(data_array, data_type_, batch_size); { - ASSERT_NOK(PrefetchFileBatchReader::Create(data_file_path, &reader_builder, mock_fs_, - /*prefetch_max_parallel_num=*/0, batch_size, 2, - /*enable_adaptive_prefetch_strategy=*/false, - executor_, - /*initialize_read_ranges=*/true)); + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( + data_file_path, &reader_builder, mock_fs_, + /*prefetch_max_parallel_num=*/0, batch_size, 2, + /*enable_adaptive_prefetch_strategy=*/false, executor_, + /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, /*batch_size=*/-1, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, /*executor=*/nullptr, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, /*reader_builder=*/nullptr, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, /*initialize_read_ranges=*/true)); } { - ASSERT_NOK(PrefetchFileBatchReader::Create( + ASSERT_NOK(PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, /*fs=*/nullptr, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -553,7 +555,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { } { ASSERT_OK_AND_ASSIGN( - auto reader, PrefetchFileBatchReader::Create( + auto reader, PrefetchFileBatchReaderImpl::Create( data_file_path, &reader_builder, mock_fs_, prefetch_max_parallel_num, batch_size, prefetch_max_parallel_num * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, @@ -566,7 +568,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestInvalidCase) { /// There are three stripes: [0,30), [30,60), [60,90). After predicate pushdown, the stripe /// [30,60) will be filtered out. /// The read range is [0,30), [30,60), [60,90). So, expected results is [0,30), [60,90) -TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithPredicatePushdownWithCompleteFiltering) { +TEST_P(PrefetchFileBatchReaderImplTest, TestPrefetchWithPredicatePushdownWithCompleteFiltering) { auto file_format = GetParam(); auto data_array = PrepareArray(90); PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/30); @@ -598,7 +600,8 @@ TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithPredicatePushdownWithComplet /// There are three stripes: [0,30), [30,60), [60,90). Each stripe has 3 row groups. /// After predicate pushdown, the row group [0, 20), [70, 90) will be remained. /// The read range is [0,30), [30,60), [60,90). -TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) { +TEST_P(PrefetchFileBatchReaderImplTest, + TestPrefetchWithOrcPredicatePushdownWithRowGroupGranularity) { auto file_format = GetParam(); auto data_array = PrepareArray(90); PrepareTestData(file_format, data_array, /*stripe_row_count=*/30, /*row_index_stride=*/10); @@ -629,7 +632,7 @@ TEST_P(PrefetchFileBatchReaderTest, TestPrefetchWithOrcPredicatePushdownWithRowG ASSERT_TRUE(CheckEqual(expected_array, result_array)); } -TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithBitmap) { +TEST_F(PrefetchFileBatchReaderImplTest, TestPrefetchWithBitmap) { auto data_array = PrepareArray(10000); std::set valid_row_ids; for (int32_t i = 0; i < 5120; i++) { @@ -640,7 +643,7 @@ TEST_F(PrefetchFileBatchReaderTest, TestPrefetchWithBitmap) { MockFormatReaderBuilder reader_builder(data_array, data_type_, bitmap, /*read_batch_size=*/100); int32_t prefetch_max_parallel_num = 3; - ASSERT_OK_AND_ASSIGN(auto reader, PrefetchFileBatchReader::Create( + ASSERT_OK_AND_ASSIGN(auto reader, PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"", &reader_builder, mock_fs_, prefetch_max_parallel_num, /*batch_size=*/100, prefetch_max_parallel_num * 2, diff --git a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp index ef050711..c2db0745 100644 --- a/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp +++ b/src/paimon/core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp @@ -23,7 +23,7 @@ #include "arrow/array/array_nested.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/executor.h" #include "paimon/testing/mock/mock_file_batch_reader.h" #include "paimon/testing/mock/mock_file_system.h" @@ -78,7 +78,7 @@ class ApplyDeletionVectorBatchReaderTest : public ::testing::Test, if (enable_prefetch) { MockFormatReaderBuilder reader_builder(data, target_type_, batch_size); ASSERT_OK_AND_ASSIGN(file_batch_reader, - PrefetchFileBatchReader::Create( + PrefetchFileBatchReaderImpl::Create( /*data_file_path=*/"DUMMY", &reader_builder, fs_, prefetch_batch_count, batch_size, prefetch_batch_count * 2, /*enable_adaptive_prefetch_strategy=*/false, executor_, diff --git a/src/paimon/core/io/complete_row_tracking_fields_reader.h b/src/paimon/core/io/complete_row_tracking_fields_reader.h index 9fb986a8..eed2f4a9 100644 --- a/src/paimon/core/io/complete_row_tracking_fields_reader.h +++ b/src/paimon/core/io/complete_row_tracking_fields_reader.h @@ -60,10 +60,6 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { reader_->Close(); } - Status SeekToRow(uint64_t row_number) override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader does not support SeekToRow"); - } - uint64_t GetPreviousBatchFirstRowNumber() const override { return reader_->GetPreviousBatchFirstRowNumber(); } @@ -72,19 +68,6 @@ class CompleteRowTrackingFieldsBatchReader : public FileBatchReader { return reader_->GetNumberOfRows(); } - uint64_t GetNextRowToRead() const override { - return reader_->GetNextRowToRead(); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader do not support GenReadRanges"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::Invalid("CompleteRowTrackingFieldsBatchReader do not support SetReadRanges"); - } - bool SupportPreciseBitmapSelection() const override { return reader_->SupportPreciseBitmapSelection(); } diff --git a/src/paimon/core/operation/abstract_split_read.cpp b/src/paimon/core/operation/abstract_split_read.cpp index 504011f2..9a7cc001 100644 --- a/src/paimon/core/operation/abstract_split_read.cpp +++ b/src/paimon/core/operation/abstract_split_read.cpp @@ -23,7 +23,7 @@ #include "arrow/type.h" #include "paimon/common/reader/delegating_prefetch_reader.h" #include "paimon/common/reader/predicate_batch_reader.h" -#include "paimon/common/reader/prefetch_file_batch_reader.h" +#include "paimon/common/reader/prefetch_file_batch_reader_impl.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/object_utils.h" @@ -150,8 +150,8 @@ Result> AbstractSplitRead::CreateFileBatchReade // TODO(zhanyu.fyh): orc format support prefetch if (context_->EnablePrefetch() && file_format_identifier != "blob" && file_format_identifier != "orc") { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr prefetch_reader, - PrefetchFileBatchReader::Create( + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr prefetch_reader, + PrefetchFileBatchReaderImpl::Create( data_file_path, reader_builder, options_.GetFileSystem(), context_->GetPrefetchMaxParallelNum(), options_.GetReadBatchSize(), context_->GetPrefetchBatchCount(), diff --git a/src/paimon/format/avro/avro_file_batch_reader.h b/src/paimon/format/avro/avro_file_batch_reader.h index 5874910b..08645b92 100644 --- a/src/paimon/format/avro/avro_file_batch_reader.h +++ b/src/paimon/format/avro/avro_file_batch_reader.h @@ -43,11 +43,6 @@ class AvroFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override { - assert(false); - return Status::NotImplemented("not implemented"); - } - uint64_t GetPreviousBatchFirstRowNumber() const override { assert(false); return -1; @@ -58,27 +53,11 @@ class AvroFileBatchReader : public FileBatchReader { return -1; } - uint64_t GetNextRowToRead() const override { - assert(false); - return -1; - } - std::shared_ptr GetReaderMetrics() const override { assert(false); return nullptr; } - Result>> GenReadRanges( - bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("not implemented"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("not implemented"); - } - void Close() override { DoClose(); } diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp index 9e3ac39d..b1e8b672 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -278,11 +278,6 @@ Result> BlobFileBatchReader::ToArrowArray( return array; } -Status BlobFileBatchReader::SeekToRow(uint64_t row_number) { - assert(false); - return Status::NotImplemented("blob file batch reader seek to row is not supported."); -} - int32_t BlobFileBatchReader::GetIndexLength(const int8_t* bytes, int32_t offset) { return (bytes[offset + 3] << 24) | ((bytes[offset + 2] & 0xff) << 16) | ((bytes[offset + 1] & 0xff) << 8) | (bytes[offset] & 0xff); diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index a4ad6981..29ecf68e 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -96,8 +96,6 @@ class BlobFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override; - Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { @@ -108,26 +106,10 @@ class BlobFileBatchReader : public FileBatchReader { return all_blob_lengths_.size(); } - uint64_t GetNextRowToRead() const override { - if (current_pos_ < target_blob_row_indexes_.size()) { - return target_blob_row_indexes_[current_pos_]; - } - return GetNumberOfRows(); - } - std::shared_ptr GetReaderMetrics() const override { return metrics_; } - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::NotImplemented("set read ranges not implemented"); - } - - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::NotImplemented("gen read ranges not implemented"); - } - void Close() override { closed_ = true; } diff --git a/src/paimon/format/blob/blob_file_batch_reader_test.cpp b/src/paimon/format/blob/blob_file_batch_reader_test.cpp index a45ac930..d827c83e 100644 --- a/src/paimon/format/blob/blob_file_batch_reader_test.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader_test.cpp @@ -169,20 +169,16 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbers) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_EQ(3, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(0, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); ASSERT_EQ(0, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(1, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(2, reader->GetNextRowToRead()); ArrowArrayRelease(batch2.first.get()); ArrowSchemaRelease(batch2.second.get()); ASSERT_OK_AND_ASSIGN(auto batch3, reader->NextBatch()); ASSERT_EQ(2, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(3, reader->GetNextRowToRead()); ArrowArrayRelease(batch3.first.get()); ArrowSchemaRelease(batch3.second.get()); ASSERT_OK_AND_ASSIGN(auto batch4, reader->NextBatch()); @@ -212,10 +208,8 @@ TEST_F(BlobFileBatchReaderTest, TestRowNumbersWithBitmap) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, roaring)); ASSERT_EQ(3, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(1, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch1, reader->NextBatch()); ASSERT_EQ(1, reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(3, reader->GetNextRowToRead()); ArrowArrayRelease(batch1.first.get()); ArrowSchemaRelease(batch1.second.get()); ASSERT_OK_AND_ASSIGN(auto batch2, reader->NextBatch()); @@ -254,7 +248,6 @@ TEST_F(BlobFileBatchReaderTest, InvalidScenario) { /*batch_size=*/1, /*blob_as_descriptor=*/true, pool_)); ASSERT_NOK_WITH_MSG(reader->GetFileSchema(), "blob file has no self-describing file schema"); - ASSERT_NOK_WITH_MSG(reader->GenReadRanges({}), "gen read ranges not implemented"); ASSERT_TRUE(reader->GetReaderMetrics()); ASSERT_NOK_WITH_MSG(reader->NextBatch(), "target type is nullptr, call SetReadSchema first"); @@ -292,7 +285,6 @@ TEST_P(BlobFileBatchReaderTest, EmptyFile) { ASSERT_OK(reader->SetReadSchema(&c_schema, nullptr, std::nullopt)); ASSERT_EQ(0, reader->GetNumberOfRows()); ASSERT_EQ(std::numeric_limits::max(), reader->GetPreviousBatchFirstRowNumber()); - ASSERT_EQ(0, reader->GetNextRowToRead()); ASSERT_OK_AND_ASSIGN(auto batch, reader->NextBatch()); ASSERT_TRUE(BatchReader::IsEofBatch(batch)); } diff --git a/src/paimon/format/lance/lance_file_batch_reader.h b/src/paimon/format/lance/lance_file_batch_reader.h index 4667e4b5..ab8019ac 100644 --- a/src/paimon/format/lance/lance_file_batch_reader.h +++ b/src/paimon/format/lance/lance_file_batch_reader.h @@ -39,10 +39,6 @@ class LanceFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override { - return Status::Invalid("do not support seek to specific row in lance format"); - } - Result NextBatch() override; uint64_t GetPreviousBatchFirstRowNumber() const override { @@ -55,25 +51,11 @@ class LanceFileBatchReader : public FileBatchReader { return num_rows_; } - uint64_t GetNextRowToRead() const override { - assert(false); - return -1; - } - std::shared_ptr GetReaderMetrics() const override { // TODO(xinyu.lxy): support metrics in reader return metrics_; } - Result>> GenReadRanges( - bool* need_prefetch) const override { - return Status::Invalid("do not support generating read ranges in lance format"); - } - - Status SetReadRanges(const std::vector>& read_ranges) override { - return Status::Invalid("do not support setting read ranges in lance format"); - } - void Close() override { return DoClose(); } diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h b/src/paimon/format/parquet/parquet_file_batch_reader.h index 0011fb1a..f28ccb9c 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.h +++ b/src/paimon/format/parquet/parquet_file_batch_reader.h @@ -36,7 +36,7 @@ #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/format/parquet/file_reader_wrapper.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" #include "paimon/result.h" #include "paimon/status.h" #include "parquet/arrow/reader.h" @@ -57,7 +57,7 @@ class RoaringBitmap32; namespace paimon::parquet { -class ParquetFileBatchReader : public FileBatchReader { +class ParquetFileBatchReader : public PrefetchFileBatchReader { public: static Result> Create( std::shared_ptr&& input_stream, diff --git a/src/paimon/testing/mock/mock_file_batch_reader.h b/src/paimon/testing/mock/mock_file_batch_reader.h index 2f80196b..398c0520 100644 --- a/src/paimon/testing/mock/mock_file_batch_reader.h +++ b/src/paimon/testing/mock/mock_file_batch_reader.h @@ -27,9 +27,10 @@ #include "paimon/common/reader/reader_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/date_time_utils.h" -#include "paimon/reader/file_batch_reader.h" +#include "paimon/reader/prefetch_file_batch_reader.h" namespace paimon::test { -class MockFileBatchReader : public FileBatchReader { + +class MockFileBatchReader : public PrefetchFileBatchReader { public: MockFileBatchReader(const std::shared_ptr& data, const std::shared_ptr& file_schema, @@ -181,4 +182,5 @@ class MockFileBatchReader : public FileBatchReader { bool enable_randomize_batch_size_ = true; std::vector> read_ranges_; }; + } // namespace paimon::test From cef3183bb2344363e89dab387c80fa07a9eb15bd Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Wed, 7 Jan 2026 10:59:06 +0800 Subject: [PATCH 2/5] fix --- src/paimon/format/orc/orc_file_batch_reader.cpp | 15 --------------- src/paimon/format/orc/orc_file_batch_reader.h | 13 ------------- 2 files changed, 28 deletions(-) diff --git a/src/paimon/format/orc/orc_file_batch_reader.cpp b/src/paimon/format/orc/orc_file_batch_reader.cpp index a860e57d..694ef7b5 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.cpp +++ b/src/paimon/format/orc/orc_file_batch_reader.cpp @@ -151,21 +151,6 @@ Status OrcFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, return Status::OK(); } -Status OrcFileBatchReader::SeekToRow(uint64_t row_number) { - try { - row_reader_->seekToRow(row_number); - } catch (const std::exception& e) { - return Status::Invalid( - fmt::format("orc file batch reader seek to row {} failed for file {}, with {} error", - row_number, file_name_, e.what())); - } catch (...) { - return Status::UnknownError(fmt::format( - "orc file batch reader seek to row {} failed for file {}, with unknown error", - row_number, file_name_)); - } - return Status::OK(); -} - Result OrcFileBatchReader::NextBatch() { if (has_error_) { return Status::Invalid(fmt::format( diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index 0201d02b..886621d1 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -48,8 +48,6 @@ class OrcFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SeekToRow(uint64_t row_number) override; - Status SetReadRanges(const std::vector>& read_ranges) override { assert(false); return Status::NotImplemented("set read ranges not implemented"); @@ -67,19 +65,8 @@ class OrcFileBatchReader : public FileBatchReader { return reader_->getNumberOfRows(); } - uint64_t GetNextRowToRead() const override { - assert(false); - return -1; - } - std::shared_ptr GetReaderMetrics() const override; - Result>> GenReadRanges( - bool* need_prefetch) const override { - assert(false); - return Status::NotImplemented("gen read ranges not implemented"); - } - void Close() override { metrics_ = GetReaderMetrics(); row_reader_.reset(); From 276cf858dba161ebf766268632b651ce5b51e37a Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Wed, 7 Jan 2026 11:00:27 +0800 Subject: [PATCH 3/5] fix --- src/paimon/format/orc/orc_file_batch_reader.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h index 886621d1..0f1295ae 100644 --- a/src/paimon/format/orc/orc_file_batch_reader.h +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -48,11 +48,6 @@ class OrcFileBatchReader : public FileBatchReader { Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, const std::optional& selection_bitmap) override; - Status SetReadRanges(const std::vector>& read_ranges) override { - assert(false); - return Status::NotImplemented("set read ranges not implemented"); - } - // Important: output ArrowArray is allocated on arrow_pool_ whose lifecycle holds in // OrcFileBatchReader. Therefore, we need to hold BatchReader when using output ArrowArray. Result NextBatch() override; From 1247fd1483ef67a046cb6fd9db2696f24a33cb5d Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Wed, 7 Jan 2026 11:28:16 +0800 Subject: [PATCH 4/5] fix --- .../common/reader/prefetch_file_batch_reader_impl_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp index 1f100835..2edc2d98 100644 --- a/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp +++ b/src/paimon/common/reader/prefetch_file_batch_reader_impl_test.cpp @@ -168,7 +168,7 @@ class PrefetchFileBatchReaderImplTest : public ::testing::Test, }; std::vector GetTestValues() { - return {"parquet", "orc"}; + return {"parquet"}; } INSTANTIATE_TEST_SUITE_P(FileFormat, PrefetchFileBatchReaderImplTest, From bceadf7196015ecde1eff91c4e4138a5d6f71c08 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Wed, 7 Jan 2026 16:06:47 +0800 Subject: [PATCH 5/5] fix --- include/paimon/reader/prefetch_file_batch_reader.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/paimon/reader/prefetch_file_batch_reader.h b/include/paimon/reader/prefetch_file_batch_reader.h index facd80c2..641e84fb 100644 --- a/include/paimon/reader/prefetch_file_batch_reader.h +++ b/include/paimon/reader/prefetch_file_batch_reader.h @@ -24,6 +24,9 @@ namespace paimon { +/// The prefetch file batch reader extends the basic FileBatchReader interface for prefetch read, +/// if a format implementation inherits from this class, it will automatically support the C++ +/// Paimon prefetch capability and integrate with the Paimon prefetch framework. class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader { public: /// Seeks to a specific row in the file.