diff --git a/be/src/cloud/pb_convert.cpp b/be/src/cloud/pb_convert.cpp index 096f072c2bc8ec..af0a819d6a2cbe 100644 --- a/be/src/cloud/pb_convert.cpp +++ b/be/src/cloud/pb_convert.cpp @@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in) out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -151,6 +152,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) { out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in) out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows()); out->mutable_segments_file_size()->CopyFrom(in.segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { @@ -305,6 +308,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) { out->set_txn_expiration(in.txn_expiration()); out->set_segments_overlap_pb(in.segments_overlap_pb()); out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated()); + out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows()); out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size()); out->set_index_id(in.index_id()); if (in.has_schema_version()) { diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f78c7d9294a30..902637aa6212c5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0"); // modify them upon necessity DEFINE_Int32(min_file_descriptor_number, "60000"); DEFINE_mBool(disable_segment_cache, "false"); +// Enable checking segment rows consistency between rowset meta and segment footer +DEFINE_mBool(enable_segment_rows_consistency_check, "false"); +DEFINE_mBool(enable_segment_rows_check_core, "false"); +// ATTENTION: For test only. In test environment, there are no historical data, +// so all rowset meta should have segment rows info. +DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false"); DEFINE_String(row_cache_mem_limit, "20%"); // Cache for storage page size @@ -1472,6 +1478,21 @@ DEFINE_mInt64(string_overflow_size, "4294967295"); // std::numic_limits bool { return config >= 10; }); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index c9d9fe94ffbdca..eb2c30e227e832 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec); // modify them upon necessity DECLARE_Int32(min_file_descriptor_number); DECLARE_mBool(disable_segment_cache); +// Enable checking segment rows consistency between rowset meta and segment footer +DECLARE_mBool(enable_segment_rows_consistency_check); +DECLARE_mBool(enable_segment_rows_check_core); +// ATTENTION: For test only. In test environment, there are no historical data, +// so all rowset meta should have segment rows info. +DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta); DECLARE_String(row_cache_mem_limit); // Cache for storage page size @@ -1550,6 +1556,21 @@ DECLARE_mInt64(string_overflow_size); DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_min_thread); // The max thread num for BufferedReaderPrefetchThreadPool DECLARE_Int64(num_buffered_reader_prefetch_thread_pool_max_thread); + +DECLARE_mBool(enable_segment_prefetch_verbose_log); +// The thread num for SegmentPrefetchThreadPool +DECLARE_Int64(segment_prefetch_thread_pool_thread_num_min); +DECLARE_Int64(segment_prefetch_thread_pool_thread_num_max); + +DECLARE_mInt32(segment_file_cache_consume_rowids_batch_size); +// Enable segment file cache block prefetch for query +DECLARE_mBool(enable_query_segment_file_cache_prefetch); +// Number of blocks to prefetch ahead in segment iterator for query +DECLARE_mInt32(query_segment_file_cache_prefetch_block_size); +// Enable segment file cache block prefetch for compaction +DECLARE_mBool(enable_compaction_segment_file_cache_prefetch); +// Number of blocks to prefetch ahead in segment iterator for compaction +DECLARE_mInt32(compaction_segment_file_cache_prefetch_block_size); // The min thread num for S3FileUploadThreadPool DECLARE_Int64(num_s3_file_upload_thread_pool_min_thread); // The max thread num for S3FileUploadThreadPool @@ -1694,6 +1715,10 @@ DECLARE_mBool(read_cluster_cache_opt_verbose_log); DECLARE_mString(aws_credentials_provider_version); +// Concurrency stats dump configuration +DECLARE_mBool(enable_concurrency_stats_dump); +DECLARE_mInt32(concurrency_stats_dump_interval_ms); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 4ee923e080e7be..1989e5c347ef5b 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -48,6 +48,7 @@ #include "io/cache/file_cache_common.h" #include "io/cache/fs_file_cache_storage.h" #include "io/cache/mem_file_cache_storage.h" +#include "util/concurrency_stats.h" #include "util/runtime_profile.h" #include "util/stack_util.h" #include "util/stopwatch.hpp" @@ -809,7 +810,9 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o DCHECK(stats != nullptr); MonotonicStopWatch sw; sw.start(); + ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment(); std::lock_guard cache_lock(_mutex); + ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement(); stats->lock_wait_timer += sw.elapsed_time(); FileBlocks file_blocks; int64_t duration = 0; diff --git a/be/src/io/cache/cached_remote_file_reader.cpp b/be/src/io/cache/cached_remote_file_reader.cpp index ed7fa7fd51af89..a30f6e2d36eae0 100644 --- a/be/src/io/cache/cached_remote_file_reader.cpp +++ b/be/src/io/cache/cached_remote_file_reader.cpp @@ -55,6 +55,7 @@ #include "service/backend_options.h" #include "util/bit_util.h" #include "util/brpc_client_cache.h" // BrpcClientCache +#include "util/concurrency_stats.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" @@ -283,6 +284,8 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vectoris_dryrun; DCHECK(!closed()); DCHECK(io_ctx); @@ -385,8 +388,12 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* cache_context.tablet_id = tablet_id.value_or(0); MonotonicStopWatch sw; sw.start(); + + ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment(); FileBlocksHolder holder = _cache->get_or_set(_cache_hash, align_left, align_size, cache_context); + ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement(); + stats.cache_get_or_set_timer += sw.elapsed_time(); std::vector empty_blocks; for (auto& block : holder.file_blocks) { @@ -431,23 +438,28 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* RETURN_IF_ERROR( _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); - for (auto& block : empty_blocks) { - if (block->state() == FileBlock::State::SKIP_CACHE) { - continue; - } - SCOPED_RAW_TIMER(&stats.local_write_timer); - char* cur_ptr = buffer.get() + block->range().left - empty_start; - size_t block_size = block->range().size(); - Status st = block->append(Slice(cur_ptr, block_size)); - if (st.ok()) { - st = block->finalize(); - } - if (!st.ok()) { - LOG_EVERY_N(WARNING, 100) << "Write data to file cache failed. err=" << st.msg(); - } else { - _insert_file_reader(block); + { + SCOPED_CONCURRENCY_COUNT( + ConcurrencyStatsManager::instance().cached_remote_reader_write_back); + for (auto& block : empty_blocks) { + if (block->state() == FileBlock::State::SKIP_CACHE) { + continue; + } + SCOPED_RAW_TIMER(&stats.local_write_timer); + char* cur_ptr = buffer.get() + block->range().left - empty_start; + size_t block_size = block->range().size(); + Status st = block->append(Slice(cur_ptr, block_size)); + if (st.ok()) { + st = block->finalize(); + } + if (!st.ok()) { + LOG_EVERY_N(WARNING, 100) + << "Write data to file cache failed. err=" << st.msg(); + } else { + _insert_file_reader(block); + } + stats.bytes_write_into_file_cache += block_size; } - stats.bytes_write_into_file_cache += block_size; } // copy from memory directly size_t right_offset = offset + bytes_req - 1; @@ -486,6 +498,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* static int64_t max_wait_time = 10; TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time); if (block_state != FileBlock::State::DOWNLOADED) { + SCOPED_CONCURRENCY_COUNT( + ConcurrencyStatsManager::instance().cached_remote_reader_blocking); do { SCOPED_RAW_TIMER(&stats.remote_wait_timer); SCOPED_RAW_TIMER(&stats.remote_read_timer); @@ -512,6 +526,8 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* } else { size_t file_offset = current_offset - left; SCOPED_RAW_TIMER(&stats.local_read_timer); + SCOPED_CONCURRENCY_COUNT( + ConcurrencyStatsManager::instance().cached_remote_reader_local_read); st = block->read(Slice(result.data + (current_offset - offset), read_size), file_offset); indirect_read_bytes += read_size; @@ -595,4 +611,48 @@ void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, g_skip_cache_sum << read_stats.skip_cache; } +void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) { + if (offset >= this->size() || size == 0) { + return; + } + + size = std::min(size, this->size() - offset); + + ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool(); + if (pool == nullptr) { + return; + } + + IOContext dryrun_ctx; + if (io_ctx != nullptr) { + dryrun_ctx = *io_ctx; + } + dryrun_ctx.is_dryrun = true; + dryrun_ctx.query_id = nullptr; + dryrun_ctx.file_cache_stats = nullptr; + dryrun_ctx.file_reader_stats = nullptr; + + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) + << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}", + offset, size, path().filename().native()); + std::weak_ptr weak_this = shared_from_this(); + auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() { + auto self = weak_this.lock(); + if (self == nullptr) { + return; + } + size_t bytes_read; + Slice dummy_buffer((char*)nullptr, size); + (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) + << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}", + offset, size, self->path().filename().native()); + }); + + if (!st.ok()) { + VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size + << " error=" << st.to_string(); + } +} + } // namespace doris::io diff --git a/be/src/io/cache/cached_remote_file_reader.h b/be/src/io/cache/cached_remote_file_reader.h index 939471b62ea41d..674fbcf3460262 100644 --- a/be/src/io/cache/cached_remote_file_reader.h +++ b/be/src/io/cache/cached_remote_file_reader.h @@ -37,7 +37,8 @@ namespace doris::io { struct IOContext; struct FileCacheStatistics; -class CachedRemoteFileReader final : public FileReader { +class CachedRemoteFileReader final : public FileReader, + public std::enable_shared_from_this { public: CachedRemoteFileReader(FileReaderSPtr remote_file_reader, const FileReaderOptions& opts); @@ -55,6 +56,18 @@ class CachedRemoteFileReader final : public FileReader { static std::pair s_align_size(size_t offset, size_t size, size_t length); + // Asynchronously prefetch a range of file cache blocks. + // This method triggers read file cache in dryrun mode to warm up the cache + // without actually reading the data into user buffers. + // + // Parameters: + // offset: Starting offset in the file + // size: Number of bytes to prefetch + // io_ctx: IO context (can be nullptr, will create a dryrun context internally) + // + // Note: This is a best-effort operation. Errors are logged but not returned. + void prefetch_range(size_t offset, size_t size, const IOContext* io_ctx = nullptr); + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 6488a39e36bf87..a20d448fa6ddf5 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -39,6 +39,7 @@ #include "runtime/thread_context.h" #include "runtime/workload_management/io_throttle.h" #include "util/bvar_helper.h" +#include "util/concurrency_stats.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" @@ -131,6 +132,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea return Status::InternalError("init s3 client error"); } + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read); + int retry_count = 0; const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 7c28084624a66d..4774b8e2543600 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -437,6 +437,7 @@ struct OlapReaderStatistics { int64_t segment_iterator_init_timer_ns = 0; int64_t segment_iterator_init_return_column_iterators_timer_ns = 0; int64_t segment_iterator_init_index_iterators_timer_ns = 0; + int64_t segment_iterator_init_segment_prefetchers_timer_ns = 0; int64_t segment_create_column_readers_timer_ns = 0; int64_t segment_load_index_timer_ns = 0; diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index ec11c42aa7e965..50a764cc72a11c 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -222,6 +222,9 @@ Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::listquery_options().__isset.enable_segment_cache + ? _state->query_options().enable_segment_cache + : true; for (auto&& [tablet, version] : _tablets) { const auto tablet_id = tablet->tablet_id(); _all_read_sources[tablet_id] = _read_sources[idx]; @@ -233,7 +236,8 @@ Status ParallelScannerBuilder::_load() { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_rows; - RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats)); + RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache, + &_builder_stats)); auto segment_count = rowset->num_segments(); for (int64_t i = 0; i != segment_count; i++) { _all_segments_rows[rowset_id].emplace_back(segment_rows[i]); diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index e6b1beb7208fa3..35fbf1dff1e46e 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -29,9 +29,11 @@ #include #include "beta_rowset.h" +#include "cloud/config.h" #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" @@ -71,24 +73,97 @@ Status BetaRowset::init() { return Status::OK(); // no op } +namespace { +Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset, + std::vector* segment_rows, bool enable_segment_cache, + OlapReaderStatistics* read_stats) { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + rowset, &segment_cache_handle, enable_segment_cache, false, read_stats)); + for (const auto& segment : segment_cache_handle.get_segments()) { + segment_rows->emplace_back(segment->num_rows()); + } + return Status::OK(); +} + +Status check_segment_rows_consistency(const std::vector& rows_from_meta, + const std::vector& rows_from_footer, + int64_t tablet_id, const std::string& rowset_id) { + DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size()); + for (size_t i = 0; i < rows_from_footer.size(); i++) { + if (rows_from_footer[i] != rows_from_meta[i]) { + auto msg = fmt::format( + "segment rows mismatch between rowset meta and segment footer. " + "segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}", + i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id); + if (config::enable_segment_rows_check_core) { + CHECK(false) << msg; + } + return Status::InternalError(msg); + } + } + return Status::OK(); +} +} // namespace + Status BetaRowset::get_segment_num_rows(std::vector* segment_rows, + bool enable_segment_cache, OlapReaderStatistics* read_stats) { +#ifndef BE_TEST // `ROWSET_UNLOADING` is state for closed() called but owned by some readers. // So here `ROWSET_UNLOADING` is allowed. DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED); - - RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] { +#endif + RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] { auto segment_count = num_segments(); - _segments_rows.resize(segment_count); - for (int64_t i = 0; i != segment_count; ++i) { - SegmentCacheHandle segment_cache_handle; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( - std::static_pointer_cast(shared_from_this()), i, - &segment_cache_handle, false, false, read_stats)); - const auto& tmp_segments = segment_cache_handle.get_segments(); - _segments_rows[i] = tmp_segments[0]->num_rows(); + if (segment_count == 0) { + return Status::OK(); } - return Status::OK(); + + if (!_rowset_meta->get_num_segment_rows().empty()) { + if (_rowset_meta->get_num_segment_rows().size() == segment_count) { + // use segment rows in rowset meta if eligible + TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta"); + _segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(), + _rowset_meta->get_num_segment_rows().cend()); + if (config::enable_segment_rows_consistency_check) { + // verify segment rows from meta match segment footer + std::vector rows_from_footer; + auto self = std::dynamic_pointer_cast(shared_from_this()); + auto load_status = load_segment_rows_from_footer( + self, &rows_from_footer, enable_segment_cache, read_stats); + if (load_status.ok()) { + return check_segment_rows_consistency( + _segments_rows, rows_from_footer, _rowset_meta->tablet_id(), + _rowset_meta->rowset_id().to_string()); + } + } + return Status::OK(); + } else { + auto msg = fmt::format( + "[verbose] corrupted segment rows info in rowset meta. " + "segment count: {}, segment rows size: {}, tablet={}, rowset={}", + segment_count, _rowset_meta->get_num_segment_rows().size(), + _rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string()); + if (config::enable_segment_rows_check_core) { + CHECK(false) << msg; + } + LOG_EVERY_SECOND(WARNING) << msg; + } + } + if (config::fail_when_segment_rows_not_in_rowset_meta) { + CHECK(false) << "[verbose] segment rows info not found in rowset meta. tablet=" + << _rowset_meta->tablet_id() + << ", rowset=" << _rowset_meta->rowset_id().to_string() + << ", version=" << _rowset_meta->version() + << ", debug_string=" << _rowset_meta->debug_string() + << ", stack=" << Status::InternalError("error"); + } + // otherwise, read it from segment footer + TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer"); + auto self = std::dynamic_pointer_cast(shared_from_this()); + return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache, + read_stats); })); segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend()); return Status::OK(); diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index f0cd5c966cdb8e..d5fa6ce4677998 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -91,7 +91,7 @@ class BetaRowset final : public Rowset { Status show_nested_index_file(rapidjson::Value* rowset_value, rapidjson::Document::AllocatorType& allocator); - Status get_segment_num_rows(std::vector* segment_rows, + Status get_segment_num_rows(std::vector* segment_rows, bool enable_segment_cache, OlapReaderStatistics* read_stats); protected: diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index bf2d8cd1cb78a0..bc935b3027b133 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -256,7 +256,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context if (_read_context->record_rowids && _read_context->rowid_conversion) { // init segment rowid map for rowid conversion std::vector segment_rows; - RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, _stats)); + RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats)); RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), segment_rows)); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 3221c836b83e96..f1390bf008abe6 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -31,6 +31,7 @@ #include // IWYU pragma: no_include +#include "common/cast_set.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" @@ -769,6 +770,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { _num_segment += cast_set(rowset->num_segments()); // append key_bounds to current rowset RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); + rowset->get_num_segment_rows(&_segment_num_rows); _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); // TODO update zonemap @@ -948,6 +950,7 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch int64_t total_data_size = 0; int64_t total_index_size = 0; std::vector segments_encoded_key_bounds; + std::vector segment_rows; { std::lock_guard lock(_segid_statistics_map_mutex); for (const auto& itr : _segid_statistics_map) { @@ -955,14 +958,23 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch total_data_size += itr.second.data_size; total_index_size += itr.second.index_size; segments_encoded_key_bounds.push_back(itr.second.key_bounds); + // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load + segment_rows.push_back(cast_set(itr.second.row_num)); } } + if (segment_rows.empty()) { + // vertical compaction and linked schema change will not record segment statistics, + // it will record segment rows in _segment_num_rows + RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); + } + for (auto& key_bound : _segments_encoded_key_bounds) { segments_encoded_key_bounds.push_back(key_bound); } if (_segments_key_bounds_truncated.has_value()) { rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); } + rowset_meta->set_num_segment_rows(segment_rows); // segment key bounds are empty in old version(before version 1.2.x). So we should not modify // the overlap property when key bounds are empty. // for mow table with cluster keys, the overlap is used for cluster keys, @@ -983,6 +995,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch "is: {}, _num_seg is: {}", segments_encoded_key_bounds_size, segment_num); } + if (segment_rows.size() != segment_num) { + return Status::InternalError( + "segment_rows size should equal to _num_seg, segment_rows size is: {}, " + "_num_seg is {}, tablet={}, rowset={}, txn={}", + segment_rows.size(), segment_num, _context.tablet_id, + _context.rowset_id.to_string(), _context.txn_id); + } } rowset_meta->set_num_segments(segment_num); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 9ab9ca3356b5f7..2d2a6267ff8079 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -272,6 +272,10 @@ class Rowset : public std::enable_shared_from_this, public MetadataAdder return Status::OK(); } + void get_num_segment_rows(std::vector* num_segment_rows) { + _rowset_meta->get_num_segment_rows(num_segment_rows); + } + // min key of the first segment bool first_key(std::string* min_key) { KeyBoundsPB key_bounds; diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index e17b9c31174ff7..56c1a46212f69b 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -26,6 +26,7 @@ #include "cloud/cloud_storage_engine.h" #include "common/logging.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "google/protobuf/util/message_differencer.h" #include "io/fs/encrypted_fs_factory.h" #include "io/fs/file_system.h" @@ -323,6 +324,20 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { set_total_disk_size(data_disk_size() + index_disk_size()); set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() || other.is_segments_key_bounds_truncated()); + if (_rowset_meta_pb.num_segment_rows_size() > 0) { + if (other.num_segments() > 0) { + if (other._rowset_meta_pb.num_segment_rows_size() > 0) { + for (auto row_count : other._rowset_meta_pb.num_segment_rows()) { + _rowset_meta_pb.add_num_segment_rows(row_count); + } + } else { + // This may happen when a partial update load commits in high version doirs_be + // and publishes with new segments in low version doris_be. In this case, just clear + // all num_segment_rows. + _rowset_meta_pb.clear_num_segment_rows(); + } + } + } for (auto&& key_bound : other.get_segments_key_bounds()) { add_segment_key_bounds(key_bound); } @@ -341,6 +356,7 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) { } // In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated // Otherwise the schema is stale and lead to wrong data read + TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge"); if (tablet_schema()->num_variant_columns() > 0) { // merge extracted columns TabletSchemaSPtr merged_schema; diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 5a567f04c81f54..8e2a38c0ee7ceb 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -147,6 +147,18 @@ class RowsetMeta : public MetadataAdder { void set_num_rows(int64_t num_rows) { _rowset_meta_pb.set_num_rows(num_rows); } + void set_num_segment_rows(const std::vector& num_segment_rows) { + _rowset_meta_pb.mutable_num_segment_rows()->Assign(num_segment_rows.cbegin(), + num_segment_rows.cend()); + } + + void get_num_segment_rows(std::vector* num_segment_rows) const { + num_segment_rows->assign(_rowset_meta_pb.num_segment_rows().cbegin(), + _rowset_meta_pb.num_segment_rows().cend()); + } + + auto& get_num_segment_rows() const { return _rowset_meta_pb.num_segment_rows(); } + int64_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); } void set_total_disk_size(int64_t total_disk_size) { @@ -434,6 +446,8 @@ class RowsetMeta : public MetadataAdder { index_pb.set_size(size); } + std::string debug_string() const { return _rowset_meta_pb.ShortDebugString(); } + private: bool _deserialize_from_pb(std::string_view value); diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index fa5299df2f0030..0545307d918b6d 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -53,6 +53,11 @@ #include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/rowset/segment_v2/segment.h" +<<<<<<< HEAD +======= +#include "olap/rowset/segment_v2/segment_prefetcher.h" +#include "olap/rowset/segment_v2/variant/hierarchical_data_iterator.h" +>>>>>>> 553a3074cf6 (prefetch for complex type) #include "olap/rowset/segment_v2/variant/variant_column_reader.h" #include "olap/rowset/segment_v2/zone_map_index.h" #include "olap/tablet_schema.h" @@ -63,6 +68,7 @@ #include "util/binary_cast.hpp" #include "util/bitmap.h" #include "util/block_compression.h" +#include "util/concurrency_stats.h" #include "util/rle_encoding.h" // for RleDecoder #include "util/slice.h" #include "vec/columns/column.h" @@ -419,6 +425,7 @@ Status ColumnReader::new_index_iterator(const std::shared_ptr& Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, PageHandle* handle, Slice* page_body, PageFooterPB* footer, BlockCompressionCodec* codec, bool is_dict_page) const { + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().column_reader_read_page); iter_opts.sanity_check(); PageReadOptions opts(iter_opts.io_ctx); opts.verify_checksum = _opts.verify_checksum; @@ -809,6 +816,16 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat return Status::OK(); } +Status ColumnReader::get_ordinal_index_reader(OrdinalIndexReader*& reader, + OlapReaderStatistics* index_load_stats) { + CHECK(_ordinal_index) << fmt::format("ordinal index is null for column reader of type {}", + std::to_string(int(_meta_type))); + RETURN_IF_ERROR( + _ordinal_index->load(_use_index_page_cache, _opts.kept_in_memory, index_load_stats)); + reader = _ordinal_index.get(); + return Status::OK(); +} + Status ColumnReader::new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column) { return new_iterator(iterator, tablet_column, nullptr); } @@ -1014,6 +1031,29 @@ Status MapFileColumnIterator::seek_to_ordinal(ordinal_t ord) { return Status::OK(); } +Status MapFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + RETURN_IF_ERROR(_offsets_iterator->init_prefetcher(params)); + if (_map_reader->is_nullable()) { + RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + } + RETURN_IF_ERROR(_key_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_val_iterator->init_prefetcher(params)); + return Status::OK(); +} + +void MapFileColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + _offsets_iterator->collect_prefetchers(prefetchers, init_method); + if (_map_reader->is_nullable()) { + _null_iterator->collect_prefetchers(prefetchers, init_method); + } + // the actual data pages to read of key/value column depends on the read result of offset column, + // so we can't init prefetch blocks according to rowids, just prefetch all data blocks here. + _key_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); + _val_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); +} + Status MapFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) { if (_reading_flag == ReadingFlag::SKIP_READING) { @@ -1414,6 +1454,27 @@ Status StructFileColumnIterator::seek_to_ordinal(ordinal_t ord) { return Status::OK(); } +Status StructFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + for (auto& column_iterator : _sub_column_iterators) { + RETURN_IF_ERROR(column_iterator->init_prefetcher(params)); + } + if (_struct_reader->is_nullable()) { + RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + } + return Status::OK(); +} + +void StructFileColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + for (auto& column_iterator : _sub_column_iterators) { + column_iterator->collect_prefetchers(prefetchers, init_method); + } + if (_struct_reader->is_nullable()) { + _null_iterator->collect_prefetchers(prefetchers, init_method); + } +} + Status StructFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, vectorized::MutableColumnPtr& dst) { if (_reading_flag == ReadingFlag::SKIP_READING) { @@ -1559,6 +1620,16 @@ Status OffsetFileColumnIterator::_peek_one_offset(ordinal_t* offset) { return Status::OK(); } +Status OffsetFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + return _offset_iterator->init_prefetcher(params); +} + +void OffsetFileColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + _offset_iterator->collect_prefetchers(prefetchers, init_method); +} + /** * first_storage_offset read from page should smaller than next_storage_offset which here call _peek_one_offset from page, and first_column_offset is keep in memory data which is different dimension with (first_storage_offset and next_storage_offset) @@ -1693,6 +1764,27 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnP return Status::OK(); } +Status ArrayFileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + RETURN_IF_ERROR(_offset_iterator->init_prefetcher(params)); + RETURN_IF_ERROR(_item_iterator->init_prefetcher(params)); + if (_array_reader->is_nullable()) { + RETURN_IF_ERROR(_null_iterator->init_prefetcher(params)); + } + return Status::OK(); +} + +void ArrayFileColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + _offset_iterator->collect_prefetchers(prefetchers, init_method); + // the actual data pages to read of item column depends on the read result of offset column, + // so we can't init prefetch blocks according to rowids, just prefetch all data blocks here. + _item_iterator->collect_prefetchers(prefetchers, PrefetcherInitMethod::ALL_DATA_BLOCKS); + if (_array_reader->is_nullable()) { + _null_iterator->collect_prefetchers(prefetchers, init_method); + } +} + Status ArrayFileColumnIterator::read_by_rowids(const rowid_t* rowids, const size_t count, vectorized::MutableColumnPtr& dst) { if (_reading_flag == ReadingFlag::SKIP_READING) { @@ -1802,12 +1894,28 @@ Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { FileColumnIterator::~FileColumnIterator() = default; +void FileColumnIterator::_trigger_prefetch_if_eligible(ordinal_t ord) { + std::vector ranges; + if (_prefetcher->need_prefetch(cast_set(ord), &ranges)) { + for (const auto& range : ranges) { + _cached_remote_file_reader->prefetch_range(range.offset, range.size, &_opts.io_ctx); + } + } +} + Status FileColumnIterator::seek_to_ordinal(ordinal_t ord) { if (_reading_flag == ReadingFlag::SKIP_READING) { DLOG(INFO) << "File column iterator column " << _column_name << " skip reading."; return Status::OK(); } + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] FileColumnIterator::seek_to_ordinal seek to ordinal {}, enable_prefetch={}", + ord, _enable_prefetch); + if (_enable_prefetch) { + _trigger_prefetch_if_eligible(ord); + } + // if current page contains this row, we don't need to seek if (!_page || !_page.contains(ord) || !_page_iter.valid()) { RETURN_IF_ERROR(_reader->seek_at_or_before(ord, &_page_iter, _opts)); @@ -2109,6 +2217,26 @@ Status FileColumnIterator::get_row_ranges_by_dict(const AndBlockColumnPredicate* return Status::OK(); } +Status FileColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + if (_cached_remote_file_reader = + std::dynamic_pointer_cast(_reader->_file_reader); + !_cached_remote_file_reader) { + return Status::OK(); + } + _enable_prefetch = true; + _prefetcher = std::make_unique(params.config); + RETURN_IF_ERROR(_prefetcher->init(_reader, params.read_options)); + return Status::OK(); +} + +void FileColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + if (_prefetcher) { + prefetchers[init_method].emplace_back(_prefetcher.get()); + } +} + Status DefaultValueColumnIterator::init(const ColumnIteratorOptions& opts) { _opts = opts; // be consistent with segment v1 diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index b65cda21ac95d2..1d08dfbb4033c6 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -33,6 +33,7 @@ #include "common/exception.h" #include "common/logging.h" #include "common/status.h" // for Status +#include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/file_system.h" #include "io/io_common.h" @@ -43,6 +44,7 @@ #include "olap/rowset/segment_v2/page_handle.h" // for PageHandle #include "olap/rowset/segment_v2/page_pointer.h" #include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage +#include "olap/rowset/segment_v2/segment_prefetcher.h" #include "olap/rowset/segment_v2/stream_reader.h" #include "olap/tablet_schema.h" #include "olap/types.h" @@ -171,6 +173,8 @@ class ColumnReader : public MetadataAdder, Status seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter, const ColumnIteratorOptions& iter_opts); + Status get_ordinal_index_reader(OrdinalIndexReader*& reader, + OlapReaderStatistics* index_load_stats); // read a page from file into a page handle Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, @@ -237,6 +241,8 @@ class ColumnReader : public MetadataAdder, private: friend class VariantColumnReader; + friend class FileColumnIterator; + friend class SegmentPrefetcher; ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, io::FileReaderSPtr file_reader); @@ -408,6 +414,12 @@ class ColumnIterator { virtual void remove_pruned_sub_iterators() {}; + virtual Status init_prefetcher(const SegmentPrefetchParams& params) { return Status::OK(); } + + virtual void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) {} + protected: Result _get_sub_access_paths(const TColumnAccessPaths& access_paths); ColumnIteratorOptions _opts; @@ -458,11 +470,17 @@ class FileColumnIterator final : public ColumnIterator { bool is_all_dict_encoding() const override { return _is_all_dict_encoding; } + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: Status _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const; Status _load_next_page(bool* eos); Status _read_data_page(const OrdinalPageIndexIterator& iter); Status _read_dict_data(); + void _trigger_prefetch_if_eligible(ordinal_t ord); std::shared_ptr _reader = nullptr; @@ -490,6 +508,10 @@ class FileColumnIterator final : public ColumnIterator { bool _is_all_dict_encoding = false; std::unique_ptr _dict_word_info; + + bool _enable_prefetch {false}; + std::unique_ptr _prefetcher; + std::shared_ptr _cached_remote_file_reader {nullptr}; }; class EmptyFileColumnIterator final : public ColumnIterator { @@ -528,6 +550,11 @@ class OffsetFileColumnIterator final : public ColumnIterator { return _offset_iterator->read_by_rowids(rowids, count, dst); } + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: std::unique_ptr _offset_iterator; // reuse a tiny column for peek to avoid frequent allocations @@ -557,6 +584,10 @@ class MapFileColumnIterator final : public ColumnIterator { ordinal_t get_current_ordinal() const override { return _offsets_iterator->get_current_ordinal(); } + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; Status set_access_paths(const TColumnAccessPaths& all_access_paths, const TColumnAccessPaths& predicate_access_paths) override; @@ -601,6 +632,11 @@ class StructFileColumnIterator final : public ColumnIterator { void remove_pruned_sub_iterators() override; + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: std::shared_ptr _struct_reader = nullptr; ColumnIteratorUPtr _null_iterator; @@ -635,6 +671,11 @@ class ArrayFileColumnIterator final : public ColumnIterator { void remove_pruned_sub_iterators() override; + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: std::shared_ptr _array_reader = nullptr; std::unique_ptr _offset_iterator; diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h b/be/src/olap/rowset/segment_v2/ordinal_page_index.h index 660c97e315da72..6d85786ed17838 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h @@ -98,7 +98,8 @@ class OrdinalIndexReader : public MetadataAdder { OlapReaderStatistics* index_load_stats); private: - friend OrdinalPageIndexIterator; + friend class OrdinalPageIndexIterator; + friend class SegmentPrefetcher; io::FileReaderSPtr _file_reader; DorisCallOnce _load_once; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index dc6d887d182b62..955ee9a8d0c9c5 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -42,6 +42,7 @@ #include "olap/rowset/segment_v2/page_handle.h" #include "util/block_compression.h" #include "util/coding.h" +#include "util/concurrency_stats.h" #include "util/faststring.h" #include "util/runtime_profile.h" @@ -206,6 +207,7 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle "Bad page: page is compressed but codec is NO_COMPRESSION, file={}", opts.file_reader->path().native()); } + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress); SCOPED_RAW_TIMER(&opts.stats->decompress_ns); std::unique_ptr decompressed_page = std::make_unique( footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type); @@ -240,6 +242,7 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle if (encoding_info) { auto* pre_decoder = encoding_info->get_data_page_pre_decoder(); if (pre_decoder) { + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode); RETURN_IF_ERROR(pre_decoder->decode( &page, &page_slice, footer->data_page_footer().nullmap_size() + footer_size + 4, @@ -255,6 +258,7 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle // just before add it to pagecache, it will be consistency with reading data from page cache. opts.stats->uncompressed_bytes_read += body->size; if (opts.use_page_cache && cache) { + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache); // insert this page into cache and return the cache handle cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); *handle = PageHandle(std::move(cache_handle)); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 3e52a9815abd36..d3019e5dd14c31 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -32,6 +32,7 @@ #include #include +#include "cloud/config.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/consts.h" @@ -39,6 +40,8 @@ #include "common/logging.h" #include "common/object_pool.h" #include "common/status.h" +#include "io/cache/cached_remote_file_reader.h" +#include "io/fs/file_reader.h" #include "io/io_common.h" #include "olap/bloom_filter_predicate.h" #include "olap/collection_similarity.h" @@ -61,8 +64,10 @@ #include "olap/rowset/segment_v2/index_reader_helper.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" #include "olap/rowset/segment_v2/inverted_index_reader.h" +#include "olap/rowset/segment_v2/ordinal_page_index.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_prefetcher.h" #include "olap/rowset/segment_v2/variant/variant_column_reader.h" #include "olap/rowset/segment_v2/virtual_column_iterator.h" #include "olap/schema.h" @@ -75,6 +80,7 @@ #include "runtime/runtime_predicate.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "util/concurrency_stats.h" #include "util/defer_op.h" #include "util/doris_metrics.h" #include "util/key_util.h" @@ -543,9 +549,93 @@ Status SegmentIterator::_lazy_init(vectorized::Block* block) { } _lazy_inited = true; + + _init_segment_prefetchers(); + return Status::OK(); } +void SegmentIterator::_init_segment_prefetchers() { + SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns); + if (!config::is_cloud_mode()) { + return; + } + static std::vector supported_reader_types { + ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION, + ReaderType::READER_CUMULATIVE_COMPACTION, ReaderType::READER_FULL_COMPACTION}; + if (std::ranges::none_of(supported_reader_types, + [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) { + return; + } + // Initialize segment prefetcher for predicate and non-predicate columns + bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY); + bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch + : config::enable_compaction_segment_file_cache_prefetch; + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, enable_prefetch={}, " + "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, " + "segment={}, predicate_column_ids={}, common_expr_column_ids={}", + is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(), + _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), + fmt::join(_predicate_column_ids, ","), fmt::join(_common_expr_column_ids, ",")); + if (enable_prefetch && !_row_bitmap.isEmpty()) { + int window_size = + 1 + (is_query ? config::query_segment_file_cache_prefetch_block_size + : config::compaction_segment_file_cache_prefetch_block_size); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentIterator prefetch config: window_size={}", window_size); + if (window_size > 0 && + !_column_iterators.empty()) { // ensure init_iterators has been called + SegmentPrefetcherConfig prefetch_config(window_size, + config::file_cache_each_block_size); + for (auto cid : _schema->column_ids()) { + auto& column_iter = _column_iterators[cid]; + if (column_iter == nullptr) { + continue; + } + const auto* tablet_column = _schema->column(cid); + SegmentPrefetchParams params { + .config = prefetch_config, + .read_options = _opts, + }; + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentIterator init_segment_prefetchers, " + "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}", + _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid, + tablet_column->name(), tablet_column->type()); + Status st = column_iter->init_prefetcher(params); + if (!st.ok()) { + LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] failed to init prefetcher for column_id={}, " + "tablet={}, rowset={}, segment={}, error={}", + cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), + st.to_string()); + } + } + + // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks + PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows()) + ? PrefetcherInitMethod::FROM_ROWIDS + : PrefetcherInitMethod::ALL_DATA_BLOCKS; + std::map> prefetchers; + for (const auto& column_iter : _column_iterators) { + if (column_iter != nullptr) { + column_iter->collect_prefetchers(prefetchers, init_method); + } + } + for (auto& [method, prefetcher_vec] : prefetchers) { + if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) { + for (auto* prefetcher : prefetcher_vec) { + prefetcher->build_all_data_blocks(); + } + } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) { + SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec); + } + } + } + } +} + Status SegmentIterator::_get_row_ranges_by_keys() { SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns); DorisMetrics::instance()->segment_row_total->increment(num_rows()); @@ -2051,6 +2141,11 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16 "[{}]", nrows_read, is_continuous, fmt::join(_predicate_column_ids, ",")); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, " + "rowids: [{}...{}]", + nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0, + nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0); for (auto cid : _predicate_column_ids) { auto& column = _current_return_columns[cid]; VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid, @@ -2415,6 +2510,8 @@ Status SegmentIterator::copy_column_data_by_selector(vectorized::IColumn* input_ } Status SegmentIterator::_next_batch_internal(vectorized::Block* block) { + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch); + bool is_mem_reuse = block->mem_reuse(); DCHECK(is_mem_reuse); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 5ef63c3c6ec4c2..89d08d4a047305 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -386,6 +386,8 @@ class SegmentIterator : public RowwiseIterator { void _init_row_bitmap_by_condition_cache(); + void _init_segment_prefetchers(); + class BitmapRangeIterator; class BackwardBitmapRangeIterator; diff --git a/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp b/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp new file mode 100644 index 00000000000000..cdc84b24b0b2c6 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_prefetcher.cpp @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/rowset/segment_v2/segment_prefetcher.h" + +#include +#include + +#include "common/config.h" +#include "common/logging.h" +#include "olap/iterators.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/rowset/segment_v2/ordinal_page_index.h" + +namespace doris::segment_v2 { + +void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) { + if (ordinal_index == nullptr) { + return; + } + const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i + const auto& pages = ordinal_index->_pages; // pages[i] = page pointer of page i + const int num_pages = ordinal_index->_num_pages; + for (uint32_t i = 0; i < num; ++i) { + rowid_t rowid = rowids[i]; + + if (_is_forward) { + while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) { + page_idx++; + } + + const auto& page = pages[page_idx]; + size_t page_start_block = _offset_to_block_id(page.offset); + size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1); + + // If page spans two blocks, assign it to the next block (page_end_block) + size_t block_id = (page_start_block != page_end_block) ? page_end_block : page_start_block; + + if (block_id != last_block_id) { + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + last_block_id = block_id; + current_block_first_rowid = rowid; + } + } else { + // Backward reading: we need the last rowid in each block as the "first" rowid + // (because when reading backwards, we encounter the largest rowid first) + // + // Strategy: iterate forward through bitmap, but for each block, + // keep updating current_block_first_rowid to the latest (largest) rowid in that block + while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) { + page_idx++; + } + size_t block_id = _offset_to_block_id(pages[page_idx].offset); + + if (block_id != last_block_id) { + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + last_block_id = block_id; + } + current_block_first_rowid = rowid; + } + } +} + +void SegmentPrefetcher::build_all_data_blocks() { + if (ordinal_index == nullptr) { + return; + } + reset_blocks(); + const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i + const auto& pages = ordinal_index->_pages; // pages[i] = page pointer of page i + const int num_pages = ordinal_index->_num_pages; + + size_t last_block_id = static_cast(-1); + rowid_t current_block_first_rowid = 0; + + for (int page_idx = 0; page_idx < num_pages; ++page_idx) { + const auto& page = pages[page_idx]; + + if (_is_forward) { + size_t page_start_block = _offset_to_block_id(page.offset); + size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1); + + // If page spans two blocks, assign it to the next block (page_end_block) + size_t block_id = (page_start_block != page_end_block) ? page_end_block : page_start_block; + + if (block_id != last_block_id) { + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + last_block_id = block_id; + current_block_first_rowid = ordinals[page_idx]; + } + } else { + // Backward: use the last ordinal in each block as first_rowid + size_t block_id = _offset_to_block_id(page.offset); + if (block_id != last_block_id) { + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + last_block_id = block_id; + } + current_block_first_rowid = ordinals[page_idx]; + } + } + + // Add the last block + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + + // Reverse for backward reading + if (!_is_forward && !_block_sequence.empty()) { + std::ranges::reverse(_block_sequence); + } +} + +void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap, + const std::vector& prefetchers) { + for (auto* prefetcher : prefetchers) { + prefetcher->begin_build_blocks_by_rowids(); + } + + int batch_size = config::segment_file_cache_consume_rowids_batch_size; + std::vector rowids(batch_size); + roaring::api::roaring_uint32_iterator_t iter; + roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter); + uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size); + + for (; num > 0; + num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) { + for (auto* prefetcher : prefetchers) { + prefetcher->add_rowids(rowids.data(), num); + } + } + + for (auto* prefetcher : prefetchers) { + prefetcher->finish_build_blocks_by_rowids(); + } +} + +void SegmentPrefetcher::begin_build_blocks_by_rowids() { + reset_blocks(); + page_idx = 0; +} + +void SegmentPrefetcher::finish_build_blocks_by_rowids() { + if (ordinal_index == nullptr) { + return; + } + if (last_block_id != static_cast(-1)) { + _block_sequence.emplace_back(last_block_id, current_block_first_rowid); + } + + if (!_is_forward && !_block_sequence.empty()) { + std::ranges::reverse(_block_sequence); + } + + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, " + "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]", + _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path, + fmt::join(_block_sequence | std::views::transform([](const auto& b) { + return fmt::format("({}, {})", b.block_id, b.first_rowid); + }), + ",")); +} + +void SegmentPrefetcher::reset_blocks() { + _block_sequence.clear(); + _current_block_index = 0; + _prefetched_index = -1; +} + +Status SegmentPrefetcher::init(std::shared_ptr column_reader, + const StorageReadOptions& read_options) { + DCHECK(column_reader != nullptr); + + reset_blocks(); + _is_forward = !read_options.read_orderby_key_reverse; + _path = column_reader->_file_reader->path().filename().native(); + + RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats)); + return Status::OK(); +} + +bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector* out_ranges) { + DCHECK(out_ranges != nullptr); + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) + << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}", + current_rowid, debug_string()); + if (_block_sequence.empty() || + _prefetched_index >= static_cast(_block_sequence.size()) - 1) { + return false; + } + + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, " + "block=(id={}, first_rowid={})", + current_rowid, debug_string(), _block_sequence[_current_block_index].block_id, + _block_sequence[_current_block_index].first_rowid); + if (_is_forward) { + while (_current_block_index + 1 < _block_sequence.size() && + _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) { + _current_block_index++; + } + } else { + while (_current_block_index + 1 < _block_sequence.size() && + _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) { + _current_block_index++; + } + } + + out_ranges->clear(); + // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns, + // so we should not prefetch for these rows + _prefetched_index = std::max(_prefetched_index, _current_block_index - 1); + while (_prefetched_index + 1 < _block_sequence.size() && + window_size() < _config.prefetch_window_size) { + out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id)); + } + + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, " + "block=(id={}, first_rowid={})", + current_rowid, debug_string(), _block_sequence[_current_block_index].block_id, + _block_sequence[_current_block_index].first_rowid); + + bool triggered = !out_ranges->empty(); + if (triggered) { + LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( + "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} " + "blocks: (offset, size)=[{}]", + current_rowid, debug_string(), out_ranges->size(), + fmt::join(*out_ranges | std::views::transform([](const auto& b) { + return fmt::format("({}, {})", b.offset, b.size); + }), + ",")); + } + return triggered; +} + +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/segment_prefetcher.h b/be/src/olap/rowset/segment_v2/segment_prefetcher.h new file mode 100644 index 00000000000000..2034f31e412ea2 --- /dev/null +++ b/be/src/olap/rowset/segment_v2/segment_prefetcher.h @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "olap/rowset/segment_v2/common.h" + +namespace doris { +namespace io { +class FileReader; +} // namespace io +class StorageReadOptions; + +namespace segment_v2 { +class OrdinalIndexReader; +class ColumnReader; + +enum class PrefetcherInitMethod : int { FROM_ROWIDS = 0, ALL_DATA_BLOCKS = 1 }; + +// Configuration for segment prefetcher +struct SegmentPrefetcherConfig { + // Number of file cache blocks to prefetch ahead + size_t prefetch_window_size = 4; + + // File cache block size in bytes (default 1MB) + size_t block_size = 1024 * 1024; + + SegmentPrefetcherConfig(size_t window_size, size_t blk_size) + : prefetch_window_size(window_size), block_size(blk_size) {} +}; + +// Block range representing [offset, offset + size) in the segment file +struct BlockRange { + uint64_t offset; + uint64_t size; + + BlockRange(uint64_t off, uint64_t sz) : offset(off), size(sz) {} + + bool operator==(const BlockRange& other) const { + return offset == other.offset && size == other.size; + } +}; + +// Represents a block with its first rowid for reading +struct BlockInfo { + size_t block_id; + rowid_t first_rowid; + + BlockInfo(size_t bid, rowid_t rid) : block_id(bid), first_rowid(rid) {} +}; + +struct SegmentPrefetchParams { + SegmentPrefetcherConfig config; + const StorageReadOptions& read_options; +}; + +// SegmentPrefetcher maintains block sequence and triggers prefetch to keep +// N blocks ahead of current reading position. +// +// Key design: +// - Monotonic reading: rowids are read in order (forward or backward) +// - Trigger condition: when current_rowid reaches a block boundary, prefetch next N blocks +// - No deduplication needed: reading is monotonic, blocks are naturally processed in order +class SegmentPrefetcher { +public: + explicit SegmentPrefetcher(const SegmentPrefetcherConfig& config) : _config(config) {} + + ~SegmentPrefetcher() = default; + + Status init(std::shared_ptr column_reader, + const StorageReadOptions& read_options); + + bool need_prefetch(rowid_t current_rowid, std::vector* out_ranges); + + static void build_blocks_by_rowids(const roaring::Roaring& row_bitmap, + const std::vector& prefetchers); + void begin_build_blocks_by_rowids(); + void add_rowids(const rowid_t* rowids, uint32_t num); + void finish_build_blocks_by_rowids(); + + void build_all_data_blocks(); + +private: + // Parameters: + // row_bitmap: The complete bitmap of rowids to scan + // ordinal_index: Ordinal index reader (must be loaded) + // + // For forward reading: first_rowid is the first rowid we need to read in each block + // For backward reading: first_rowid is the last rowid we need to read in each block + // (since we read backwards, this is the first one we'll encounter) + void _build_block_sequence_from_bitmap(const roaring::Roaring& row_bitmap, + OrdinalIndexReader* ordinal_index); + size_t _offset_to_block_id(uint64_t offset) const { return offset / _config.block_size; } + + BlockRange _block_id_to_range(size_t block_id) const { + return {block_id * _config.block_size, _config.block_size}; + } + + int window_size() const { return _prefetched_index - _current_block_index + 1; } + + std::string debug_string() const { + return fmt::format( + "[internal state] _is_forward={}, _prefetched_index={}, _current_block_index={}, " + "window_size={}, block.size()={}, path={}", + _is_forward, _prefetched_index, _current_block_index, window_size(), + _block_sequence.size(), _path); + } + + void reset_blocks(); + +private: + SegmentPrefetcherConfig _config; + std::string _path; + + // Sequence of blocks with their first rowid (in reading order) + std::vector _block_sequence; + + bool _is_forward = true; + + int _prefetched_index = -1; + int _current_block_index = 0; + + int page_idx = 0; + // For each page, track the first rowid we need to read + // For forward: the smallest rowid in this page + // For backward: the largest rowid in this page (first one we'll encounter when reading backwards) + size_t last_block_id = static_cast(-1); + rowid_t current_block_first_rowid = 0; + + OrdinalIndexReader* ordinal_index = nullptr; +}; + +} // namespace segment_v2 +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.cpp b/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.cpp index dd375cb1dccb62..02fff26ed3e7da 100644 --- a/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.cpp @@ -164,6 +164,39 @@ ordinal_t HierarchicalDataIterator::get_current_ordinal() const { return (*_substream_reader.begin())->data.iterator->get_current_ordinal(); } +Status HierarchicalDataIterator::init_prefetcher(const SegmentPrefetchParams& params) { + RETURN_IF_ERROR(tranverse([&](SubstreamReaderTree::Node& node) { + RETURN_IF_ERROR(node.data.iterator->init_prefetcher(params)); + return Status::OK(); + })); + if (_root_reader) { + DCHECK(_root_reader->inited); + RETURN_IF_ERROR(_root_reader->iterator->init_prefetcher(params)); + } + if (_sparse_column_reader) { + DCHECK(_sparse_column_reader->inited); + RETURN_IF_ERROR(_sparse_column_reader->iterator->init_prefetcher(params)); + } + return Status::OK(); +} + +void HierarchicalDataIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + static_cast(tranverse([&](SubstreamReaderTree::Node& node) { + node.data.iterator->collect_prefetchers(prefetchers, init_method); + return Status::OK(); + })); + if (_root_reader) { + DCHECK(_root_reader->inited); + _root_reader->iterator->collect_prefetchers(prefetchers, init_method); + } + if (_sparse_column_reader) { + DCHECK(_sparse_column_reader->inited); + _sparse_column_reader->iterator->collect_prefetchers(prefetchers, init_method); + } +} + Status HierarchicalDataIterator::_process_sub_columns( vectorized::ColumnVariant& container_variant, const PathsWithColumnAndType& non_nested_subcolumns) { diff --git a/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.h b/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.h index a549a131883b33..f0a657b95e3f1b 100644 --- a/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.h +++ b/be/src/olap/rowset/segment_v2/variant/hierarchical_data_iterator.h @@ -89,6 +89,11 @@ class HierarchicalDataIterator : public ColumnIterator { Status add_stream(int32_t col_uid, const SubcolumnColumnMetaInfo::Node* node, ColumnReaderCache* column_reader_cache, OlapReaderStatistics* stats); + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: SubstreamReaderTree _substream_reader; std::unique_ptr _root_reader; diff --git a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h index f4afa778eadcbd..56f6505445b419 100644 --- a/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h +++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h @@ -85,6 +85,22 @@ class BaseSparseColumnProcessor : public ColumnIterator { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "not implement"); } + Status init_prefetcher(const SegmentPrefetchParams& params) override { + if (has_sparse_column_cache()) { + return Status::OK(); + } + return _sparse_column_reader->init_prefetcher(params); + } + + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override { + if (has_sparse_column_cache()) { + return; + } + _sparse_column_reader->collect_prefetchers(prefetchers, init_method); + } + // Template method pattern for batch processing template Status _process_batch(ReadMethod&& read_method, size_t nrows, diff --git a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp index 165c0d29591723..ccb4ce432828dc 100644 --- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.cpp @@ -44,6 +44,23 @@ Status SparseColumnMergeIterator::seek_to_ordinal(ordinal_t ord) { return Status::OK(); } +Status SparseColumnMergeIterator::init_prefetcher(const SegmentPrefetchParams& params) { + RETURN_IF_ERROR(_sparse_column_reader->init_prefetcher(params)); + for (auto& entry : _src_subcolumns_for_sparse) { + RETURN_IF_ERROR(entry->data.iterator->init_prefetcher(params)); + } + return Status::OK(); +} + +void SparseColumnMergeIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + _sparse_column_reader->collect_prefetchers(prefetchers, init_method); + for (auto& entry : _src_subcolumns_for_sparse) { + entry->data.iterator->collect_prefetchers(prefetchers, init_method); + } +} + Status SparseColumnMergeIterator::init(const ColumnIteratorOptions& opts) { RETURN_IF_ERROR(_sparse_column_cache->init(opts)); for (auto& entry : _src_subcolumns_for_sparse) { diff --git a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h index bc3af0ef146cef..ecfdfa31bef850 100644 --- a/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h +++ b/be/src/olap/rowset/segment_v2/variant/sparse_column_merge_iterator.h @@ -92,6 +92,11 @@ class SparseColumnMergeIterator : public BaseSparseColumnProcessor { Status seek_to_ordinal(ordinal_t ord) override; + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: template Status _read_subcolumns(ReadFunction&& read_func) { diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp index 1e411dec498684..704875cb25b79f 100644 --- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.cpp @@ -1178,6 +1178,16 @@ Status VariantRootColumnIterator::read_by_rowids(const rowid_t* rowids, const si return _process_root_column(dst, root_column, most_common_type); } +Status VariantRootColumnIterator::init_prefetcher(const SegmentPrefetchParams& params) { + return _inner_iter->init_prefetcher(params); +} + +void VariantRootColumnIterator::collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) { + _inner_iter->collect_prefetchers(prefetchers, init_method); +} + static void fill_nested_with_defaults(vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& sibling_column, size_t nrows) { const auto* sibling_array = vectorized::check_and_get_column( diff --git a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h index 3143fd94d8126a..b74bfd7e463276 100644 --- a/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h +++ b/be/src/olap/rowset/segment_v2/variant/variant_column_reader.h @@ -417,6 +417,11 @@ class VariantRootColumnIterator : public ColumnIterator { ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); } + Status init_prefetcher(const SegmentPrefetchParams& params) override; + void collect_prefetchers( + std::map>& prefetchers, + PrefetcherInitMethod init_method) override; + private: Status _process_root_column(vectorized::MutableColumnPtr& dst, vectorized::MutableColumnPtr& root_column, diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 7738c195474f64..f4f418761c7015 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -264,6 +264,8 @@ Status OlapScanLocalState::_init_profile() { ADD_TIMER(_scanner_profile, "SegmentIteratorInitReturnColumnIteratorsTimer"); _segment_iterator_init_index_iterators_timer = ADD_TIMER(_scanner_profile, "SegmentIteratorInitIndexIteratorsTimer"); + _segment_iterator_init_segment_prefetchers_timer = + ADD_TIMER(_scanner_profile, "SegmentIteratorInitSegmentPrefetchersTimer"); _segment_create_column_readers_timer = ADD_TIMER(_scanner_profile, "SegmentCreateColumnReadersTimer"); diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 331091a36504c4..7ea8312f3ebe9c 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -268,6 +268,7 @@ class OlapScanLocalState final : public ScanLocalState { RuntimeProfile::Counter* _segment_iterator_init_timer = nullptr; RuntimeProfile::Counter* _segment_iterator_init_return_column_iterators_timer = nullptr; RuntimeProfile::Counter* _segment_iterator_init_index_iterators_timer = nullptr; + RuntimeProfile::Counter* _segment_iterator_init_segment_prefetchers_timer = nullptr; RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr; RuntimeProfile::Counter* _segment_load_index_timer = nullptr; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 8d769c359ab574..f65aae6f62f4a4 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -264,6 +264,7 @@ class ExecEnv { ThreadPool* non_block_close_thread_pool(); ThreadPool* s3_file_system_thread_pool() { return _s3_file_system_thread_pool.get(); } ThreadPool* udf_close_workers_pool() { return _udf_close_workers_thread_pool.get(); } + ThreadPool* segment_prefetch_thread_pool() { return _segment_prefetch_thread_pool.get(); } void init_file_cache_factory(std::vector& cache_paths); io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; } @@ -486,6 +487,8 @@ class ExecEnv { std::unique_ptr _s3_file_system_thread_pool; // for java-udf to close std::unique_ptr _udf_close_workers_thread_pool; + // Threadpool used to prefetch segment file cache blocks + std::unique_ptr _segment_prefetch_thread_pool; FragmentMgr* _fragment_mgr = nullptr; WorkloadGroupMgr* _workload_group_manager = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index cb4ad78984b973..88c5a81abc92cf 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -258,6 +258,13 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_max_threads(cast_set(buffered_reader_max_threads)) .build(&_buffered_reader_prefetch_thread_pool)); + static_cast(ThreadPoolBuilder("SegmentPrefetchThreadPool") + .set_min_threads(cast_set( + config::segment_prefetch_thread_pool_thread_num_min)) + .set_max_threads(cast_set( + config::segment_prefetch_thread_pool_thread_num_max)) + .build(&_segment_prefetch_thread_pool)); + static_cast(ThreadPoolBuilder("SendTableStatsThreadPool") .set_min_threads(8) .set_max_threads(32) @@ -812,6 +819,7 @@ void ExecEnv::destroy() { _runtime_query_statistics_mgr->stop_report_thread(); } SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); + SAFE_SHUTDOWN(_segment_prefetch_thread_pool); SAFE_SHUTDOWN(_s3_file_upload_thread_pool); SAFE_SHUTDOWN(_lazy_release_obj_pool); SAFE_SHUTDOWN(_non_block_close_thread_pool); @@ -873,6 +881,7 @@ void ExecEnv::destroy() { _s3_file_system_thread_pool.reset(nullptr); _send_table_stats_thread_pool.reset(nullptr); _buffered_reader_prefetch_thread_pool.reset(nullptr); + _segment_prefetch_thread_pool.reset(nullptr); _s3_file_upload_thread_pool.reset(nullptr); _send_batch_thread_pool.reset(nullptr); _udf_close_workers_thread_pool.reset(nullptr); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index ead7426a56408d..b8c07a4100a64e 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -48,6 +48,7 @@ #include "cloud/cloud_backend_service.h" #include "cloud/config.h" #include "common/stack_trace.h" +#include "util/concurrency_stats.h" #include "olap/tablet_schema_cache.h" #include "olap/utils.h" #include "runtime/memory/mem_tracker_limiter.h" @@ -533,6 +534,9 @@ int main(int argc, char** argv) { return 0; } + // Start concurrency stats manager + doris::ConcurrencyStatsManager::instance().start(); + // begin to start services doris::ThriftRpcHelper::setup(exec_env); // 1. thrift server with be_port diff --git a/be/src/util/concurrency_stats.cpp b/be/src/util/concurrency_stats.cpp new file mode 100644 index 00000000000000..b835bdb28254cb --- /dev/null +++ b/be/src/util/concurrency_stats.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/concurrency_stats.h" + +#include +#include + +#include "common/config.h" +#include "common/logging.h" + +namespace doris { +ConcurrencyStatsManager::ConcurrencyStatsManager() : _running(false) { + // Initialize all counters in the order of read path (top to bottom) + vscanner_get_block = new ConcurrencyCounter("vscanner"); + segment_iterator_next_batch = new ConcurrencyCounter("segment_iterator"); + column_reader_read_page = new ConcurrencyCounter("column_reader"); + page_io_decompress = new ConcurrencyCounter("page_io.decompress"); + page_io_pre_decode = new ConcurrencyCounter("page_io.pre_decode"); + page_io_insert_page_cache = new ConcurrencyCounter("page_io.insert_page_cache"); + cached_remote_reader_read_at = new ConcurrencyCounter("file_cache.read_at"); + cached_remote_reader_get_or_set = new ConcurrencyCounter("file_cache.get_or_set"); + cached_remote_reader_get_or_set_wait_lock = + new ConcurrencyCounter("file_cache.get_or_set_wait_lock"); + cached_remote_reader_get_or_set_downloader = + new ConcurrencyCounter("file_cache.get_or_set_downloader"); + cached_remote_reader_write_back = new ConcurrencyCounter("file_cache.write_back"); + cached_remote_reader_blocking = new ConcurrencyCounter("file_cache.blocking"); + cached_remote_reader_local_read = new ConcurrencyCounter("file_cache.local_read"); + s3_file_reader_read = new ConcurrencyCounter("s3.read"); + + // Add to vector in the order they should be printed + _counters.push_back(vscanner_get_block); + _counters.push_back(segment_iterator_next_batch); + _counters.push_back(column_reader_read_page); + _counters.push_back(page_io_decompress); + _counters.push_back(page_io_pre_decode); + _counters.push_back(page_io_insert_page_cache); + _counters.push_back(cached_remote_reader_read_at); + _counters.push_back(cached_remote_reader_get_or_set); + _counters.push_back(cached_remote_reader_get_or_set_wait_lock); + // _counters.push_back(cached_remote_reader_get_or_set_downloader); + _counters.push_back(cached_remote_reader_write_back); + _counters.push_back(cached_remote_reader_blocking); + _counters.push_back(cached_remote_reader_local_read); + _counters.push_back(s3_file_reader_read); +} + +ConcurrencyStatsManager::~ConcurrencyStatsManager() { + stop(); + + // Clean up counters + for (auto* counter : _counters) { + delete counter; + } + _counters.clear(); +} + +ConcurrencyStatsManager& ConcurrencyStatsManager::instance() { + static ConcurrencyStatsManager instance; + return instance; +} + +void ConcurrencyStatsManager::start() { + if (_running.exchange(true)) { + return; // Already running + } + + _dump_thread = std::make_unique([this]() { _dump_thread_func(); }); +} + +void ConcurrencyStatsManager::stop() { + if (!_running.exchange(false)) { + return; // Not running + } + + if (_dump_thread && _dump_thread->joinable()) { + _dump_thread->join(); + } + _dump_thread.reset(); +} + +void ConcurrencyStatsManager::dump_to_log() { + if (_counters.empty()) { + return; + } + + // Build single line output: CONCURRENCY_STATS name1=value1 name2=value2 ... + std::stringstream ss; + ss << "CONCURRENCY_STATS"; + + for (const auto* counter : _counters) { + int64_t value = counter->value(); + ss << " " << counter->name() << "=" << value; + } + + LOG(INFO) << ss.str(); +} + +void ConcurrencyStatsManager::_dump_thread_func() { + while (_running.load(std::memory_order_relaxed)) { + // Check if dumping is enabled + if (config::enable_concurrency_stats_dump) { + dump_to_log(); + } + + // Sleep for the configured interval + int32_t interval_ms = config::concurrency_stats_dump_interval_ms; + if (interval_ms <= 0) { + interval_ms = 100; // Default to 100ms if invalid + } + + std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms)); + } +} + +} // namespace doris diff --git a/be/src/util/concurrency_stats.h b/be/src/util/concurrency_stats.h new file mode 100644 index 00000000000000..d3c1dcde414bc4 --- /dev/null +++ b/be/src/util/concurrency_stats.h @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +namespace doris { + +// A thread-safe counter for tracking concurrent operations +// Uses atomic variable for high-performance concurrent access +class ConcurrencyCounter { +public: + explicit ConcurrencyCounter(std::string name) : _name(std::move(name)), _count(0) {} + + // Increment the counter + void increment() { _count.fetch_add(1, std::memory_order_relaxed); } + + // Decrement the counter + void decrement() { _count.fetch_sub(1, std::memory_order_relaxed); } + + // Get current value + int64_t value() const { return _count.load(std::memory_order_relaxed); } + + const std::string& name() const { return _name; } + + // RAII helper for automatic increment/decrement + class Guard { + public: + explicit Guard(ConcurrencyCounter* counter) : _counter(counter) { + if (_counter) { + _counter->increment(); + } + } + + ~Guard() { + if (_counter) { + _counter->decrement(); + } + } + + Guard(const Guard&) = delete; + Guard& operator=(const Guard&) = delete; + + private: + ConcurrencyCounter* _counter; + }; + +private: + std::string _name; + std::atomic _count; +}; + +// Singleton manager for all concurrency counters +// All counters are defined here in order +class ConcurrencyStatsManager { +public: + static ConcurrencyStatsManager& instance(); + + // Start the background thread for periodic logging + void start(); + + // Stop the background thread + void stop(); + + // Manually dump all counters to log + void dump_to_log(); + + // Access to individual counters (defined in order of read path from top to bottom) + ConcurrencyCounter* vscanner_get_block; + ConcurrencyCounter* segment_iterator_next_batch; + ConcurrencyCounter* column_reader_read_page; + ConcurrencyCounter* page_io_decompress; + ConcurrencyCounter* page_io_pre_decode; + ConcurrencyCounter* page_io_insert_page_cache; + ConcurrencyCounter* cached_remote_reader_read_at; + ConcurrencyCounter* cached_remote_reader_get_or_set; + ConcurrencyCounter* cached_remote_reader_get_or_set_wait_lock; + ConcurrencyCounter* cached_remote_reader_get_or_set_downloader; + ConcurrencyCounter* cached_remote_reader_write_back; + ConcurrencyCounter* cached_remote_reader_blocking; + ConcurrencyCounter* cached_remote_reader_local_read; + ConcurrencyCounter* s3_file_reader_read; + +private: + ConcurrencyStatsManager(); + ~ConcurrencyStatsManager(); + + ConcurrencyStatsManager(const ConcurrencyStatsManager&) = delete; + ConcurrencyStatsManager& operator=(const ConcurrencyStatsManager&) = delete; + + void _dump_thread_func(); + + // All counters in the order they should be printed + std::vector _counters; + + std::atomic _running; + std::unique_ptr _dump_thread; +}; + +// Macro for scoped counting +#define SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, unique_id) \ + doris::ConcurrencyCounter::Guard _concurrency_guard_##unique_id(counter_ptr) + +#define SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, line) \ + SCOPED_CONCURRENCY_COUNT_IMPL(counter_ptr, line) + +#define SCOPED_CONCURRENCY_COUNT(counter_ptr) \ + SCOPED_CONCURRENCY_COUNT_HELPER(counter_ptr, __LINE__) + +} // namespace doris diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index d321075435b2eb..90c24292c4297c 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -23,6 +23,7 @@ #include "common/status.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" +#include "util/concurrency_stats.h" #include "util/defer_op.h" #include "util/runtime_profile.h" #include "vec/columns/column_nothing.h" @@ -76,6 +77,7 @@ Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) { } Status Scanner::get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) { + SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block); auto& row_descriptor = _local_state->_parent->row_descriptor(); if (_output_row_descriptor) { _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 6390bdcbb714e3..d659a19e955037 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -384,7 +384,7 @@ class TestRowIdConversion : public testing::TestWithParam(out_rowset); std::vector segment_num_rows; OlapReaderStatistics statistics; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &statistics).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &statistics).ok()); if (has_delete_handler) { // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 730999e650adc0..3885620566f3f3 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -39,6 +39,7 @@ #include "common/config.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "gen_cpp/olap_file.pb.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/file_system.h" @@ -413,4 +414,167 @@ TEST_F(BetaRowsetTest, GetIndexFileNames) { } } +TEST_F(BetaRowsetTest, GetSegmentNumRowsFromMeta) { + // Test getting segment rows from rowset meta (new version data) + // This test verifies that when segment_rows is present in rowset meta, + // it uses the cached data directly without loading segments + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + // Set segment rows in rowset meta (simulating new version data) + std::vector expected_segment_rows = {100, 200, 300}; + rowset_meta->set_num_segment_rows(expected_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(segment_rows.size(), 3); + ASSERT_EQ(segment_rows[0], 100); + ASSERT_EQ(segment_rows[1], 200); + ASSERT_EQ(segment_rows[2], 300); + + // Verify that we used the meta path and not the footer path + ASSERT_TRUE(used_meta_path); + ASSERT_FALSE(used_footer_path); + + // Test calling get_segment_num_rows twice to verify cache works + used_meta_path = false; + used_footer_path = false; + std::vector segment_rows_2; + st = rowset->get_segment_num_rows(&segment_rows_2, false, &_stats); + ASSERT_TRUE(st.ok()) << st; + ASSERT_EQ(segment_rows_2.size(), 3); + ASSERT_EQ(segment_rows_2[0], 100); + ASSERT_EQ(segment_rows_2[1], 200); + ASSERT_EQ(segment_rows_2[2], 300); + + EXPECT_FALSE(used_meta_path); + EXPECT_FALSE(used_footer_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetSegmentNumRowsEmptyMeta) { + // Test when rowset meta has no segment rows (old version data) + // In this case, it should try to load segments from segment footer + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(2); + // segment_rows is empty (simulating old version data) + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + + // Since we don't have actual segment files, it will fail to load segments + // But the important thing is to verify it tried to load from footer + ASSERT_TRUE(used_footer_path); + ASSERT_FALSE(used_meta_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetSegmentNumRowsCorruptedMeta) { + // Test when segment_rows size doesn't match segment count + // This simulates a corrupted rowset meta + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + // Set segment rows with wrong size (should be 3 but only has 2) + std::vector wrong_segment_rows = {100, 200}; + rowset_meta->set_num_segment_rows(wrong_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + // Use sync point to verify code path + auto sp = SyncPoint::get_instance(); + bool used_meta_path = false; + bool used_footer_path = false; + + sp->set_call_back("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta", + [&](auto&& args) { used_meta_path = true; }); + + sp->set_call_back("BetaRowset::get_segment_num_rows:load_from_segment_footer", + [&](auto&& args) { used_footer_path = true; }); + + sp->enable_processing(); + + std::vector segment_rows; + Status st = rowset->get_segment_num_rows(&segment_rows, false, &_stats); + + // When segment_rows size doesn't match, it should fall back to loading from footer + ASSERT_FALSE(used_meta_path); + ASSERT_TRUE(used_footer_path); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); +} + +TEST_F(BetaRowsetTest, GetNumSegmentRowsAPI) { + // Test the simple get_num_segment_rows API (without loading) + auto tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + auto rowset_meta = std::make_shared(); + init_rs_meta(rowset_meta, 1, 1); + rowset_meta->set_num_segments(3); + + std::vector expected_segment_rows = {100, 200, 300}; + rowset_meta->set_num_segment_rows(expected_segment_rows); + + auto rowset = std::make_shared(tablet_schema, rowset_meta, ""); + + std::vector segment_rows; + rowset->get_num_segment_rows(&segment_rows); + ASSERT_EQ(segment_rows.size(), 3); + ASSERT_EQ(segment_rows[0], 100); + ASSERT_EQ(segment_rows[1], 200); + ASSERT_EQ(segment_rows[2], 300); +} + } // namespace doris diff --git a/be/test/olap/rowset/rowset_meta_test.cpp b/be/test/olap/rowset/rowset_meta_test.cpp index cb1b2865c1440e..c78b5803f03a82 100644 --- a/be/test/olap/rowset/rowset_meta_test.cpp +++ b/be/test/olap/rowset/rowset_meta_test.cpp @@ -29,6 +29,7 @@ #include #include "common/status.h" +#include "cpp/sync_point.h" #include "gtest/gtest_pred_impl.h" #include "olap/olap_common.h" #include "olap/olap_meta.h" @@ -123,4 +124,183 @@ TEST_F(RowsetMetaTest, TestRowsetIdInit) { EXPECT_EQ(id.to_string(), "72057594037927935"); } +TEST_F(RowsetMetaTest, TestNumSegmentRowsSetAndGet) { + RowsetMeta rowset_meta; + EXPECT_TRUE(rowset_meta.init_from_json(_json_rowset_meta)); + + // Test set_num_segment_rows and get_num_segment_rows + std::vector num_segment_rows = {100, 200, 300}; + rowset_meta.set_num_segment_rows(num_segment_rows); + + std::vector retrieved_rows; + rowset_meta.get_num_segment_rows(&retrieved_rows); + + EXPECT_EQ(retrieved_rows.size(), 3); + EXPECT_EQ(retrieved_rows[0], 100); + EXPECT_EQ(retrieved_rows[1], 200); + EXPECT_EQ(retrieved_rows[2], 300); + + // Test get_num_segment_rows() const reference + const auto& num_segment_rows_ref = rowset_meta.get_num_segment_rows(); + EXPECT_EQ(num_segment_rows_ref.size(), 3); + EXPECT_EQ(num_segment_rows_ref.Get(0), 100); + EXPECT_EQ(num_segment_rows_ref.Get(1), 200); + EXPECT_EQ(num_segment_rows_ref.Get(2), 300); + + // Test serialization and deserialization + RowsetMetaPB rowset_meta_pb; + rowset_meta.to_rowset_pb(&rowset_meta_pb); + EXPECT_EQ(rowset_meta_pb.num_segment_rows_size(), 3); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(0), 100); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(1), 200); + EXPECT_EQ(rowset_meta_pb.num_segment_rows(2), 300); + + RowsetMeta rowset_meta_2; + rowset_meta_2.init_from_pb(rowset_meta_pb); + std::vector retrieved_rows_2; + rowset_meta_2.get_num_segment_rows(&retrieved_rows_2); + EXPECT_EQ(retrieved_rows_2.size(), 3); + EXPECT_EQ(retrieved_rows_2[0], 100); + EXPECT_EQ(retrieved_rows_2[1], 200); + EXPECT_EQ(retrieved_rows_2[2], 300); +} + +TEST_F(RowsetMetaTest, TestNumSegmentRowsEmpty) { + RowsetMeta rowset_meta; + EXPECT_TRUE(rowset_meta.init_from_json(_json_rowset_meta)); + + // By default, num_segment_rows should be empty + std::vector retrieved_rows; + rowset_meta.get_num_segment_rows(&retrieved_rows); + EXPECT_EQ(retrieved_rows.size(), 0); + + const auto& num_segment_rows_ref = rowset_meta.get_num_segment_rows(); + EXPECT_EQ(num_segment_rows_ref.size(), 0); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaWithNumSegmentRows) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_1 = {100, 200}; + rowset_meta_1.set_num_segment_rows(num_segment_rows_1); + rowset_meta_1.set_num_segments(2); + rowset_meta_1.set_total_disk_size(1000); + rowset_meta_1.set_data_disk_size(800); + rowset_meta_1.set_index_disk_size(200); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_2 = {300, 400, 500}; + rowset_meta_2.set_num_segment_rows(num_segment_rows_2); + rowset_meta_2.set_num_segments(3); + rowset_meta_2.set_total_disk_size(2000); + rowset_meta_2.set_data_disk_size(1600); + rowset_meta_2.set_index_disk_size(400); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + bool skip_called = false; + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + skip_called = true; + // Set the return flag to skip the schema merge logic + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + EXPECT_TRUE(skip_called); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // Check merged num_segment_rows + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 5); + EXPECT_EQ(merged_rows[0], 100); + EXPECT_EQ(merged_rows[1], 200); + EXPECT_EQ(merged_rows[2], 300); + EXPECT_EQ(merged_rows[3], 400); + EXPECT_EQ(merged_rows[4], 500); + + // Check merged num_segments + EXPECT_EQ(rowset_meta_1.num_segments(), 5); + + // Check merged disk sizes + EXPECT_EQ(rowset_meta_1.total_disk_size(), 3000); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaWithPartialNumSegmentRows) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + std::vector num_segment_rows_1 = {100, 200}; + rowset_meta_1.set_num_segment_rows(num_segment_rows_1); + rowset_meta_1.set_num_segments(2); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + // rowset_meta_2 has no num_segment_rows (simulating old version data) + rowset_meta_2.set_num_segments(3); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // num_segment_rows should be cleared when one of them is empty + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 0); + + // num_segments should still be merged + EXPECT_EQ(rowset_meta_1.num_segments(), 5); +} + +TEST_F(RowsetMetaTest, TestMergeRowsetMetaBothEmpty) { + RowsetMeta rowset_meta_1; + EXPECT_TRUE(rowset_meta_1.init_from_json(_json_rowset_meta)); + rowset_meta_1.set_num_segments(2); + + RowsetMeta rowset_meta_2; + EXPECT_TRUE(rowset_meta_2.init_from_json(_json_rowset_meta)); + rowset_meta_2.set_num_segments(3); + + // Use sync point to skip schema merge logic + auto sp = SyncPoint::get_instance(); + sp->set_call_back("RowsetMeta::merge_rowset_meta:skip_schema_merge", [&](auto&& args) { + auto pred = try_any_cast(args.back()); + *pred = true; + }); + sp->enable_processing(); + + // Merge rowset_meta_2 into rowset_meta_1 + rowset_meta_1.merge_rowset_meta(rowset_meta_2); + + sp->clear_all_call_backs(); + sp->disable_processing(); + sp->clear_trace(); + + // num_segment_rows should remain empty + std::vector merged_rows; + rowset_meta_1.get_num_segment_rows(&merged_rows); + EXPECT_EQ(merged_rows.size(), 0); + + // num_segments should still be merged + EXPECT_EQ(rowset_meta_1.num_segments(), 5); +} + } // namespace doris diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index d1e2b44ce4df73..92e20da4efb95c 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -287,7 +287,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index c599725c18df78..51487ef236c83c 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -393,7 +393,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -903,7 +903,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; @@ -1172,7 +1172,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { auto beta_rowset = std::dynamic_pointer_cast(rowset); std::vector segment_num_rows; OlapReaderStatistics stats; - EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, &stats).ok()); + EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows, false, &stats).ok()); size_t total_num_rows = 0; for (const auto& i : segment_num_rows) { total_num_rows += i; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 7fa90adcb9f058..f49dd0cf46414b 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -136,6 +136,9 @@ message RowsetMetaPB { // indicate that whether the segments key bounds is truncated optional bool segments_key_bounds_truncated = 55; + // rows count for each segment + repeated int64 num_segment_rows = 56; + // For cloud // for data recycling optional int64 txn_expiration = 1000; @@ -234,6 +237,9 @@ message RowsetMetaCloudPB { // indicate that whether the segments key bounds is truncated optional bool segments_key_bounds_truncated = 55; + // rows count for each segment + repeated int64 num_segment_rows = 56; + // cloud // the field is a vector, rename it repeated int64 segments_file_size = 100; diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index fa41a88af35c11..210e709e99fa0f 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -52,3 +52,7 @@ enable_prefill_all_dbm_agg_cache_after_compaction=true enable_batch_get_delete_bitmap=true get_delete_bitmap_bytes_threshold=10 + +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true +fail_when_segment_rows_not_in_rowset_meta=true diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf b/regression-test/pipeline/cloud_p1/conf/be_custom.conf index aed4d69efbf704..749be9f2d092e5 100644 --- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf @@ -37,3 +37,6 @@ enable_table_size_correctness_check=true enable_write_index_searcher_cache=true large_cumu_compaction_task_min_thread_num=3 enable_prefill_all_dbm_agg_cache_after_compaction=true + +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true diff --git a/regression-test/pipeline/nonConcurrent/conf/be.conf b/regression-test/pipeline/nonConcurrent/conf/be.conf index 394784bc5174e0..c64398a30aa5d9 100644 --- a/regression-test/pipeline/nonConcurrent/conf/be.conf +++ b/regression-test/pipeline/nonConcurrent/conf/be.conf @@ -89,4 +89,5 @@ large_cumu_compaction_task_min_thread_num=3 enable_parquet_page_index=true enable_graceful_exit_check=true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 161aa140b14704..0571efddaf9d48 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -92,4 +92,6 @@ enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true enable_fetch_rowsets_from_peer_replicas = true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true +fail_when_segment_rows_not_in_rowset_meta=true diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index fb7d788ab6ac68..155317490250aa 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -77,4 +77,5 @@ enable_graceful_exit_check=true enable_prefill_all_dbm_agg_cache_after_compaction=true - +enable_segment_rows_consistency_check=true +enable_segment_rows_check_core=true