Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-json/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
pub mod reader;
pub mod writer;

pub use self::reader::{Reader, ReaderBuilder};
pub use self::reader::{ArrayDecoder, DecoderFactory, Reader, ReaderBuilder, Tape, TapeElement};
pub use self::writer::{
ArrayWriter, Encoder, EncoderFactory, EncoderOptions, LineDelimitedWriter, Writer,
WriterBuilder,
Expand Down
6 changes: 6 additions & 0 deletions arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use arrow_buffer::buffer::NullBuffer;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};
use std::marker::PhantomData;
use std::sync::Arc;

use super::DecoderFactory;

pub struct ListArrayDecoder<O> {
data_type: DataType,
Expand All @@ -39,18 +42,21 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
strict_mode: bool,
is_nullable: bool,
struct_mode: StructMode,
decoder_factory: Option<Arc<dyn DecoderFactory>>,
) -> Result<Self, ArrowError> {
let field = match &data_type {
DataType::List(f) if !O::IS_LARGE => f,
DataType::LargeList(f) if O::IS_LARGE => f,
_ => unreachable!(),
};
let decoder = make_decoder(
Some(field.clone()),
field.data_type().clone(),
coerce_primitive,
strict_mode,
field.is_nullable(),
struct_mode,
decoder_factory,
)?;

Ok(Self {
Expand Down
9 changes: 9 additions & 0 deletions arrow-json/src/reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::StructMode;
use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{ArrayDecoder, make_decoder};
Expand All @@ -24,6 +26,8 @@ use arrow_buffer::buffer::NullBuffer;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};

use super::DecoderFactory;

pub struct MapArrayDecoder {
data_type: DataType,
keys: Box<dyn ArrayDecoder>,
Expand All @@ -38,6 +42,7 @@ impl MapArrayDecoder {
strict_mode: bool,
is_nullable: bool,
struct_mode: StructMode,
decoder_factory: Option<Arc<dyn DecoderFactory>>,
) -> Result<Self, ArrowError> {
let fields = match &data_type {
DataType::Map(_, true) => {
Expand All @@ -57,18 +62,22 @@ impl MapArrayDecoder {
};

let keys = make_decoder(
Some(fields[0].clone()),
fields[0].data_type().clone(),
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
struct_mode,
decoder_factory.clone(),
)?;
let values = make_decoder(
Some(fields[1].clone()),
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
fields[1].is_nullable(),
struct_mode,
decoder_factory,
)?;

Ok(Self {
Expand Down
195 changes: 189 additions & 6 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer,
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
pub use schema::*;
pub use tape::*;

use crate::reader::boolean_array::BooleanArrayDecoder;
use crate::reader::decimal_array::DecimalArrayDecoder;
Expand All @@ -159,7 +160,6 @@ use crate::reader::primitive_array::PrimitiveArrayDecoder;
use crate::reader::string_array::StringArrayDecoder;
use crate::reader::string_view_array::StringViewArrayDecoder;
use crate::reader::struct_array::StructArrayDecoder;
use crate::reader::tape::{Tape, TapeDecoder};
use crate::reader::timestamp_array::TimestampArrayDecoder;

mod binary_array;
Expand All @@ -184,6 +184,7 @@ pub struct ReaderBuilder {
strict_mode: bool,
is_field: bool,
struct_mode: StructMode,
decoder_factory: Option<Arc<dyn DecoderFactory>>,

schema: SchemaRef,
}
Expand All @@ -205,6 +206,7 @@ impl ReaderBuilder {
is_field: false,
struct_mode: Default::default(),
schema,
decoder_factory: None,
}
}

Expand Down Expand Up @@ -246,6 +248,7 @@ impl ReaderBuilder {
is_field: true,
struct_mode: Default::default(),
schema: Arc::new(Schema::new([field.into()])),
decoder_factory: None,
}
}

Expand Down Expand Up @@ -285,6 +288,14 @@ impl ReaderBuilder {
}
}

/// Set an optional hook for customizing decoding behavior.
pub fn with_decoder_factory(self, decoder_factory: Arc<dyn DecoderFactory>) -> Self {
Self {
decoder_factory: Some(decoder_factory),
..self
}
}

/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
Ok(Reader {
Expand All @@ -304,11 +315,13 @@ impl ReaderBuilder {
};

let decoder = make_decoder(
None,
data_type,
self.coerce_primitive,
self.strict_mode,
nullable,
self.struct_mode,
self.decoder_factory,
)?;

let num_fields = self.schema.flattened_fields().len();
Expand Down Expand Up @@ -373,6 +386,95 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
}
}

/// A trait to create custom decoders for specific data types.
///
/// This allows overriding the default decoders for specific data types,
/// or adding new decoders for custom data types.
///
/// # Examples
///
/// ```
/// use arrow_json::{ArrayDecoder, DecoderFactory, TapeElement, Tape, ReaderBuilder, StructMode};
/// use arrow_schema::ArrowError;
/// use arrow_schema::{DataType, Field, Fields, Schema};
/// use arrow_array::cast::AsArray;
/// use arrow_array::Array;
/// use arrow_array::builder::StringBuilder;
/// use arrow_data::ArrayData;
/// use std::sync::Arc;
///
/// struct IncorrectStringAsNullDecoder {}
///
/// impl ArrayDecoder for IncorrectStringAsNullDecoder {
/// fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
/// let mut builder = StringBuilder::new();
/// for p in pos {
/// match tape.get(*p) {
/// TapeElement::String(idx) => {
/// builder.append_value(tape.get_string(idx));
/// }
/// _ => builder.append_null(),
/// }
/// }
/// Ok(builder.finish().into_data())
/// }
/// }
///
/// #[derive(Debug)]
/// struct IncorrectStringAsNullDecoderFactory;
///
/// impl DecoderFactory for IncorrectStringAsNullDecoderFactory {
/// fn make_custom_decoder<'a>(
/// &self,
/// _field: Option<FieldRef>,
/// data_type: DataType,
/// _coerce_primitive: bool,
/// _strict_mode: bool,
/// _is_nullable: bool,
/// _struct_mode: StructMode,
/// ) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> {
/// match data_type {
/// DataType::Utf8 => Ok(Some(Box::new(IncorrectStringAsNullDecoder {}))),
/// _ => Ok(None),
/// }
/// }
/// }
///
/// let json = r#"
/// {"a": "a"}
/// {"a": 12}
/// "#;
/// let batch = ReaderBuilder::new(Arc::new(Schema::new(Fields::from(vec![Field::new(
/// "a",
/// DataType::Utf8,
/// true,
/// )]))))
/// .with_decoder_factory(Arc::new(IncorrectStringAsNullDecoderFactory))
/// .build(json.as_bytes())
/// .unwrap()
/// .next()
/// .unwrap()
/// .unwrap();
///
/// let values = batch.column(0).as_string::<i32>();
/// assert_eq!(values.len(), 2);
/// assert_eq!(values.value(0), "a");
/// assert!(values.is_null(1));
/// ```
pub trait DecoderFactory: std::fmt::Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am understanding correctly here, one could almost use this framework to set up a fully customizable decoding. But not quite.

The decoder factory could intercept the top-level struct of the input schema and assign a custom decoder to every field based on path and/or type. That could be really slick for custom error handling, for example turning all incompatible inputs to NULL instead of producing errors (potentially only for a subset of known-funky fields while leaving "reliable" fields fully strongly and strictly typed). Problem is, if the decoder factory intercepts a complex type, it has to provide a custom decoder for every field of that struct (and wrap them all in a custom complex type decoder). Ouch.

I don't know that we would want to make all the internal decoders public -- they are currently pub members of private modules -- but is there perhaps a way to give decoder factories access to some functionality like make_decoder, even if only implicitly? A way to say "do whatever you normally would have done here"?

For primitives, it might be as simple as defining a DefaultDecoderFactory that forwards blindly to make_decoder.

But struct/map/list would need something more, because the decoder factory for a complex type would need to assemble the custom sub-decoders of that type's children (if not defaulted). Maybe the hypothetical DefaultDecoderFactory would have associated make_xxx_decoder functions which are just validating wrappers that create the appropriate decoder directly from user-provided child decoders? e.g. make_list_decoder would take a single Box<dyn ArrayDecoder>, make_map_decoder would take two decoders, and make_struct_decoder would take a vec of field decoders. Alternatively, we could make the complex type decoders pub and define actual try_new. But that would increase the number of types in an asymmetric way (where are the primitive decoders).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im a little worried about this Pr growing in scope. (My main goal was to make it so that variant could be decoded and I've already had to pull in the extension code too). Nevertheless I think it's worth considering more usefulness.

Im hearing two related uses.

First, for changing the behavior of some fields in complex types: I think another way of handling this is to augment the schema in some way with a special decoder kind. This could allow for the default behavior to be used by the decoder (including on structs/lists) but specialized decoders substituted in directly on the (sub)fields as indicated by the schema. Im not sure if this would fit into the existing schema type (eg special metadata) or if a new kind of DecoderSchema would be created to hold this information.

Second for catching errors on primitive types and substituting something else (e.g. NULL) my suggestion is to make that be an optional setting on the built in primitive decoder. Possibly when combined with the solution above to make it so that certain fields could be marked with this behavior in the schema.

/// Make a decoder that overrides the default decoder for a specific data type.
/// This can be used to override how e.g. error in decoding are handled.
fn make_custom_decoder(
&self,
_field: Option<FieldRef>,
_data_type: DataType,
_coerce_primitive: bool,
_strict_mode: bool,
_is_nullable: bool,
_struct_mode: StructMode,
) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError>;
}

/// A low-level interface for reading JSON data from a byte stream
///
/// See [`Reader`] for a higher-level interface for interface with [`BufRead`]
Expand Down Expand Up @@ -674,7 +776,8 @@ impl Decoder {
}
}

trait ArrayDecoder: Send {
/// A trait to decode JSON values into arrow arrays
pub trait ArrayDecoder: Send {
/// Decode elements from `tape` starting at the indexes contained in `pos`
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
}
Expand All @@ -686,12 +789,27 @@ macro_rules! primitive_decoder {
}

fn make_decoder(
field: Option<FieldRef>,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich you commented here on the other PR about it being awkward to take Field and still require data_type and is_nullable. I agree but the issue I ran into is that when ReaderBuilder::build_decoder calls make_decoder on the root of the schema it creates a Struct based on the Schema to pass in.

Another option here could be to create a dummy Field from the Schema. I'd have to look at whether data_type and is_nullable are always inherited from the field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're not always inherited. Some callers are passing the union of all ancestors' nullability, i.e. when extracting a nested field from a struct. That's why I had suggested perhaps just passing the field metadata here -- because I don't think the nullable and data type members of a passed-in Field would be used, potentially leading to confusion.

data_type: DataType,
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_mode: StructMode,
decoder_factory: Option<Arc<dyn DecoderFactory>>,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
if let Some(ref factory) = decoder_factory {
if let Some(decoder) = factory.make_custom_decoder(
field.clone(),
data_type.clone(),
Comment on lines +802 to +803
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: We don't know yet that the factory will actually produce a decoder. Should the trait take &Arc so implementations can decide whether to clone them? Or is the overhead so (relatively) low compared to everything else involved with parsing that we don't care?

coerce_primitive,
strict_mode,
is_nullable,
struct_mode,
)? {
return Ok(decoder);
}
}

downcast_integer! {
data_type => (primitive_decoder, data_type),
DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
Expand Down Expand Up @@ -744,14 +862,14 @@ fn make_decoder(
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)),
DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode, decoder_factory)?)),
d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
}
}
Expand Down Expand Up @@ -2815,4 +2933,69 @@ mod tests {
"Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
);
}

#[test]
fn test_decoder_factory() {
use arrow_array::builder;

struct AlwaysNullStringArrayDecoder;

impl ArrayDecoder for AlwaysNullStringArrayDecoder {
fn decode(&mut self, _tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = builder::StringBuilder::new();
for _ in pos {
builder.append_null();
}
Ok(builder.finish().into_data())
}
}

#[derive(Debug)]
struct AlwaysNullStringArrayDecoderFactory;

impl DecoderFactory for AlwaysNullStringArrayDecoderFactory {
fn make_custom_decoder<'a>(
&self,
_field: Option<FieldRef>,
data_type: DataType,
_coerce_primitive: bool,
_strict_mode: bool,
_is_nullable: bool,
_struct_mode: StructMode,
) -> Result<Option<Box<dyn ArrayDecoder>>, ArrowError> {
match data_type {
DataType::Utf8 => Ok(Some(Box::new(AlwaysNullStringArrayDecoder {}))),
_ => Ok(None),
}
}
}

let buf = r#"
{"a": "1", "b": 2}
{"a": "hello", "b": 23}
"#;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Int32, true),
]));

let batches = ReaderBuilder::new(schema.clone())
.with_batch_size(2)
.with_decoder_factory(Arc::new(AlwaysNullStringArrayDecoderFactory))
.build(Cursor::new(buf.as_bytes()))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();

assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_string::<i32>();
assert_eq!(col1.null_count(), 2);
assert!(col1.is_null(0));
assert!(col1.is_null(1));

let col2 = batches[0].column(1).as_primitive::<Int32Type>();
assert_eq!(col2.value(0), 2);
assert_eq!(col2.value(1), 23);
}
}
Loading