-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Allow extensions to arrow-json decoder and include an extension for variant #9021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -149,6 +149,7 @@ use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, | |
| use arrow_data::ArrayData; | ||
| use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; | ||
| pub use schema::*; | ||
| pub use tape::*; | ||
|
|
||
| use crate::reader::boolean_array::BooleanArrayDecoder; | ||
| use crate::reader::decimal_array::DecimalArrayDecoder; | ||
|
|
@@ -159,7 +160,6 @@ use crate::reader::primitive_array::PrimitiveArrayDecoder; | |
| use crate::reader::string_array::StringArrayDecoder; | ||
| use crate::reader::string_view_array::StringViewArrayDecoder; | ||
| use crate::reader::struct_array::StructArrayDecoder; | ||
| use crate::reader::tape::{Tape, TapeDecoder}; | ||
| use crate::reader::timestamp_array::TimestampArrayDecoder; | ||
|
|
||
| mod binary_array; | ||
|
|
@@ -184,6 +184,7 @@ pub struct ReaderBuilder { | |
| strict_mode: bool, | ||
| is_field: bool, | ||
| struct_mode: StructMode, | ||
| decoder_factory: Option<Arc<dyn DecoderFactory>>, | ||
|
|
||
| schema: SchemaRef, | ||
| } | ||
|
|
@@ -205,6 +206,7 @@ impl ReaderBuilder { | |
| is_field: false, | ||
| struct_mode: Default::default(), | ||
| schema, | ||
| decoder_factory: None, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -246,6 +248,7 @@ impl ReaderBuilder { | |
| is_field: true, | ||
| struct_mode: Default::default(), | ||
| schema: Arc::new(Schema::new([field.into()])), | ||
| decoder_factory: None, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -285,6 +288,14 @@ impl ReaderBuilder { | |
| } | ||
| } | ||
|
|
||
| /// Set an optional hook for customizing decoding behavior. | ||
| pub fn with_decoder_factory(self, decoder_factory: Arc<dyn DecoderFactory>) -> Self { | ||
| Self { | ||
| decoder_factory: Some(decoder_factory), | ||
| ..self | ||
| } | ||
| } | ||
|
|
||
| /// Create a [`Reader`] with the provided [`BufRead`] | ||
| pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> { | ||
| Ok(Reader { | ||
|
|
@@ -304,11 +315,13 @@ impl ReaderBuilder { | |
| }; | ||
|
|
||
| let decoder = make_decoder( | ||
| None, | ||
| data_type, | ||
| self.coerce_primitive, | ||
| self.strict_mode, | ||
| nullable, | ||
| self.struct_mode, | ||
| self.decoder_factory, | ||
| )?; | ||
|
|
||
| let num_fields = self.schema.flattened_fields().len(); | ||
|
|
@@ -373,6 +386,95 @@ impl<R: BufRead> RecordBatchReader for Reader<R> { | |
| } | ||
| } | ||
|
|
||
| /// A trait to create custom decoders for specific data types. | ||
| /// | ||
| /// This allows overriding the default decoders for specific data types, | ||
| /// or adding new decoders for custom data types. | ||
| /// | ||
| /// # Examples | ||
| /// | ||
| /// ``` | ||
| /// use arrow_json::{ArrayDecoder, DecoderFactory, TapeElement, Tape, ReaderBuilder, StructMode}; | ||
| /// use arrow_schema::ArrowError; | ||
| /// use arrow_schema::{DataType, Field, Fields, Schema}; | ||
| /// use arrow_array::cast::AsArray; | ||
| /// use arrow_array::Array; | ||
| /// use arrow_array::builder::StringBuilder; | ||
| /// use arrow_data::ArrayData; | ||
| /// use std::sync::Arc; | ||
| /// | ||
| /// struct IncorrectStringAsNullDecoder {} | ||
| /// | ||
| /// impl ArrayDecoder for IncorrectStringAsNullDecoder { | ||
| /// fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> { | ||
| /// let mut builder = StringBuilder::new(); | ||
| /// for p in pos { | ||
| /// match tape.get(*p) { | ||
| /// TapeElement::String(idx) => { | ||
| /// builder.append_value(tape.get_string(idx)); | ||
| /// } | ||
| /// _ => builder.append_null(), | ||
| /// } | ||
| /// } | ||
| /// Ok(builder.finish().into_data()) | ||
| /// } | ||
| /// } | ||
| /// | ||
| /// #[derive(Debug)] | ||
| /// struct IncorrectStringAsNullDecoderFactory; | ||
| /// | ||
| /// impl DecoderFactory for IncorrectStringAsNullDecoderFactory { | ||
| /// fn make_custom_decoder<'a>( | ||
| /// &self, | ||
| /// _field: Option<FieldRef>, | ||
| /// data_type: DataType, | ||
| /// _coerce_primitive: bool, | ||
| /// _strict_mode: bool, | ||
| /// _is_nullable: bool, | ||
| /// _struct_mode: StructMode, | ||
| /// ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> { | ||
| /// match data_type { | ||
| /// DataType::Utf8 => Ok(Some(Box::new(IncorrectStringAsNullDecoder {}))), | ||
| /// _ => Ok(None), | ||
| /// } | ||
| /// } | ||
| /// } | ||
| /// | ||
| /// let json = r#" | ||
| /// {"a": "a"} | ||
| /// {"a": 12} | ||
| /// "#; | ||
| /// let batch = ReaderBuilder::new(Arc::new(Schema::new(Fields::from(vec![Field::new( | ||
| /// "a", | ||
| /// DataType::Utf8, | ||
| /// true, | ||
| /// )])))) | ||
| /// .with_decoder_factory(Arc::new(IncorrectStringAsNullDecoderFactory)) | ||
| /// .build(json.as_bytes()) | ||
| /// .unwrap() | ||
| /// .next() | ||
| /// .unwrap() | ||
| /// .unwrap(); | ||
| /// | ||
| /// let values = batch.column(0).as_string::<i32>(); | ||
| /// assert_eq!(values.len(), 2); | ||
| /// assert_eq!(values.value(0), "a"); | ||
| /// assert!(values.is_null(1)); | ||
| /// ``` | ||
| pub trait DecoderFactory: std::fmt::Debug + Send + Sync { | ||
| /// Make a decoder that overrides the default decoder for a specific data type. | ||
| /// This can be used to override how e.g. error in decoding are handled. | ||
| fn make_custom_decoder( | ||
| &self, | ||
| _field: Option<FieldRef>, | ||
| _data_type: DataType, | ||
| _coerce_primitive: bool, | ||
| _strict_mode: bool, | ||
| _is_nullable: bool, | ||
| _struct_mode: StructMode, | ||
| ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError>; | ||
| } | ||
|
|
||
| /// A low-level interface for reading JSON data from a byte stream | ||
| /// | ||
| /// See [`Reader`] for a higher-level interface for interface with [`BufRead`] | ||
|
|
@@ -674,7 +776,8 @@ impl Decoder { | |
| } | ||
| } | ||
|
|
||
| trait ArrayDecoder: Send { | ||
| /// A trait to decode JSON values into arrow arrays | ||
| pub trait ArrayDecoder: Send { | ||
| /// Decode elements from `tape` starting at the indexes contained in `pos` | ||
| fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>; | ||
| } | ||
|
|
@@ -686,12 +789,27 @@ macro_rules! primitive_decoder { | |
| } | ||
|
|
||
| fn make_decoder( | ||
| field: Option<FieldRef>, | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @scovich you commented here on the other PR about it being awkward to take Another option here could be to create a dummy Field from the Schema. I'd have to look at whether
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They're not always inherited. Some callers are passing the union of all ancestors' nullability, i.e. when extracting a nested field from a struct. That's why I had suggested perhaps just passing the field metadata here -- because I don't think the nullable and data type members of a passed-in Field would be used, potentially leading to confusion. |
||
| data_type: DataType, | ||
| coerce_primitive: bool, | ||
| strict_mode: bool, | ||
| is_nullable: bool, | ||
| struct_mode: StructMode, | ||
| decoder_factory: Option<Arc<dyn DecoderFactory>>, | ||
| ) -> Result<Box<dyn ArrayDecoder>, ArrowError> { | ||
| if let Some(ref factory) = decoder_factory { | ||
| if let Some(decoder) = factory.make_custom_decoder( | ||
| field.clone(), | ||
| data_type.clone(), | ||
|
Comment on lines
+802
to
+803
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tiny nit: We don't know yet that the factory will actually produce a decoder. Should the trait take |
||
| coerce_primitive, | ||
| strict_mode, | ||
| is_nullable, | ||
| struct_mode, | ||
| )? { | ||
| return Ok(decoder); | ||
| } | ||
| } | ||
|
|
||
| downcast_integer! { | ||
| data_type => (primitive_decoder, data_type), | ||
| DataType::Null => Ok(Box::<NullArrayDecoder>::default()), | ||
|
|
@@ -744,14 +862,14 @@ fn make_decoder( | |
| DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))), | ||
| DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))), | ||
| DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))), | ||
| DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), | ||
| DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), | ||
| DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), | ||
| DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), | ||
| DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), | ||
| DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), | ||
| DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())), | ||
| DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())), | ||
| DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))), | ||
| DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())), | ||
| DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), | ||
| DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)), | ||
| d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) | ||
| } | ||
| } | ||
|
|
@@ -2815,4 +2933,69 @@ mod tests { | |
| "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned() | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_decoder_factory() { | ||
| use arrow_array::builder; | ||
|
|
||
| struct AlwaysNullStringArrayDecoder; | ||
|
|
||
| impl ArrayDecoder for AlwaysNullStringArrayDecoder { | ||
| fn decode(&mut self, _tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> { | ||
| let mut builder = builder::StringBuilder::new(); | ||
| for _ in pos { | ||
| builder.append_null(); | ||
| } | ||
| Ok(builder.finish().into_data()) | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct AlwaysNullStringArrayDecoderFactory; | ||
|
|
||
| impl DecoderFactory for AlwaysNullStringArrayDecoderFactory { | ||
| fn make_custom_decoder<'a>( | ||
| &self, | ||
| _field: Option<FieldRef>, | ||
| data_type: DataType, | ||
| _coerce_primitive: bool, | ||
| _strict_mode: bool, | ||
| _is_nullable: bool, | ||
| _struct_mode: StructMode, | ||
| ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> { | ||
| match data_type { | ||
| DataType::Utf8 => Ok(Some(Box::new(AlwaysNullStringArrayDecoder {}))), | ||
| _ => Ok(None), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let buf = r#" | ||
| {"a": "1", "b": 2} | ||
| {"a": "hello", "b": 23} | ||
| "#; | ||
| let schema = Arc::new(Schema::new(vec![ | ||
| Field::new("a", DataType::Utf8, true), | ||
| Field::new("b", DataType::Int32, true), | ||
| ])); | ||
|
|
||
| let batches = ReaderBuilder::new(schema.clone()) | ||
| .with_batch_size(2) | ||
| .with_decoder_factory(Arc::new(AlwaysNullStringArrayDecoderFactory)) | ||
| .build(Cursor::new(buf.as_bytes())) | ||
| .unwrap() | ||
| .collect::<Result<Vec<_>, _>>() | ||
| .unwrap(); | ||
|
|
||
| assert_eq!(batches.len(), 1); | ||
|
|
||
| let col1 = batches[0].column(0).as_string::<i32>(); | ||
| assert_eq!(col1.null_count(), 2); | ||
| assert!(col1.is_null(0)); | ||
| assert!(col1.is_null(1)); | ||
|
|
||
| let col2 = batches[0].column(1).as_primitive::<Int32Type>(); | ||
| assert_eq!(col2.value(0), 2); | ||
| assert_eq!(col2.value(1), 23); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am understanding correctly here, one could almost use this framework to set up a fully customizable decoding. But not quite.
The decoder factory could intercept the top-level struct of the input schema and assign a custom decoder to every field based on path and/or type. That could be really slick for custom error handling, for example turning all incompatible inputs to NULL instead of producing errors (potentially only for a subset of known-funky fields while leaving "reliable" fields fully strongly and strictly typed). Problem is, if the decoder factory intercepts a complex type, it has to provide a custom decoder for every field of that struct (and wrap them all in a custom complex type decoder). Ouch.
I don't know that we would want to make all the internal decoders public -- they are currently
pubmembers of private modules -- but is there perhaps a way to give decoder factories access to some functionality likemake_decoder, even if only implicitly? A way to say "do whatever you normally would have done here"?For primitives, it might be as simple as defining a
DefaultDecoderFactorythat forwards blindly tomake_decoder.But struct/map/list would need something more, because the decoder factory for a complex type would need to assemble the custom sub-decoders of that type's children (if not defaulted). Maybe the hypothetical
DefaultDecoderFactorywould have associatedmake_xxx_decoderfunctions which are just validating wrappers that create the appropriate decoder directly from user-provided child decoders? e.g.make_list_decoderwould take a singleBox<dyn ArrayDecoder>,make_map_decoderwould take two decoders, andmake_struct_decoderwould take a vec of field decoders. Alternatively, we could make the complex type decoderspuband define actualtry_new. But that would increase the number of types in an asymmetric way (where are the primitive decoders).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im a little worried about this Pr growing in scope. (My main goal was to make it so that variant could be decoded and I've already had to pull in the extension code too). Nevertheless I think it's worth considering more usefulness.
Im hearing two related uses.
First, for changing the behavior of some fields in complex types: I think another way of handling this is to augment the schema in some way with a special decoder kind. This could allow for the default behavior to be used by the decoder (including on structs/lists) but specialized decoders substituted in directly on the (sub)fields as indicated by the schema. Im not sure if this would fit into the existing schema type (eg special metadata) or if a new kind of
DecoderSchemawould be created to hold this information.Second for catching errors on primitive types and substituting something else (e.g. NULL) my suggestion is to make that be an optional setting on the built in primitive decoder. Possibly when combined with the solution above to make it so that certain fields could be marked with this behavior in the schema.