diff --git a/connectorx/src/destinations/arrow/arrow_assoc.rs b/connectorx/src/destinations/arrow/arrow_assoc.rs index f9b40c370..713c95b04 100644 --- a/connectorx/src/destinations/arrow/arrow_assoc.rs +++ b/connectorx/src/destinations/arrow/arrow_assoc.rs @@ -288,9 +288,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc() - .timestamp_nanos_opt() - .unwrap_or_else(|| panic!("out of range DateTime")) + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -328,10 +326,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -342,17 +340,17 @@ impl ArrowAssoc for Option { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), true, ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -363,7 +361,7 @@ impl ArrowAssoc for NaiveDateTime { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), false, ) } diff --git a/connectorx/src/destinations/arrowstream/arrow_assoc.rs b/connectorx/src/destinations/arrowstream/arrow_assoc.rs index 08fe03cfb..f5e81aee5 100644 --- a/connectorx/src/destinations/arrowstream/arrow_assoc.rs +++ b/connectorx/src/destinations/arrowstream/arrow_assoc.rs @@ -285,9 +285,7 @@ fn naive_date_to_arrow(nd: NaiveDate) -> i32 { } fn naive_datetime_to_arrow(nd: NaiveDateTime) -> i64 { - nd.and_utc() - .timestamp_nanos_opt() - .unwrap_or_else(|| panic!("out of range DateTime")) + nd.and_utc().timestamp_micros() } impl ArrowAssoc for Option { @@ -325,10 +323,10 @@ impl ArrowAssoc for NaiveDate { } impl ArrowAssoc for Option { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: Option) -> Result<()> { @@ -339,17 +337,17 @@ impl ArrowAssoc for Option { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), true, ) } } impl ArrowAssoc for NaiveDateTime { - type Builder = TimestampNanosecondBuilder; + type Builder = TimestampMicrosecondBuilder; fn builder(nrows: usize) -> Self::Builder { - TimestampNanosecondBuilder::with_capacity(nrows) + TimestampMicrosecondBuilder::with_capacity(nrows) } fn append(builder: &mut Self::Builder, value: NaiveDateTime) -> Result<()> { @@ -360,7 +358,7 @@ impl ArrowAssoc for NaiveDateTime { fn field(header: &str) -> Field { Field::new( header, - ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), false, ) } diff --git a/connectorx/src/transports/oracle_arrowstream.rs b/connectorx/src/transports/oracle_arrowstream.rs index 1bd4a474c..5d98a847b 100644 --- a/connectorx/src/transports/oracle_arrowstream.rs +++ b/connectorx/src/transports/oracle_arrowstream.rs @@ -1,6 +1,7 @@ use crate::{ destinations::arrowstream::{ - typesystem::ArrowTypeSystem, ArrowDestination, ArrowDestinationError, + typesystem::{ArrowTypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro}, + ArrowDestination, ArrowDestinationError, }, impl_transport, sources::oracle::{OracleSource, OracleSourceError, OracleTypeSystem}, @@ -30,20 +31,33 @@ impl_transport!( systems = OracleTypeSystem => ArrowTypeSystem, route = OracleSource => ArrowDestination, mappings = { - { NumFloat[f64] => Float64[f64] | conversion auto } - { NumDecimal[Decimal] => Decimal[Decimal] | conversion auto } - { Float[f64] => Float64[f64] | conversion none } - { BinaryFloat[f64] => Float64[f64] | conversion none } - { BinaryDouble[f64] => Float64[f64] | conversion none } - { NumInt[i64] => Int64[i64] | conversion auto } - { Blob[Vec] => LargeBinary[Vec] | conversion auto } - { Clob[String] => LargeUtf8[String] | conversion none } - { VarChar[String] => LargeUtf8[String] | conversion auto } - { Char[String] => LargeUtf8[String] | conversion none } - { NVarChar[String] => LargeUtf8[String] | conversion none } - { NChar[String] => LargeUtf8[String] | conversion none } - { Date[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } - { Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion none } - { TimestampTz[DateTime] => DateTimeTz[DateTime] | conversion auto } + { NumFloat[f64] => Float64[f64] | conversion auto } + { NumDecimal[Decimal] => Decimal[Decimal] | conversion auto } + { Float[f64] => Float64[f64] | conversion none } + { BinaryFloat[f64] => Float64[f64] | conversion none } + { BinaryDouble[f64] => Float64[f64] | conversion none } + { NumInt[i64] => Int64[i64] | conversion auto } + { Blob[Vec] => LargeBinary[Vec] | conversion auto } + { Clob[String] => LargeUtf8[String] | conversion none } + { VarChar[String] => LargeUtf8[String] | conversion auto } + { Char[String] => LargeUtf8[String] | conversion none } + { NVarChar[String] => LargeUtf8[String] | conversion none } + { NChar[String] => LargeUtf8[String] | conversion none } + { Date[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option } + { Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion none } + { TimestampNano[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto } + { TimestampTz[DateTime] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option } + { TimestampTzNano[DateTime] => DateTimeTz[DateTime] | conversion auto } } ); +impl TypeConversion for OracleArrowTransport { + fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro { + NaiveDateTimeWrapperMicro(val) + } +} + +impl TypeConversion, DateTimeWrapperMicro> for OracleArrowTransport { + fn convert(val: DateTime) -> DateTimeWrapperMicro { + DateTimeWrapperMicro(val) + } +}