Skip to content

Commit 6cb54ef

Browse files
committed
fix(iceberg-datafusion): handle timestamp predicates from DF
DataFusion sometimes passes dates as string literals, but can also pass timestamp ScalarValues, which need to be converted to predicates correctly in order to enable partition pruning. This also adds support for converting date values, which helps with predicate expressions such as `date > DATE_TRUNC('day', ts)`.
1 parent 99ca196 commit 6cb54ef

File tree

5 files changed

+455
-171
lines changed

5 files changed

+455
-171
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/spec/values/datum.rs

Lines changed: 107 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,17 +1118,11 @@ impl Datum {
11181118
match target_type {
11191119
Type::Primitive(target_primitive_type) => {
11201120
match (&self.literal, &self.r#type, target_primitive_type) {
1121-
(PrimitiveLiteral::Int(val), _, PrimitiveType::Int) => Ok(Datum::int(*val)),
1122-
(PrimitiveLiteral::Int(val), _, PrimitiveType::Date) => Ok(Datum::date(*val)),
1123-
(PrimitiveLiteral::Int(val), _, PrimitiveType::Long) => Ok(Datum::long(*val)),
1124-
(PrimitiveLiteral::Long(val), _, PrimitiveType::Int) => {
1125-
Ok(Datum::i64_to_i32(*val))
1121+
(PrimitiveLiteral::Int(val), source_type, target_type) => {
1122+
convert_int(*val, source_type, target_type)
11261123
}
1127-
(PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamp) => {
1128-
Ok(Datum::timestamp_micros(*val))
1129-
}
1130-
(PrimitiveLiteral::Long(val), _, PrimitiveType::Timestamptz) => {
1131-
Ok(Datum::timestamptz_micros(*val))
1124+
(PrimitiveLiteral::Long(val), source_type, target_type) => {
1125+
convert_long(*val, source_type, target_type)
11321126
}
11331127
// Let's wait with nano's until this clears up: https://github.com/apache/iceberg/pull/11775
11341128
(PrimitiveLiteral::Int128(val), _, PrimitiveType::Long) => {
@@ -1203,3 +1197,106 @@ impl Datum {
12031197
}
12041198
}
12051199
}
1200+
1201+
const MICROS_PER_DAY: i64 = 24 * 60 * 60 * 1_000_000;
1202+
const NANOS_PER_MICRO: i64 = 1000;
1203+
const NANOS_PER_DAY: i64 = NANOS_PER_MICRO * MICROS_PER_DAY;
1204+
1205+
/// Converts an int literal between two [PrimitiveType]s.
1206+
fn convert_int(
1207+
val: i32,
1208+
source_type: &PrimitiveType,
1209+
target_type: &PrimitiveType,
1210+
) -> Result<Datum> {
1211+
let datum = match (source_type, target_type) {
1212+
(_, PrimitiveType::Int) => Datum::int(val),
1213+
(_, PrimitiveType::Date) => Datum::date(val),
1214+
(_, PrimitiveType::Long) => Datum::long(val),
1215+
(PrimitiveType::Date, PrimitiveType::Timestamp) => Datum::timestamp_micros(
1216+
(val as i64)
1217+
.checked_mul(MICROS_PER_DAY)
1218+
.ok_or_else(overflow)?,
1219+
),
1220+
(PrimitiveType::Date, PrimitiveType::Timestamptz) => Datum::timestamptz_micros(
1221+
(val as i64)
1222+
.checked_mul(MICROS_PER_DAY)
1223+
.ok_or_else(overflow)?,
1224+
),
1225+
(PrimitiveType::Date, PrimitiveType::TimestampNs) => Datum::timestamp_nanos(
1226+
(val as i64)
1227+
.checked_mul(NANOS_PER_DAY)
1228+
.ok_or_else(overflow)?,
1229+
),
1230+
(PrimitiveType::Date, PrimitiveType::TimestamptzNs) => Datum::timestamptz_nanos(
1231+
(val as i64)
1232+
.checked_mul(NANOS_PER_DAY)
1233+
.ok_or_else(overflow)?,
1234+
),
1235+
_ => {
1236+
return Err(Error::new(
1237+
ErrorKind::DataInvalid,
1238+
format!("Can't convert datum from {source_type} type to {target_type} type.",),
1239+
));
1240+
}
1241+
};
1242+
1243+
Ok(datum)
1244+
}
1245+
1246+
/// Converts a long literal between two [PrimitiveType]s.
1247+
fn convert_long(
1248+
val: i64,
1249+
source_type: &PrimitiveType,
1250+
target_type: &PrimitiveType,
1251+
) -> Result<Datum> {
1252+
let datum = match (source_type, target_type) {
1253+
(_, PrimitiveType::Int) => Datum::i64_to_i32(val),
1254+
(_, PrimitiveType::Long) => Datum::long(val),
1255+
(
1256+
PrimitiveType::Long | PrimitiveType::Timestamp | PrimitiveType::Timestamptz,
1257+
PrimitiveType::Timestamp,
1258+
) => Datum::timestamp_micros(val),
1259+
(
1260+
PrimitiveType::Long | PrimitiveType::Timestamp | PrimitiveType::Timestamptz,
1261+
PrimitiveType::Timestamptz,
1262+
) => Datum::timestamptz_micros(val),
1263+
(
1264+
PrimitiveType::Long | PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs,
1265+
PrimitiveType::TimestampNs,
1266+
) => Datum::timestamp_nanos(val),
1267+
(
1268+
PrimitiveType::Long | PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs,
1269+
PrimitiveType::TimestamptzNs,
1270+
) => Datum::timestamptz_nanos(val),
1271+
(PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs, PrimitiveType::Timestamp) => {
1272+
Datum::timestamp_micros(val / NANOS_PER_MICRO)
1273+
}
1274+
(PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs, PrimitiveType::Timestamptz) => {
1275+
Datum::timestamptz_micros(val / NANOS_PER_MICRO)
1276+
}
1277+
(PrimitiveType::Timestamp | PrimitiveType::Timestamptz, PrimitiveType::TimestampNs) => {
1278+
Datum::timestamp_nanos(val.checked_mul(NANOS_PER_MICRO).ok_or_else(overflow)?)
1279+
}
1280+
(PrimitiveType::Timestamp | PrimitiveType::Timestamptz, PrimitiveType::TimestamptzNs) => {
1281+
Datum::timestamptz_nanos(val.checked_mul(NANOS_PER_MICRO).ok_or_else(overflow)?)
1282+
}
1283+
(PrimitiveType::Timestamp | PrimitiveType::Timestamptz, PrimitiveType::Date) => {
1284+
Datum::date((val / MICROS_PER_DAY) as i32)
1285+
}
1286+
(PrimitiveType::TimestampNs | PrimitiveType::TimestamptzNs, PrimitiveType::Date) => {
1287+
Datum::date((val / (NANOS_PER_DAY)) as i32)
1288+
}
1289+
_ => {
1290+
return Err(Error::new(
1291+
ErrorKind::DataInvalid,
1292+
format!("Can't convert datum from {source_type} type to {target_type} type."),
1293+
));
1294+
}
1295+
};
1296+
1297+
Ok(datum)
1298+
}
1299+
1300+
fn overflow() -> Error {
1301+
Error::new(ErrorKind::DataInvalid, "integer overflow")
1302+
}

0 commit comments

Comments
 (0)