Skip to content

Commit 899af98

Browse files
committed
(feat): Add support for timestamp datafusion pushdown
1 parent b7ba2e8 commit 899af98

File tree

1 file changed

+53
-1
lines changed

1 file changed

+53
-1
lines changed

crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::vec;
2020
use datafusion::arrow::datatypes::DataType;
2121
use datafusion::logical_expr::{Expr, Operator};
2222
use datafusion::scalar::ScalarValue;
23+
use iceberg::arrow::UTC_TIME_ZONE;
2324
use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
2425
use iceberg::spec::Datum;
2526

@@ -201,6 +202,8 @@ fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
201202
}
202203

203204
const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
205+
const MICROS_PER_MILLIS: i64 = 1000;
206+
const MICROS_PER_SEC: i64 = MICROS_PER_MILLIS * 1000;
204207
/// Convert a scalar value to an iceberg datum.
205208
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
206209
match value {
@@ -214,6 +217,26 @@ fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
214217
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
215218
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
216219
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
220+
ScalarValue::TimestampSecond(Some(v), Some(tz)) if tz.as_ref().eq(UTC_TIME_ZONE) => {
221+
Some(Datum::timestamptz_micros(*v * MICROS_PER_SEC))
222+
}
223+
ScalarValue::TimestampSecond(Some(v), None) => {
224+
Some(Datum::timestamp_micros(*v * MICROS_PER_SEC))
225+
}
226+
ScalarValue::TimestampMillisecond(Some(v), Some(tz)) if tz.as_ref().eq(UTC_TIME_ZONE) => {
227+
Some(Datum::timestamptz_micros(*v * MICROS_PER_MILLIS))
228+
}
229+
ScalarValue::TimestampMillisecond(Some(v), None) => {
230+
Some(Datum::timestamp_micros(*v * MICROS_PER_MILLIS))
231+
}
232+
ScalarValue::TimestampMicrosecond(Some(v), Some(tz)) if tz.as_ref().eq(UTC_TIME_ZONE) => {
233+
Some(Datum::timestamptz_micros(*v))
234+
}
235+
ScalarValue::TimestampMicrosecond(Some(v), None) => Some(Datum::timestamp_micros(*v)),
236+
ScalarValue::TimestampNanosecond(Some(v), Some(tz)) if tz.as_ref().eq(UTC_TIME_ZONE) => {
237+
Some(Datum::timestamptz_nanos(*v))
238+
}
239+
ScalarValue::TimestampNanosecond(Some(v), None) => Some(Datum::timestamp_nanos(*v)),
217240
_ => None,
218241
}
219242
}
@@ -223,9 +246,11 @@ mod tests {
223246
use std::collections::HashMap;
224247

225248
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
226-
use datafusion::common::DFSchema;
249+
use datafusion::common::{DFSchema, ScalarValue};
227250
use datafusion::logical_expr::utils::split_conjunction;
251+
use datafusion::logical_expr::{col, lit};
228252
use datafusion::prelude::{Expr, SessionContext};
253+
use iceberg::arrow::UTC_TIME_ZONE;
229254
use iceberg::expr::{Predicate, Reference};
230255
use iceberg::spec::Datum;
231256
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
@@ -423,6 +448,33 @@ mod tests {
423448
assert_eq!(predicate, expected_predicate);
424449
}
425450

451+
#[test]
452+
fn test_predicate_conversion_with_timestamp() {
453+
let expr = vec![
454+
col("ts").gt(lit(ScalarValue::TimestampMicrosecond(
455+
Some(1672862400000000),
456+
None,
457+
))),
458+
col("ts").lt(lit(ScalarValue::TimestampMillisecond(
459+
Some(1672862400000),
460+
Some(UTC_TIME_ZONE.into()),
461+
))),
462+
col("ts").gt(lit(ScalarValue::TimestampSecond(Some(1672862400), None))),
463+
col("ts").lt(lit(ScalarValue::TimestampNanosecond(
464+
Some(1672862400000000000),
465+
None,
466+
))),
467+
];
468+
let predicate = convert_filters_to_predicate(&expr).unwrap();
469+
let expected_predicate = Reference::new("ts")
470+
.greater_than(Datum::timestamp_micros(1672862400000000))
471+
.and(Reference::new("ts").less_than(Datum::timestamptz_micros(1672862400000000)))
472+
.and(Reference::new("ts").greater_than(Datum::timestamp_micros(1672862400000000)))
473+
.and(Reference::new("ts").less_than(Datum::timestamp_nanos(1672862400000000000)));
474+
475+
assert_eq!(predicate, expected_predicate);
476+
}
477+
426478
#[test]
427479
fn test_predicate_conversion_with_date_cast() {
428480
let sql = "ts >= date '2023-01-05T11:00:00'";

0 commit comments

Comments
 (0)