Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state
: JoinProbeLocalState<NestedLoopJoinSharedState, NestedLoopJoinProbeLocalState>(state,
parent),
_matched_rows_done(false),
_left_block_pos(0) {}
_probe_block_pos(0) {}

Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
Expand Down Expand Up @@ -88,32 +88,34 @@ void NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block*
void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() {
// TODO: need a vector of left block to register the _probe_row_visited_flags
_current_build_pos = 0;
_left_block_pos++;
_probe_block_pos++;
}

template <typename JoinOpType, bool set_build_side_flag, bool set_probe_side_flag>
Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* state,
JoinOpType& join_op_variants) {
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
constexpr bool ignore_null = JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
_left_block_start_pos = _left_block_pos;
_left_side_process_count = 0;
_probe_block_start_pos = _probe_block_pos;
_probe_side_process_count = 0;
DCHECK(!_need_more_input_data || !_matched_rows_done);

auto* probe_block = _child_block.get();

if (!_matched_rows_done && !_need_more_input_data) {
// We should try to join rows if there still are some rows from probe side.
// _probe_offset_stack and _build_offset_stack use u16 for storage
// because on the FE side, it is guaranteed that the batch size will not exceed 65535 (the maximum value for u16).s
while (_join_block.rows() < state->batch_size()) {
while (_current_build_pos == _shared_state->build_blocks.size() ||
_left_block_pos == _child_block->rows()) {
_probe_block_pos == probe_block->rows()) {
// if left block is empty(), do not need disprocess the left block rows
if (_child_block->rows() > _left_block_pos) {
_left_side_process_count++;
if (probe_block->rows() > _probe_block_pos) {
_probe_side_process_count++;
}

_reset_with_next_probe_row();
if (_left_block_pos < _child_block->rows()) {
if (_probe_block_pos < probe_block->rows()) {
if constexpr (set_probe_side_flag) {
_probe_offset_stack.push(
cast_set<uint16_t, size_t, false>(_join_block.rows()));
Expand Down Expand Up @@ -152,11 +154,11 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
// `_left_side_process_count`, means all rows from build
// side have been joined with _left_side_process_count, we should output current
// probe row with null from build side.
if (_left_side_process_count) {
if (_probe_side_process_count) {
_finalize_current_phase<false, JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>(
_join_block, state->batch_size());
}
} else if (_left_side_process_count && p._is_mark_join &&
} else if (_probe_side_process_count && p._is_mark_join &&
_shared_state->build_blocks.empty()) {
_append_left_data_with_null(_join_block);
}
Expand Down Expand Up @@ -249,9 +251,9 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::Block& b
} else {
if (!p._is_mark_join) {
auto new_size = column_size;
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
DCHECK_LE(_probe_block_start_pos + _probe_side_process_count, _child_block->rows());
for (int j = _probe_block_start_pos;
j < _probe_block_start_pos + _probe_side_process_count; ++j) {
if (_cur_probe_row_visited_flags[j] == IsSemi) {
new_size++;
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
Expand Down Expand Up @@ -280,22 +282,22 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::Block& b
}
} else {
vectorized::ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]);
mark_column.reserve(mark_column.size() + _left_side_process_count);
DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
mark_column.reserve(mark_column.size() + _probe_side_process_count);
DCHECK_LE(_probe_block_start_pos + _probe_side_process_count, _child_block->rows());
for (int j = _probe_block_start_pos;
j < _probe_block_start_pos + _probe_side_process_count; ++j) {
mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]);
}
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName src_column =
_child_block->get_by_position(i);
DCHECK(p._join_op != TJoinOp::FULL_OUTER_JOIN);
dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
_left_side_process_count);
dst_columns[i]->insert_range_from(*src_column.column, _probe_block_start_pos,
_probe_side_process_count);
}
for (size_t i = 0; i < p._num_build_side_columns; ++i) {
dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(
_left_side_process_count);
_probe_side_process_count);
}
}
}
Expand All @@ -314,23 +316,23 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null(vectorized::Bloc
p._join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
->get_nested_column_ptr()
->insert_range_from(*src_column.column, _left_block_start_pos,
_left_side_process_count);
->insert_range_from(*src_column.column, _probe_block_start_pos,
_probe_side_process_count);
assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
->get_null_map_column()
.get_data()
.resize_fill(origin_sz + 1, 0);
} else {
dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
_left_side_process_count);
dst_columns[i]->insert_range_from(*src_column.column, _probe_block_start_pos,
_probe_side_process_count);
}
}
for (size_t i = 0; i < p._num_build_side_columns; ++i) {
dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count);
dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_probe_side_process_count);
}
auto& mark_column = *dst_columns[dst_columns.size() - 1];
vectorized::ColumnFilterHelper(mark_column)
.resize_fill(mark_column.size() + _left_side_process_count, 0);
.resize_fill(mark_column.size() + _probe_side_process_count, 0);
block.set_columns(std::move(dst_columns));
}

Expand All @@ -348,14 +350,14 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block(
p._join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
->get_nested_column_ptr()
->insert_many_from(*src_column.column, _left_block_pos, max_added_rows);
->insert_many_from(*src_column.column, _probe_block_pos, max_added_rows);
assert_cast<vectorized::ColumnNullable*>(dst_columns[i].get())
->get_null_map_column()
.get_data()
.resize_fill(origin_sz + max_added_rows, 0);
} else {
// TODO: for cross join, maybe could insert one row, and wrap for a const column
dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows);
dst_columns[i]->insert_many_from(*src_column.column, _probe_block_pos, max_added_rows);
}
}
for (size_t i = 0; i < p._num_build_side_columns; ++i) {
Expand Down Expand Up @@ -436,7 +438,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
local_state._cur_probe_row_visited_flags.resize(block->rows());
std::fill(local_state._cur_probe_row_visited_flags.begin(),
local_state._cur_probe_row_visited_flags.end(), 0);
local_state._left_block_pos = 0;
local_state._probe_block_pos = 0;
local_state._need_more_input_data = false;
local_state._shared_state->left_side_eos = eos;

Expand Down
12 changes: 6 additions & 6 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class NestedLoopJoinProbeLocalState final
}
if constexpr (SetProbeSideFlag) {
int64_t end = filter.size();
for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1
: _left_block_pos;
i >= _left_block_start_pos; i--) {
for (int i = _probe_block_pos == _child_block->rows() ? _probe_block_pos - 1
: _probe_block_pos;
i >= _probe_block_start_pos; i--) {
int64_t offset = 0;
if (!_probe_offset_stack.empty()) {
offset = _probe_offset_stack.top();
Expand Down Expand Up @@ -178,9 +178,9 @@ class NestedLoopJoinProbeLocalState final
}

bool _matched_rows_done;
int _left_block_start_pos = 0;
int _left_block_pos; // current scan pos in _left_block
int _left_side_process_count = 0;
int _probe_block_start_pos = 0;
int _probe_block_pos; // current scan pos in _probe_block
int _probe_side_process_count = 0;
bool _need_more_input_data = true;
// Visited flags for current row in probe side.
std::vector<int8_t> _cur_probe_row_visited_flags;
Expand Down