Skip to content

Commit df7558d

Browse files
branch-4.0: [Fix](Compaction) Fix cumulative compaction pick rowsets to trim by max score after filtering #59268 (#59475)
Cherry-picked from #59268 Co-authored-by: Jimmy <lianyukang@selectdb.com>
1 parent 2078d35 commit df7558d

File tree

4 files changed

+1967
-12
lines changed

4 files changed

+1967
-12
lines changed

be/src/cloud/cloud_cumulative_compaction_policy.cpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "olap/olap_common.h"
3131
#include "olap/tablet.h"
3232
#include "olap/tablet_meta.h"
33+
#include "util/defer_op.h"
3334

3435
namespace doris {
3536
#include "common/compile_check_begin.h"
@@ -120,6 +121,26 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
120121
int transient_size = 0;
121122
*compaction_score = 0;
122123
int64_t total_size = 0;
124+
bool skip_trim = false; // Skip trim for Empty Rowset Compaction
125+
126+
// DEFER: trim input_rowsets from back if score > max_compaction_score
127+
// This ensures we don't return more rowsets than allowed by max_compaction_score,
128+
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
129+
// Must be placed after variable initialization and before collection loop.
130+
DEFER({
131+
if (skip_trim) {
132+
return;
133+
}
134+
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
135+
while (input_rowsets->size() > 1 &&
136+
*compaction_score > static_cast<size_t>(max_compaction_score)) {
137+
auto& last_rowset = input_rowsets->back();
138+
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
139+
total_size -= last_rowset->rowset_meta()->total_disk_size();
140+
input_rowsets->pop_back();
141+
}
142+
});
143+
123144
for (auto& rowset : candidate_rowsets) {
124145
// check whether this rowset is delete version
125146
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -143,10 +164,8 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
143164
continue;
144165
}
145166
}
146-
if (*compaction_score >= max_compaction_score) {
147-
// got enough segments
148-
break;
149-
}
167+
// Removed: max_compaction_score check here
168+
// We now collect all candidate rowsets and trim from back at return time via DEFER
150169
*compaction_score += rowset->rowset_meta()->get_compaction_score();
151170
total_size += rowset->rowset_meta()->total_disk_size();
152171

@@ -184,8 +203,10 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
184203
static_cast<double>(input_rowsets->size()) >=
185204
config::empty_rowset_compaction_min_ratio) {
186205
// Prioritize consecutive empty rowset compaction
206+
// Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count
187207
*input_rowsets = consecutive_empty_rowsets;
188208
*compaction_score = consecutive_empty_rowsets.size();
209+
skip_trim = true;
189210
return consecutive_empty_rowsets.size();
190211
}
191212
}
@@ -229,7 +250,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
229250
*compaction_score = max_score;
230251
return transient_size;
231252
}
232-
// Exceeding max compaction score, do compaction on all candidate rowsets anyway
253+
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
233254
return transient_size;
234255
}
235256
}

be/src/olap/cumulative_compaction_policy.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "olap/tablet.h"
3030
#include "olap/tablet_meta.h"
3131
#include "util/debug_points.h"
32+
#include "util/defer_op.h"
3233

3334
namespace doris {
3435
#include "common/compile_check_begin.h"
@@ -268,6 +269,22 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
268269
int transient_size = 0;
269270
*compaction_score = 0;
270271
int64_t total_size = 0;
272+
273+
// DEFER: trim input_rowsets from back if score > max_compaction_score
274+
// This ensures we don't return more rowsets than allowed by max_compaction_score,
275+
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
276+
// Must be placed after variable initialization and before collection loop.
277+
DEFER({
278+
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
279+
while (input_rowsets->size() > 1 &&
280+
*compaction_score > static_cast<size_t>(max_compaction_score)) {
281+
auto& last_rowset = input_rowsets->back();
282+
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
283+
total_size -= last_rowset->rowset_meta()->total_disk_size();
284+
input_rowsets->pop_back();
285+
}
286+
});
287+
271288
for (auto& rowset : candidate_rowsets) {
272289
// check whether this rowset is delete version
273290
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -291,10 +308,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
291308
continue;
292309
}
293310
}
294-
if (*compaction_score >= max_compaction_score) {
295-
// got enough segments
296-
break;
297-
}
311+
// Removed: max_compaction_score check here
312+
// We now collect all candidate rowsets and trim from back at return time via DEFER
298313
*compaction_score += rowset->rowset_meta()->get_compaction_score();
299314
total_size += rowset->rowset_meta()->total_disk_size();
300315

@@ -355,7 +370,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
355370
*compaction_score = max_score;
356371
return transient_size;
357372
}
358-
// no rowset is OVERLAPPING, execute compaction on all input rowsets
373+
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
359374
return transient_size;
360375
}
361376
input_rowsets->erase(input_rowsets->begin(), rs_begin);

be/test/cloud/cloud_cumulative_compaction_policy_test.cpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include "cloud/cloud_cumulative_compaction_policy.h"
19+
1820
#include <gen_cpp/AgentService_types.h>
1921
#include <gen_cpp/olap_file.pb.h>
2022
#include <gtest/gtest-message.h>
2123
#include <gtest/gtest-test-part.h>
2224
#include <gtest/gtest.h>
2325

2426
#include "cloud/cloud_storage_engine.h"
27+
#include "cloud/config.h"
28+
#include "common/config.h"
2529
#include "gtest/gtest_pred_impl.h"
2630
#include "json2pb/json_to_pb.h"
2731
#include "olap/olap_common.h"
@@ -146,4 +150,122 @@ TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, new_cumulative_point) {
146150
Version version(1, 1);
147151
EXPECT_EQ(policy.new_cumulative_point(&_tablet, output_rowset, version, 2), 6);
148152
}
153+
154+
// Test case: Empty rowset compaction with skip_trim
155+
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty_rowset_compaction) {
156+
// Save original config values
157+
bool orig_enable_empty_rowset_compaction = config::enable_empty_rowset_compaction;
158+
int32_t orig_empty_rowset_compaction_min_count = config::empty_rowset_compaction_min_count;
159+
double orig_empty_rowset_compaction_min_ratio = config::empty_rowset_compaction_min_ratio;
160+
161+
// Enable empty rowset compaction
162+
config::enable_empty_rowset_compaction = true;
163+
config::empty_rowset_compaction_min_count = 5;
164+
config::empty_rowset_compaction_min_ratio = 0.5;
165+
166+
CloudTablet _tablet(_engine, _tablet_meta);
167+
_tablet._base_size = 1024L * 1024 * 1024; // 1GB base
168+
169+
// Create candidate rowsets: 2 normal + 150 empty rowsets
170+
// This tests that skip_trim = true for empty rowset compaction
171+
std::vector<RowsetSharedPtr> candidate_rowsets;
172+
173+
// 2 normal rowsets
174+
for (int i = 0; i < 2; i++) {
175+
auto rowset = create_rowset(Version(i + 2, i + 2), 1, true, 1024 * 1024); // 1MB
176+
candidate_rowsets.push_back(rowset);
177+
}
178+
179+
// 150 empty rowsets (consecutive)
180+
for (int i = 0; i < 150; i++) {
181+
auto rowset = create_rowset(Version(i + 4, i + 4), 0, false, 0); // empty
182+
candidate_rowsets.push_back(rowset);
183+
}
184+
185+
std::vector<RowsetSharedPtr> input_rowsets;
186+
Version last_delete_version {-1, -1};
187+
size_t compaction_score = 0;
188+
189+
CloudSizeBasedCumulativeCompactionPolicy policy;
190+
// max=100, but empty rowset compaction should return 150 (skip_trim = true)
191+
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 50, &input_rowsets,
192+
&last_delete_version, &compaction_score, true);
193+
194+
// Empty rowset compaction should return all 150 empty rowsets
195+
// skip_trim = true, so no trimming even though score > max
196+
EXPECT_EQ(150, input_rowsets.size());
197+
EXPECT_EQ(150, compaction_score);
198+
199+
// Verify all returned rowsets are empty
200+
for (const auto& rs : input_rowsets) {
201+
EXPECT_EQ(0, rs->num_segments());
202+
}
203+
204+
// Restore original config values
205+
config::enable_empty_rowset_compaction = orig_enable_empty_rowset_compaction;
206+
config::empty_rowset_compaction_min_count = orig_empty_rowset_compaction_min_count;
207+
config::empty_rowset_compaction_min_ratio = orig_empty_rowset_compaction_min_ratio;
208+
}
209+
210+
// Test case: prioritize_query_perf_in_compaction for non-DUP_KEYS table
211+
// This tests the branch: rs_begin == end && prioritize_query_perf && keys_type != DUP_KEYS
212+
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_prioritize_query_perf) {
213+
// Save original config value
214+
bool orig_prioritize_query_perf = config::prioritize_query_perf_in_compaction;
215+
216+
// Enable prioritize_query_perf_in_compaction
217+
config::prioritize_query_perf_in_compaction = true;
218+
219+
// Create tablet with UNIQUE keys (not DUP_KEYS)
220+
TTabletSchema schema;
221+
schema.keys_type = TKeysType::UNIQUE_KEYS;
222+
TabletMetaSharedPtr tablet_meta(new TabletMeta(1, 2, 15673, 15674, 4, 5, schema, 6, {{7, 8}},
223+
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
224+
TCompressionType::LZ4F));
225+
226+
CloudTablet _tablet(_engine, tablet_meta);
227+
// Use large base_size to get large promotion_size, ensuring total_size < promotion_size
228+
// so we don't trigger promotion_size early return and can reach level_size logic
229+
_tablet._base_size = 20L * 1024 * 1024 * 1024; // 20GB base, promotion_size ~= 1GB
230+
231+
// Create candidate rowsets that will ALL be removed by level_size
232+
// Key: each rowset's level > remain_level after removal
233+
std::vector<RowsetSharedPtr> candidate_rowsets;
234+
235+
// 3 rowsets with decreasing sizes, all will be removed by level_size:
236+
// - 40MB: level(40)=32, remain=35, level(35)=32, 32>32? NO... need adjustment
237+
// Let's use sizes that guarantee all removal:
238+
// - 50MB: level(50)=32, after remove remain=25, level(25)=16, 32>16 -> remove
239+
// - 20MB: level(20)=16, after remove remain=5, level(5)=4, 16>4 -> remove
240+
// - 5MB: level(5)=4, after remove remain=0, level(0)=0, 4>0 -> remove
241+
auto rowset1 = create_rowset(Version(2, 2), 30, true, 50L * 1024 * 1024); // 50MB, score=30
242+
auto rowset2 = create_rowset(Version(3, 3), 20, true, 20L * 1024 * 1024); // 20MB, score=20
243+
auto rowset3 = create_rowset(Version(4, 4), 10, true, 5L * 1024 * 1024); // 5MB, score=10
244+
candidate_rowsets.push_back(rowset1);
245+
candidate_rowsets.push_back(rowset2);
246+
candidate_rowsets.push_back(rowset3);
247+
248+
// total_size = 75MB < promotion_size (~1GB), enters level_size logic
249+
// All 3 rowsets will be removed by level_size -> rs_begin == end
250+
// With prioritize_query_perf=true and UNIQUE_KEYS, should return all candidates
251+
252+
std::vector<RowsetSharedPtr> input_rowsets;
253+
Version last_delete_version {-1, -1};
254+
size_t compaction_score = 0;
255+
256+
CloudSizeBasedCumulativeCompactionPolicy policy;
257+
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 5, &input_rowsets,
258+
&last_delete_version, &compaction_score, true);
259+
260+
// With prioritize_query_perf enabled for non-DUP_KEYS table,
261+
// when all rowsets are removed by level_size, should return all candidates
262+
// (before DEFER trim)
263+
// Total score = 60, max = 100, so no trimming needed
264+
EXPECT_EQ(3, input_rowsets.size());
265+
EXPECT_EQ(60, compaction_score);
266+
267+
// Restore original config value
268+
config::prioritize_query_perf_in_compaction = orig_prioritize_query_perf;
269+
}
270+
149271
} // namespace doris

0 commit comments

Comments
 (0)