From e9fd408ec4bbb8d6f5e88a4f4a50eebe6043d60b Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 22 Nov 2025 15:48:58 -0500 Subject: [PATCH 1/9] Implement FFI_ExtensionOptions --- datafusion/ffi/src/config_options.rs | 211 +++++++++++++++++++++++++++ datafusion/ffi/src/lib.rs | 1 + 2 files changed, 212 insertions(+) create mode 100644 datafusion/ffi/src/config_options.rs diff --git a/datafusion/ffi/src/config_options.rs b/datafusion/ffi/src/config_options.rs new file mode 100644 index 000000000000..2ad077d129fe --- /dev/null +++ b/datafusion/ffi/src/config_options.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, ffi::c_void, ops::Deref}; + +use abi_stable::{std_types::{RHashMap, RResult, RStr, RString, RVec}, RTuple, StableAbi}; +use abi_stable::std_types::{ROption, Tuple3}; +use arrow::{array::ArrayRef, error::ArrowError}; +use datafusion::{ + error::{DataFusionError, Result}, + scalar::ScalarValue, +}; +use datafusion_common::config::{ConfigEntry, ExtensionOptions}; +use prost::Message; + +use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; + +/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. +/// For an explanation of each field, see the corresponding function +/// defined in [`ExtensionOptions`]. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_ExtensionOptions { + pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions, + + pub set: unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, + + pub entries: + unsafe extern "C" fn(&Self) -> RVec, RStr>>, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(&mut Self), + + /// Internal data. This is only to be accessed by the provider of the options. + /// A [`ForeignExtensionOptions`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_ExtensionOptions {} +unsafe impl Sync for FFI_ExtensionOptions {} + +pub struct ExtensionOptionsPrivateData { + pub options: Box, +} + +impl FFI_ExtensionOptions { + #[inline] + unsafe fn inner_mut(&mut self) -> &mut Box { + let private_data = self.private_data as *mut ExtensionOptionsPrivateData; + &mut (*private_data).options + } + + #[inline] + unsafe fn inner(&self) -> &dyn ExtensionOptions { + let private_data = self.private_data as *const ExtensionOptionsPrivateData; + (*private_data).options.deref() + } +} + +unsafe extern "C" fn cloned_fn_wrapper(options: &FFI_ExtensionOptions) -> FFI_ExtensionOptions { + options.inner().cloned().into() +} + +unsafe extern "C" fn set_fn_wrapper( + options: &mut FFI_ExtensionOptions, + key: RStr, + value: RStr, +) -> RResult<(), RString> { + let key = key.as_str(); + let value = value.as_str(); + + rresult!(options.inner_mut().set(key, value)) +} + +unsafe extern "C" fn entries_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> RVec, RStr>> { + options.inner() + .entries() + .into_iter() + .map(|entry| (entry.key.into(), entry.value.map(Into::into).into(), entry.description.into()).into()) + .collect() +} + +unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { + let private_data = + Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData); + drop(private_data); +} + +impl From> for FFI_ExtensionOptions { + fn from(options: Box) -> Self { + let private_data = ExtensionOptionsPrivateData { options }; + + Self { + cloned: cloned_fn_wrapper, + set: set_fn_wrapper, + entries: entries_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + } + } +} + +impl Drop for FFI_ExtensionOptions { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +/// This struct is used to access an UDF provided by a foreign +/// library across a FFI boundary. +/// +/// The ForeignExtensionOptions is to be used by the caller of the UDF, so it has +/// no knowledge or access to the private data. All interaction with the UDF +/// must occur through the functions defined in FFI_ExtensionOptions. +#[derive(Debug)] +pub struct ForeignExtensionOptions(FFI_ExtensionOptions); + +unsafe impl Send for ForeignExtensionOptions {} +unsafe impl Sync for ForeignExtensionOptions {} + +impl From for ForeignExtensionOptions { + fn from(options: FFI_ExtensionOptions) -> Self { + Self(options) + } +} + +impl ExtensionOptions for ForeignExtensionOptions { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn cloned(&self) -> Box { + (self.0.cloned)(&self.0).into() + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + df_result!({ + (self.0.set)(&mut self.0, key.into(), value.into()) + }) + } + + fn entries(&self) -> Vec { + (self.0.entries)(&self.0).into_iter() + .map(|entry_tuple| ConfigEntry { + key: entry_tuple.0.into(), + value: entry_tuple.1.into(), + description: entry_tuple.2.into(), + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use datafusion_common::{ + config::ConfigExtension, config::ConfigOptions, extensions_options, + }; + // Define a new configuration struct using the `extensions_options` macro + extensions_options! { + /// My own config options. + pub struct MyConfig { + /// Should "foo" be replaced by "bar"? + pub foo_to_bar: bool, default = true + + /// How many "baz" should be created? + pub baz_count: usize, default = 1337 + } + } + + impl ConfigExtension for MyConfig { + const PREFIX: &'static str = "my_config"; + } + + #[test] + fn round_trip_ffi_extension_options() { + + // set up config struct and register extension + let mut config = ConfigOptions::default(); + config.extensions.insert(MyConfig::default()); + + // overwrite config default + config.set("my_config.baz_count", "42").unwrap(); + + // check config state + let my_config = config.extensions.get::().unwrap(); + assert!(my_config.foo_to_bar,); + assert_eq!(my_config.baz_count, 42,); + + } +} diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 2ca9b8f6f495..4a6ef5a90c7e 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -28,6 +28,7 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; +pub mod config_options; pub mod execution; pub mod execution_plan; pub mod expr; From 280e8157634366b5ebf68b7de332c17b5ff2f868 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 23 Nov 2025 09:06:01 -0500 Subject: [PATCH 2/9] wip --- datafusion/ffi/src/config_options.rs | 134 +++++++++++++++++++-------- 1 file changed, 94 insertions(+), 40 deletions(-) diff --git a/datafusion/ffi/src/config_options.rs b/datafusion/ffi/src/config_options.rs index 2ad077d129fe..102952d4502d 100644 --- a/datafusion/ffi/src/config_options.rs +++ b/datafusion/ffi/src/config_options.rs @@ -16,17 +16,19 @@ // under the License. use std::{any::Any, ffi::c_void, ops::Deref}; - -use abi_stable::{std_types::{RHashMap, RResult, RStr, RString, RVec}, RTuple, StableAbi}; -use abi_stable::std_types::{ROption, Tuple3}; +use std::collections::HashMap; +use abi_stable::{ + std_types::{RHashMap, ROption, RResult, RStr, RString, RVec, Tuple3}, + RTuple, StableAbi, +}; use arrow::{array::ArrayRef, error::ArrowError}; use datafusion::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use datafusion_common::config::{ConfigEntry, ExtensionOptions}; +use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; use prost::Message; - +use datafusion_common::exec_err; use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; /// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. @@ -38,10 +40,14 @@ use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; pub struct FFI_ExtensionOptions { pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions, - pub set: unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, + pub set: + unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, - pub entries: - unsafe extern "C" fn(&Self) -> RVec, RStr>>, + pub entries: unsafe extern "C" fn( + &Self, + ) -> RVec< + Tuple3, RStr<'static>>, + >, /// Release the memory of the private data when it is no longer being used. pub release: unsafe extern "C" fn(&mut Self), @@ -55,24 +61,26 @@ unsafe impl Send for FFI_ExtensionOptions {} unsafe impl Sync for FFI_ExtensionOptions {} pub struct ExtensionOptionsPrivateData { - pub options: Box, + pub options: HashMap, } impl FFI_ExtensionOptions { #[inline] - unsafe fn inner_mut(&mut self) -> &mut Box { + unsafe fn inner_mut(&mut self) -> &mut HashMap { let private_data = self.private_data as *mut ExtensionOptionsPrivateData; &mut (*private_data).options } #[inline] - unsafe fn inner(&self) -> &dyn ExtensionOptions { + unsafe fn inner(&self) -> &HashMap { let private_data = self.private_data as *const ExtensionOptionsPrivateData; - (*private_data).options.deref() + &(*private_data).options } } -unsafe extern "C" fn cloned_fn_wrapper(options: &FFI_ExtensionOptions) -> FFI_ExtensionOptions { +unsafe extern "C" fn cloned_fn_wrapper( + options: &FFI_ExtensionOptions, +) -> FFI_ExtensionOptions { options.inner().cloned().into() } @@ -81,19 +89,24 @@ unsafe extern "C" fn set_fn_wrapper( key: RStr, value: RStr, ) -> RResult<(), RString> { - let key = key.as_str(); - let value = value.as_str(); - - rresult!(options.inner_mut().set(key, value)) + rresult!(options.inner_mut().set(key.into(), value.into())) } unsafe extern "C" fn entries_fn_wrapper( options: &FFI_ExtensionOptions, -) -> RVec, RStr>> { - options.inner() +) -> RVec, RStr<'static>>> { + options + .inner() .entries() .into_iter() - .map(|entry| (entry.key.into(), entry.value.map(Into::into).into(), entry.description.into()).into()) + .map(|entry| { + ( + entry.key.into(), + entry.value.map(Into::into).into(), + entry.description.into(), + ) + .into() + }) .collect() } @@ -103,9 +116,9 @@ unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { drop(private_data); } -impl From> for FFI_ExtensionOptions { - fn from(options: Box) -> Self { - let private_data = ExtensionOptionsPrivateData { options }; +impl Default for FFI_ExtensionOptions { + fn default() -> Self { + let private_data = ExtensionOptionsPrivateData { options: HashMap::new() }; Self { cloned: cloned_fn_wrapper, @@ -135,12 +148,34 @@ pub struct ForeignExtensionOptions(FFI_ExtensionOptions); unsafe impl Send for ForeignExtensionOptions {} unsafe impl Sync for ForeignExtensionOptions {} -impl From for ForeignExtensionOptions { - fn from(options: FFI_ExtensionOptions) -> Self { - Self(options) +impl TryFrom for T { + type Error = DataFusionError; + + fn try_from(options: &FFI_ExtensionOptions) -> Result { + let mut config = T::default(); + + let mut found = false; + unsafe { + for entry_tuple in (options.entries)(&options) + .into_iter() { + if let ROption::RSome(value) = entry_tuple.1 + && let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') { + if namespace == T::PREFIX { + found = true; + config.set(key, value.as_str())?; + } + } + } + } + + Ok(config) } } +impl ConfigExtension for ForeignExtensionOptions { + const PREFIX: &'static str = "datafusion_ffi"; +} + impl ExtensionOptions for ForeignExtensionOptions { fn as_any(&self) -> &dyn Any { self @@ -151,31 +186,44 @@ impl ExtensionOptions for ForeignExtensionOptions { } fn cloned(&self) -> Box { - (self.0.cloned)(&self.0).into() + unsafe { (self.0.cloned)(&self.0).into() } } fn set(&mut self, key: &str, value: &str) -> Result<()> { - df_result!({ - (self.0.set)(&mut self.0, key.into(), value.into()) - }) + let Some((namespace, key)) = key.split_once('.') else { + return exec_err!("Unable to set FFI config value without namespace set"); + }; + + if namespace != ForeignExtensionOptions::PREFIX { + return exec_err!("Unexpected namespace {namespace} set for FFI config"); + } + + df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) } fn entries(&self) -> Vec { - (self.0.entries)(&self.0).into_iter() - .map(|entry_tuple| ConfigEntry { - key: entry_tuple.0.into(), - value: entry_tuple.1.into(), - description: entry_tuple.2.into(), - }) - .collect() + unsafe { + (self.0.entries)(&self.0) + .into_iter() + .map(|entry_tuple| ConfigEntry { + key: entry_tuple.0.into(), + value: entry_tuple.1.map(Into::into).into(), + description: entry_tuple.2.into(), + }) + .collect() + } } } #[cfg(test)] mod tests { use datafusion_common::{ - config::ConfigExtension, config::ConfigOptions, extensions_options, + config::{ConfigExtension, ConfigOptions, ExtensionOptions}, + extensions_options, }; + + use crate::config_options::FFI_ExtensionOptions; + // Define a new configuration struct using the `extensions_options` macro extensions_options! { /// My own config options. @@ -194,10 +242,10 @@ mod tests { #[test] fn round_trip_ffi_extension_options() { - // set up config struct and register extension let mut config = ConfigOptions::default(); - config.extensions.insert(MyConfig::default()); + config.extensions.insert(FFI_ExtensionOptions::default()); + // config.extensions.insert(MyConfig::default()); // overwrite config default config.set("my_config.baz_count", "42").unwrap(); @@ -207,5 +255,11 @@ mod tests { assert!(my_config.foo_to_bar,); assert_eq!(my_config.baz_count, 42,); + // let boxed_config = Box::new(MyConfig::default()) as Box; + // let mut ffi_config = FFI_ExtensionOptions::from(boxed_config); + // ffi_config.library_marker_id = crate::mock_foreign_marker_id; + // let foreign_config: Box = ffi_config.into(); + // + // config.extensions.insert(foreign_config); } } From f8238541632146ceb6bd0bcdd89517bd2a02b13a Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sat, 13 Dec 2025 15:27:50 +0100 Subject: [PATCH 3/9] More testing --- datafusion/common/src/config.rs | 2 +- datafusion/ffi/src/config_options.rs | 139 +++++++++++++++------------ datafusion/ffi/src/udwf/mod.rs | 17 +++- 3 files changed, 91 insertions(+), 67 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a452..65c8937dbbff 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1230,7 +1230,7 @@ impl<'a> TryInto> for &'a FormatOptions } /// A key value pair, with a corresponding description -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ConfigEntry { /// A unique string to identify this config value pub key: String, diff --git a/datafusion/ffi/src/config_options.rs b/datafusion/ffi/src/config_options.rs index 102952d4502d..979f37675698 100644 --- a/datafusion/ffi/src/config_options.rs +++ b/datafusion/ffi/src/config_options.rs @@ -15,21 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, ffi::c_void, ops::Deref}; +use std::any::Any; use std::collections::HashMap; -use abi_stable::{ - std_types::{RHashMap, ROption, RResult, RStr, RString, RVec, Tuple3}, - RTuple, StableAbi, -}; -use arrow::{array::ArrayRef, error::ArrowError}; -use datafusion::{ - error::{DataFusionError, Result}, - scalar::ScalarValue, -}; +use std::ffi::c_void; + +use abi_stable::StableAbi; +use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; +use datafusion::error::Result; use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; -use prost::Message; use datafusion_common::exec_err; -use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; + +use crate::df_result; /// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. /// For an explanation of each field, see the corresponding function @@ -43,11 +39,7 @@ pub struct FFI_ExtensionOptions { pub set: unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, - pub entries: unsafe extern "C" fn( - &Self, - ) -> RVec< - Tuple3, RStr<'static>>, - >, + pub entries: unsafe extern "C" fn(&Self) -> RVec>, /// Release the memory of the private data when it is no longer being used. pub release: unsafe extern "C" fn(&mut Self), @@ -81,7 +73,12 @@ impl FFI_ExtensionOptions { unsafe extern "C" fn cloned_fn_wrapper( options: &FFI_ExtensionOptions, ) -> FFI_ExtensionOptions { - options.inner().cloned().into() + options + .inner() + .iter() + .map(|(k, v)| (k.to_owned(), v.to_owned())) + .collect::>() + .into() } unsafe extern "C" fn set_fn_wrapper( @@ -89,24 +86,17 @@ unsafe extern "C" fn set_fn_wrapper( key: RStr, value: RStr, ) -> RResult<(), RString> { - rresult!(options.inner_mut().set(key.into(), value.into())) + let _ = options.inner_mut().insert(key.into(), value.into()); + RResult::ROk(()) } unsafe extern "C" fn entries_fn_wrapper( options: &FFI_ExtensionOptions, -) -> RVec, RStr<'static>>> { +) -> RVec> { options .inner() - .entries() - .into_iter() - .map(|entry| { - ( - entry.key.into(), - entry.value.map(Into::into).into(), - entry.description.into(), - ) - .into() - }) + .iter() + .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) .collect() } @@ -117,8 +107,14 @@ unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { } impl Default for FFI_ExtensionOptions { - fn default() -> Self { - let private_data = ExtensionOptionsPrivateData { options: HashMap::new() }; + fn default() -> Self { + HashMap::new().into() + } +} + +impl From> for FFI_ExtensionOptions { + fn from(options: HashMap) -> Self { + let private_data = ExtensionOptionsPrivateData { options }; Self { cloned: cloned_fn_wrapper, @@ -148,27 +144,41 @@ pub struct ForeignExtensionOptions(FFI_ExtensionOptions); unsafe impl Send for ForeignExtensionOptions {} unsafe impl Sync for ForeignExtensionOptions {} -impl TryFrom for T { - type Error = DataFusionError; - - fn try_from(options: &FFI_ExtensionOptions) -> Result { - let mut config = T::default(); - - let mut found = false; - unsafe { - for entry_tuple in (options.entries)(&options) - .into_iter() { - if let ROption::RSome(value) = entry_tuple.1 - && let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') { - if namespace == T::PREFIX { - found = true; - config.set(key, value.as_str())?; - } - } +// impl TryFrom for T { +// type Error = DataFusionError; +// +// fn try_from(options: &FFI_ExtensionOptions) -> Result { +// let mut config = T::default(); +// +// let mut found = false; +// unsafe { +// for entry_tuple in (options.entries)(&options).into_iter() { +// if let ROption::RSome(value) = entry_tuple.1 { +// if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') +// { +// if namespace == T::PREFIX { +// found = true; +// config.set(key, value.as_str())?; +// } +// } +// } +// } +// } +// +// Ok(config) +// } +// } + +impl ForeignExtensionOptions { + pub fn add_config(&mut self, config: &C) -> Result<()> { + for entry in config.entries() { + if let Some(value) = entry.value { + let key = format!("{}.{}", C::PREFIX, entry.key); + self.set(key.as_str(), value.as_str())?; } } - Ok(config) + Ok(()) } } @@ -186,17 +196,20 @@ impl ExtensionOptions for ForeignExtensionOptions { } fn cloned(&self) -> Box { - unsafe { (self.0.cloned)(&self.0).into() } + let ffi_options = unsafe { (self.0.cloned)(&self.0) }; + let foreign_options = ForeignExtensionOptions(ffi_options); + Box::new(foreign_options) } fn set(&mut self, key: &str, value: &str) -> Result<()> { + println!("Setting {key} = {value}"); let Some((namespace, key)) = key.split_once('.') else { return exec_err!("Unable to set FFI config value without namespace set"); }; - if namespace != ForeignExtensionOptions::PREFIX { - return exec_err!("Unexpected namespace {namespace} set for FFI config"); - } + // if namespace != ForeignExtensionOptions::PREFIX { + // return exec_err!("Unexpected namespace {namespace} set for FFI config"); + // } df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) } @@ -207,8 +220,8 @@ impl ExtensionOptions for ForeignExtensionOptions { .into_iter() .map(|entry_tuple| ConfigEntry { key: entry_tuple.0.into(), - value: entry_tuple.1.map(Into::into).into(), - description: entry_tuple.2.into(), + value: Some(entry_tuple.1.into()), + description: "ffi_config_options", }) .collect() } @@ -217,12 +230,10 @@ impl ExtensionOptions for ForeignExtensionOptions { #[cfg(test)] mod tests { - use datafusion_common::{ - config::{ConfigExtension, ConfigOptions, ExtensionOptions}, - extensions_options, - }; + use datafusion_common::config::{ConfigExtension, ConfigOptions}; + use datafusion_common::extensions_options; - use crate::config_options::FFI_ExtensionOptions; + use crate::config_options::{FFI_ExtensionOptions, ForeignExtensionOptions}; // Define a new configuration struct using the `extensions_options` macro extensions_options! { @@ -244,7 +255,11 @@ mod tests { fn round_trip_ffi_extension_options() { // set up config struct and register extension let mut config = ConfigOptions::default(); - config.extensions.insert(FFI_ExtensionOptions::default()); + let mut foreign_options = + ForeignExtensionOptions(FFI_ExtensionOptions::default()); + foreign_options.add_config(&MyConfig::default()).unwrap(); + + config.extensions.insert(foreign_options); // config.extensions.insert(MyConfig::default()); // overwrite config default diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index e4ac97f5d4ff..bded8b208054 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -399,14 +399,14 @@ impl From<&FFI_SortOptions> for SortOptions { mod tests { use std::sync::Arc; - use arrow::array::{ArrayRef, create_array}; + use crate::tests::create_record_batch; + use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; + use arrow::array::{ArrayRef, RecordBatch, create_array}; use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf}; use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::{ExprFunctionExt, WindowUDF, WindowUDFImpl, col}; use datafusion::prelude::SessionContext; - - use crate::tests::create_record_batch; - use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; + use datafusion_common::record_batch; fn create_test_foreign_udwf( original_udwf: impl WindowUDFImpl + 'static, @@ -437,11 +437,20 @@ mod tests { Ok(()) } + fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch { + let end_value = start_value + num_values as i32; + let a_vals: Vec = (start_value..end_value).collect(); + let b_vals: Vec = a_vals.iter().map(|v| *v as f64).collect(); + + record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap() + } + #[tokio::test] async fn test_lag_udwf() -> datafusion::common::Result<()> { let udwf = create_test_foreign_udwf(WindowShift::lag())?; let ctx = SessionContext::default(); + let df = ctx.read_batch(create_record_batch(-5, 5))?; let df = df.select(vec![ From 2d9d66a6afbe8a340714990ab0ff875581155dec Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Sun, 14 Dec 2025 09:08:18 -0500 Subject: [PATCH 4/9] Adding description for some possible next steps --- .../extension_options.rs} | 38 ++- datafusion/ffi/src/config/extensions.rs | 298 ++++++++++++++++++ datafusion/ffi/src/config/mod.rs | 19 ++ datafusion/ffi/src/lib.rs | 2 +- 4 files changed, 349 insertions(+), 8 deletions(-) rename datafusion/ffi/src/{config_options.rs => config/extension_options.rs} (85%) create mode 100644 datafusion/ffi/src/config/extensions.rs create mode 100644 datafusion/ffi/src/config/mod.rs diff --git a/datafusion/ffi/src/config_options.rs b/datafusion/ffi/src/config/extension_options.rs similarity index 85% rename from datafusion/ffi/src/config_options.rs rename to datafusion/ffi/src/config/extension_options.rs index 979f37675698..12ac7bf5b718 100644 --- a/datafusion/ffi/src/config_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -23,7 +23,7 @@ use abi_stable::StableAbi; use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; use datafusion::error::Result; use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; -use datafusion_common::exec_err; +use datafusion_common::{exec_err, DataFusionError}; use crate::df_result; @@ -49,6 +49,19 @@ pub struct FFI_ExtensionOptions { pub private_data: *mut c_void, } +// TODO(tsaucer) We have a problem in datafusion_common::config::Extension::get +// which relies on knowing the concrete types of the extensions so that we can +// use their PREFIX for insertion of configs. We cannot work around this using +// things like `fn namespace() -> &'static str` because we must be able to do +// this without having an instance. Instead we will go to an approach of having +// a concrete FFI_ForeignConfigExtension and add a check into all of the methods +// in the above `get` (and similar) methods to check to see if we have an FFI +// configs. If so we get the concrete FFI config and then have a method that will +// convert from FFI_ForeignExtensionConfig into the concrete type. Somehow our +// FFI library will need to make this as easy an experience as they are used to +// so maybe we need to implement something at the `Extensions` level in addition +// to the ConfigExtension. + unsafe impl Send for FFI_ExtensionOptions {} unsafe impl Sync for FFI_ExtensionOptions {} @@ -203,14 +216,10 @@ impl ExtensionOptions for ForeignExtensionOptions { fn set(&mut self, key: &str, value: &str) -> Result<()> { println!("Setting {key} = {value}"); - let Some((namespace, key)) = key.split_once('.') else { + if key.split_once('.').is_none() { return exec_err!("Unable to set FFI config value without namespace set"); }; - // if namespace != ForeignExtensionOptions::PREFIX { - // return exec_err!("Unexpected namespace {namespace} set for FFI config"); - // } - df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) } @@ -228,12 +237,27 @@ impl ExtensionOptions for ForeignExtensionOptions { } } +// TODO: Maybe get rid of ForeignExtensionOptions? +impl TryFrom<&ForeignExtensionOptions> for C { + type Error = DataFusionError; + fn try_from(options: &ForeignExtensionOptions) -> Result { + let mut result = C::default(); + for entry in options.entries() { + if let Some(value) = entry.value { + result.set(entry.key.as_str(), value.as_str())?; + } + } + + result + } +} + #[cfg(test)] mod tests { use datafusion_common::config::{ConfigExtension, ConfigOptions}; use datafusion_common::extensions_options; - use crate::config_options::{FFI_ExtensionOptions, ForeignExtensionOptions}; + use crate::config::extension_options::{FFI_ExtensionOptions, ForeignExtensionOptions}; // Define a new configuration struct using the `extensions_options` macro extensions_options! { diff --git a/datafusion/ffi/src/config/extensions.rs b/datafusion/ffi/src/config/extensions.rs new file mode 100644 index 000000000000..1196ad535d25 --- /dev/null +++ b/datafusion/ffi/src/config/extensions.rs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// use std::any::Any; +// use std::collections::HashMap; +// use std::ffi::c_void; +// +// use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; +// use abi_stable::StableAbi; +// use datafusion::error::Result; +// use datafusion_common::config::{ConfigEntry, ConfigExtension, Extensions}; +// use datafusion_common::exec_err; +// +// use crate::df_result; +// +// /// A stable struct for sharing [`Extensions`] across FFI boundaries. +// /// For an explanation of each field, see the corresponding function +// /// defined in [`Extensions`]. +// #[repr(C)] +// #[derive(Debug, StableAbi)] +// #[allow(non_camel_case_types)] +// pub struct FFI_Extensions { +// pub cloned: unsafe extern "C" fn(&Self) -> FFI_Extensions, +// +// pub set: +// unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, +// +// pub entries: unsafe extern "C" fn(&Self) -> RVec>, +// +// /// Release the memory of the private data when it is no longer being used. +// pub release: unsafe extern "C" fn(&mut Self), +// +// /// Internal data. This is only to be accessed by the provider of the options. +// /// A [`ForeignExtensions`] should never attempt to access this data. +// pub private_data: *mut c_void, +// } +// +// // TODO(tsaucer) We have a problem in datafusion_common::config::Extension::get +// // which relies on knowing the concrete types of the extensions so that we can +// // use their PREFIX for insertion of configs. We cannot work around this using +// // things like `fn namespace() -> &'static str` because we must be able to do +// // this without having an instance. Instead we will go to an approach of having +// // a concrete FFI_ForeignConfigExtension and add a check into all of the methods +// // in the above `get` (and similar) methods to check to see if we have an FFI +// // configs. If so we get the concrete FFI config and then have a method that will +// // convert from FFI_ForeignExtensionConfig into the concrete type. Somehow our +// // FFI library will need to make this as easy an experience as they are used to +// // so maybe we need to implement something at the `Extensions` level in addition +// // to the ConfigExtension. +// +// unsafe impl Send for FFI_Extensions {} +// unsafe impl Sync for FFI_Extensions {} +// +// pub struct ExtensionsPrivateData { +// pub options: HashMap, +// } +// +// impl FFI_Extensions { +// #[inline] +// unsafe fn inner_mut(&mut self) -> &mut HashMap { +// let private_data = self.private_data as *mut ExtensionsPrivateData; +// &mut (*private_data).options +// } +// +// #[inline] +// unsafe fn inner(&self) -> &HashMap { +// let private_data = self.private_data as *const ExtensionsPrivateData; +// &(*private_data).options +// } +// } +// +// unsafe extern "C" fn cloned_fn_wrapper( +// options: &FFI_Extensions, +// ) -> FFI_Extensions { +// options +// .inner() +// .iter() +// .map(|(k, v)| (k.to_owned(), v.to_owned())) +// .collect::>() +// .into() +// } +// +// unsafe extern "C" fn set_fn_wrapper( +// options: &mut FFI_Extensions, +// key: RStr, +// value: RStr, +// ) -> RResult<(), RString> { +// let _ = options.inner_mut().insert(key.into(), value.into()); +// RResult::ROk(()) +// } +// +// unsafe extern "C" fn entries_fn_wrapper( +// options: &FFI_Extensions, +// ) -> RVec> { +// options +// .inner() +// .iter() +// .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) +// .collect() +// } +// +// unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_Extensions) { +// let private_data = +// Box::from_raw(options.private_data as *mut ExtensionsPrivateData); +// drop(private_data); +// } +// +// impl Default for FFI_Extensions { +// fn default() -> Self { +// HashMap::new().into() +// } +// } +// +// impl From> for FFI_Extensions { +// fn from(options: HashMap) -> Self { +// let private_data = ExtensionsPrivateData { options }; +// +// Self { +// cloned: cloned_fn_wrapper, +// set: set_fn_wrapper, +// entries: entries_fn_wrapper, +// release: release_fn_wrapper, +// private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, +// } +// } +// } +// +// impl Drop for FFI_Extensions { +// fn drop(&mut self) { +// unsafe { (self.release)(self) } +// } +// } +// +// /// This struct is used to access an UDF provided by a foreign +// /// library across a FFI boundary. +// /// +// /// The ForeignExtensions is to be used by the caller of the UDF, so it has +// /// no knowledge or access to the private data. All interaction with the UDF +// /// must occur through the functions defined in FFI_Extensions. +// #[derive(Debug)] +// pub struct ForeignExtensions(FFI_Extensions); +// +// unsafe impl Send for ForeignExtensions {} +// unsafe impl Sync for ForeignExtensions {} +// +// // impl TryFrom for T { +// // type Error = DataFusionError; +// // +// // fn try_from(options: &FFI_Extensions) -> Result { +// // let mut config = T::default(); +// // +// // let mut found = false; +// // unsafe { +// // for entry_tuple in (options.entries)(&options).into_iter() { +// // if let ROption::RSome(value) = entry_tuple.1 { +// // if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') +// // { +// // if namespace == T::PREFIX { +// // found = true; +// // config.set(key, value.as_str())?; +// // } +// // } +// // } +// // } +// // } +// // +// // Ok(config) +// // } +// // } +// +// impl ForeignExtensions { +// pub fn add_config(&mut self, config: &C) -> Result<()> { +// for entry in config.entries() { +// if let Some(value) = entry.value { +// let key = format!("{}.{}", C::PREFIX, entry.key); +// self.set(key.as_str(), value.as_str())?; +// } +// } +// +// Ok(()) +// } +// } +// +// impl ConfigExtension for ForeignExtensions { +// const PREFIX: &'static str = "datafusion_ffi"; +// } +// +// impl Extensions for ForeignExtensions { +// fn as_any(&self) -> &dyn Any { +// self +// } +// +// fn as_any_mut(&mut self) -> &mut dyn Any { +// self +// } +// +// fn cloned(&self) -> Box { +// let ffi_options = unsafe { (self.0.cloned)(&self.0) }; +// let foreign_options = ForeignExtensions(ffi_options); +// Box::new(foreign_options) +// } +// +// fn set(&mut self, key: &str, value: &str) -> Result<()> { +// println!("Setting {key} = {value}"); +// let Some((namespace, key)) = key.split_once('.') else { +// return exec_err!("Unable to set FFI config value without namespace set"); +// }; +// +// // if namespace != ForeignExtensions::PREFIX { +// // return exec_err!("Unexpected namespace {namespace} set for FFI config"); +// // } +// +// df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) +// } +// +// fn entries(&self) -> Vec { +// unsafe { +// (self.0.entries)(&self.0) +// .into_iter() +// .map(|entry_tuple| ConfigEntry { +// key: entry_tuple.0.into(), +// value: Some(entry_tuple.1.into()), +// description: "ffi_config_options", +// }) +// .collect() +// } +// } +// } +// +// impl From<&ForeignExtensions> for C { +// fn from(options: &ForeignExtensions) -> Self { +// +// } +// } +// +// #[cfg(test)] +// mod tests { +// use datafusion_common::config::{ConfigExtension, ConfigOptions}; +// use datafusion_common::extensions_options; +// +// use crate::config_options::{FFI_Extensions, ForeignExtensions}; +// +// // Define a new configuration struct using the `extensions_options` macro +// extensions_options! { +// /// My own config options. +// pub struct MyConfig { +// /// Should "foo" be replaced by "bar"? +// pub foo_to_bar: bool, default = true +// +// /// How many "baz" should be created? +// pub baz_count: usize, default = 1337 +// } +// } +// +// impl ConfigExtension for MyConfig { +// const PREFIX: &'static str = "my_config"; +// } +// +// #[test] +// fn round_trip_ffi_extension_options() { +// // set up config struct and register extension +// let mut config = ConfigOptions::default(); +// let mut foreign_options = ForeignExtensions(FFI_Extensions::default()); +// foreign_options.add_config(&MyConfig::default()).unwrap(); +// +// config.extensions.insert(foreign_options); +// // config.extensions.insert(MyConfig::default()); +// +// // overwrite config default +// config.set("my_config.baz_count", "42").unwrap(); +// +// // check config state +// let my_config = config.extensions.get::().unwrap(); +// assert!(my_config.foo_to_bar,); +// assert_eq!(my_config.baz_count, 42,); +// +// // let boxed_config = Box::new(MyConfig::default()) as Box; +// // let mut ffi_config = FFI_Extensions::from(boxed_config); +// // ffi_config.library_marker_id = crate::mock_foreign_marker_id; +// // let foreign_config: Box = ffi_config.into(); +// // +// // config.extensions.insert(foreign_config); +// } +// } diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs new file mode 100644 index 000000000000..f6a9cd7d4ccc --- /dev/null +++ b/datafusion/ffi/src/config/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod extensions; +pub mod extension_options; \ No newline at end of file diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 4a6ef5a90c7e..a5759e397df1 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -28,7 +28,7 @@ pub mod arrow_wrappers; pub mod catalog_provider; pub mod catalog_provider_list; -pub mod config_options; +pub mod config; pub mod execution; pub mod execution_plan; pub mod expr; From 0f5ca5e9dfd99b6d0ba8d705b8682b71d0a3e5be Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 23 Dec 2025 06:46:00 -0500 Subject: [PATCH 5/9] formatting --- datafusion/ffi/src/config/extension_options.rs | 6 ++++-- datafusion/ffi/src/config/mod.rs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs index 12ac7bf5b718..b07ab5f11286 100644 --- a/datafusion/ffi/src/config/extension_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -23,7 +23,7 @@ use abi_stable::StableAbi; use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; use datafusion::error::Result; use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_common::{DataFusionError, exec_err}; use crate::df_result; @@ -257,7 +257,9 @@ mod tests { use datafusion_common::config::{ConfigExtension, ConfigOptions}; use datafusion_common::extensions_options; - use crate::config::extension_options::{FFI_ExtensionOptions, ForeignExtensionOptions}; + use crate::config::extension_options::{ + FFI_ExtensionOptions, ForeignExtensionOptions, + }; // Define a new configuration struct using the `extensions_options` macro extensions_options! { diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs index f6a9cd7d4ccc..af96fd0228ff 100644 --- a/datafusion/ffi/src/config/mod.rs +++ b/datafusion/ffi/src/config/mod.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. +pub mod extension_options; pub mod extensions; -pub mod extension_options; \ No newline at end of file From 71b6c3162fa07eb087f284d45b8d6f1fd61ea183 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 23 Dec 2025 10:23:35 -0500 Subject: [PATCH 6/9] Minimal workable example, tested against df-python --- datafusion/common/src/config.rs | 22 ++- .../ffi/src/config/extension_options.rs | 133 ++++++++---------- datafusion/ffi/src/udwf/mod.rs | 1 - 3 files changed, 78 insertions(+), 78 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 65c8937dbbff..e54db49d7b60 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1340,12 +1340,12 @@ impl ConfigOptions { /// Set a configuration option pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, key)) = key.split_once('.') else { + let Some((mut prefix, mut inner_key)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; if prefix == "datafusion" { - if key == "optimizer.enable_dynamic_filter_pushdown" { + if inner_key == "optimizer.enable_dynamic_filter_pushdown" { let bool_value = value.parse::().map_err(|e| { DataFusionError::Configuration(format!( "Failed to parse '{value}' as bool: {e}", @@ -1360,13 +1360,18 @@ impl ConfigOptions { } return Ok(()); } - return ConfigField::set(self, key, value); + return ConfigField::set(self, inner_key, value); + } + + if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") { + inner_key = key; + prefix = "datafusion_ffi"; } let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; - e.0.set(key, value) + e.0.set(inner_key, value) } /// Create new [`ConfigOptions`], taking values from environment variables @@ -1613,6 +1618,7 @@ impl Extensions { /// Retrieves the extension of the given type if any pub fn get_mut(&mut self) -> Option<&mut T> { + println!("extensions trying get_mut on prefix {}", T::PREFIX); let e = self.0.get_mut(T::PREFIX)?; e.0.as_any_mut().downcast_mut() } @@ -2131,7 +2137,7 @@ impl TableOptions { /// /// A result indicating success or failure in setting the configuration option. pub fn set(&mut self, key: &str, value: &str) -> Result<()> { - let Some((prefix, _)) = key.split_once('.') else { + let Some((mut prefix, _)) = key.split_once('.') else { return _config_err!("could not find config namespace for key \"{key}\""); }; @@ -2143,6 +2149,12 @@ impl TableOptions { return Ok(()); } + if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") { + prefix = "datafusion_ffi"; + } else { + println!("Existing keys {:?}", self.extensions.0.keys()); + } + let Some(e) = self.extensions.0.get_mut(prefix) else { return _config_err!("Could not find config namespace \"{prefix}\""); }; diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs index b07ab5f11286..f8e0ffe95151 100644 --- a/datafusion/ffi/src/config/extension_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -21,9 +21,8 @@ use std::ffi::c_void; use abi_stable::StableAbi; use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; -use datafusion::error::Result; use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; -use datafusion_common::{DataFusionError, exec_err}; +use datafusion_common::{Result, exec_err}; use crate::df_result; @@ -71,15 +70,15 @@ pub struct ExtensionOptionsPrivateData { impl FFI_ExtensionOptions { #[inline] - unsafe fn inner_mut(&mut self) -> &mut HashMap { + fn inner_mut(&mut self) -> &mut HashMap { let private_data = self.private_data as *mut ExtensionOptionsPrivateData; - &mut (*private_data).options + unsafe { &mut (*private_data).options } } #[inline] - unsafe fn inner(&self) -> &HashMap { + fn inner(&self) -> &HashMap { let private_data = self.private_data as *const ExtensionOptionsPrivateData; - &(*private_data).options + unsafe { &(*private_data).options } } } @@ -114,8 +113,9 @@ unsafe extern "C" fn entries_fn_wrapper( } unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) { - let private_data = - Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData); + let private_data = unsafe { + Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData) + }; drop(private_data); } @@ -145,44 +145,25 @@ impl Drop for FFI_ExtensionOptions { } } +impl Clone for FFI_ExtensionOptions { + fn clone(&self) -> Self { + unsafe { (self.cloned)(&self) } + } +} + /// This struct is used to access an UDF provided by a foreign /// library across a FFI boundary. /// /// The ForeignExtensionOptions is to be used by the caller of the UDF, so it has /// no knowledge or access to the private data. All interaction with the UDF /// must occur through the functions defined in FFI_ExtensionOptions. -#[derive(Debug)] -pub struct ForeignExtensionOptions(FFI_ExtensionOptions); - -unsafe impl Send for ForeignExtensionOptions {} -unsafe impl Sync for ForeignExtensionOptions {} - -// impl TryFrom for T { -// type Error = DataFusionError; +// #[derive(Debug)] +// pub struct ForeignExtensionOptions(FFI_ExtensionOptions); // -// fn try_from(options: &FFI_ExtensionOptions) -> Result { -// let mut config = T::default(); -// -// let mut found = false; -// unsafe { -// for entry_tuple in (options.entries)(&options).into_iter() { -// if let ROption::RSome(value) = entry_tuple.1 { -// if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') -// { -// if namespace == T::PREFIX { -// found = true; -// config.set(key, value.as_str())?; -// } -// } -// } -// } -// } -// -// Ok(config) -// } -// } +// unsafe impl Send for ForeignExtensionOptions {} +// unsafe impl Sync for ForeignExtensionOptions {} -impl ForeignExtensionOptions { +impl FFI_ExtensionOptions { pub fn add_config(&mut self, config: &C) -> Result<()> { for entry in config.entries() { if let Some(value) = entry.value { @@ -193,13 +174,22 @@ impl ForeignExtensionOptions { Ok(()) } + + pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> { + for entry in other.entries() { + if let Some(value) = entry.value { + self.set(entry.key.as_str(), value.as_str())?; + } + } + Ok(()) + } } -impl ConfigExtension for ForeignExtensionOptions { +impl ConfigExtension for FFI_ExtensionOptions { const PREFIX: &'static str = "datafusion_ffi"; } -impl ExtensionOptions for ForeignExtensionOptions { +impl ExtensionOptions for FFI_ExtensionOptions { fn as_any(&self) -> &dyn Any { self } @@ -209,23 +199,21 @@ impl ExtensionOptions for ForeignExtensionOptions { } fn cloned(&self) -> Box { - let ffi_options = unsafe { (self.0.cloned)(&self.0) }; - let foreign_options = ForeignExtensionOptions(ffi_options); - Box::new(foreign_options) + let ffi_options = unsafe { (self.cloned)(&self) }; + Box::new(ffi_options) } fn set(&mut self, key: &str, value: &str) -> Result<()> { - println!("Setting {key} = {value}"); if key.split_once('.').is_none() { return exec_err!("Unable to set FFI config value without namespace set"); }; - df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) + df_result!(unsafe { (self.set)(self, key.into(), value.into()) }) } fn entries(&self) -> Vec { unsafe { - (self.0.entries)(&self.0) + (self.entries)(&self) .into_iter() .map(|entry_tuple| ConfigEntry { key: entry_tuple.0.into(), @@ -237,18 +225,24 @@ impl ExtensionOptions for ForeignExtensionOptions { } } -// TODO: Maybe get rid of ForeignExtensionOptions? -impl TryFrom<&ForeignExtensionOptions> for C { - type Error = DataFusionError; - fn try_from(options: &ForeignExtensionOptions) -> Result { +impl FFI_ExtensionOptions { + pub fn to_extension(&self) -> Result { let mut result = C::default(); - for entry in options.entries() { - if let Some(value) = entry.value { - result.set(entry.key.as_str(), value.as_str())?; + + unsafe { + for entry in (self.entries)(&self) { + let key = entry.0.as_str(); + let value = entry.1.as_str(); + + if let Some((prefix, inner_key)) = key.split_once('.') + && prefix == C::PREFIX + { + result.set(inner_key, value)?; + } } } - result + Ok(result) } } @@ -257,9 +251,7 @@ mod tests { use datafusion_common::config::{ConfigExtension, ConfigOptions}; use datafusion_common::extensions_options; - use crate::config::extension_options::{ - FFI_ExtensionOptions, ForeignExtensionOptions, - }; + use crate::config::extension_options::FFI_ExtensionOptions; // Define a new configuration struct using the `extensions_options` macro extensions_options! { @@ -281,26 +273,23 @@ mod tests { fn round_trip_ffi_extension_options() { // set up config struct and register extension let mut config = ConfigOptions::default(); - let mut foreign_options = - ForeignExtensionOptions(FFI_ExtensionOptions::default()); - foreign_options.add_config(&MyConfig::default()).unwrap(); + let mut ffi_options = FFI_ExtensionOptions::default(); + ffi_options.add_config(&MyConfig::default()).unwrap(); - config.extensions.insert(foreign_options); - // config.extensions.insert(MyConfig::default()); + config.extensions.insert(ffi_options); // overwrite config default config.set("my_config.baz_count", "42").unwrap(); // check config state - let my_config = config.extensions.get::().unwrap(); - assert!(my_config.foo_to_bar,); - assert_eq!(my_config.baz_count, 42,); - - // let boxed_config = Box::new(MyConfig::default()) as Box; - // let mut ffi_config = FFI_ExtensionOptions::from(boxed_config); - // ffi_config.library_marker_id = crate::mock_foreign_marker_id; - // let foreign_config: Box = ffi_config.into(); - // - // config.extensions.insert(foreign_config); + let returned_ffi_config = + config.extensions.get::().unwrap(); + let my_config: MyConfig = returned_ffi_config.to_extension().unwrap(); + + // check default value + assert!(my_config.foo_to_bar); + + // check overwritten value + assert_eq!(my_config.baz_count, 42); } } diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs index bded8b208054..cf59d2c05f5a 100644 --- a/datafusion/ffi/src/udwf/mod.rs +++ b/datafusion/ffi/src/udwf/mod.rs @@ -399,7 +399,6 @@ impl From<&FFI_SortOptions> for SortOptions { mod tests { use std::sync::Arc; - use crate::tests::create_record_batch; use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF}; use arrow::array::{ArrayRef, RecordBatch, create_array}; use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf}; From 859586ba38fbb59bc7e21e1cb86e8cbee9d04ef1 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 23 Dec 2025 10:36:19 -0500 Subject: [PATCH 7/9] Cleaning up temp code --- datafusion/common/src/config.rs | 8 +- .../ffi/src/config/extension_options.rs | 92 +++--- datafusion/ffi/src/config/extensions.rs | 298 ------------------ datafusion/ffi/src/config/mod.rs | 1 - 4 files changed, 45 insertions(+), 354 deletions(-) delete mode 100644 datafusion/ffi/src/config/extensions.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e54db49d7b60..56a682f07270 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1363,7 +1363,9 @@ impl ConfigOptions { return ConfigField::set(self, inner_key, value); } - if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") { + if !self.extensions.0.contains_key(prefix) + && self.extensions.0.contains_key("datafusion_ffi") + { inner_key = key; prefix = "datafusion_ffi"; } @@ -2149,7 +2151,9 @@ impl TableOptions { return Ok(()); } - if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") { + if !self.extensions.0.contains_key(prefix) + && self.extensions.0.contains_key("datafusion_ffi") + { prefix = "datafusion_ffi"; } else { println!("Existing keys {:?}", self.extensions.0.keys()); diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs index f8e0ffe95151..1afc2a22d198 100644 --- a/datafusion/ffi/src/config/extension_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -27,17 +27,23 @@ use datafusion_common::{Result, exec_err}; use crate::df_result; /// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries. -/// For an explanation of each field, see the corresponding function -/// defined in [`ExtensionOptions`]. +/// +/// Unlike other FFI structs in this crate, we do not construct a foreign +/// variant of this object. This is due to the typical method for interacting +/// with extension options is by creating a local struct of your concrete type. +/// To support this methodology use the `to_extension` method instead. #[repr(C)] #[derive(Debug, StableAbi)] #[allow(non_camel_case_types)] pub struct FFI_ExtensionOptions { + /// Return a deep clone of this [`ExtensionOptions`] pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions, + /// Set the given `key`, `value` pair pub set: unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, + /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`] pub entries: unsafe extern "C" fn(&Self) -> RVec>, /// Release the memory of the private data when it is no longer being used. @@ -48,19 +54,6 @@ pub struct FFI_ExtensionOptions { pub private_data: *mut c_void, } -// TODO(tsaucer) We have a problem in datafusion_common::config::Extension::get -// which relies on knowing the concrete types of the extensions so that we can -// use their PREFIX for insertion of configs. We cannot work around this using -// things like `fn namespace() -> &'static str` because we must be able to do -// this without having an instance. Instead we will go to an approach of having -// a concrete FFI_ForeignConfigExtension and add a check into all of the methods -// in the above `get` (and similar) methods to check to see if we have an FFI -// configs. If so we get the concrete FFI config and then have a method that will -// convert from FFI_ForeignExtensionConfig into the concrete type. Somehow our -// FFI library will need to make this as easy an experience as they are used to -// so maybe we need to implement something at the `Extensions` level in addition -// to the ConfigExtension. - unsafe impl Send for FFI_ExtensionOptions {} unsafe impl Sync for FFI_ExtensionOptions {} @@ -147,41 +140,7 @@ impl Drop for FFI_ExtensionOptions { impl Clone for FFI_ExtensionOptions { fn clone(&self) -> Self { - unsafe { (self.cloned)(&self) } - } -} - -/// This struct is used to access an UDF provided by a foreign -/// library across a FFI boundary. -/// -/// The ForeignExtensionOptions is to be used by the caller of the UDF, so it has -/// no knowledge or access to the private data. All interaction with the UDF -/// must occur through the functions defined in FFI_ExtensionOptions. -// #[derive(Debug)] -// pub struct ForeignExtensionOptions(FFI_ExtensionOptions); -// -// unsafe impl Send for ForeignExtensionOptions {} -// unsafe impl Sync for ForeignExtensionOptions {} - -impl FFI_ExtensionOptions { - pub fn add_config(&mut self, config: &C) -> Result<()> { - for entry in config.entries() { - if let Some(value) = entry.value { - let key = format!("{}.{}", C::PREFIX, entry.key); - self.set(key.as_str(), value.as_str())?; - } - } - - Ok(()) - } - - pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> { - for entry in other.entries() { - if let Some(value) = entry.value { - self.set(entry.key.as_str(), value.as_str())?; - } - } - Ok(()) + unsafe { (self.cloned)(self) } } } @@ -199,7 +158,7 @@ impl ExtensionOptions for FFI_ExtensionOptions { } fn cloned(&self) -> Box { - let ffi_options = unsafe { (self.cloned)(&self) }; + let ffi_options = unsafe { (self.cloned)(self) }; Box::new(ffi_options) } @@ -213,7 +172,7 @@ impl ExtensionOptions for FFI_ExtensionOptions { fn entries(&self) -> Vec { unsafe { - (self.entries)(&self) + (self.entries)(self) .into_iter() .map(|entry_tuple| ConfigEntry { key: entry_tuple.0.into(), @@ -226,11 +185,38 @@ impl ExtensionOptions for FFI_ExtensionOptions { } impl FFI_ExtensionOptions { + /// Add all of the values in a concrete configuration extension to the + /// FFI variant. This is safe to call on either side of the FFI + /// boundary. + pub fn add_config(&mut self, config: &C) -> Result<()> { + for entry in config.entries() { + if let Some(value) = entry.value { + let key = format!("{}.{}", C::PREFIX, entry.key); + self.set(key.as_str(), value.as_str())?; + } + } + + Ok(()) + } + + /// Merge another `FFI_ExtensionOptions` configurations into this one. + /// This is safe to call on either side of the FFI boundary. + pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> { + for entry in other.entries() { + if let Some(value) = entry.value { + self.set(entry.key.as_str(), value.as_str())?; + } + } + Ok(()) + } + + /// Create a concrete extension type from the FFI variant. + /// This is safe to call on either side of the FFI boundary. pub fn to_extension(&self) -> Result { let mut result = C::default(); unsafe { - for entry in (self.entries)(&self) { + for entry in (self.entries)(self) { let key = entry.0.as_str(); let value = entry.1.as_str(); diff --git a/datafusion/ffi/src/config/extensions.rs b/datafusion/ffi/src/config/extensions.rs deleted file mode 100644 index 1196ad535d25..000000000000 --- a/datafusion/ffi/src/config/extensions.rs +++ /dev/null @@ -1,298 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// -// use std::any::Any; -// use std::collections::HashMap; -// use std::ffi::c_void; -// -// use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; -// use abi_stable::StableAbi; -// use datafusion::error::Result; -// use datafusion_common::config::{ConfigEntry, ConfigExtension, Extensions}; -// use datafusion_common::exec_err; -// -// use crate::df_result; -// -// /// A stable struct for sharing [`Extensions`] across FFI boundaries. -// /// For an explanation of each field, see the corresponding function -// /// defined in [`Extensions`]. -// #[repr(C)] -// #[derive(Debug, StableAbi)] -// #[allow(non_camel_case_types)] -// pub struct FFI_Extensions { -// pub cloned: unsafe extern "C" fn(&Self) -> FFI_Extensions, -// -// pub set: -// unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, -// -// pub entries: unsafe extern "C" fn(&Self) -> RVec>, -// -// /// Release the memory of the private data when it is no longer being used. -// pub release: unsafe extern "C" fn(&mut Self), -// -// /// Internal data. This is only to be accessed by the provider of the options. -// /// A [`ForeignExtensions`] should never attempt to access this data. -// pub private_data: *mut c_void, -// } -// -// // TODO(tsaucer) We have a problem in datafusion_common::config::Extension::get -// // which relies on knowing the concrete types of the extensions so that we can -// // use their PREFIX for insertion of configs. We cannot work around this using -// // things like `fn namespace() -> &'static str` because we must be able to do -// // this without having an instance. Instead we will go to an approach of having -// // a concrete FFI_ForeignConfigExtension and add a check into all of the methods -// // in the above `get` (and similar) methods to check to see if we have an FFI -// // configs. If so we get the concrete FFI config and then have a method that will -// // convert from FFI_ForeignExtensionConfig into the concrete type. Somehow our -// // FFI library will need to make this as easy an experience as they are used to -// // so maybe we need to implement something at the `Extensions` level in addition -// // to the ConfigExtension. -// -// unsafe impl Send for FFI_Extensions {} -// unsafe impl Sync for FFI_Extensions {} -// -// pub struct ExtensionsPrivateData { -// pub options: HashMap, -// } -// -// impl FFI_Extensions { -// #[inline] -// unsafe fn inner_mut(&mut self) -> &mut HashMap { -// let private_data = self.private_data as *mut ExtensionsPrivateData; -// &mut (*private_data).options -// } -// -// #[inline] -// unsafe fn inner(&self) -> &HashMap { -// let private_data = self.private_data as *const ExtensionsPrivateData; -// &(*private_data).options -// } -// } -// -// unsafe extern "C" fn cloned_fn_wrapper( -// options: &FFI_Extensions, -// ) -> FFI_Extensions { -// options -// .inner() -// .iter() -// .map(|(k, v)| (k.to_owned(), v.to_owned())) -// .collect::>() -// .into() -// } -// -// unsafe extern "C" fn set_fn_wrapper( -// options: &mut FFI_Extensions, -// key: RStr, -// value: RStr, -// ) -> RResult<(), RString> { -// let _ = options.inner_mut().insert(key.into(), value.into()); -// RResult::ROk(()) -// } -// -// unsafe extern "C" fn entries_fn_wrapper( -// options: &FFI_Extensions, -// ) -> RVec> { -// options -// .inner() -// .iter() -// .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) -// .collect() -// } -// -// unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_Extensions) { -// let private_data = -// Box::from_raw(options.private_data as *mut ExtensionsPrivateData); -// drop(private_data); -// } -// -// impl Default for FFI_Extensions { -// fn default() -> Self { -// HashMap::new().into() -// } -// } -// -// impl From> for FFI_Extensions { -// fn from(options: HashMap) -> Self { -// let private_data = ExtensionsPrivateData { options }; -// -// Self { -// cloned: cloned_fn_wrapper, -// set: set_fn_wrapper, -// entries: entries_fn_wrapper, -// release: release_fn_wrapper, -// private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, -// } -// } -// } -// -// impl Drop for FFI_Extensions { -// fn drop(&mut self) { -// unsafe { (self.release)(self) } -// } -// } -// -// /// This struct is used to access an UDF provided by a foreign -// /// library across a FFI boundary. -// /// -// /// The ForeignExtensions is to be used by the caller of the UDF, so it has -// /// no knowledge or access to the private data. All interaction with the UDF -// /// must occur through the functions defined in FFI_Extensions. -// #[derive(Debug)] -// pub struct ForeignExtensions(FFI_Extensions); -// -// unsafe impl Send for ForeignExtensions {} -// unsafe impl Sync for ForeignExtensions {} -// -// // impl TryFrom for T { -// // type Error = DataFusionError; -// // -// // fn try_from(options: &FFI_Extensions) -> Result { -// // let mut config = T::default(); -// // -// // let mut found = false; -// // unsafe { -// // for entry_tuple in (options.entries)(&options).into_iter() { -// // if let ROption::RSome(value) = entry_tuple.1 { -// // if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') -// // { -// // if namespace == T::PREFIX { -// // found = true; -// // config.set(key, value.as_str())?; -// // } -// // } -// // } -// // } -// // } -// // -// // Ok(config) -// // } -// // } -// -// impl ForeignExtensions { -// pub fn add_config(&mut self, config: &C) -> Result<()> { -// for entry in config.entries() { -// if let Some(value) = entry.value { -// let key = format!("{}.{}", C::PREFIX, entry.key); -// self.set(key.as_str(), value.as_str())?; -// } -// } -// -// Ok(()) -// } -// } -// -// impl ConfigExtension for ForeignExtensions { -// const PREFIX: &'static str = "datafusion_ffi"; -// } -// -// impl Extensions for ForeignExtensions { -// fn as_any(&self) -> &dyn Any { -// self -// } -// -// fn as_any_mut(&mut self) -> &mut dyn Any { -// self -// } -// -// fn cloned(&self) -> Box { -// let ffi_options = unsafe { (self.0.cloned)(&self.0) }; -// let foreign_options = ForeignExtensions(ffi_options); -// Box::new(foreign_options) -// } -// -// fn set(&mut self, key: &str, value: &str) -> Result<()> { -// println!("Setting {key} = {value}"); -// let Some((namespace, key)) = key.split_once('.') else { -// return exec_err!("Unable to set FFI config value without namespace set"); -// }; -// -// // if namespace != ForeignExtensions::PREFIX { -// // return exec_err!("Unexpected namespace {namespace} set for FFI config"); -// // } -// -// df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) -// } -// -// fn entries(&self) -> Vec { -// unsafe { -// (self.0.entries)(&self.0) -// .into_iter() -// .map(|entry_tuple| ConfigEntry { -// key: entry_tuple.0.into(), -// value: Some(entry_tuple.1.into()), -// description: "ffi_config_options", -// }) -// .collect() -// } -// } -// } -// -// impl From<&ForeignExtensions> for C { -// fn from(options: &ForeignExtensions) -> Self { -// -// } -// } -// -// #[cfg(test)] -// mod tests { -// use datafusion_common::config::{ConfigExtension, ConfigOptions}; -// use datafusion_common::extensions_options; -// -// use crate::config_options::{FFI_Extensions, ForeignExtensions}; -// -// // Define a new configuration struct using the `extensions_options` macro -// extensions_options! { -// /// My own config options. -// pub struct MyConfig { -// /// Should "foo" be replaced by "bar"? -// pub foo_to_bar: bool, default = true -// -// /// How many "baz" should be created? -// pub baz_count: usize, default = 1337 -// } -// } -// -// impl ConfigExtension for MyConfig { -// const PREFIX: &'static str = "my_config"; -// } -// -// #[test] -// fn round_trip_ffi_extension_options() { -// // set up config struct and register extension -// let mut config = ConfigOptions::default(); -// let mut foreign_options = ForeignExtensions(FFI_Extensions::default()); -// foreign_options.add_config(&MyConfig::default()).unwrap(); -// -// config.extensions.insert(foreign_options); -// // config.extensions.insert(MyConfig::default()); -// -// // overwrite config default -// config.set("my_config.baz_count", "42").unwrap(); -// -// // check config state -// let my_config = config.extensions.get::().unwrap(); -// assert!(my_config.foo_to_bar,); -// assert_eq!(my_config.baz_count, 42,); -// -// // let boxed_config = Box::new(MyConfig::default()) as Box; -// // let mut ffi_config = FFI_Extensions::from(boxed_config); -// // ffi_config.library_marker_id = crate::mock_foreign_marker_id; -// // let foreign_config: Box = ffi_config.into(); -// // -// // config.extensions.insert(foreign_config); -// } -// } diff --git a/datafusion/ffi/src/config/mod.rs b/datafusion/ffi/src/config/mod.rs index af96fd0228ff..b77b5d410e79 100644 --- a/datafusion/ffi/src/config/mod.rs +++ b/datafusion/ffi/src/config/mod.rs @@ -16,4 +16,3 @@ // under the License. pub mod extension_options; -pub mod extensions; From e811330f7c53d7a6e5ba1d02d5bbfe440ba257a3 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 23 Dec 2025 10:39:52 -0500 Subject: [PATCH 8/9] Remove constant used throughout --- datafusion/common/src/config.rs | 19 ++++++++++++------- .../ffi/src/config/extension_options.rs | 3 ++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 56a682f07270..332d7cf10475 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1326,6 +1326,8 @@ impl ConfigField for ConfigOptions { } } +pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi"; + impl ConfigOptions { /// Creates a new [`ConfigOptions`] with default values pub fn new() -> Self { @@ -1364,10 +1366,13 @@ impl ConfigOptions { } if !self.extensions.0.contains_key(prefix) - && self.extensions.0.contains_key("datafusion_ffi") + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) { inner_key = key; - prefix = "datafusion_ffi"; + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; } let Some(e) = self.extensions.0.get_mut(prefix) else { @@ -1620,7 +1625,6 @@ impl Extensions { /// Retrieves the extension of the given type if any pub fn get_mut(&mut self) -> Option<&mut T> { - println!("extensions trying get_mut on prefix {}", T::PREFIX); let e = self.0.get_mut(T::PREFIX)?; e.0.as_any_mut().downcast_mut() } @@ -2152,11 +2156,12 @@ impl TableOptions { } if !self.extensions.0.contains_key(prefix) - && self.extensions.0.contains_key("datafusion_ffi") + && self + .extensions + .0 + .contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE) { - prefix = "datafusion_ffi"; - } else { - println!("Existing keys {:?}", self.extensions.0.keys()); + prefix = DATAFUSION_FFI_CONFIG_NAMESPACE; } let Some(e) = self.extensions.0.get_mut(prefix) else { diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs index 1afc2a22d198..1ab5f744d7a0 100644 --- a/datafusion/ffi/src/config/extension_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -145,7 +145,8 @@ impl Clone for FFI_ExtensionOptions { } impl ConfigExtension for FFI_ExtensionOptions { - const PREFIX: &'static str = "datafusion_ffi"; + const PREFIX: &'static str = + datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE; } impl ExtensionOptions for FFI_ExtensionOptions { From bd9be4522665ac7adfa5b1a9fe808067749092e0 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 23 Dec 2025 10:44:32 -0500 Subject: [PATCH 9/9] docstring --- datafusion/ffi/src/config/extension_options.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/ffi/src/config/extension_options.rs b/datafusion/ffi/src/config/extension_options.rs index 1ab5f744d7a0..cca3860b19ad 100644 --- a/datafusion/ffi/src/config/extension_options.rs +++ b/datafusion/ffi/src/config/extension_options.rs @@ -32,6 +32,11 @@ use crate::df_result; /// variant of this object. This is due to the typical method for interacting /// with extension options is by creating a local struct of your concrete type. /// To support this methodology use the `to_extension` method instead. +/// +/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension +/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys +/// are stored with the full path prefix to avoid overwriting values when using +/// multiple extensions. #[repr(C)] #[derive(Debug, StableAbi)] #[allow(non_camel_case_types)]