Skip to content

Commit ef85152

Browse files
authored
feat(arrow): Convert Arrow schema to Iceberg schema with auto assigned field ids (#1928)
## Which issue does this PR close? - Closes #1927 ## What changes are included in this PR? - Modified ArrowSchemaConverter to enable id reassignment - Added a new pub helper: `arrow_schema_to_schema_auto_assign_ids` ## Are these changes tested? Added uts
1 parent de6a2b9 commit ef85152

File tree

5 files changed

+226
-18
lines changed

5 files changed

+226
-18
lines changed

crates/iceberg/src/arrow/schema.rs

Lines changed: 220 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ use uuid::Uuid;
3535

3636
use crate::error::Result;
3737
use crate::spec::{
38-
Datum, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, PrimitiveType, Schema,
39-
SchemaVisitor, StructType, Type,
38+
Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral,
39+
PrimitiveType, Schema, SchemaVisitor, StructType, Type,
4040
};
4141
use crate::{Error, ErrorKind};
4242

@@ -221,6 +221,19 @@ pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
221221
visit_schema(schema, &mut visitor)
222222
}
223223

224+
/// Convert Arrow schema to Iceberg schema with automatically assigned field IDs.
225+
///
226+
/// Unlike [`arrow_schema_to_schema`], this function does not require field IDs in the Arrow
227+
/// schema metadata. Instead, it automatically assigns unique field IDs starting from 1,
228+
/// following Iceberg's field ID assignment rules.
229+
///
230+
/// This is useful when converting Arrow schemas that don't originate from Iceberg tables,
231+
/// such as schemas from DataFusion or other Arrow-based systems.
232+
pub fn arrow_schema_to_schema_auto_assign_ids(schema: &ArrowSchema) -> Result<Schema> {
233+
let mut visitor = ArrowSchemaConverter::new_with_field_ids_from(FIRST_FIELD_ID);
234+
visit_schema(schema, &mut visitor)
235+
}
236+
224237
/// Convert Arrow type to iceberg type.
225238
pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
226239
let mut visitor = ArrowSchemaConverter::new();
@@ -229,7 +242,7 @@ pub fn arrow_type_to_type(ty: &DataType) -> Result<Type> {
229242

230243
const ARROW_FIELD_DOC_KEY: &str = "doc";
231244

232-
pub(super) fn get_field_id(field: &Field) -> Result<i32> {
245+
pub(super) fn get_field_id_from_metadata(field: &Field) -> Result<i32> {
233246
if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) {
234247
return value.parse::<i32>().map_err(|e| {
235248
Error::new(
@@ -253,19 +266,55 @@ fn get_field_doc(field: &Field) -> Option<String> {
253266
None
254267
}
255268

256-
struct ArrowSchemaConverter;
269+
struct ArrowSchemaConverter {
270+
/// When set, the schema builder will reassign field IDs starting from this value
271+
/// using level-order traversal (breadth-first).
272+
reassign_field_ids_from: Option<i32>,
273+
/// Generates unique placeholder IDs for fields before reassignment.
274+
/// Required because `ReassignFieldIds` builds an old-to-new ID mapping
275+
/// that expects unique input IDs.
276+
next_field_id: i32,
277+
}
257278

258279
impl ArrowSchemaConverter {
259280
fn new() -> Self {
260-
Self {}
281+
Self {
282+
reassign_field_ids_from: None,
283+
next_field_id: 0,
284+
}
285+
}
286+
287+
fn new_with_field_ids_from(start_from: i32) -> Self {
288+
Self {
289+
reassign_field_ids_from: Some(start_from),
290+
next_field_id: 0,
291+
}
292+
}
293+
294+
fn get_field_id(&mut self, field: &Field) -> Result<i32> {
295+
if self.reassign_field_ids_from.is_some() {
296+
// Field IDs will be reassigned by the schema builder.
297+
// We need unique temporary IDs because ReassignFieldIds builds an
298+
// old->new ID mapping that requires unique input IDs.
299+
let temp_id = self.next_field_id;
300+
self.next_field_id += 1;
301+
Ok(temp_id)
302+
} else {
303+
// Get field ID from arrow field metadata
304+
get_field_id_from_metadata(field)
305+
}
261306
}
262307

263-
fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result<Vec<NestedFieldRef>> {
308+
fn convert_fields(
309+
&mut self,
310+
fields: &Fields,
311+
field_results: &[Type],
312+
) -> Result<Vec<NestedFieldRef>> {
264313
let mut results = Vec::with_capacity(fields.len());
265314
for i in 0..fields.len() {
266315
let field = &fields[i];
267316
let field_type = &field_results[i];
268-
let id = get_field_id(field)?;
317+
let id = self.get_field_id(field)?;
269318
let doc = get_field_doc(field);
270319
let nested_field = NestedField {
271320
id,
@@ -287,13 +336,16 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
287336
type U = Schema;
288337

289338
fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> Result<Self::U> {
290-
let fields = Self::convert_fields(schema.fields(), &values)?;
291-
let builder = Schema::builder().with_fields(fields);
339+
let fields = self.convert_fields(schema.fields(), &values)?;
340+
let mut builder = Schema::builder().with_fields(fields);
341+
if let Some(start_from) = self.reassign_field_ids_from {
342+
builder = builder.with_reassigned_field_ids(start_from)
343+
}
292344
builder.build()
293345
}
294346

295347
fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> Result<Self::T> {
296-
let fields = Self::convert_fields(fields, &results)?;
348+
let fields = self.convert_fields(fields, &results)?;
297349
Ok(Type::Struct(StructType::new(fields)))
298350
}
299351

@@ -310,7 +362,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
310362
}
311363
};
312364

313-
let id = get_field_id(element_field)?;
365+
let id = self.get_field_id(element_field)?;
314366
let doc = get_field_doc(element_field);
315367
let mut element_field =
316368
NestedField::list_element(id, value.clone(), !element_field.is_nullable());
@@ -335,15 +387,15 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter {
335387
let key_field = &fields[0];
336388
let value_field = &fields[1];
337389

338-
let key_id = get_field_id(key_field)?;
390+
let key_id = self.get_field_id(key_field)?;
339391
let key_doc = get_field_doc(key_field);
340392
let mut key_field = NestedField::map_key_element(key_id, key_value.clone());
341393
if let Some(doc) = key_doc {
342394
key_field = key_field.with_doc(doc);
343395
}
344396
let key_field = Arc::new(key_field);
345397

346-
let value_id = get_field_id(value_field)?;
398+
let value_id = self.get_field_id(value_field)?;
347399
let value_doc = get_field_doc(value_field);
348400
let mut value_field = NestedField::map_value_element(
349401
value_id,
@@ -1932,4 +1984,159 @@ mod tests {
19321984
assert_eq!(array.value(0), [66u8; 16]);
19331985
}
19341986
}
1987+
1988+
#[test]
1989+
fn test_arrow_schema_to_schema_with_field_id() {
1990+
// Create a complex Arrow schema without field ID metadata
1991+
// Including: primitives, list, nested struct, map, and nested list of structs
1992+
let arrow_schema = ArrowSchema::new(vec![
1993+
Field::new("id", DataType::Int64, false),
1994+
Field::new("name", DataType::Utf8, true),
1995+
Field::new("price", DataType::Decimal128(10, 2), false),
1996+
Field::new(
1997+
"created_at",
1998+
DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
1999+
true,
2000+
),
2001+
Field::new(
2002+
"tags",
2003+
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2004+
true,
2005+
),
2006+
Field::new(
2007+
"address",
2008+
DataType::Struct(Fields::from(vec![
2009+
Field::new("street", DataType::Utf8, true),
2010+
Field::new("city", DataType::Utf8, false),
2011+
Field::new("zip", DataType::Int32, true),
2012+
])),
2013+
true,
2014+
),
2015+
Field::new(
2016+
"attributes",
2017+
DataType::Map(
2018+
Arc::new(Field::new(
2019+
DEFAULT_MAP_FIELD_NAME,
2020+
DataType::Struct(Fields::from(vec![
2021+
Field::new("key", DataType::Utf8, false),
2022+
Field::new("value", DataType::Utf8, true),
2023+
])),
2024+
false,
2025+
)),
2026+
false,
2027+
),
2028+
true,
2029+
),
2030+
Field::new(
2031+
"orders",
2032+
DataType::List(Arc::new(Field::new(
2033+
"element",
2034+
DataType::Struct(Fields::from(vec![
2035+
Field::new("order_id", DataType::Int64, false),
2036+
Field::new("amount", DataType::Float64, false),
2037+
])),
2038+
true,
2039+
))),
2040+
true,
2041+
),
2042+
]);
2043+
2044+
let schema = arrow_schema_to_schema_auto_assign_ids(&arrow_schema).unwrap();
2045+
2046+
// Build expected schema with exact field IDs following level-order assignment:
2047+
// Level 0: id=1, name=2, price=3, created_at=4, tags=5, address=6, attributes=7, orders=8
2048+
// Level 1: tags.element=9, address.{street=10,city=11,zip=12}, attributes.{key=13,value=14}, orders.element=15
2049+
// Level 2: orders.element.{order_id=16,amount=17}
2050+
let expected = Schema::builder()
2051+
.with_fields(vec![
2052+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
2053+
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
2054+
NestedField::required(
2055+
3,
2056+
"price",
2057+
Type::Primitive(PrimitiveType::Decimal {
2058+
precision: 10,
2059+
scale: 2,
2060+
}),
2061+
)
2062+
.into(),
2063+
NestedField::optional(4, "created_at", Type::Primitive(PrimitiveType::Timestamptz))
2064+
.into(),
2065+
NestedField::optional(
2066+
5,
2067+
"tags",
2068+
Type::List(ListType {
2069+
element_field: NestedField::list_element(
2070+
9,
2071+
Type::Primitive(PrimitiveType::String),
2072+
false,
2073+
)
2074+
.into(),
2075+
}),
2076+
)
2077+
.into(),
2078+
NestedField::optional(
2079+
6,
2080+
"address",
2081+
Type::Struct(StructType::new(vec![
2082+
NestedField::optional(10, "street", Type::Primitive(PrimitiveType::String))
2083+
.into(),
2084+
NestedField::required(11, "city", Type::Primitive(PrimitiveType::String))
2085+
.into(),
2086+
NestedField::optional(12, "zip", Type::Primitive(PrimitiveType::Int))
2087+
.into(),
2088+
])),
2089+
)
2090+
.into(),
2091+
NestedField::optional(
2092+
7,
2093+
"attributes",
2094+
Type::Map(MapType {
2095+
key_field: NestedField::map_key_element(
2096+
13,
2097+
Type::Primitive(PrimitiveType::String),
2098+
)
2099+
.into(),
2100+
value_field: NestedField::map_value_element(
2101+
14,
2102+
Type::Primitive(PrimitiveType::String),
2103+
false,
2104+
)
2105+
.into(),
2106+
}),
2107+
)
2108+
.into(),
2109+
NestedField::optional(
2110+
8,
2111+
"orders",
2112+
Type::List(ListType {
2113+
element_field: NestedField::list_element(
2114+
15,
2115+
Type::Struct(StructType::new(vec![
2116+
NestedField::required(
2117+
16,
2118+
"order_id",
2119+
Type::Primitive(PrimitiveType::Long),
2120+
)
2121+
.into(),
2122+
NestedField::required(
2123+
17,
2124+
"amount",
2125+
Type::Primitive(PrimitiveType::Double),
2126+
)
2127+
.into(),
2128+
])),
2129+
false,
2130+
)
2131+
.into(),
2132+
}),
2133+
)
2134+
.into(),
2135+
])
2136+
.build()
2137+
.unwrap();
2138+
2139+
pretty_assertions::assert_eq!(schema, expected);
2140+
assert_eq!(schema.highest_field_id(), 17);
2141+
}
19352142
}

crates/iceberg/src/arrow/value.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use arrow_buffer::NullBuffer;
2727
use arrow_schema::{DataType, FieldRef};
2828
use uuid::Uuid;
2929

30-
use super::get_field_id;
30+
use super::get_field_id_from_metadata;
3131
use crate::spec::{
3232
ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType,
3333
SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner,
@@ -450,7 +450,7 @@ impl FieldMatchMode {
450450
/// Determines if an Arrow field matches an Iceberg field based on the matching mode.
451451
pub fn match_field(&self, arrow_field: &FieldRef, iceberg_field: &NestedField) -> bool {
452452
match self {
453-
FieldMatchMode::Id => get_field_id(arrow_field)
453+
FieldMatchMode::Id => get_field_id_from_metadata(arrow_field)
454454
.map(|id| id == iceberg_field.id)
455455
.unwrap_or(false),
456456
FieldMatchMode::Name => arrow_field.name() == &iceberg_field.name,

crates/iceberg/src/spec/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub use snapshot_summary::*;
4949
pub use sort::*;
5050
pub use statistic_file::*;
5151
pub use table_metadata::*;
52+
pub(crate) use table_metadata_builder::FIRST_FIELD_ID;
5253
pub use table_properties::*;
5354
pub use transform::*;
5455
pub use values::*;

crates/iceberg/src/spec/schema/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ impl SchemaBuilder {
102102
/// Reassignment starts from the field-id specified in `start_from` (inclusive).
103103
///
104104
/// All specified aliases and identifier fields will be updated to the new field-ids.
105-
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self {
106-
self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX));
105+
pub(crate) fn with_reassigned_field_ids(mut self, start_from: i32) -> Self {
106+
self.reassign_field_ids_from = Some(start_from);
107107
self
108108
}
109109

crates/iceberg/src/spec/table_metadata_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::error::{Error, ErrorKind, Result};
3131
use crate::spec::{EncryptedKey, INITIAL_ROW_ID, MIN_FORMAT_VERSION_ROW_LINEAGE};
3232
use crate::{TableCreation, TableUpdate};
3333

34-
const FIRST_FIELD_ID: u32 = 1;
34+
pub(crate) const FIRST_FIELD_ID: i32 = 1;
3535

3636
/// Manipulating table metadata.
3737
///

0 commit comments

Comments
 (0)