diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 76a50ff8169a6a..88510db9b893de 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -36,7 +36,7 @@ NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state : JoinProbeLocalState(state, parent), _matched_rows_done(false), - _left_block_pos(0) {} + _probe_block_pos(0) {} Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); @@ -88,7 +88,7 @@ void NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block* void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() { // TODO: need a vector of left block to register the _probe_row_visited_flags _current_build_pos = 0; - _left_block_pos++; + _probe_block_pos++; } template @@ -96,24 +96,26 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta JoinOpType& join_op_variants) { auto& p = _parent->cast(); constexpr bool ignore_null = JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; - _left_block_start_pos = _left_block_pos; - _left_side_process_count = 0; + _probe_block_start_pos = _probe_block_pos; + _probe_side_process_count = 0; DCHECK(!_need_more_input_data || !_matched_rows_done); + auto* probe_block = _child_block.get(); + if (!_matched_rows_done && !_need_more_input_data) { // We should try to join rows if there still are some rows from probe side. // _probe_offset_stack and _build_offset_stack use u16 for storage // because on the FE side, it is guaranteed that the batch size will not exceed 65535 (the maximum value for u16).s while (_join_block.rows() < state->batch_size()) { while (_current_build_pos == _shared_state->build_blocks.size() || - _left_block_pos == _child_block->rows()) { + _probe_block_pos == probe_block->rows()) { // if left block is empty(), do not need disprocess the left block rows - if (_child_block->rows() > _left_block_pos) { - _left_side_process_count++; + if (probe_block->rows() > _probe_block_pos) { + _probe_side_process_count++; } _reset_with_next_probe_row(); - if (_left_block_pos < _child_block->rows()) { + if (_probe_block_pos < probe_block->rows()) { if constexpr (set_probe_side_flag) { _probe_offset_stack.push( cast_set(_join_block.rows())); @@ -152,11 +154,11 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta // `_left_side_process_count`, means all rows from build // side have been joined with _left_side_process_count, we should output current // probe row with null from build side. - if (_left_side_process_count) { + if (_probe_side_process_count) { _finalize_current_phase( _join_block, state->batch_size()); } - } else if (_left_side_process_count && p._is_mark_join && + } else if (_probe_side_process_count && p._is_mark_join && _shared_state->build_blocks.empty()) { _append_left_data_with_null(_join_block); } @@ -249,9 +251,9 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::Block& b } else { if (!p._is_mark_join) { auto new_size = column_size; - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows()); - for (int j = _left_block_start_pos; - j < _left_block_start_pos + _left_side_process_count; ++j) { + DCHECK_LE(_probe_block_start_pos + _probe_side_process_count, _child_block->rows()); + for (int j = _probe_block_start_pos; + j < _probe_block_start_pos + _probe_side_process_count; ++j) { if (_cur_probe_row_visited_flags[j] == IsSemi) { new_size++; for (size_t i = 0; i < p._num_probe_side_columns; ++i) { @@ -280,22 +282,22 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::Block& b } } else { vectorized::ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 1]); - mark_column.reserve(mark_column.size() + _left_side_process_count); - DCHECK_LE(_left_block_start_pos + _left_side_process_count, _child_block->rows()); - for (int j = _left_block_start_pos; - j < _left_block_start_pos + _left_side_process_count; ++j) { + mark_column.reserve(mark_column.size() + _probe_side_process_count); + DCHECK_LE(_probe_block_start_pos + _probe_side_process_count, _child_block->rows()); + for (int j = _probe_block_start_pos; + j < _probe_block_start_pos + _probe_side_process_count; ++j) { mark_column.insert_value(IsSemi == _cur_probe_row_visited_flags[j]); } for (size_t i = 0; i < p._num_probe_side_columns; ++i) { const vectorized::ColumnWithTypeAndName src_column = _child_block->get_by_position(i); DCHECK(p._join_op != TJoinOp::FULL_OUTER_JOIN); - dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, - _left_side_process_count); + dst_columns[i]->insert_range_from(*src_column.column, _probe_block_start_pos, + _probe_side_process_count); } for (size_t i = 0; i < p._num_build_side_columns; ++i) { dst_columns[p._num_probe_side_columns + i]->insert_many_defaults( - _left_side_process_count); + _probe_side_process_count); } } } @@ -314,23 +316,23 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null(vectorized::Bloc p._join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast(dst_columns[i].get()) ->get_nested_column_ptr() - ->insert_range_from(*src_column.column, _left_block_start_pos, - _left_side_process_count); + ->insert_range_from(*src_column.column, _probe_block_start_pos, + _probe_side_process_count); assert_cast(dst_columns[i].get()) ->get_null_map_column() .get_data() .resize_fill(origin_sz + 1, 0); } else { - dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, - _left_side_process_count); + dst_columns[i]->insert_range_from(*src_column.column, _probe_block_start_pos, + _probe_side_process_count); } } for (size_t i = 0; i < p._num_build_side_columns; ++i) { - dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count); + dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_probe_side_process_count); } auto& mark_column = *dst_columns[dst_columns.size() - 1]; vectorized::ColumnFilterHelper(mark_column) - .resize_fill(mark_column.size() + _left_side_process_count, 0); + .resize_fill(mark_column.size() + _probe_side_process_count, 0); block.set_columns(std::move(dst_columns)); } @@ -348,14 +350,14 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block( p._join_op == TJoinOp::FULL_OUTER_JOIN); assert_cast(dst_columns[i].get()) ->get_nested_column_ptr() - ->insert_many_from(*src_column.column, _left_block_pos, max_added_rows); + ->insert_many_from(*src_column.column, _probe_block_pos, max_added_rows); assert_cast(dst_columns[i].get()) ->get_null_map_column() .get_data() .resize_fill(origin_sz + max_added_rows, 0); } else { // TODO: for cross join, maybe could insert one row, and wrap for a const column - dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows); + dst_columns[i]->insert_many_from(*src_column.column, _probe_block_pos, max_added_rows); } } for (size_t i = 0; i < p._num_build_side_columns; ++i) { @@ -436,7 +438,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized local_state._cur_probe_row_visited_flags.resize(block->rows()); std::fill(local_state._cur_probe_row_visited_flags.begin(), local_state._cur_probe_row_visited_flags.end(), 0); - local_state._left_block_pos = 0; + local_state._probe_block_pos = 0; local_state._need_more_input_data = false; local_state._shared_state->left_side_eos = eos; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 945a964672b92f..53cf6922adbebd 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -87,9 +87,9 @@ class NestedLoopJoinProbeLocalState final } if constexpr (SetProbeSideFlag) { int64_t end = filter.size(); - for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1 - : _left_block_pos; - i >= _left_block_start_pos; i--) { + for (int i = _probe_block_pos == _child_block->rows() ? _probe_block_pos - 1 + : _probe_block_pos; + i >= _probe_block_start_pos; i--) { int64_t offset = 0; if (!_probe_offset_stack.empty()) { offset = _probe_offset_stack.top(); @@ -178,9 +178,9 @@ class NestedLoopJoinProbeLocalState final } bool _matched_rows_done; - int _left_block_start_pos = 0; - int _left_block_pos; // current scan pos in _left_block - int _left_side_process_count = 0; + int _probe_block_start_pos = 0; + int _probe_block_pos; // current scan pos in _probe_block + int _probe_side_process_count = 0; bool _need_more_input_data = true; // Visited flags for current row in probe side. std::vector _cur_probe_row_visited_flags;