Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 95 additions & 3 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,12 @@ impl ManifestWriter {
"format-version".to_string(),
(self.metadata.format_version as u8).to_string(),
)?;
if self.metadata.format_version == FormatVersion::V2 {
avro_writer
.add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
match self.metadata.format_version {
FormatVersion::V1 => {}
FormatVersion::V2 | FormatVersion::V3 => {
avro_writer
.add_user_metadata("content".to_string(), self.metadata.content.to_string())?;
}
}

let partition_summary = self.construct_partition_summaries(&partition_type)?;
Expand Down Expand Up @@ -708,4 +711,93 @@ mod tests {
entries[0].file_sequence_number = None;
assert_eq!(actual_manifest, Manifest::new(metadata, entries));
}

#[tokio::test]
async fn test_v3_delete_manifest_delete_file_roundtrip() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"data",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);

let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();

// Create a position delete file entry
let delete_entry = ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::PositionDeletes,
file_path: "s3://bucket/table/data/delete-00000.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 10,
file_size_in_bytes: 1024,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: None,
equality_ids: None,
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
};

// Write a V3 delete manifest
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("v3_delete_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestWriterBuilder::new(
output_file,
Some(1),
None,
schema.clone(),
partition_spec.clone(),
)
.build_v3_deletes();

writer.add_entry(delete_entry).unwrap();
let manifest_file = writer.write_manifest_file().await.unwrap();

// The returned ManifestFile correctly reports Deletes content
assert_eq!(manifest_file.content, ManifestContentType::Deletes);

// Read back the manifest file
let actual_manifest =
Manifest::parse_avro(fs::read(&path).expect("read_file must succeed").as_slice())
.unwrap();

// Verify the content type is correctly preserved as Deletes
assert_eq!(
actual_manifest.metadata().content,
ManifestContentType::Deletes,
);
}
}
Loading