From ffba3d2e52a4ed19530ea2f30637473bec2ad6e0 Mon Sep 17 00:00:00 2001 From: dbascoules <155983332+dbascoules@users.noreply.github.com> Date: Tue, 2 Dec 2025 12:23:58 +0100 Subject: [PATCH 1/3] Support Oracle Date/Timestamp arrowstream parity with arrow. --- .../src/transports/oracle_arrowstream.rs | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/connectorx/src/transports/oracle_arrowstream.rs b/connectorx/src/transports/oracle_arrowstream.rs index 1bd4a474c..9a7ba1738 100644 --- a/connectorx/src/transports/oracle_arrowstream.rs +++ b/connectorx/src/transports/oracle_arrowstream.rs @@ -1,6 +1,6 @@ use crate::{ destinations::arrowstream::{ - typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError, + typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro}, ArrowDestination, ArrowDestinationError, }, impl_transport, sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem}, @@ -30,20 +30,33 @@ impl_transport!( systems = OracleTypeSystem => ArrowTypeSystem, route = OracleSource => ArrowDestination, mappings = { - { NumFloat[f64] => Float64[f64] | conversion auto } - { NumDecimal[Decimal] => Decimal[Decimal] | conversion auto } - { Float[f64] => Float64[f64] | conversion none } - { BinaryFloat[f64] => Float64[f64] | conversion none } - { BinaryDouble[f64] => Float64[f64] | conversion none } - { NumInt[i64] => Int64[i64] | conversion auto } - { Blob[Vec] => LargeBinary[Vec] | conversion auto } - { Clob[String] => LargeUtf8[String] | conversion none } - { VarChar[String] => LargeUtf8[String] | conversion auto } - { Char[String] => LargeUtf8[String] | conversion none } - { NVarChar[String] => LargeUtf8[String] | conversion none } - { NChar[String] => LargeUtf8[String] | conversion none } - { Date[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } - { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion none } - { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } + { NumFloat[f64] => Float64[f64] | conversion auto } + { NumDecimal[Decimal] => Decimal[Decimal] | conversion auto } + { Float[f64] => Float64[f64] | conversion none } + { BinaryFloat[f64] => Float64[f64] | conversion none } + { BinaryDouble[f64] => Float64[f64] | conversion none } + { NumInt[i64] => Int64[i64] | conversion auto } + { Blob[Vec] => LargeBinary[Vec] | conversion auto } + { Clob[String] => LargeUtf8[String] | conversion none } + { VarChar[String] => LargeUtf8[String] | conversion auto } + { Char[String] => LargeUtf8[String] | conversion none } + { NVarChar[String] => LargeUtf8[String] | conversion none } + { NChar[String] => LargeUtf8[String] | conversion none } + { Date[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option } + { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none } + { TimestampNano[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } + { TimestampTz[DateTime] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option } + { TimestampTzNano[DateTime] => DateTimeTz[DateTime] | conversion auto } } ); +impl TypeConversion for OracleArrowTransport { + fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro { + NaiveDateTimeWrapperMicro(val) + } +} + +impl TypeConversion, DateTimeWrapperMicro> for OracleArrowTransport { + fn convert(val: DateTime) -> DateTimeWrapperMicro { + DateTimeWrapperMicro(val) + } +} From b3a47b8d2c3eed52045a060681abf64d9efea42c Mon Sep 17 00:00:00 2001 From: dbascoules <155983332+dbascoules@users.noreply.github.com> Date: Fri, 5 Dec 2025 11:40:15 +0100 Subject: [PATCH 2/3] Better timestamp support with timestamp[us] instead of timestamp[ns] (~290 000 years vs. 585 years respectively) timestamp_nanos_opt() support only dates between 1677 and 2262 --- connectorx/src/destinations/arrow/arrow_assoc.rs | 16 +++++++--------- .../src/destinations/arrowstream/arrow_assoc.rs | 16 +++++++--------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs index f9b40c370..713c95b04 100644 --- a/connectorx/src/destinations/arrow/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow/arrow_assoc.rs @@ -288,9 +288,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc() - .timestamp_nanos_opt() - .unwrap_or_else(|| panic!("out of range DateTime")) + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -328,10 +326,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -342,17 +340,17 @@ impl ArrowAssoc for Option { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), true, ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -363,7 +361,7 @@ impl ArrowAssoc for NaiveDateTime { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), false, ) } diff --git a/connectorx/src/destinations/arrowstream/arrow_assoc.rs b/connectorx/src/destinations/arrowstream/arrow_assoc.rs index 08fe03cfb..f5e81aee5 100644 --- a/connectorx/src/destinations/arrowstream/arrow_assoc.rs +++ b/connectorx/src/destinations/arrowstream/arrow_assoc.rs @@ -285,9 +285,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc() - .timestamp_nanos_opt() - .unwrap_or_else(|| panic!("out of range DateTime")) + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -325,10 +323,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -339,17 +337,17 @@ impl ArrowAssoc for Option { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), true, ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -360,7 +358,7 @@ impl ArrowAssoc for NaiveDateTime { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), false, ) } From 3a50001420bb4367c82fb1e35ceb2d65a399cf01 Mon Sep 17 00:00:00 2001 From: dbascoules <155983332+dbascoules@users.noreply.github.com> Date: Mon, 15 Dec 2025 09:49:25 +0100 Subject: [PATCH 3/3] =?UTF-8?q?`cargo=20fmt`=20=F0=9F=AB=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- connectorx/src/transports/oracle_arrowstream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connectorx/src/transports/oracle_arrowstream.rs b/connectorx/src/transports/oracle_arrowstream.rs index 9a7ba1738..5d98a847b 100644 --- a/connectorx/src/transports/oracle_arrowstream.rs +++ b/connectorx/src/transports/oracle_arrowstream.rs @@ -1,6 +1,7 @@ use crate::{ destinations::arrowstream::{ - typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro}, ArrowDestination, ArrowDestinationError, + typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro}, + ArrowDestination, ArrowDestinationError, }, impl_transport, sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem},