Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,31 @@ Status AggSharedState::reset_hash_table() {
}

void PartitionedAggSharedState::init_spill_params(size_t spill_partition_count) {
partition_count = spill_partition_count;
// PartitionedAgg uses hierarchical spill partitioning with fixed 8-way fanout per level.
// Keep the API but ignore spill_partition_count for fanout.
//
// The existing RuntimeState::spill_aggregation_partition_count() was originally used to decide
// the number of single-level partitions. With multi-level partitioning, fanout must be stable
// across sink/source and across split levels, so we pin it to kSpillFanout=8 (same as join).
partition_count = kSpillFanout;
max_partition_index = partition_count - 1;

for (int i = 0; i < partition_count; ++i) {
spill_partitions.emplace_back(std::make_shared<AggSpillPartition>());
spill_partitions.clear();
pending_partitions.clear();
for (uint32_t i = 0; i < partition_count; ++i) {
SpillPartitionId id {.level = 0, .path = i};
auto [it, inserted] = spill_partitions.try_emplace(id.key());
it->second.id = id;
pending_partitions.emplace_back(id);
}
}

void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* source_profile) {
for (auto& partition : spill_partitions) {
if (partition->spilling_stream_) {
partition->spilling_stream_->update_shared_profiles(source_profile);
for (auto& [_, partition] : spill_partitions) {
if (partition.spilling_stream) {
partition.spilling_stream->update_shared_profiles(source_profile);
}
for (auto& stream : partition->spill_streams_) {
for (auto& stream : partition.spill_streams) {
if (stream) {
stream->update_shared_profiles(source_profile);
}
Expand All @@ -343,25 +354,25 @@ void PartitionedAggSharedState::update_spill_stream_profiles(RuntimeProfile* sou
Status AggSpillPartition::get_spill_stream(RuntimeState* state, int node_id,
RuntimeProfile* profile,
vectorized::SpillStreamSPtr& spill_stream) {
if (spilling_stream_) {
spill_stream = spilling_stream_;
if (spilling_stream) {
spill_stream = spilling_stream;
return Status::OK();
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream_, print_id(state->query_id()), "agg", node_id,
state, spilling_stream, print_id(state->query_id()), "agg", node_id,
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), profile));
spill_streams_.emplace_back(spilling_stream_);
spill_stream = spilling_stream_;
spill_streams.emplace_back(spilling_stream);
spill_stream = spilling_stream;
return Status::OK();
}
void AggSpillPartition::close() {
if (spilling_stream_) {
spilling_stream_.reset();
if (spilling_stream) {
spilling_stream.reset();
}
for (auto& stream : spill_streams_) {
for (auto& stream : spill_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
spill_streams_.clear();
spill_streams.clear();
}

void PartitionedAggSharedState::close() {
Expand All @@ -372,8 +383,8 @@ void PartitionedAggSharedState::close() {
return;
}
DCHECK(!false_close && is_closed);
for (auto partition : spill_partitions) {
partition->close();
for (auto& [_, partition] : spill_partitions) {
partition.close();
}
spill_partitions.clear();
}
Expand Down
89 changes: 72 additions & 17 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
#include <sqltypes.h>

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <utility>

#include "common/config.h"
Expand All @@ -39,6 +41,7 @@
#include "pipeline/common/join_utils.h"
#include "pipeline/common/set_utils.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/hierarchical_spill_partition.h"
#include "pipeline/exec/join/process_hash_table_probe.h"
#include "util/brpc_closure.h"
#include "util/stack_util.h"
Expand Down Expand Up @@ -462,7 +465,11 @@ struct PartitionedAggSharedState : public BasicSharedState,
size_t max_partition_index;
bool is_spilled = false;
std::atomic_bool is_closed = false;
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
// Hierarchical spill partitions (multi-level split).
// Keyed by SpillPartitionId::key(). (level-0 has kSpillFanout base partitions.)
std::unordered_map<uint32_t, AggSpillPartition> spill_partitions;

std::deque<SpillPartitionId> pending_partitions;

size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; }
};
Expand All @@ -472,35 +479,41 @@ struct AggSpillPartition {

AggSpillPartition() = default;

SpillPartitionId id;
bool is_split = false;
// Best-effort bytes written via this partition node (in block format).
// Used as a split trigger; not used for correctness.
size_t spilled_bytes = 0;

void close();

Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile,
vectorized::SpillStreamSPtr& spilling_stream);
vectorized::SpillStreamSPtr& spill_stream);

Status flush_if_full() {
DCHECK(spilling_stream_);
DCHECK(spilling_stream);
Status status;
// avoid small spill files
if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
status = spilling_stream_->spill_eof();
spilling_stream_.reset();
if (spilling_stream->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
status = spilling_stream->spill_eof();
spilling_stream.reset();
}
return status;
}

Status finish_current_spilling(bool eos = false) {
if (spilling_stream_) {
if (eos || spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
auto status = spilling_stream_->spill_eof();
spilling_stream_.reset();
if (spilling_stream) {
if (eos || spilling_stream->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
auto status = spilling_stream->spill_eof();
spilling_stream.reset();
return status;
}
}
return Status::OK();
}

std::deque<vectorized::SpillStreamSPtr> spill_streams_;
vectorized::SpillStreamSPtr spilling_stream_;
std::deque<vectorized::SpillStreamSPtr> spill_streams;
vectorized::SpillStreamSPtr spilling_stream;
};
using AggSpillPartitionSPtr = std::shared_ptr<AggSpillPartition>;
struct SortSharedState : public BasicSharedState {
Expand Down Expand Up @@ -624,24 +637,66 @@ struct HashJoinSharedState : public JoinSharedState {
std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
};

// Hierarchical spill partitioning for hash join probe-side.
static constexpr uint32_t kHashJoinSpillFanout = kSpillFanout;
static constexpr uint32_t kHashJoinSpillBitsPerLevel = kSpillBitsPerLevel;
static constexpr uint32_t kHashJoinSpillMaxDepth = kSpillMaxDepth;
using HashJoinSpillPartitionId = SpillPartitionId;

struct HashJoinSpillPartition {
HashJoinSpillPartitionId id;
bool is_split = false;
// Probe-side buffered rows for this partition before flushing into blocks/spill.
std::unique_ptr<vectorized::MutableBlock> accumulating_block;
// Probe-side materialized blocks for this partition (in-memory).
std::vector<vectorized::Block> blocks;
vectorized::SpillStreamSPtr spill_stream;

// Memory tracking for this partition.
size_t in_mem_bytes = 0; // Bytes of data currently in memory (accumulating_block + blocks).
size_t spilled_bytes = 0; // Bytes of data that have been spilled to disk.

size_t total_bytes() const { return in_mem_bytes + spilled_bytes; }
};

using HashJoinSpillPartitionMap = std::unordered_map<uint32_t, HashJoinSpillPartition>;

struct HashJoinSpillBuildPartition {
HashJoinSpillPartitionId id;
bool is_split = false;
// Build-side buffered rows for this partition before hash table build.
std::unique_ptr<vectorized::MutableBlock> build_block;
vectorized::SpillStreamSPtr spill_stream;

// Memory tracking for this partition.
size_t in_mem_bytes = 0; // Bytes of data currently in memory (build_block).
size_t spilled_bytes = 0; // Bytes of data that have been spilled to disk.
size_t row_count = 0; // Total number of rows in this partition.

size_t total_bytes() const { return in_mem_bytes + spilled_bytes; }
};

using HashJoinSpillBuildPartitionMap = std::unordered_map<uint32_t, HashJoinSpillBuildPartition>;

struct PartitionedHashJoinSharedState
: public HashJoinSharedState,
public BasicSpillSharedState,
public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)

void update_spill_stream_profiles(RuntimeProfile* source_profile) override {
for (auto& stream : spilled_streams) {
if (stream) {
stream->update_shared_profiles(source_profile);
for (auto& [_, partition] : build_partitions) {
if (partition.spill_stream) {
partition.spill_stream->update_shared_profiles(source_profile);
}
}
}

std::unique_ptr<RuntimeState> inner_runtime_state;
std::shared_ptr<HashJoinSharedState> inner_shared_state;
std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks;
std::vector<vectorized::SpillStreamSPtr> spilled_streams;
HashJoinSpillPartitionMap probe_partitions;
HashJoinSpillBuildPartitionMap build_partitions;
std::deque<HashJoinSpillPartitionId> pending_probe_partitions;
bool is_spilled = false;
};

Expand Down
11 changes: 11 additions & 0 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -81,6 +82,16 @@ class DataQueue {

void terminate();

std::pair<int64_t, uint32_t> current_queue_size() const {
int64_t total_bytes = 0;
uint32_t total_blocks = 0;
for (int i = 0; i < _child_count; ++i) {
total_bytes += _cur_bytes_in_queue[i].load();
total_blocks += _cur_blocks_nums_in_queue[i].load();
}
return {total_bytes, total_blocks};
}

private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
Expand Down
27 changes: 26 additions & 1 deletion be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include <gen_cpp/Metrics_types.h>

#include <cstdint>
#include <memory>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "util/runtime_profile.h"
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris {
Expand Down Expand Up @@ -202,6 +204,26 @@ Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows);
DCHECK_LE(_distinct_row.size(), rows)
<< "_distinct_row size should be less than or equal to rows";

size_t used_memory = 0;
std::visit(vectorized::Overload {
[&](std::monostate& arg) {
// Do nothing
},
[&](auto& agg_method) {
used_memory = agg_method.hash_table->get_buffer_size_in_bytes();
}},
_agg_data->method_variant);
COUNTER_SET(_memory_used_counter,
int64_t(_distinct_row.allocated_bytes() + _arena.size() + used_memory));
} else {
std::visit(vectorized::Overload {[&](std::monostate& arg) {
// Do nothing
},
[&](auto& agg_method) { agg_method.hash_table.reset(); }},
_agg_data->method_variant);
_arena.clear(true);
COUNTER_SET(_memory_used_counter, 0);
}

bool mem_reuse = _parent->cast<DistinctStreamingAggOperatorX>()._make_nullable_keys.empty() &&
Expand Down Expand Up @@ -439,8 +461,11 @@ Status DistinctStreamingAggLocalState::close(RuntimeState* state) {
// Do nothing
},
[&](auto& agg_method) {
COUNTER_SET(_hash_table_size_counter,
if (agg_method.hash_table) {
COUNTER_SET(
_hash_table_size_counter,
int64_t(agg_method.hash_table->size()));
}
}},
_agg_data->method_variant);
}
Expand Down
11 changes: 2 additions & 9 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,10 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo

if (eos) {
const size_t rows = build_block_rows + state->batch_size();
const auto bucket_size = hash_join_table_calc_bucket_size(rows);

size_to_reserve += bucket_size * sizeof(uint32_t); // JoinHashTable::first
size_to_reserve += rows * sizeof(uint32_t); // JoinHashTable::next

auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
if (p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::RIGHT_OUTER_JOIN ||
p._join_op == TJoinOp::RIGHT_ANTI_JOIN || p._join_op == TJoinOp::RIGHT_SEMI_JOIN) {
size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
}
size_to_reserve += estimate_hash_table_mem_size(rows, p._join_op);
size_to_reserve += _evaluate_mem_usage;
const auto bucket_size = hash_join_table_calc_bucket_size(rows);

vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());

Expand Down
Loading
Loading