Use arrow pool to fix memory over accounting aggregations #19501
+544
−205
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #
Related to the https://github.com/apache/datafusion/issues/16841#issuecomment-3563643947
Rationale for this change
Aggregation accumulators that store Arrow Arrays can have memory over accounting when array buffers are shared between multiple ScalarValues or directly Arrow Arrays. This occurs because doing ScalarValue::try_from_array() create slices that reference the same underlying buffers and each ScalarValue reports the full buffer size when calculating memory usage and the same physical buffer gets counted multiple times, leading to over accounting (this comment explains very well why we are seeing this)
There have been several attempts to fix this before which included compacting data to not keep the whole array alive, or using
get_slice_memory_sizeinstead ofget_array_memory_size. However, we observed that both had downsides:compact()was CPU inefficient (copies data which can be expensive)get_slice_memory_size()only accounts for logical memory, but it is not the actual physical buffer capacity, therefore the amount returned is not accurate.What changes are included in this PR?
This approach avoids double-counting memory by using Arrow's TrackingMemoryPool, which automatically deduplicates shared buffers when accounting them. This means we don't need to compact() or call
get_slice_memory_size()just to solve the accounting problem. Note that compact() might still be useful when we want to release memory pressure.NoneReturns total memory size including Arrow buffers using either get_slice_memory_size or just ScalarValue-> size () (same as before, so its backward compatible)Updated accumulators that use the pool parameter:
All other accumulator implementations had to be updated to match new signature
Are these changes tested?
Added
distinct_count_does_not_over_account_memory()test to test memory pool deduplication for COUNT(DISTINCT) with array types. Also updated the existing accumulator tests to use memory pool, it verifies the accounted memory is still less than when not using the memory pool (in some cases even less than when we compacted).Are there any user-facing changes?
yes, the API size for Accumulators and GroupAccumulators changed from
fn size(&self) -> usize;tofn size(&self, pool: Option<&dyn MemoryPool>) -> usize;Not sure if this is the best API design... I'm open to suggestions. In any case, if None is passed the behavior will remain the same as before. Also IIUC this function is mainly used to keep the DF memory pool within its bounds during aggregations.