Skip to content

Commit

Permalink
fix: Use Arrow schema for file readers (#12048)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 26, 2023
1 parent c646828 commit 929952a
Show file tree
Hide file tree
Showing 47 changed files with 287 additions and 222 deletions.
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/datatypes/field.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::{DataType, Metadata};

Expand All @@ -12,7 +12,7 @@ use super::{DataType, Metadata};
/// Almost all IO in this crate uses [`Field`] to represent logical information about the data
/// to be serialized.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Field {
/// Its name
pub name: String,
Expand Down
22 changes: 11 additions & 11 deletions crates/polars-arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![forbid(unsafe_code)]
//! Contains all metadata, such as [`PhysicalType`], [`DataType`], [`Field`] and [`Schema`].
//! Contains all metadata, such as [`PhysicalType`], [`DataType`], [`Field`] and [`ArrowSchema`].

mod field;
mod physical_type;
Expand All @@ -10,11 +9,11 @@ use std::sync::Arc;

pub use field::Field;
pub use physical_type::*;
pub use schema::Schema;
#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
pub use schema::{ArrowSchema, ArrowSchemaRef};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

/// typedef for [BTreeMap<String, String>] denoting [`Field`]'s and [`Schema`]'s metadata.
/// typedef for [BTreeMap<String, String>] denoting [`Field`]'s and [`ArrowSchema`]'s metadata.
pub type Metadata = BTreeMap<String, String>;
/// typedef for [Option<(String, Option<String>)>] descr
pub(crate) type Extension = Option<(String, Option<String>)>;
Expand All @@ -28,7 +27,7 @@ pub(crate) type Extension = Option<(String, Option<String>)>;
/// The [`DataType::Extension`] is special in that it augments a [`DataType`] with metadata to support custom types.
/// Use `to_logical_type` to desugar such type and return its corresponding logical type.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum DataType {
/// Null type
Null,
Expand Down Expand Up @@ -108,6 +107,7 @@ pub enum DataType {
Struct(Vec<Field>),
/// A nested datatype that can represent slots of differing types.
/// Third argument represents mode
#[cfg_attr(feature = "serde", serde(skip))]
Union(Vec<Field>, Option<Vec<i32>>, UnionMode),
/// A nested type that is represented as
///
Expand Down Expand Up @@ -337,7 +337,7 @@ impl UnionMode {

/// The time units defined in Arrow.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TimeUnit {
/// Time in seconds.
Second,
Expand Down Expand Up @@ -375,7 +375,7 @@ impl From<arrow_schema::TimeUnit> for TimeUnit {

/// Interval units defined in Arrow
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum IntervalUnit {
/// The number of elapsed whole months.
YearMonth,
Expand Down Expand Up @@ -501,8 +501,8 @@ impl From<PrimitiveType> for DataType {
}
}

/// typedef for [`Arc<Schema>`].
pub type SchemaRef = Arc<Schema>;
/// typedef for [`Arc<ArrowSchema>`].
pub type SchemaRef = Arc<ArrowSchema>;

/// support get extension for metadata
pub fn get_extension(metadata: &Metadata) -> Extension {
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/datatypes/physical_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

pub use crate::types::PrimitiveType;

Expand Down Expand Up @@ -55,7 +55,7 @@ impl PhysicalType {
/// the set of valid indices types of a dictionary-encoded Array.
/// Each type corresponds to a variant of [`crate::array::DictionaryArray`].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum IntegerType {
/// A signed 8-bit integer.
Int8,
Expand Down
34 changes: 24 additions & 10 deletions crates/polars-arrow/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
#[cfg(feature = "serde_types")]
use serde_derive::{Deserialize, Serialize};
use std::sync::Arc;

#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::{Field, Metadata};

/// An ordered sequence of [`Field`]s with associated [`Metadata`].
///
/// [`Schema`] is an abstraction used to read from, and write to, Arrow IPC format,
/// [`ArrowSchema`] is an abstraction used to read from, and write to, Arrow IPC format,
/// Apache Parquet, and Apache Avro. All these formats have a concept of a schema
/// with fields and metadata.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[cfg_attr(feature = "serde_types", derive(Serialize, Deserialize))]
pub struct Schema {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ArrowSchema {
/// The fields composing this schema.
pub fields: Vec<Field>,
/// Optional metadata.
pub metadata: Metadata,
}

impl Schema {
/// Attaches a [`Metadata`] to [`Schema`]
pub type ArrowSchemaRef = Arc<ArrowSchema>;

impl ArrowSchema {
/// Attaches a [`Metadata`] to [`ArrowSchema`]
#[inline]
pub fn with_metadata(self, metadata: Metadata) -> Self {
Self {
Expand All @@ -27,7 +31,17 @@ impl Schema {
}
}

/// Returns a new [`Schema`] with a subset of all fields whose `predicate`
#[inline]
pub fn len(&self) -> usize {
self.fields.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.fields.is_empty()
}

/// Returns a new [`ArrowSchema`] with a subset of all fields whose `predicate`
/// evaluates to true.
pub fn filter<F: Fn(usize, &Field) -> bool>(self, predicate: F) -> Self {
let fields = self
Expand All @@ -43,14 +57,14 @@ impl Schema {
})
.collect();

Schema {
ArrowSchema {
fields,
metadata: self.metadata,
}
}
}

impl From<Vec<Field>> for Schema {
impl From<Vec<Field>> for ArrowSchema {
fn from(fields: Vec<Field>) -> Self {
Self {
fields,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ fn external_props(schema: &AvroSchema) -> Metadata {
props
}

/// Infers an [`Schema`] from the root [`Record`].
/// Infers an [`ArrowSchema`] from the root [`Record`].
/// This
pub fn infer_schema(record: &Record) -> PolarsResult<Schema> {
pub fn infer_schema(record: &Record) -> PolarsResult<ArrowSchema> {
Ok(record
.fields
.iter()
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use polars_error::{polars_bail, PolarsResult};

use crate::datatypes::*;

/// Converts a [`Schema`] to an Avro [`Record`].
pub fn to_record(schema: &Schema) -> PolarsResult<Record> {
/// Converts a [`ArrowSchema`] to an Avro [`Record`].
pub fn to_record(schema: &ArrowSchema) -> PolarsResult<Record> {
let mut name_counter: i32 = 0;
let fields = schema
.fields
Expand Down
20 changes: 10 additions & 10 deletions crates/polars-arrow/src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,27 @@ impl From<EncodedData> for FlightData {
}
}

/// Serializes a [`Schema`] to [`SchemaResult`].
/// Serializes a [`ArrowSchema`] to [`SchemaResult`].
pub fn serialize_schema_to_result(
schema: &Schema,
schema: &ArrowSchema,
ipc_fields: Option<&[IpcField]>,
) -> SchemaResult {
SchemaResult {
schema: _serialize_schema(schema, ipc_fields),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> FlightData {
/// Serializes a [`ArrowSchema`] to [`FlightData`].
pub fn serialize_schema(schema: &ArrowSchema, ipc_fields: Option<&[IpcField]>) -> FlightData {
FlightData {
data_header: _serialize_schema(schema, ipc_fields),
..Default::default()
}
}

/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::data::FlightInfo`].
/// Convert a [`ArrowSchema`] to bytes in the format expected in [`arrow_format::flight::data::FlightInfo`].
pub fn serialize_schema_to_info(
schema: &Schema,
schema: &ArrowSchema,
ipc_fields: Option<&[IpcField]>,
) -> PolarsResult<Vec<u8>> {
let encoded_data = if let Some(ipc_fields) = ipc_fields {
Expand All @@ -88,7 +88,7 @@ pub fn serialize_schema_to_info(
Ok(schema)
}

fn _serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
fn _serialize_schema(schema: &ArrowSchema, ipc_fields: Option<&[IpcField]>) -> Vec<u8> {
if let Some(ipc_fields) = ipc_fields {
write::schema_to_bytes(schema, ipc_fields)
} else {
Expand All @@ -97,16 +97,16 @@ fn _serialize_schema(schema: &Schema, ipc_fields: Option<&[IpcField]>) -> Vec<u8
}
}

fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedData {
fn schema_as_encoded_data(schema: &ArrowSchema, ipc_fields: &[IpcField]) -> EncodedData {
EncodedData {
ipc_message: write::schema_to_bytes(schema, ipc_fields),
arrow_data: vec![],
}
}

/// Deserialize an IPC message into [`Schema`], [`IpcSchema`].
/// Deserialize an IPC message into [`ArrowSchema`], [`IpcSchema`].
/// Use to deserialize [`FlightData::data_header`] and [`SchemaResult::schema`].
pub fn deserialize_schemas(bytes: &[u8]) -> PolarsResult<(Schema, IpcSchema)> {
pub fn deserialize_schemas(bytes: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchema)> {
read::deserialize_schema(bytes)
}

Expand Down
7 changes: 4 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use ahash::AHashMap;
use arrow_format::ipc::planus::ReadAsRoot;
Expand All @@ -11,14 +12,14 @@ use super::schema::fb_to_schema;
use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::datatypes::ArrowSchemaRef;
use crate::io::ipc::IpcSchema;

/// Metadata of an Arrow IPC file, written in the footer of the file.
#[derive(Debug, Clone)]
pub struct FileMetadata {
/// The schema that is read from the file footer
pub schema: Schema,
pub schema: ArrowSchemaRef,

/// The files' [`IpcSchema`]
pub ipc_schema: IpcSchema,
Expand Down Expand Up @@ -200,7 +201,7 @@ pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<
.transpose()?;

Ok(FileMetadata {
schema,
schema: Arc::new(schema),
ipc_schema,
blocks,
dictionaries,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use super::file::{deserialize_footer, get_record_batch};
use super::{Dictionaries, FileMetadata, OutOfSpecKind};
use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::datatypes::{ArrowSchema, Field};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, PolarsResult<Chunk<Box<dyn Array>>>>,
schema: Option<Schema>,
schema: Option<ArrowSchema>,
metadata: FileMetadata,
}

Expand All @@ -39,7 +39,7 @@ impl<'a> FileStream<'a> {
{
let (projection, schema) = if let Some(projection) = projection {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
let schema = ArrowSchema {
fields,
metadata: metadata.schema.metadata.clone(),
};
Expand All @@ -62,7 +62,7 @@ impl<'a> FileStream<'a> {
}

/// Get the projected schema from the IPC file.
pub fn schema(&self) -> &Schema {
pub fn schema(&self) -> &ArrowSchema {
self.schema.as_ref().unwrap_or(&self.metadata.schema)
}

Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::common::*;
use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::datatypes::ArrowSchema;

/// An iterator of [`Chunk`]s from an Arrow IPC file.
pub struct FileReader<R: Read + Seek> {
Expand All @@ -16,7 +16,7 @@ pub struct FileReader<R: Read + Seek> {
// the dictionaries are going to be read
dictionaries: Option<Dictionaries>,
current_block: usize,
projection: Option<(Vec<usize>, AHashMap<usize, usize>, Schema)>,
projection: Option<(Vec<usize>, AHashMap<usize, usize>, ArrowSchema)>,
remaining: usize,
data_scratch: Vec<u8>,
message_scratch: Vec<u8>,
Expand All @@ -34,7 +34,7 @@ impl<R: Read + Seek> FileReader<R> {
) -> Self {
let projection = projection.map(|projection| {
let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection);
let schema = Schema {
let schema = ArrowSchema {
fields,
metadata: metadata.schema.metadata.clone(),
};
Expand All @@ -53,7 +53,7 @@ impl<R: Read + Seek> FileReader<R> {
}

/// Return the schema of the file
pub fn schema(&self) -> &Schema {
pub fn schema(&self) -> &ArrowSchema {
self.projection
.as_ref()
.map(|x| &x.2)
Expand Down
Loading

0 comments on commit 929952a

Please sign in to comment.