Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/olap_common.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/defer_op.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -120,6 +121,26 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
bool skip_trim = false; // Skip trim for Empty Rowset Compaction

// DEFER: trim input_rowsets from back if score > max_compaction_score
// This ensures we don't return more rowsets than allowed by max_compaction_score,
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
// Must be placed after variable initialization and before collection loop.
DEFER({
if (skip_trim) {
return;
}
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
while (input_rowsets->size() > 1 &&
*compaction_score > static_cast<size_t>(max_compaction_score)) {
auto& last_rowset = input_rowsets->back();
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
total_size -= last_rowset->rowset_meta()->total_disk_size();
input_rowsets->pop_back();
}
});

for (auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
Expand All @@ -143,10 +164,8 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
continue;
}
}
if (*compaction_score >= max_compaction_score) {
// got enough segments
break;
}
// Removed: max_compaction_score check here
// We now collect all candidate rowsets and trim from back at return time via DEFER
*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

Expand Down Expand Up @@ -184,8 +203,10 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
static_cast<double>(input_rowsets->size()) >=
config::empty_rowset_compaction_min_ratio) {
// Prioritize consecutive empty rowset compaction
// Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count
*input_rowsets = consecutive_empty_rowsets;
*compaction_score = consecutive_empty_rowsets.size();
skip_trim = true;
return consecutive_empty_rowsets.size();
}
}
Expand Down Expand Up @@ -229,7 +250,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
*compaction_score = max_score;
return transient_size;
}
// Exceeding max compaction score, do compaction on all candidate rowsets anyway
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
return transient_size;
}
}
Expand Down
25 changes: 20 additions & 5 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "util/debug_points.h"
#include "util/defer_op.h"

namespace doris {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -268,6 +269,22 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;

// DEFER: trim input_rowsets from back if score > max_compaction_score
// This ensures we don't return more rowsets than allowed by max_compaction_score,
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
// Must be placed after variable initialization and before collection loop.
DEFER({
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
while (input_rowsets->size() > 1 &&
*compaction_score > static_cast<size_t>(max_compaction_score)) {
auto& last_rowset = input_rowsets->back();
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
total_size -= last_rowset->rowset_meta()->total_disk_size();
input_rowsets->pop_back();
}
});

for (auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
Expand All @@ -291,10 +308,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
continue;
}
}
if (*compaction_score >= max_compaction_score) {
// got enough segments
break;
}
// Removed: max_compaction_score check here
// We now collect all candidate rowsets and trim from back at return time via DEFER
*compaction_score += rowset->rowset_meta()->get_compaction_score();
total_size += rowset->rowset_meta()->total_disk_size();

Expand Down Expand Up @@ -355,7 +370,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
*compaction_score = max_score;
return transient_size;
}
// no rowset is OVERLAPPING, execute compaction on all input rowsets
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
return transient_size;
}
input_rowsets->erase(input_rowsets->begin(), rs_begin);
Expand Down
122 changes: 122 additions & 0 deletions be/test/cloud/cloud_cumulative_compaction_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_cumulative_compaction_policy.h"

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "gtest/gtest_pred_impl.h"
#include "json2pb/json_to_pb.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -146,4 +150,122 @@ TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, new_cumulative_point) {
Version version(1, 1);
EXPECT_EQ(policy.new_cumulative_point(&_tablet, output_rowset, version, 2), 6);
}

// Test case: Empty rowset compaction with skip_trim
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty_rowset_compaction) {
// Save original config values
bool orig_enable_empty_rowset_compaction = config::enable_empty_rowset_compaction;
int32_t orig_empty_rowset_compaction_min_count = config::empty_rowset_compaction_min_count;
double orig_empty_rowset_compaction_min_ratio = config::empty_rowset_compaction_min_ratio;

// Enable empty rowset compaction
config::enable_empty_rowset_compaction = true;
config::empty_rowset_compaction_min_count = 5;
config::empty_rowset_compaction_min_ratio = 0.5;

CloudTablet _tablet(_engine, _tablet_meta);
_tablet._base_size = 1024L * 1024 * 1024; // 1GB base

// Create candidate rowsets: 2 normal + 150 empty rowsets
// This tests that skip_trim = true for empty rowset compaction
std::vector<RowsetSharedPtr> candidate_rowsets;

// 2 normal rowsets
for (int i = 0; i < 2; i++) {
auto rowset = create_rowset(Version(i + 2, i + 2), 1, true, 1024 * 1024); // 1MB
candidate_rowsets.push_back(rowset);
}

// 150 empty rowsets (consecutive)
for (int i = 0; i < 150; i++) {
auto rowset = create_rowset(Version(i + 4, i + 4), 0, false, 0); // empty
candidate_rowsets.push_back(rowset);
}

std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
size_t compaction_score = 0;

CloudSizeBasedCumulativeCompactionPolicy policy;
// max=100, but empty rowset compaction should return 150 (skip_trim = true)
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 50, &input_rowsets,
&last_delete_version, &compaction_score, true);

// Empty rowset compaction should return all 150 empty rowsets
// skip_trim = true, so no trimming even though score > max
EXPECT_EQ(150, input_rowsets.size());
EXPECT_EQ(150, compaction_score);

// Verify all returned rowsets are empty
for (const auto& rs : input_rowsets) {
EXPECT_EQ(0, rs->num_segments());
}

// Restore original config values
config::enable_empty_rowset_compaction = orig_enable_empty_rowset_compaction;
config::empty_rowset_compaction_min_count = orig_empty_rowset_compaction_min_count;
config::empty_rowset_compaction_min_ratio = orig_empty_rowset_compaction_min_ratio;
}

// Test case: prioritize_query_perf_in_compaction for non-DUP_KEYS table
// This tests the branch: rs_begin == end && prioritize_query_perf && keys_type != DUP_KEYS
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_prioritize_query_perf) {
// Save original config value
bool orig_prioritize_query_perf = config::prioritize_query_perf_in_compaction;

// Enable prioritize_query_perf_in_compaction
config::prioritize_query_perf_in_compaction = true;

// Create tablet with UNIQUE keys (not DUP_KEYS)
TTabletSchema schema;
schema.keys_type = TKeysType::UNIQUE_KEYS;
TabletMetaSharedPtr tablet_meta(new TabletMeta(1, 2, 15673, 15674, 4, 5, schema, 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));

CloudTablet _tablet(_engine, tablet_meta);
// Use large base_size to get large promotion_size, ensuring total_size < promotion_size
// so we don't trigger promotion_size early return and can reach level_size logic
_tablet._base_size = 20L * 1024 * 1024 * 1024; // 20GB base, promotion_size ~= 1GB

// Create candidate rowsets that will ALL be removed by level_size
// Key: each rowset's level > remain_level after removal
std::vector<RowsetSharedPtr> candidate_rowsets;

// 3 rowsets with decreasing sizes, all will be removed by level_size:
// - 40MB: level(40)=32, remain=35, level(35)=32, 32>32? NO... need adjustment
// Let's use sizes that guarantee all removal:
// - 50MB: level(50)=32, after remove remain=25, level(25)=16, 32>16 -> remove
// - 20MB: level(20)=16, after remove remain=5, level(5)=4, 16>4 -> remove
// - 5MB: level(5)=4, after remove remain=0, level(0)=0, 4>0 -> remove
auto rowset1 = create_rowset(Version(2, 2), 30, true, 50L * 1024 * 1024); // 50MB, score=30
auto rowset2 = create_rowset(Version(3, 3), 20, true, 20L * 1024 * 1024); // 20MB, score=20
auto rowset3 = create_rowset(Version(4, 4), 10, true, 5L * 1024 * 1024); // 5MB, score=10
candidate_rowsets.push_back(rowset1);
candidate_rowsets.push_back(rowset2);
candidate_rowsets.push_back(rowset3);

// total_size = 75MB < promotion_size (~1GB), enters level_size logic
// All 3 rowsets will be removed by level_size -> rs_begin == end
// With prioritize_query_perf=true and UNIQUE_KEYS, should return all candidates

std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
size_t compaction_score = 0;

CloudSizeBasedCumulativeCompactionPolicy policy;
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 5, &input_rowsets,
&last_delete_version, &compaction_score, true);

// With prioritize_query_perf enabled for non-DUP_KEYS table,
// when all rowsets are removed by level_size, should return all candidates
// (before DEFER trim)
// Total score = 60, max = 100, so no trimming needed
EXPECT_EQ(3, input_rowsets.size());
EXPECT_EQ(60, compaction_score);

// Restore original config value
config::prioritize_query_perf_in_compaction = orig_prioritize_query_perf;
}

} // namespace doris
Loading
Loading