Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

Which issue does this PR close?

Closes #
Related to the https://github.com/apache/datafusion/issues/16841#issuecomment-3563643947

Rationale for this change

Aggregation accumulators that store Arrow Arrays can have memory over accounting when array buffers are shared between multiple ScalarValues or directly Arrow Arrays. This occurs because doing ScalarValue::try_from_array() create slices that reference the same underlying buffers and each ScalarValue reports the full buffer size when calculating memory usage and the same physical buffer gets counted multiple times, leading to over accounting (this comment explains very well why we are seeing this)

There have been several attempts to fix this before which included compacting data to not keep the whole array alive, or using get_slice_memory_size instead of get_array_memory_size. However, we observed that both had downsides:

  • compact() was CPU inefficient (copies data which can be expensive)
  • get_slice_memory_size() only accounts for logical memory, but it is not the actual physical buffer capacity, therefore the amount returned is not accurate.

What changes are included in this PR?

This approach avoids double-counting memory by using Arrow's TrackingMemoryPool, which automatically deduplicates shared buffers when accounting them. This means we don't need to compact() or call get_slice_memory_size() just to solve the accounting problem. Note that compact() might still be useful when we want to release memory pressure.

  • Updated Accumulator::size() and GroupsAccumulator::size() signatures to accept Option<&dyn MemoryPool>:
    • When pool is None Returns total memory size including Arrow buffers using either get_slice_memory_size or just ScalarValue-> size () (same as before, so its backward compatible)
    • When pool is Some iteturns structural size only and claims buffers with the pool for deduplication tracking. Callers using the pool must add pool.used() to get total memory.

Updated accumulators that use the pool parameter:

  • DistinctCountAccumulator
  • ArrayAggAccumulator
  • For OrderSensitiveArrayAggAccumulator and DistinctArrayAggAccumulator I removed the compacting since it was introduced specifically to solve the over accounting, and its not needed anymore.
  • FirstValueAccumulator / LastValueAccumulator
    All other accumulator implementations had to be updated to match new signature

Are these changes tested?

Added distinct_count_does_not_over_account_memory() test to test memory pool deduplication for COUNT(DISTINCT) with array types. Also updated the existing accumulator tests to use memory pool, it verifies the accounted memory is still less than when not using the memory pool (in some cases even less than when we compacted).

Are there any user-facing changes?

yes, the API size for Accumulators and GroupAccumulators changed from fn size(&self) -> usize; to fn size(&self, pool: Option<&dyn MemoryPool>) -> usize;

Not sure if this is the best API design... I'm open to suggestions. In any case, if None is passed the behavior will remain the same as before. Also IIUC this function is mainly used to keep the DF memory pool within its bounds during aggregations.

@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate proto Related to proto crate functions Changes to functions implementation ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate spark labels Dec 26, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/use-arrow-pool-to-fix-memory-overaccounting-aggregations branch from 72d5f92 to 2378e6e Compare December 26, 2025 17:01
@LiaCastaneda LiaCastaneda force-pushed the lia/use-arrow-pool-to-fix-memory-overaccounting-aggregations branch from 2378e6e to 7d158c9 Compare December 26, 2025 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions physical-plan Changes to the physical-plan crate proto Related to proto crate spark substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant