Skip to content

Commit 3796d2c

Browse files
committed
[Opt](meta)persist segment rows in rowse meta pb
1 parent b15d7ef commit 3796d2c

File tree

22 files changed

+521
-21
lines changed

22 files changed

+521
-21
lines changed

be/src/cloud/pb_convert.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
8080
out->set_txn_expiration(in.txn_expiration());
8181
out->set_segments_overlap_pb(in.segments_overlap_pb());
8282
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
83+
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
8384
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
8485
out->set_index_id(in.index_id());
8586
if (in.has_schema_version()) {
@@ -151,6 +152,7 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
151152
out->set_txn_expiration(in.txn_expiration());
152153
out->set_segments_overlap_pb(in.segments_overlap_pb());
153154
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
155+
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
154156
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
155157
out->set_index_id(in.index_id());
156158
if (in.has_schema_version()) {
@@ -234,6 +236,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
234236
out->set_txn_expiration(in.txn_expiration());
235237
out->set_segments_overlap_pb(in.segments_overlap_pb());
236238
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
239+
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
237240
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
238241
out->set_index_id(in.index_id());
239242
if (in.has_schema_version()) {
@@ -305,6 +308,7 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
305308
out->set_txn_expiration(in.txn_expiration());
306309
out->set_segments_overlap_pb(in.segments_overlap_pb());
307310
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
311+
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
308312
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
309313
out->set_index_id(in.index_id());
310314
if (in.has_schema_version()) {

be/src/common/config.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,12 @@ DEFINE_mInt32(trash_file_expire_time_sec, "0");
384384
// modify them upon necessity
385385
DEFINE_Int32(min_file_descriptor_number, "60000");
386386
DEFINE_mBool(disable_segment_cache, "false");
387+
// Enable checking segment rows consistency between rowset meta and segment footer
388+
DEFINE_mBool(enable_segment_rows_consistency_check, "false");
389+
DEFINE_mBool(enable_segment_rows_check_core, "false");
390+
// ATTENTION: For test only. In test environment, there are no historical data,
391+
// so all rowset meta should have segment rows info.
392+
DEFINE_mBool(fail_when_segment_rows_not_in_rowset_meta, "false");
387393
DEFINE_String(row_cache_mem_limit, "20%");
388394

389395
// Cache for storage page size

be/src/common/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,12 @@ DECLARE_mInt32(trash_file_expire_time_sec);
432432
// modify them upon necessity
433433
DECLARE_Int32(min_file_descriptor_number);
434434
DECLARE_mBool(disable_segment_cache);
435+
// Enable checking segment rows consistency between rowset meta and segment footer
436+
DECLARE_mBool(enable_segment_rows_consistency_check);
437+
DECLARE_mBool(enable_segment_rows_check_core);
438+
// ATTENTION: For test only. In test environment, there are no historical data,
439+
// so all rowset meta should have segment rows info.
440+
DECLARE_mBool(fail_when_segment_rows_not_in_rowset_meta);
435441
DECLARE_String(row_cache_mem_limit);
436442

437443
// Cache for storage page size

be/src/olap/parallel_scanner_builder.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,9 @@ Status ParallelScannerBuilder::_build_scanners_by_per_segment(std::list<ScannerS
222222
Status ParallelScannerBuilder::_load() {
223223
_total_rows = 0;
224224
size_t idx = 0;
225+
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
226+
? _state->query_options().enable_segment_cache
227+
: true;
225228
for (auto&& [tablet, version] : _tablets) {
226229
const auto tablet_id = tablet->tablet_id();
227230
_all_read_sources[tablet_id] = _read_sources[idx];
@@ -233,7 +236,8 @@ Status ParallelScannerBuilder::_load() {
233236

234237
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
235238
std::vector<uint32_t> segment_rows;
236-
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, &_builder_stats));
239+
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows, enable_segment_cache,
240+
&_builder_stats));
237241
auto segment_count = rowset->num_segments();
238242
for (int64_t i = 0; i != segment_count; i++) {
239243
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);

be/src/olap/rowset/beta_rowset.cpp

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
#include <utility>
3030

3131
#include "beta_rowset.h"
32+
#include "cloud/config.h"
3233
#include "common/config.h"
3334
#include "common/logging.h"
3435
#include "common/status.h"
36+
#include "cpp/sync_point.h"
3537
#include "io/fs/file_reader.h"
3638
#include "io/fs/file_system.h"
3739
#include "io/fs/local_file_system.h"
@@ -71,24 +73,90 @@ Status BetaRowset::init() {
7173
return Status::OK(); // no op
7274
}
7375

76+
namespace {
77+
Status load_segment_rows_from_footer(BetaRowsetSharedPtr rowset,
78+
std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
79+
OlapReaderStatistics* read_stats) {
80+
SegmentCacheHandle segment_cache_handle;
81+
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
82+
rowset, &segment_cache_handle, enable_segment_cache, false, read_stats));
83+
for (const auto& segment : segment_cache_handle.get_segments()) {
84+
segment_rows->emplace_back(segment->num_rows());
85+
}
86+
return Status::OK();
87+
}
88+
89+
Status check_segment_rows_consistency(const std::vector<uint32_t>& rows_from_meta,
90+
const std::vector<uint32_t>& rows_from_footer,
91+
int64_t tablet_id, const std::string& rowset_id) {
92+
DCHECK_EQ(rows_from_footer.size(), rows_from_meta.size());
93+
for (size_t i = 0; i < rows_from_footer.size(); i++) {
94+
if (rows_from_footer[i] != rows_from_meta[i]) {
95+
auto msg = fmt::format(
96+
"segment rows mismatch between rowset meta and segment footer. "
97+
"segment index: {}, meta rows: {}, footer rows: {}, tablet={}, rowset={}",
98+
i, rows_from_meta[i], rows_from_footer[i], tablet_id, rowset_id);
99+
if (config::enable_segment_rows_check_core) {
100+
CHECK(false) << msg;
101+
}
102+
return Status::InternalError(msg);
103+
}
104+
}
105+
return Status::OK();
106+
}
107+
} // namespace
108+
74109
Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows,
110+
bool enable_segment_cache,
75111
OlapReaderStatistics* read_stats) {
76112
// `ROWSET_UNLOADING` is state for closed() called but owned by some readers.
77113
// So here `ROWSET_UNLOADING` is allowed.
78114
DCHECK_NE(_rowset_state_machine.rowset_state(), ROWSET_UNLOADED);
79115

80-
RETURN_IF_ERROR(_load_segment_rows_once.call([this, read_stats] {
116+
RETURN_IF_ERROR(_load_segment_rows_once.call([this, enable_segment_cache, read_stats] {
81117
auto segment_count = num_segments();
82-
_segments_rows.resize(segment_count);
83-
for (int64_t i = 0; i != segment_count; ++i) {
84-
SegmentCacheHandle segment_cache_handle;
85-
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
86-
std::static_pointer_cast<BetaRowset>(shared_from_this()), i,
87-
&segment_cache_handle, false, false, read_stats));
88-
const auto& tmp_segments = segment_cache_handle.get_segments();
89-
_segments_rows[i] = tmp_segments[0]->num_rows();
118+
119+
if (!_rowset_meta->get_num_segment_rows().empty()) {
120+
if (_rowset_meta->get_num_segment_rows().size() == segment_count) {
121+
// use segment rows in rowset meta if eligible
122+
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:use_segment_rows_from_meta");
123+
_segments_rows.assign(_rowset_meta->get_num_segment_rows().cbegin(),
124+
_rowset_meta->get_num_segment_rows().cend());
125+
if (config::enable_segment_rows_consistency_check) {
126+
// verify segment rows from meta match segment footer
127+
std::vector<uint32_t> rows_from_footer;
128+
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
129+
auto load_status = load_segment_rows_from_footer(
130+
self, &rows_from_footer, enable_segment_cache, read_stats);
131+
if (load_status.ok()) {
132+
return check_segment_rows_consistency(
133+
_segments_rows, rows_from_footer, _rowset_meta->tablet_id(),
134+
_rowset_meta->rowset_id().to_string());
135+
}
136+
}
137+
return Status::OK();
138+
} else {
139+
auto msg = fmt::format(
140+
"corrupted segment rows info in rowset meta. "
141+
"segment count: {}, segment rows size: {}, tablet={}, rowset={}",
142+
segment_count, _rowset_meta->get_num_segment_rows().size(),
143+
_rowset_meta->tablet_id(), _rowset_meta->rowset_id().to_string());
144+
if (config::enable_segment_rows_check_core) {
145+
CHECK(false) << msg;
146+
}
147+
LOG_EVERY_SECOND(WARNING) << msg;
148+
}
90149
}
91-
return Status::OK();
150+
if (config::fail_when_segment_rows_not_in_rowset_meta) {
151+
CHECK(false) << "segment rows info not found in rowset meta. tablet="
152+
<< _rowset_meta->tablet_id()
153+
<< ", rowset=" << _rowset_meta->rowset_id().to_string();
154+
}
155+
// otherwise, read it from segment footer
156+
TEST_SYNC_POINT("BetaRowset::get_segment_num_rows:load_from_segment_footer");
157+
auto self = std::dynamic_pointer_cast<BetaRowset>(shared_from_this());
158+
return load_segment_rows_from_footer(self, &_segments_rows, enable_segment_cache,
159+
read_stats);
92160
}));
93161
segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
94162
return Status::OK();

be/src/olap/rowset/beta_rowset.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class BetaRowset final : public Rowset {
9191
Status show_nested_index_file(rapidjson::Value* rowset_value,
9292
rapidjson::Document::AllocatorType& allocator);
9393

94-
Status get_segment_num_rows(std::vector<uint32_t>* segment_rows,
94+
Status get_segment_num_rows(std::vector<uint32_t>* segment_rows, bool enable_segment_cache,
9595
OlapReaderStatistics* read_stats);
9696

9797
protected:

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
256256
if (_read_context->record_rowids && _read_context->rowid_conversion) {
257257
// init segment rowid map for rowid conversion
258258
std::vector<uint32_t> segment_rows;
259-
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, _stats));
259+
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows, should_use_cache, _stats));
260260
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
261261
segment_rows));
262262
}

be/src/olap/rowset/beta_rowset_writer.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <utility>
3232

3333
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
34+
#include "common/cast_set.h"
3435
#include "common/compiler_util.h" // IWYU pragma: keep
3536
#include "common/config.h"
3637
#include "common/logging.h"
@@ -769,6 +770,7 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
769770
_num_segment += cast_set<int32_t>(rowset->num_segments());
770771
// append key_bounds to current rowset
771772
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
773+
rowset->get_num_segment_rows(&_segment_num_rows);
772774
_segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();
773775

774776
// TODO update zonemap
@@ -948,21 +950,31 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
948950
int64_t total_data_size = 0;
949951
int64_t total_index_size = 0;
950952
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
953+
std::vector<uint32_t> segment_rows;
951954
{
952955
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
953956
for (const auto& itr : _segid_statistics_map) {
954957
num_rows_written += itr.second.row_num;
955958
total_data_size += itr.second.data_size;
956959
total_index_size += itr.second.index_size;
957960
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
961+
// segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
962+
segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
958963
}
959964
}
965+
if (segment_rows.empty()) {
966+
// vertical compaction and linked schema change will not record segment statistics,
967+
// it will record segment rows in _segment_num_rows
968+
RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
969+
}
970+
960971
for (auto& key_bound : _segments_encoded_key_bounds) {
961972
segments_encoded_key_bounds.push_back(key_bound);
962973
}
963974
if (_segments_key_bounds_truncated.has_value()) {
964975
rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
965976
}
977+
rowset_meta->set_num_segment_rows(segment_rows);
966978
// segment key bounds are empty in old version(before version 1.2.x). So we should not modify
967979
// the overlap property when key bounds are empty.
968980
// for mow table with cluster keys, the overlap is used for cluster keys,
@@ -983,6 +995,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
983995
"is: {}, _num_seg is: {}",
984996
segments_encoded_key_bounds_size, segment_num);
985997
}
998+
if (segment_rows.size() != segment_num) {
999+
return Status::InternalError(
1000+
"segment_rows size should equal to _num_seg, segment_rows size is: {}, "
1001+
"_num_seg is {}, tablet={}, rowset={}, txn={}",
1002+
segment_rows.size(), segment_num, _context.tablet_id,
1003+
_context.rowset_id.to_string(), _context.txn_id);
1004+
}
9861005
}
9871006

9881007
rowset_meta->set_num_segments(segment_num);

be/src/olap/rowset/rowset.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
272272
return Status::OK();
273273
}
274274

275+
void get_num_segment_rows(std::vector<uint32_t>* num_segment_rows) {
276+
_rowset_meta->get_num_segment_rows(num_segment_rows);
277+
}
278+
275279
// min key of the first segment
276280
bool first_key(std::string* min_key) {
277281
KeyBoundsPB key_bounds;

be/src/olap/rowset/rowset_meta.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "cloud/cloud_storage_engine.h"
2727
#include "common/logging.h"
2828
#include "common/status.h"
29+
#include "cpp/sync_point.h"
2930
#include "google/protobuf/util/message_differencer.h"
3031
#include "io/fs/encrypted_fs_factory.h"
3132
#include "io/fs/file_system.h"
@@ -323,6 +324,20 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
323324
set_total_disk_size(data_disk_size() + index_disk_size());
324325
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
325326
other.is_segments_key_bounds_truncated());
327+
if (_rowset_meta_pb.num_segment_rows_size() > 0) {
328+
if (other.num_segments() > 0) {
329+
if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
330+
for (auto row_count : other._rowset_meta_pb.num_segment_rows()) {
331+
_rowset_meta_pb.add_num_segment_rows(row_count);
332+
}
333+
} else {
334+
// This may happen when a partial update load commits in high version doirs_be
335+
// and publishes with new segments in low version doris_be. In this case, just clear
336+
// all num_segment_rows.
337+
_rowset_meta_pb.clear_num_segment_rows();
338+
}
339+
}
340+
}
326341
for (auto&& key_bound : other.get_segments_key_bounds()) {
327342
add_segment_key_bounds(key_bound);
328343
}
@@ -341,6 +356,7 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
341356
}
342357
// In partial update the rowset schema maybe updated when table contains variant type, so we need the newest schema to be updated
343358
// Otherwise the schema is stale and lead to wrong data read
359+
TEST_SYNC_POINT_RETURN_WITH_VOID("RowsetMeta::merge_rowset_meta:skip_schema_merge");
344360
if (tablet_schema()->num_variant_columns() > 0) {
345361
// merge extracted columns
346362
TabletSchemaSPtr merged_schema;

0 commit comments

Comments
 (0)