From 68e0347486c7f76928bbe9cd6ec7785f54ab9b4c Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 08:59:41 -0500 Subject: [PATCH 1/9] transformed data cleanup and docs --- Cargo.lock | 6 +- docs/source/column_usage.md | 2 +- docs/source/index.md | 1 + docs/source/transformed_data.md | 16 ++ examples/python-examples/column_usage.py | 5 +- .../python-examples/pre_transformed_data.py | 170 ++++++++++++++++ examples/rust-examples/Cargo.toml | 2 + .../examples/pre_transform_data.rs | 190 ++++++++++++++++++ vegafusion-core/src/proto/pretransform.proto | 12 +- .../src/proto/prost_gen/pretransform.rs | 12 +- .../src/proto/tonic_gen/pretransform.rs | 12 +- vegafusion-core/src/runtime/grpc_runtime.rs | 7 +- vegafusion-core/src/runtime/runtime.rs | 20 +- vegafusion-python/pyproject.toml | 2 +- vegafusion-python/src/lib.rs | 17 +- vegafusion-python/vegafusion/runtime.py | 168 +++++++++------- vegafusion-runtime/benches/spec_benchmarks.rs | 6 +- vegafusion-runtime/src/task_graph/runtime.rs | 9 +- vegafusion-runtime/tests/test_chart_state.rs | 6 +- .../test_destringify_selection_datasets.rs | 6 +- .../tests/test_image_comparison.rs | 18 +- vegafusion-runtime/tests/test_planning.rs | 6 +- .../tests/test_pre_transform_extract.rs | 6 +- .../test_pre_transform_keep_variables.rs | 6 +- .../tests/test_pre_transform_values.rs | 42 +--- .../tests/test_stringify_datetimes.rs | 30 +-- .../tests/test_task_graph_runtime.rs | 12 +- vegafusion-server/src/main.rs | 22 +- vegafusion-wasm/src/lib.rs | 4 +- 29 files changed, 559 insertions(+), 256 deletions(-) create mode 100644 docs/source/transformed_data.md create mode 100644 examples/python-examples/pre_transformed_data.py create mode 100644 examples/rust-examples/examples/pre_transform_data.rs diff --git a/Cargo.lock b/Cargo.lock index 3c56be39..beb146ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3592,7 +3592,9 @@ name = "rust-examples" version = "0.1.0" dependencies = [ "serde_json", + "tokio", "vegafusion-core", + "vegafusion-runtime", ] [[package]] @@ -4255,9 +4257,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", diff --git a/docs/source/column_usage.md b/docs/source/column_usage.md index 972c545a..d7cde66c 100644 --- a/docs/source/column_usage.md +++ b/docs/source/column_usage.md @@ -1,4 +1,4 @@ -# Get Column Usage +# Column Usage VegaFusion provides a function for introspecting a Vega specification and determining which columns are referenced from each root dataset. A root dataset is one defined at the top-level of the spec that includes a `url` or `values` properties. This is useful in contexts where it's more efficient to minimize the number of columns provided to the Vega specification. For example, the Python library uses this function to determine how to downsample the input DataFrame columns prior to converting to Arrow. When VegaFusion cannot precisely determine which columns are referenced from each root dataset, this function returns `None` or `null` for the corresponding dataset. diff --git a/docs/source/index.md b/docs/source/index.md index a5686c30..dd7a6b86 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -18,4 +18,5 @@ If you've arrived here looking for information on how to scale Vega-Altair visua :caption: Contents column_usage +transformed_data ``` diff --git a/docs/source/transformed_data.md b/docs/source/transformed_data.md new file mode 100644 index 00000000..9f58a153 --- /dev/null +++ b/docs/source/transformed_data.md @@ -0,0 +1,16 @@ +# Transformed Data + +VegaFusion can be used to evaluate datasets in a Vega spec and return them as arrow tables or DataFrames. This is the foundation for Vega-Altair's [`chart.transformed_data`](https://altair-viz.github.io/user_guide/transform/index.html#accessing-transformed-data) method. + +## Python + +```{eval-rst} +.. automethod:: vegafusion.runtime.VegaFusionRuntime.pre_transform_datasets +``` + +**Example**: See [pre_transform_data.py](https://github.com/vega/vegafusion/tree/v2/examples/python-examples/pre_transform_data.py) for a complete example. + +## Rust +The Rust API provides a slightly more general `pre_transform_values` method that can extract dataset or signal values. + +See [pre_transform_data.rs](https://github.com/vega/vegafusion/tree/v2/examples/rust-examples/examples/pre_transform_data.rs) for a complete example of extracting dataset values as arrow tables. diff --git a/examples/python-examples/column_usage.py b/examples/python-examples/column_usage.py index 149caed7..756e2716 100644 --- a/examples/python-examples/column_usage.py +++ b/examples/python-examples/column_usage.py @@ -1,12 +1,11 @@ import json from typing import Any - -from vegafusion import get_column_usage +import vegafusion as vf def main(): spec = get_spec() - column_usage = get_column_usage(spec) + column_usage = vf.get_column_usage(spec) print(json.dumps(column_usage, indent=2)) assert column_usage == { diff --git a/examples/python-examples/pre_transformed_data.py b/examples/python-examples/pre_transformed_data.py new file mode 100644 index 00000000..bf634a79 --- /dev/null +++ b/examples/python-examples/pre_transformed_data.py @@ -0,0 +1,170 @@ +import json +from typing import Any + +import vegafusion as vf + + +def main(): + spec = get_spec() + res, warnings = vf.runtime.pre_transform_datasets( + spec, ["counts"], dataset_format="polars" + ) + assert warnings == [] + assert len(res) == 1 + print(res[0]) + + +def get_spec() -> dict[str, Any]: + """ + Based on https://vega.github.io/editor/#/examples/vega/histogram-null-values + """ + spec_str = """ +{ + "$schema": "https://vega.github.io/schema/vega/v5.json", + "description": "A histogram of film ratings, modified to include null values.", + "width": 400, + "height": 200, + "padding": 5, + "autosize": {"type": "fit", "resize": true}, + + "signals": [ + { + "name": "maxbins", "value": 10 + }, + { + "name": "binCount", + "update": "(bins.stop - bins.start) / bins.step" + }, + { + "name": "nullGap", "value": 10 + }, + { + "name": "barStep", + "update": "(width - nullGap) / (1 + binCount)" + } + ], + + "data": [ + { + "name": "table", + "url": "data/movies.json", + "transform": [ + { + "type": "extent", "field": "IMDB Rating", + "signal": "extent" + }, + { + "type": "bin", "signal": "bins", + "field": "IMDB Rating", "extent": {"signal": "extent"}, + "maxbins": 10 + } + ] + }, + { + "name": "counts", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] != null" + }, + { + "type": "aggregate", + "groupby": ["bin0", "bin1"] + } + ] + }, + { + "name": "nulls", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] == null" + }, + { + "type": "aggregate", + "groupby": [] + } + ] + } + ], + + "scales": [ + { + "name": "yscale", + "type": "linear", + "range": "height", + "round": true, "nice": true, + "domain": { + "fields": [ + {"data": "counts", "field": "count"}, + {"data": "nulls", "field": "count"} + ] + } + }, + { + "name": "xscale", + "type": "linear", + "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], + "round": true, + "domain": {"signal": "[bins.start, bins.stop]"}, + "bins": {"signal": "bins"} + }, + { + "name": "xscale-null", + "type": "band", + "range": [0, {"signal": "barStep"}], + "round": true, + "domain": [null] + } + ], + + "axes": [ + {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, + {"orient": "bottom", "scale": "xscale-null"}, + {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5} + ], + + "marks": [ + { + "type": "rect", + "from": {"data": "counts"}, + "encode": { + "update": { + "x": {"scale": "xscale", "field": "bin0", "offset": 1}, + "x2": {"scale": "xscale", "field": "bin1"}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "steelblue"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + }, + { + "type": "rect", + "from": {"data": "nulls"}, + "encode": { + "update": { + "x": {"scale": "xscale-null", "value": null, "offset": 1}, + "x2": {"scale": "xscale-null", "band": 1}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "#aaa"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + } + ] +} + + """ + return json.loads(spec_str) + + +if __name__ == "__main__": + main() diff --git a/examples/rust-examples/Cargo.toml b/examples/rust-examples/Cargo.toml index 64a4368c..ef3ee6e6 100644 --- a/examples/rust-examples/Cargo.toml +++ b/examples/rust-examples/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" [dev-dependencies] serde_json = { workspace = true } vegafusion-core = { path = "../../vegafusion-core" } +vegafusion-runtime = { path = "../../vegafusion-runtime" } +tokio = "1.41.1" \ No newline at end of file diff --git a/examples/rust-examples/examples/pre_transform_data.rs b/examples/rust-examples/examples/pre_transform_data.rs new file mode 100644 index 00000000..109315ae --- /dev/null +++ b/examples/rust-examples/examples/pre_transform_data.rs @@ -0,0 +1,190 @@ +use vegafusion_core::{get_column_usage, spec::chart::ChartSpec}; +use vegafusion_core::proto::gen::tasks::Variable; +use vegafusion_core::runtime::VegaFusionRuntimeTrait; +use vegafusion_core::task_graph::task_value::TaskValue; +use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; + +/// This example demonstrates how to use the `pre_transform_values` method to get +/// transformed datasets as Arrow tables. +#[tokio::main] +async fn main() { + let spec = get_spec(); + + let runtime = VegaFusionRuntime::new(None); + + let (values, warnings) = runtime.pre_transform_values( + &spec, + &[(Variable::new_data("counts"), vec![])], + &Default::default(), // Inline datasets + &Default::default() // Options + ).await.unwrap(); + + assert_eq!(values.len(), 1); + assert_eq!(warnings.len(), 0); + + let TaskValue::Table(counts_table) = &values[0] else { + panic!("Expected a table") + }; + + let tbl_repr = counts_table.pretty_format(None).unwrap(); + + assert_eq!(tbl_repr, "\ ++------+------+-------+ +| bin0 | bin1 | count | ++------+------+-------+ +| 6.0 | 7.0 | 985 | +| 3.0 | 4.0 | 100 | +| 7.0 | 8.0 | 741 | +| 5.0 | 6.0 | 633 | +| 8.0 | 9.0 | 204 | +| 2.0 | 3.0 | 43 | +| 4.0 | 5.0 | 273 | +| 9.0 | 10.0 | 4 | +| 1.0 | 2.0 | 5 | ++------+------+-------+") +} + +fn get_spec() -> ChartSpec { + let spec_str = r##" + { + "$schema": "https://vega.github.io/schema/vega/v5.json", + "description": "A histogram of film ratings, modified to include null values.", + "width": 400, + "height": 200, + "padding": 5, + "autosize": {"type": "fit", "resize": true}, + "data": [ + { + "name": "table", + "url": "data/movies.json", + "transform": [ + { + "type": "extent", "field": "IMDB Rating", + "signal": "extent" + }, + { + "type": "bin", "signal": "bins", + "field": "IMDB Rating", "extent": {"signal": "extent"}, + "maxbins": 10 + } + ] + }, + { + "name": "counts", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] != null" + }, + { + "type": "aggregate", + "groupby": ["bin0", "bin1"] + } + ] + }, + { + "name": "nulls", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] == null" + }, + { + "type": "aggregate", + "groupby": [] + } + ] + } + ], + "signals": [ + { + "name": "maxbins", "value": 10 + }, + { + "name": "binCount", + "update": "(bins.stop - bins.start) / bins.step" + }, + { + "name": "nullGap", "value": 10 + }, + { + "name": "barStep", + "update": "(width - nullGap) / (1 + binCount)" + } + ], + "scales": [ + { + "name": "yscale", + "type": "linear", + "range": "height", + "round": true, "nice": true, + "domain": { + "fields": [ + {"data": "counts", "field": "count"}, + {"data": "nulls", "field": "count"} + ] + } + }, + { + "name": "xscale", + "type": "linear", + "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], + "round": true, + "domain": {"signal": "[bins.start, bins.stop]"}, + "bins": {"signal": "bins"} + }, + { + "name": "xscale-null", + "type": "band", + "range": [0, {"signal": "barStep"}], + "round": true, + "domain": [null] + } + ], + + "axes": [ + {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, + {"orient": "bottom", "scale": "xscale-null"}, + {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5} + ], + + "marks": [ + { + "type": "rect", + "from": {"data": "counts"}, + "encode": { + "update": { + "x": {"scale": "xscale", "field": "bin0", "offset": 1}, + "x2": {"scale": "xscale", "field": "bin1"}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "steelblue"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + }, + { + "type": "rect", + "from": {"data": "nulls"}, + "encode": { + "update": { + "x": {"scale": "xscale-null", "value": null, "offset": 1}, + "x2": {"scale": "xscale-null", "band": 1}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "#aaa"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + } + ] + } + "##; + serde_json::from_str(spec_str).unwrap() +} diff --git a/vegafusion-core/src/proto/pretransform.proto b/vegafusion-core/src/proto/pretransform.proto index cba6700c..d106fa26 100644 --- a/vegafusion-core/src/proto/pretransform.proto +++ b/vegafusion-core/src/proto/pretransform.proto @@ -49,16 +49,16 @@ message PreTransformVariable { } message PreTransformValuesOpts { - repeated PreTransformVariable variables = 1; - optional uint32 row_limit = 2; - string local_tz = 3; - optional string default_input_tz = 4; + optional uint32 row_limit = 1; + string local_tz = 2; + optional string default_input_tz = 3; } message PreTransformValuesRequest { string spec = 1; - repeated tasks.InlineDataset inline_datasets = 2; - PreTransformValuesOpts opts = 3; + repeated PreTransformVariable variables = 2; + repeated tasks.InlineDataset inline_datasets = 3; + PreTransformValuesOpts opts = 4; } message PreTransformValuesResponse { diff --git a/vegafusion-core/src/proto/prost_gen/pretransform.rs b/vegafusion-core/src/proto/prost_gen/pretransform.rs index 3ebc33ab..e314003f 100644 --- a/vegafusion-core/src/proto/prost_gen/pretransform.rs +++ b/vegafusion-core/src/proto/prost_gen/pretransform.rs @@ -80,13 +80,11 @@ pub struct PreTransformVariable { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreTransformValuesOpts { - #[prost(message, repeated, tag = "1")] - pub variables: ::prost::alloc::vec::Vec, - #[prost(uint32, optional, tag = "2")] + #[prost(uint32, optional, tag = "1")] pub row_limit: ::core::option::Option, - #[prost(string, tag = "3")] + #[prost(string, tag = "2")] pub local_tz: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] + #[prost(string, optional, tag = "3")] pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -95,8 +93,10 @@ pub struct PreTransformValuesRequest { #[prost(string, tag = "1")] pub spec: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] + pub variables: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] pub inline_datasets: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "4")] pub opts: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/vegafusion-core/src/proto/tonic_gen/pretransform.rs b/vegafusion-core/src/proto/tonic_gen/pretransform.rs index 3ebc33ab..e314003f 100644 --- a/vegafusion-core/src/proto/tonic_gen/pretransform.rs +++ b/vegafusion-core/src/proto/tonic_gen/pretransform.rs @@ -80,13 +80,11 @@ pub struct PreTransformVariable { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreTransformValuesOpts { - #[prost(message, repeated, tag = "1")] - pub variables: ::prost::alloc::vec::Vec, - #[prost(uint32, optional, tag = "2")] + #[prost(uint32, optional, tag = "1")] pub row_limit: ::core::option::Option, - #[prost(string, tag = "3")] + #[prost(string, tag = "2")] pub local_tz: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] + #[prost(string, optional, tag = "3")] pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -95,8 +93,10 @@ pub struct PreTransformValuesRequest { #[prost(string, tag = "1")] pub spec: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] + pub variables: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] pub inline_datasets: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "4")] pub opts: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/vegafusion-core/src/runtime/grpc_runtime.rs b/vegafusion-core/src/runtime/grpc_runtime.rs index fc4a7eaf..527ce023 100644 --- a/vegafusion-core/src/runtime/grpc_runtime.rs +++ b/vegafusion-core/src/runtime/grpc_runtime.rs @@ -25,7 +25,8 @@ use vegafusion_common::{ data::table::VegaFusionTable, error::{Result, VegaFusionError}, }; - +use crate::proto::gen::pretransform::PreTransformVariable; +use crate::task_graph::graph::ScopedVariable; use super::{ runtime::{encode_inline_datasets, PreTransformExtractTable}, VegaFusionRuntimeTrait, @@ -154,6 +155,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime { async fn pre_transform_values( &self, spec: &ChartSpec, + variables: &[ScopedVariable], inline_datasets: &HashMap, options: &PreTransformValuesOpts, ) -> Result<(Vec, Vec)> { @@ -161,6 +163,9 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime { let request = PreTransformValuesRequest { spec: serde_json::to_string(spec)?, + variables: variables.iter().map( + |v| PreTransformVariable { variable: Some(v.0.clone()), scope: v.1.clone() } + ).collect(), inline_datasets, opts: Some(options.clone()), }; diff --git a/vegafusion-core/src/runtime/runtime.rs b/vegafusion-core/src/runtime/runtime.rs index 120349f1..806ca2fc 100644 --- a/vegafusion-core/src/runtime/runtime.rs +++ b/vegafusion-core/src/runtime/runtime.rs @@ -30,6 +30,7 @@ use crate::{ }, }; + #[derive(Clone)] pub struct PreTransformExtractTable { pub name: String, @@ -244,13 +245,14 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { async fn pre_transform_values( &self, spec: &ChartSpec, + variables: &[ScopedVariable], inline_datasets: &HashMap, options: &PreTransformValuesOpts, ) -> Result<(Vec, Vec)> { // Check that requested variables exist and collect indices - for var in &options.variables { - let scope = var.scope.as_slice(); - let variable = var.variable.clone().unwrap(); + for var in variables { + let scope = var.1.as_slice(); + let variable = var.0.clone(); let name = variable.name.clone(); let namespace = variable.clone().ns(); @@ -282,12 +284,7 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { // Make sure planner keeps the requested variables, event // if they are not used elsewhere in the spec - let keep_variables = options - .variables - .clone() - .into_iter() - .map(|v| (v.variable.unwrap(), v.scope)) - .collect(); + let keep_variables = Vec::from(variables); // Create spec plan let plan = SpecPlan::try_new( @@ -333,12 +330,11 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { } // Collect node indices for variables - let indices: Vec<_> = options - .variables + let indices: Vec<_> = variables .iter() .map(|var| { if let Some(index) = - task_graph_mapping.get(&(var.variable.clone().unwrap(), var.scope.clone())) + task_graph_mapping.get(&(var.0.clone(), var.1.clone())) { Ok(index.clone()) } else { diff --git a/vegafusion-python/pyproject.toml b/vegafusion-python/pyproject.toml index ccfbdc09..cb66628c 100644 --- a/vegafusion-python/pyproject.toml +++ b/vegafusion-python/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ "License :: OSI Approved :: BSD License", "Topic :: Scientific/Engineering :: Visualization", ] -dependencies = ["arro3-core", "packaging", "narwhals>=1.9"] +dependencies = ["arro3-core", "packaging", "narwhals>=1.13"] [[project.authors]] name = "VegaFusion Contributors" diff --git a/vegafusion-python/src/lib.rs b/vegafusion-python/src/lib.rs index 80842b99..1e795fe5 100644 --- a/vegafusion-python/src/lib.rs +++ b/vegafusion-python/src/lib.rs @@ -36,7 +36,7 @@ use vegafusion_core::task_graph::task_value::TaskValue; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; use vegafusion_core::runtime::VegaFusionRuntimeTrait; -use vegafusion_runtime::datafusion::context::make_datafusion_context; +use vegafusion_runtime::task_graph::cache::VegaFusionCache; static INIT: Once = Once::new(); @@ -210,12 +210,9 @@ impl PyVegaFusionRuntime { .build() .external("Failed to create Tokio thread pool")?; + Ok(Self { - runtime: Arc::new(VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - max_capacity, - memory_limit, - )), + runtime: Arc::new(VegaFusionRuntime::new(Some(VegaFusionCache::new(max_capacity, memory_limit)))), tokio_runtime: Arc::new(tokio_runtime_connection), }) } @@ -366,15 +363,9 @@ impl PyVegaFusionRuntime { self.tokio_runtime.block_on( self.runtime.pre_transform_values( &spec, + &variables, &inline_datasets, &PreTransformValuesOpts { - variables: variables - .into_iter() - .map(|v| PreTransformVariable { - variable: Some(v.0), - scope: v.1, - }) - .collect(), local_tz, default_input_tz, row_limit, diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index b915f65e..d90fdb37 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -29,7 +29,7 @@ UnaryUnaryMultiCallable = Any -def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> str | None: +def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> ModuleType | None: namespaces = set() try: if inline_datasets is not None: @@ -37,7 +37,7 @@ def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> str | None: namespaces.add(nw.get_native_namespace(nw.from_native(df))) if len(namespaces) == 1: - return str(next(iter(namespaces)).__name__) + return next(iter(namespaces)) else: return None except TypeError: @@ -80,6 +80,9 @@ class PreTransformWarning(TypedDict): message: str +DatasetFormat = Literal["auto", "polars", "pandas", "pyarrow", "arro3"] + + class ChartState: def __init__(self, chart_state: PyChartState | PyChartStateGrpc) -> None: self._chart_state = chart_state @@ -262,19 +265,17 @@ def _import_inline_datasets( # If so, keep the arrow version so that it's more efficient # to convert as part of the whole table later - inner_value = inner_value.assign( - **{ - col: pd.arrays.ArrowExtensionArray( - pa.chunked_array(col_tbl.column(0)) - ) - } - ) + inner_value = inner_value.assign(**{ + col: pd.arrays.ArrowExtensionArray( + pa.chunked_array(col_tbl.column(0)) + ) + }) except TypeError: # If the Table constructor can't handle the object column, # convert the column to pyarrow strings - inner_value = inner_value.assign( - **{col: inner_value[col].astype("string[pyarrow]")} - ) + inner_value = inner_value.assign(**{ + col: inner_value[col].astype("string[pyarrow]") + }) if hasattr(inner_value, "__arrow_c_stream__"): # TODO: this requires pyarrow 14.0.0 or later imported_inline_datasets[name] = Table(inner_value) @@ -472,8 +473,10 @@ def pre_transform_datasets( row_limit: int | None = None, inline_datasets: dict[str, DataFrameLike] | None = None, trim_unused_columns: bool = False, + dataset_format: DatasetFormat = "auto", ) -> tuple[list[DataFrameLike], list[dict[str, str]]]: - """Extract the fully evaluated form of the requested datasets from a Vega + """ + Extract the fully evaluated form of the requested datasets from a Vega specification. Extracts datasets as pandas DataFrames. @@ -481,15 +484,16 @@ def pre_transform_datasets( Args: spec: A Vega specification dict or JSON string. datasets: A list with elements that are either: - - The name of a top-level dataset as a string - - A two-element tuple where the first element is the name of a dataset + + * The name of a top-level dataset as a string + * A two-element tuple where the first element is the name of a dataset as a string and the second element is the nested scope of the dataset as a list of integers - local_tz: Name of timezone to be considered local. E.g. 'America/New_York'. - Defaults to the value of vf.get_local_tz(), which defaults to the - system timezone if one can be determined. - default_input_tz: Name of timezone (e.g. 'America/New_York') that naive - datetime strings should be interpreted in. Defaults to `local_tz`. + local_tz: Name of timezone to be considered local. E.g. + ``'America/New_York'``. Defaults to the value of vf.get_local_tz(), + which defaults to the system timezone if one can be determined. + default_input_tz: Name of timezone (e.g. ``'America/New_York'``) that naive + datetime strings should be interpreted in. Defaults to ``local_tz``. row_limit: Maximum number of dataset rows to include in the returned datasets. If exceeded, datasets will be truncated to this number of rows and a RowLimitExceeded warning will be included in the resulting @@ -500,19 +504,22 @@ def pre_transform_datasets( or 'table://{dataset_name}'. trim_unused_columns: If True, unused columns are removed from returned datasets. - - Returns: - A tuple containing: - - List of pandas DataFrames corresponding to the input datasets list - - A list of warnings as dictionaries. Each warning dict has a 'type' + dataset_format: Format for returned datasets. One of: + + * ``"auto"``: Infer the result type based on the types of inline datasets. + If no inline datasets are provided, return type will depend on + installed packages. + * ``"polars"``: polars.DataFrame + * ``"pandas"``: pandas.DataFrame + * ``"pyarrow"``: pyarrow.Table + * ``"arro3"``: arro3.Table + + Returns: Two-element tuple + * List of pandas DataFrames corresponding to the input datasets list + * A list of warnings as dictionaries. Each warning dict has a 'type' key indicating the warning type, and a 'message' key containing a description of the warning. """ - if not TYPE_CHECKING: - pl = sys.modules.get("polars", None) - pa = sys.modules.get("pyarrow", None) - pd = sys.modules.get("pandas", None) - local_tz = local_tz or get_local_tz() # Build input variables @@ -535,49 +542,74 @@ def pre_transform_datasets( inline_datasets=inline_arrow_dataset, ) - # Wrap result dataframes in native format, then with Narwhals so that - # we can manipulate them with a uniform API - namespace = _get_common_namespace(inline_datasets) - if namespace == "polars" and pl is not None: - nw_dataframes = [nw.from_native(pl.DataFrame(value)) for value in values] - - elif namespace == "pyarrow" and pa is not None: - nw_dataframes = [nw.from_native(pa.table(value)) for value in values] - elif namespace == "pandas" and pd is not None and pa is not None: - nw_dataframes = [ - nw.from_native(pa.table(value).to_pandas()) for value in values - ] + def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: + # Convert to `local_tz` (or, set to UTC and then convert if starting + # from time-zone-naive data), then extract the native DataFrame to return. + processed_datasets = [] + for df in dfs: + df = df.with_columns( + nw.col(col).dt.convert_time_zone(local_tz) + for col, dtype in df.schema.items() + if dtype == nw.Datetime + ) + processed_datasets.append(df.to_native()) + return processed_datasets + + # Wrap result dataframes in Narwhals, using the input type and arrow PyCapsule interface + if dataset_format != "auto": + match dataset_format: + case "polars": + import polars as pl + + datasets = normalize_timezones([ + nw.from_native(pl.DataFrame(value)) for value in values + ]) + case "pandas": + import pyarrow as pa + + datasets = normalize_timezones([ + nw.from_native(pa.table(value).to_pandas()) for value in values + ]) + case "pyarrow": + import pyarrow as pa + + datasets = normalize_timezones([ + nw.from_native(pa.table(value)) for value in values + ]) + case "arro3": + # Pass through arrof3 + datasets = values + case _: + raise ValueError(f"Unrecognized dataset_format: {dataset_format}") + elif (namespace := _get_common_namespace(inline_datasets)) is not None: + # Infer the type from the inline datasets + datasets = normalize_timezones([ + nw.from_arrow(value, native_namespace=namespace) for value in values + ]) else: # Either no inline datasets, inline datasets with mixed or # unrecognized types - if pa is not None and pd is not None: - nw_dataframes = [ - nw.from_native(pa.table(value).to_pandas()) for value in values - ] - elif pl is not None: - nw_dataframes = [ + try: + # Try polars + import polars as pl + + datasets = normalize_timezones([ nw.from_native(pl.DataFrame(value)) for value in values - ] - else: - # Hopefully narwhals will eventually help us fall back to whatever - # is installed here - raise ValueError( - "Either polars or pandas must be installed to extract " - "transformed data" - ) + ]) + except ImportError: + try: + # Try pandas + import pandas as _pd # noqa: F401 + import pyarrow as pa - # Convert to `local_tz` (or, set to UTC and then convert if starting - # from time-zone-naive data), then extract the native DataFrame to return. - processed_datasets = [] - for df in nw_dataframes: - df = df.with_columns( - nw.col(col).dt.convert_time_zone(local_tz) - for col, dtype in df.schema.items() - if dtype == nw.Datetime - ) - processed_datasets.append(df.to_native()) + datasets = normalize_timezones([ + nw.from_native(pa.table(value).to_pandas()) for value in values + ]) + except ImportError: + # Fall back to arro3 + datasets = values - return processed_datasets, warnings + return datasets, warnings def pre_transform_extract( self, @@ -597,7 +629,7 @@ def pre_transform_extract( Evaluate supported transforms in an input Vega specification. Produces a new specification with small pre-transformed datasets (under 100 - rows) included inline and larger inline datasets (100 rows or more) extracted + rows) included inline and larger inline datasets (20 rows or more) extracted into pyarrow tables. Args: diff --git a/vegafusion-runtime/benches/spec_benchmarks.rs b/vegafusion-runtime/benches/spec_benchmarks.rs index 986d4491..cedbfb82 100644 --- a/vegafusion-runtime/benches/spec_benchmarks.rs +++ b/vegafusion-runtime/benches/spec_benchmarks.rs @@ -52,8 +52,7 @@ async fn eval_spec_get_variable(full_spec: ChartSpec, var: &ScopedVariable) -> V let task_graph_mapping = task_graph.build_mapping(); // Initialize task graph runtime - let ctx = make_datafusion_context(); - let runtime = VegaFusionRuntime::new(Arc::new(ctx), Some(64), None); + let runtime = VegaFusionRuntime::new(None); let node_index = task_graph_mapping.get(var).unwrap(); @@ -104,8 +103,7 @@ async fn eval_spec_sequence(full_spec: ChartSpec, full_updates: Vec); @@ -26,13 +27,11 @@ pub struct VegaFusionRuntime { impl VegaFusionRuntime { pub fn new( - ctx: Arc, - capacity: Option, - memory_limit: Option, + cache: Option ) -> Self { Self { - cache: VegaFusionCache::new(capacity, memory_limit), - ctx, + cache: cache.unwrap_or_else(|| VegaFusionCache::new(Some(32), None)), + ctx: Arc::new(make_datafusion_context()), } } diff --git a/vegafusion-runtime/tests/test_chart_state.rs b/vegafusion-runtime/tests/test_chart_state.rs index 9df8fee1..124b9d17 100644 --- a/vegafusion-runtime/tests/test_chart_state.rs +++ b/vegafusion-runtime/tests/test_chart_state.rs @@ -27,11 +27,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let chart_state = ChartState::try_new( &runtime, diff --git a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs index 1940c132..c40b5700 100644 --- a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs +++ b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs @@ -26,11 +26,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (chart_spec, _warnings) = runtime .pre_transform_spec( diff --git a/vegafusion-runtime/tests/test_image_comparison.rs b/vegafusion-runtime/tests/test_image_comparison.rs index 524d33be..43d46698 100644 --- a/vegafusion-runtime/tests/test_image_comparison.rs +++ b/vegafusion-runtime/tests/test_image_comparison.rs @@ -1186,11 +1186,7 @@ mod test_pre_transform_inline { let vegajs_runtime = vegajs_runtime(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Get timezone let local_tz = vegajs_runtime.nodejs_runtime.local_timezone().unwrap(); @@ -1342,11 +1338,7 @@ async fn check_pre_transform_spec_from_files(spec_name: &str, tolerance: f64) { let vegajs_runtime = vegajs_runtime(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Get timezone let local_tz = vegajs_runtime.nodejs_runtime.local_timezone().unwrap(); @@ -1465,11 +1457,7 @@ async fn check_spec_sequence( .collect(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Extract the initial values of all of the variables that should be sent from the // server to the client diff --git a/vegafusion-runtime/tests/test_planning.rs b/vegafusion-runtime/tests/test_planning.rs index 731f0489..edadb9e8 100644 --- a/vegafusion-runtime/tests/test_planning.rs +++ b/vegafusion-runtime/tests/test_planning.rs @@ -62,11 +62,7 @@ async fn test_extract_server_data() { let mapping = graph.build_mapping(); // println!("{:#?}", mapping); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); let _data_3 = graph_runtime .get_node_value( graph.clone(), diff --git a/vegafusion-runtime/tests/test_pre_transform_extract.rs b/vegafusion-runtime/tests/test_pre_transform_extract.rs index ffdd805e..413f7d7c 100644 --- a/vegafusion-runtime/tests/test_pre_transform_extract.rs +++ b/vegafusion-runtime/tests/test_pre_transform_extract.rs @@ -29,11 +29,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (tx_spec, datasets, warnings) = runtime .pre_transform_extract( diff --git a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs index 02d31dc8..63cb18d4 100644 --- a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs +++ b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs @@ -30,11 +30,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (tx_spec, warnings) = runtime .pre_transform_spec( diff --git a/vegafusion-runtime/tests/test_pre_transform_values.rs b/vegafusion-runtime/tests/test_pre_transform_values.rs index 3173042c..875a874f 100644 --- a/vegafusion-runtime/tests/test_pre_transform_values.rs +++ b/vegafusion-runtime/tests/test_pre_transform_values.rs @@ -42,11 +42,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( @@ -98,11 +94,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( @@ -154,11 +146,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Check existent but unsupported dataset name let result = runtime @@ -222,11 +210,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let source_0 = VegaFusionTable::from_json(&json!([{"normal": 1, "a.b": 2}, {"normal": 1, "a.b": 4}])) @@ -285,11 +269,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( @@ -339,11 +319,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( @@ -433,11 +409,7 @@ mod tests { ))); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( diff --git a/vegafusion-runtime/tests/test_stringify_datetimes.rs b/vegafusion-runtime/tests/test_stringify_datetimes.rs index 238cc794..c057e90d 100644 --- a/vegafusion-runtime/tests/test_stringify_datetimes.rs +++ b/vegafusion-runtime/tests/test_stringify_datetimes.rs @@ -82,11 +82,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let local_tz = local_tz.to_string(); let (spec, _warnings) = runtime @@ -139,11 +135,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // let local_tz = "America/New_York".to_string(); let local_tz = "UTC".to_string(); let default_input_tz = "UTC".to_string(); @@ -229,11 +221,7 @@ mod test_stringify_datetimes { let spec_str = fs::read_to_string(spec_path).unwrap(); let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = runtime .pre_transform_spec( @@ -298,11 +286,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = TOKIO_RUNTIME .block_on(runtime.pre_transform_spec( @@ -347,11 +331,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = runtime .pre_transform_spec( diff --git a/vegafusion-runtime/tests/test_task_graph_runtime.rs b/vegafusion-runtime/tests/test_task_graph_runtime.rs index c971a8c9..8ba55047 100644 --- a/vegafusion-runtime/tests/test_task_graph_runtime.rs +++ b/vegafusion-runtime/tests/test_task_graph_runtime.rs @@ -81,11 +81,7 @@ async fn try_it() { ]; let graph = Arc::new(TaskGraph::new(tasks, &task_scope).unwrap()); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); // let result = graph_runtime.get_node_value(graph, 2, None).await.unwrap(); let result = graph_runtime .get_node_value(graph, &NodeValueIndex::new(2, Some(0)), Default::default()) @@ -143,11 +139,7 @@ async fn try_it_from_spec() { let graph = Arc::new(TaskGraph::new(tasks, &task_scope).unwrap()); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); let result = graph_runtime .get_node_value(graph, &NodeValueIndex::new(2, Some(0)), Default::default()) .await diff --git a/vegafusion-server/src/main.rs b/vegafusion-server/src/main.rs index 7a567d8d..4122c113 100644 --- a/vegafusion-server/src/main.rs +++ b/vegafusion-server/src/main.rs @@ -20,13 +20,13 @@ use vegafusion_core::task_graph::graph::ScopedVariable; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use clap::Parser; -use datafusion::prelude::SessionContext; use regex::Regex; use vegafusion_core::proto::gen::pretransform::{ PreTransformExtractDataset, PreTransformExtractRequest, PreTransformExtractResponse, PreTransformSpecOpts, PreTransformSpecRequest, PreTransformSpecResponse, PreTransformValuesOpts, PreTransformValuesRequest, PreTransformValuesResponse, }; +use vegafusion_runtime::task_graph::cache::VegaFusionCache; #[derive(Clone)] pub struct VegaFusionRuntimeGrpc { @@ -175,31 +175,25 @@ impl VegaFusionRuntimeGrpc { .opts .clone() .unwrap_or_else(|| PreTransformValuesOpts { - variables: vec![], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, }); + let variables: Vec = request.variables.iter().map( + |v| (v.variable.clone().unwrap(), v.scope.clone()) + ).collect::>(); + // Extract and deserialize inline datasets let inline_datasets = decode_inline_datasets(request.inline_datasets)?; - // Extract requested variables - let variables: Vec = request - .opts - .map(|opts| opts.variables) - .unwrap_or_default() - .into_iter() - .map(|var| (var.variable.unwrap(), var.scope)) - .collect(); - // Parse spec let spec_string = request.spec; let spec: ChartSpec = serde_json::from_str(&spec_string)?; let (values, warnings) = self .runtime - .pre_transform_values(&spec, &inline_datasets, &opts) + .pre_transform_values(&spec, &variables, &inline_datasets, &opts) .await?; let response_values: Vec<_> = values @@ -326,9 +320,7 @@ async fn main() -> Result<(), VegaFusionError> { }; let tg_runtime = VegaFusionRuntime::new( - Arc::new(SessionContext::new()), - Some(args.capacity), - memory_limit, + Some(VegaFusionCache::new(Some(args.capacity), memory_limit)) ); grpc_server(grpc_address, tg_runtime.clone(), args.web) diff --git a/vegafusion-wasm/src/lib.rs b/vegafusion-wasm/src/lib.rs index 95b284e1..b581b777 100644 --- a/vegafusion-wasm/src/lib.rs +++ b/vegafusion-wasm/src/lib.rs @@ -32,7 +32,6 @@ use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::chart_state::ChartState; use vegafusion_core::data::dataset::VegaFusionDataset; use vegafusion_core::get_column_usage; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use web_sys::Element; @@ -439,8 +438,7 @@ pub async fn vegafusion_embed( let runtime: Box = if query_fn.is_undefined() || query_fn.is_null() { // Use embedded runtime - let ctx = make_datafusion_context(); - Box::new(VegaFusionRuntime::new(Arc::new(ctx), None, None)) + Box::new(VegaFusionRuntime::new(None)) } else { let query_fn = query_fn.dyn_into::().map_err(|e| { JsError::new(&format!( From f578e3ad57357b5129e127556ddadf53cc4f7345 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:08:38 -0500 Subject: [PATCH 2/9] remove redundant grpc runtime implementations --- vegafusion-core/src/runtime/grpc_runtime.rs | 114 -------------------- 1 file changed, 114 deletions(-) diff --git a/vegafusion-core/src/runtime/grpc_runtime.rs b/vegafusion-core/src/runtime/grpc_runtime.rs index 527ce023..618a917a 100644 --- a/vegafusion-core/src/runtime/grpc_runtime.rs +++ b/vegafusion-core/src/runtime/grpc_runtime.rs @@ -76,120 +76,6 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime { )), } } - - async fn pre_transform_spec( - &self, - spec: &ChartSpec, - inline_datasets: &HashMap, - options: &PreTransformSpecOpts, - ) -> Result<(ChartSpec, Vec)> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformSpecRequest { - spec: serde_json::to_string(spec)?, - inline_datasets, - opts: Some(options.clone()), - }; - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_spec(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_spec_result::Result::Response(response) => { - Ok((serde_json::from_str(&response.spec)?, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } - - async fn pre_transform_extract( - &self, - spec: &ChartSpec, - inline_datasets: &HashMap, - options: &PreTransformExtractOpts, - ) -> Result<( - ChartSpec, - Vec, - Vec, - )> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformExtractRequest { - spec: serde_json::to_string(spec)?, - inline_datasets, - opts: Some(options.clone()), - }; - - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_extract(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_extract_result::Result::Response(response) => { - let spec: ChartSpec = serde_json::from_str(&response.spec)?; - let datasets = response - .datasets - .into_iter() - .map(|ds| { - Ok(PreTransformExtractTable { - name: ds.name, - scope: ds.scope, - table: VegaFusionTable::from_ipc_bytes(&ds.table)?, - }) - }) - .collect::>>()?; - Ok((spec, datasets, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } - - async fn pre_transform_values( - &self, - spec: &ChartSpec, - variables: &[ScopedVariable], - inline_datasets: &HashMap, - options: &PreTransformValuesOpts, - ) -> Result<(Vec, Vec)> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformValuesRequest { - spec: serde_json::to_string(spec)?, - variables: variables.iter().map( - |v| PreTransformVariable { variable: Some(v.0.clone()), scope: v.1.clone() } - ).collect(), - inline_datasets, - opts: Some(options.clone()), - }; - - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_values(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_values_result::Result::Response(response) => { - let values = response - .values - .into_iter() - .map(|v| TaskValue::try_from(&v.value.unwrap())) - .collect::>>()?; - Ok((values, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } } impl GrpcVegaFusionRuntime { From 88753d7fd31a29b4d72853c1deae8db8d15f67be Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:09:03 -0500 Subject: [PATCH 3/9] fmt --- .../examples/pre_transform_data.rs | 24 ++++++++++++------- vegafusion-core/src/runtime/grpc_runtime.rs | 12 +++++----- vegafusion-core/src/runtime/runtime.rs | 5 +--- vegafusion-python/src/lib.rs | 13 +++++----- vegafusion-runtime/src/task_graph/runtime.rs | 6 ++--- vegafusion-server/src/main.rs | 15 +++++++----- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/examples/rust-examples/examples/pre_transform_data.rs b/examples/rust-examples/examples/pre_transform_data.rs index 109315ae..9a57bdbf 100644 --- a/examples/rust-examples/examples/pre_transform_data.rs +++ b/examples/rust-examples/examples/pre_transform_data.rs @@ -1,7 +1,7 @@ -use vegafusion_core::{get_column_usage, spec::chart::ChartSpec}; use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::task_graph::task_value::TaskValue; +use vegafusion_core::{get_column_usage, spec::chart::ChartSpec}; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; /// This example demonstrates how to use the `pre_transform_values` method to get @@ -12,12 +12,15 @@ async fn main() { let runtime = VegaFusionRuntime::new(None); - let (values, warnings) = runtime.pre_transform_values( - &spec, - &[(Variable::new_data("counts"), vec![])], - &Default::default(), // Inline datasets - &Default::default() // Options - ).await.unwrap(); + let (values, warnings) = runtime + .pre_transform_values( + &spec, + &[(Variable::new_data("counts"), vec![])], + &Default::default(), // Inline datasets + &Default::default(), // Options + ) + .await + .unwrap(); assert_eq!(values.len(), 1); assert_eq!(warnings.len(), 0); @@ -28,7 +31,9 @@ async fn main() { let tbl_repr = counts_table.pretty_format(None).unwrap(); - assert_eq!(tbl_repr, "\ + assert_eq!( + tbl_repr, + "\ +------+------+-------+ | bin0 | bin1 | count | +------+------+-------+ @@ -41,7 +46,8 @@ async fn main() { | 4.0 | 5.0 | 273 | | 9.0 | 10.0 | 4 | | 1.0 | 2.0 | 5 | -+------+------+-------+") ++------+------+-------+" + ) } fn get_spec() -> ChartSpec { diff --git a/vegafusion-core/src/runtime/grpc_runtime.rs b/vegafusion-core/src/runtime/grpc_runtime.rs index 618a917a..d4dd167e 100644 --- a/vegafusion-core/src/runtime/grpc_runtime.rs +++ b/vegafusion-core/src/runtime/grpc_runtime.rs @@ -17,6 +17,12 @@ use crate::{ task_graph::task_value::{NamedTaskValue, TaskValue}, }; +use super::{ + runtime::{encode_inline_datasets, PreTransformExtractTable}, + VegaFusionRuntimeTrait, +}; +use crate::proto::gen::pretransform::PreTransformVariable; +use crate::task_graph::graph::ScopedVariable; use async_mutex::Mutex; use async_trait::async_trait; use std::collections::HashMap; @@ -25,12 +31,6 @@ use vegafusion_common::{ data::table::VegaFusionTable, error::{Result, VegaFusionError}, }; -use crate::proto::gen::pretransform::PreTransformVariable; -use crate::task_graph::graph::ScopedVariable; -use super::{ - runtime::{encode_inline_datasets, PreTransformExtractTable}, - VegaFusionRuntimeTrait, -}; #[derive(Clone)] pub struct GrpcVegaFusionRuntime { diff --git a/vegafusion-core/src/runtime/runtime.rs b/vegafusion-core/src/runtime/runtime.rs index 806ca2fc..99344e4f 100644 --- a/vegafusion-core/src/runtime/runtime.rs +++ b/vegafusion-core/src/runtime/runtime.rs @@ -30,7 +30,6 @@ use crate::{ }, }; - #[derive(Clone)] pub struct PreTransformExtractTable { pub name: String, @@ -333,9 +332,7 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { let indices: Vec<_> = variables .iter() .map(|var| { - if let Some(index) = - task_graph_mapping.get(&(var.0.clone(), var.1.clone())) - { + if let Some(index) = task_graph_mapping.get(&(var.0.clone(), var.1.clone())) { Ok(index.clone()) } else { Err(VegaFusionError::pre_transform(format!( diff --git a/vegafusion-python/src/lib.rs b/vegafusion-python/src/lib.rs index 1e795fe5..96de1674 100644 --- a/vegafusion-python/src/lib.rs +++ b/vegafusion-python/src/lib.rs @@ -210,9 +210,11 @@ impl PyVegaFusionRuntime { .build() .external("Failed to create Tokio thread pool")?; - Ok(Self { - runtime: Arc::new(VegaFusionRuntime::new(Some(VegaFusionCache::new(max_capacity, memory_limit)))), + runtime: Arc::new(VegaFusionRuntime::new(Some(VegaFusionCache::new( + max_capacity, + memory_limit, + )))), tokio_runtime: Arc::new(tokio_runtime_connection), }) } @@ -360,8 +362,8 @@ impl PyVegaFusionRuntime { .collect(); let (values, warnings) = py.allow_threads(|| { - self.tokio_runtime.block_on( - self.runtime.pre_transform_values( + self.tokio_runtime + .block_on(self.runtime.pre_transform_values( &spec, &variables, &inline_datasets, @@ -370,8 +372,7 @@ impl PyVegaFusionRuntime { default_input_tz, row_limit, }, - ), - ) + )) })?; let warnings: Vec<_> = warnings diff --git a/vegafusion-runtime/src/task_graph/runtime.rs b/vegafusion-runtime/src/task_graph/runtime.rs index 6b7905b5..8ad2c2c4 100644 --- a/vegafusion-runtime/src/task_graph/runtime.rs +++ b/vegafusion-runtime/src/task_graph/runtime.rs @@ -1,3 +1,4 @@ +use crate::datafusion::context::make_datafusion_context; use crate::task_graph::cache::VegaFusionCache; use crate::task_graph::task::TaskCall; use crate::task_graph::timezone::RuntimeTzConfig; @@ -15,7 +16,6 @@ use vegafusion_core::error::{Result, ResultWithContext, VegaFusionError}; use vegafusion_core::proto::gen::tasks::{task::TaskKind, NodeValueIndex, TaskGraph}; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::task_graph::task_value::{NamedTaskValue, TaskValue}; -use crate::datafusion::context::make_datafusion_context; type CacheValue = (TaskValue, Vec); @@ -26,9 +26,7 @@ pub struct VegaFusionRuntime { } impl VegaFusionRuntime { - pub fn new( - cache: Option - ) -> Self { + pub fn new(cache: Option) -> Self { Self { cache: cache.unwrap_or_else(|| VegaFusionCache::new(Some(32), None)), ctx: Arc::new(make_datafusion_context()), diff --git a/vegafusion-server/src/main.rs b/vegafusion-server/src/main.rs index 4122c113..2d0ecf9e 100644 --- a/vegafusion-server/src/main.rs +++ b/vegafusion-server/src/main.rs @@ -180,9 +180,11 @@ impl VegaFusionRuntimeGrpc { default_input_tz: None, }); - let variables: Vec = request.variables.iter().map( - |v| (v.variable.clone().unwrap(), v.scope.clone()) - ).collect::>(); + let variables: Vec = request + .variables + .iter() + .map(|v| (v.variable.clone().unwrap(), v.scope.clone())) + .collect::>(); // Extract and deserialize inline datasets let inline_datasets = decode_inline_datasets(request.inline_datasets)?; @@ -319,9 +321,10 @@ async fn main() -> Result<(), VegaFusionError> { None }; - let tg_runtime = VegaFusionRuntime::new( - Some(VegaFusionCache::new(Some(args.capacity), memory_limit)) - ); + let tg_runtime = VegaFusionRuntime::new(Some(VegaFusionCache::new( + Some(args.capacity), + memory_limit, + ))); grpc_server(grpc_address, tg_runtime.clone(), args.web) .await From 57c4c2bdac8f94c45fceb73dd7bb2633e20c95be Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:16:53 -0500 Subject: [PATCH 4/9] fix tests --- vegafusion-core/src/runtime/grpc_runtime.rs | 21 ++------ vegafusion-runtime/tests/test_chart_state.rs | 2 - .../test_destringify_selection_datasets.rs | 2 - .../tests/test_image_comparison.rs | 2 - vegafusion-runtime/tests/test_planning.rs | 1 - .../tests/test_pre_transform_extract.rs | 5 +- .../test_pre_transform_keep_variables.rs | 6 +-- .../tests/test_pre_transform_values.rs | 52 ++++--------------- .../tests/test_stringify_datetimes.rs | 2 - .../tests/test_task_graph_runtime.rs | 1 - 10 files changed, 16 insertions(+), 78 deletions(-) diff --git a/vegafusion-core/src/runtime/grpc_runtime.rs b/vegafusion-core/src/runtime/grpc_runtime.rs index d4dd167e..9789d6fa 100644 --- a/vegafusion-core/src/runtime/grpc_runtime.rs +++ b/vegafusion-core/src/runtime/grpc_runtime.rs @@ -1,36 +1,21 @@ use crate::{ data::dataset::VegaFusionDataset, proto::gen::{ - pretransform::{ - PreTransformExtractOpts, PreTransformExtractRequest, PreTransformExtractWarning, - PreTransformSpecOpts, PreTransformSpecRequest, PreTransformSpecWarning, - PreTransformValuesOpts, PreTransformValuesRequest, PreTransformValuesWarning, - }, services::{ - pre_transform_extract_result, pre_transform_spec_result, pre_transform_values_result, query_request, query_result, vega_fusion_runtime_client::VegaFusionRuntimeClient, QueryRequest, }, tasks::{NodeValueIndex, TaskGraph, TaskGraphValueRequest}, }, - spec::chart::ChartSpec, - task_graph::task_value::{NamedTaskValue, TaskValue}, + task_graph::task_value::NamedTaskValue, }; -use super::{ - runtime::{encode_inline_datasets, PreTransformExtractTable}, - VegaFusionRuntimeTrait, -}; -use crate::proto::gen::pretransform::PreTransformVariable; -use crate::task_graph::graph::ScopedVariable; +use super::{runtime::encode_inline_datasets, VegaFusionRuntimeTrait}; use async_mutex::Mutex; use async_trait::async_trait; use std::collections::HashMap; use std::{any::Any, sync::Arc}; -use vegafusion_common::{ - data::table::VegaFusionTable, - error::{Result, VegaFusionError}, -}; +use vegafusion_common::error::{Result, VegaFusionError}; #[derive(Clone)] pub struct GrpcVegaFusionRuntime { diff --git a/vegafusion-runtime/tests/test_chart_state.rs b/vegafusion-runtime/tests/test_chart_state.rs index 124b9d17..6f9afe88 100644 --- a/vegafusion-runtime/tests/test_chart_state.rs +++ b/vegafusion-runtime/tests/test_chart_state.rs @@ -8,12 +8,10 @@ mod tests { use crate::crate_dir; use serde_json::json; use std::fs; - use std::sync::Arc; use vegafusion_core::chart_state::ChartState; use vegafusion_core::planning::watch::{ExportUpdateJSON, ExportUpdateNamespace}; use vegafusion_core::proto::gen::tasks::TzConfig; use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] diff --git a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs index c40b5700..e1e52fd7 100644 --- a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs +++ b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs @@ -7,12 +7,10 @@ fn crate_dir() -> String { mod tests { use crate::crate_dir; use std::fs; - use std::sync::Arc; use vegafusion_core::proto::gen::pretransform::PreTransformSpecOpts; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::spec::transform::TransformSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] diff --git a/vegafusion-runtime/tests/test_image_comparison.rs b/vegafusion-runtime/tests/test_image_comparison.rs index 43d46698..bb7d0c75 100644 --- a/vegafusion-runtime/tests/test_image_comparison.rs +++ b/vegafusion-runtime/tests/test_image_comparison.rs @@ -26,7 +26,6 @@ use vegafusion_core::proto::gen::tasks::{TaskGraph, TzConfig}; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::graph::ScopedVariable; use vegafusion_core::task_graph::task_value::TaskValue; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; @@ -1177,7 +1176,6 @@ mod test_pre_transform_inline { use super::*; use crate::util::datasets::vega_json_dataset_async; use vegafusion_core::{data::dataset::VegaFusionDataset, runtime::VegaFusionRuntimeTrait}; - use vegafusion_runtime::datafusion::context::make_datafusion_context; #[tokio::test] async fn test() { diff --git a/vegafusion-runtime/tests/test_planning.rs b/vegafusion-runtime/tests/test_planning.rs index edadb9e8..9c7830f9 100644 --- a/vegafusion-runtime/tests/test_planning.rs +++ b/vegafusion-runtime/tests/test_planning.rs @@ -10,7 +10,6 @@ use vegafusion_core::planning::split_domain_data::split_domain_data; use vegafusion_core::planning::stitch::stitch_specs; use vegafusion_core::planning::strip_encodings::strip_encodings; -use vegafusion_runtime::datafusion::context::make_datafusion_context; #[tokio::test(flavor = "multi_thread")] async fn test_extract_server_data() { diff --git a/vegafusion-runtime/tests/test_pre_transform_extract.rs b/vegafusion-runtime/tests/test_pre_transform_extract.rs index 413f7d7c..98fbc97e 100644 --- a/vegafusion-runtime/tests/test_pre_transform_extract.rs +++ b/vegafusion-runtime/tests/test_pre_transform_extract.rs @@ -10,12 +10,9 @@ mod tests { use vegafusion_core::proto::gen::pretransform::PreTransformExtractOpts; use std::fs; - use std::sync::Arc; - - use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::runtime::VegaFusionRuntimeTrait; - use vegafusion_runtime::datafusion::context::make_datafusion_context; + use vegafusion_core::spec::chart::ChartSpec; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] diff --git a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs index 63cb18d4..e69fe4a7 100644 --- a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs +++ b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs @@ -8,15 +8,11 @@ mod tests { use crate::crate_dir; use std::fs; - use std::sync::Arc; use vegafusion_common::error::VegaFusionError; use vegafusion_core::proto::gen::pretransform::{PreTransformSpecOpts, PreTransformVariable}; use vegafusion_core::proto::gen::tasks::Variable; - - use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_core::runtime::VegaFusionRuntimeTrait; - use vegafusion_runtime::datafusion::context::make_datafusion_context; + use vegafusion_core::spec::chart::ChartSpec; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] diff --git a/vegafusion-runtime/tests/test_pre_transform_values.rs b/vegafusion-runtime/tests/test_pre_transform_values.rs index 875a874f..abb1a8a6 100644 --- a/vegafusion-runtime/tests/test_pre_transform_values.rs +++ b/vegafusion-runtime/tests/test_pre_transform_values.rs @@ -20,18 +20,15 @@ mod tests { use std::collections::HashMap; use std::env; use std::fs; - use std::sync::Arc; use vegafusion_common::data::table::VegaFusionTable; use vegafusion_common::error::VegaFusionError; use vegafusion_core::data::dataset::VegaFusionDataset; use vegafusion_core::proto::gen::pretransform::pre_transform_values_warning::WarningType; use vegafusion_core::proto::gen::pretransform::PreTransformValuesOpts; - use vegafusion_core::proto::gen::pretransform::PreTransformVariable; use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::spec::values::StringOrSignalSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -47,12 +44,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -99,12 +93,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: Some(3), local_tz: "UTC".to_string(), default_input_tz: None, @@ -152,12 +143,9 @@ mod tests { let result = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -179,12 +167,9 @@ mod tests { let result = runtime .pre_transform_values( &spec, + &[(Variable::new_data("bogus_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("bogus_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -225,12 +210,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &inline_datasets, &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -274,12 +256,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("data_3"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("data_3")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -324,18 +303,12 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[ + (Variable::new_data("click_selected"), vec![]), + (Variable::new_data("drag_selected"), vec![]), + ], &Default::default(), &PreTransformValuesOpts { - variables: vec![ - PreTransformVariable { - variable: Some(Variable::new_data("click_selected")), - scope: vec![], - }, - PreTransformVariable { - variable: Some(Variable::new_data("drag_selected")), - scope: vec![], - }, - ], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -414,12 +387,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, diff --git a/vegafusion-runtime/tests/test_stringify_datetimes.rs b/vegafusion-runtime/tests/test_stringify_datetimes.rs index c057e90d..66f85e0f 100644 --- a/vegafusion-runtime/tests/test_stringify_datetimes.rs +++ b/vegafusion-runtime/tests/test_stringify_datetimes.rs @@ -15,11 +15,9 @@ mod test_stringify_datetimes { use crate::{crate_dir, TOKIO_RUNTIME}; use rstest::rstest; use std::fs; - use std::sync::Arc; use vegafusion_core::proto::gen::pretransform::PreTransformSpecOpts; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[rstest( diff --git a/vegafusion-runtime/tests/test_task_graph_runtime.rs b/vegafusion-runtime/tests/test_task_graph_runtime.rs index 8ba55047..fcf1ef28 100644 --- a/vegafusion-runtime/tests/test_task_graph_runtime.rs +++ b/vegafusion-runtime/tests/test_task_graph_runtime.rs @@ -12,7 +12,6 @@ use vegafusion_core::proto::gen::transforms::{ use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::scope::TaskScope; use vegafusion_core::task_graph::task_value::TaskValue; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test(flavor = "multi_thread")] From 90e56847228f51a012455e9bdb7daec2b7c7f9c4 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:21:04 -0500 Subject: [PATCH 5/9] clippy fixes --- .../rust-examples/examples/pre_transform_data.rs | 2 +- vegafusion-runtime/benches/spec_benchmarks.rs | 1 - vegafusion-runtime/src/transform/timeunit.rs | 12 +++--------- vegafusion-runtime/tests/test_stringify_datetimes.rs | 12 ++++++------ 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/examples/rust-examples/examples/pre_transform_data.rs b/examples/rust-examples/examples/pre_transform_data.rs index 9a57bdbf..4285120e 100644 --- a/examples/rust-examples/examples/pre_transform_data.rs +++ b/examples/rust-examples/examples/pre_transform_data.rs @@ -1,7 +1,7 @@ use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::runtime::VegaFusionRuntimeTrait; +use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::task_value::TaskValue; -use vegafusion_core::{get_column_usage, spec::chart::ChartSpec}; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; /// This example demonstrates how to use the `pre_transform_values` method to get diff --git a/vegafusion-runtime/benches/spec_benchmarks.rs b/vegafusion-runtime/benches/spec_benchmarks.rs index cedbfb82..d229fd5f 100644 --- a/vegafusion-runtime/benches/spec_benchmarks.rs +++ b/vegafusion-runtime/benches/spec_benchmarks.rs @@ -143,7 +143,6 @@ async fn eval_spec_sequence(full_spec: ChartSpec, full_updates: Vec tokio::runtime::Runtime { diff --git a/vegafusion-runtime/src/transform/timeunit.rs b/vegafusion-runtime/src/transform/timeunit.rs index b2aa1f3a..d85df296 100644 --- a/vegafusion-runtime/src/transform/timeunit.rs +++ b/vegafusion-runtime/src/transform/timeunit.rs @@ -289,7 +289,7 @@ fn timeunit_custom_udf( } /// Convert a column to a timezone aware timestamp with Millisecond precision, in UTC -pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String) -> Result { +pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &str) -> Result { Ok(match expr.get_type(schema)? { DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(_)) => expr, DataType::Timestamp(_, Some(tz)) => expr.try_cast_to( @@ -297,10 +297,7 @@ pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String schema, )?, DataType::Timestamp(_, None) => expr.try_cast_to( - &DataType::Timestamp( - ArrowTimeUnit::Millisecond, - Some(default_input_tz.as_str().into()), - ), + &DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(default_input_tz.into())), schema, )?, DataType::Date32 | DataType::Date64 => cast_to( @@ -309,10 +306,7 @@ pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String schema, )? .try_cast_to( - &DataType::Timestamp( - ArrowTimeUnit::Millisecond, - Some(default_input_tz.as_str().into()), - ), + &DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(default_input_tz.into())), schema, )?, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { diff --git a/vegafusion-runtime/tests/test_stringify_datetimes.rs b/vegafusion-runtime/tests/test_stringify_datetimes.rs index 66f85e0f..5f6616cd 100644 --- a/vegafusion-runtime/tests/test_stringify_datetimes.rs +++ b/vegafusion-runtime/tests/test_stringify_datetimes.rs @@ -10,6 +10,12 @@ lazy_static! { .unwrap(); } +fn crate_dir() -> String { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .display() + .to_string() +} + #[cfg(test)] mod test_stringify_datetimes { use crate::{crate_dir, TOKIO_RUNTIME}; @@ -364,9 +370,3 @@ mod test_stringify_datetimes { assert_eq!(month_date, "2012-01-01T00:00:00.000"); } } - -fn crate_dir() -> String { - std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .display() - .to_string() -} From 5095dee7e67948b15983cc0f0ca088fcfb5ffb3e Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:22:06 -0500 Subject: [PATCH 6/9] python lint --- vegafusion-python/vegafusion/runtime.py | 69 ++++++++++++++----------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index d90fdb37..cb0a8048 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -265,17 +265,19 @@ def _import_inline_datasets( # If so, keep the arrow version so that it's more efficient # to convert as part of the whole table later - inner_value = inner_value.assign(**{ - col: pd.arrays.ArrowExtensionArray( - pa.chunked_array(col_tbl.column(0)) - ) - }) + inner_value = inner_value.assign( + **{ + col: pd.arrays.ArrowExtensionArray( + pa.chunked_array(col_tbl.column(0)) + ) + } + ) except TypeError: # If the Table constructor can't handle the object column, # convert the column to pyarrow strings - inner_value = inner_value.assign(**{ - col: inner_value[col].astype("string[pyarrow]") - }) + inner_value = inner_value.assign( + **{col: inner_value[col].astype("string[pyarrow]")} + ) if hasattr(inner_value, "__arrow_c_stream__"): # TODO: this requires pyarrow 14.0.0 or later imported_inline_datasets[name] = Table(inner_value) @@ -506,9 +508,9 @@ def pre_transform_datasets( datasets. dataset_format: Format for returned datasets. One of: - * ``"auto"``: Infer the result type based on the types of inline datasets. - If no inline datasets are provided, return type will depend on - installed packages. + * ``"auto"``: (default) Infer the result type based on the types of + inline datasets. If no inline datasets are provided, return type will + depend on installed packages. * ``"polars"``: polars.DataFrame * ``"pandas"``: pandas.DataFrame * ``"pyarrow"``: pyarrow.Table @@ -555,27 +557,31 @@ def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: processed_datasets.append(df.to_native()) return processed_datasets - # Wrap result dataframes in Narwhals, using the input type and arrow PyCapsule interface + # Wrap result dataframes in Narwhals, using the input type and arrow + # PyCapsule interface if dataset_format != "auto": match dataset_format: case "polars": import polars as pl - datasets = normalize_timezones([ - nw.from_native(pl.DataFrame(value)) for value in values - ]) + datasets = normalize_timezones( + [nw.from_native(pl.DataFrame(value)) for value in values] + ) case "pandas": import pyarrow as pa - datasets = normalize_timezones([ - nw.from_native(pa.table(value).to_pandas()) for value in values - ]) + datasets = normalize_timezones( + [ + nw.from_native(pa.table(value).to_pandas()) + for value in values + ] + ) case "pyarrow": import pyarrow as pa - datasets = normalize_timezones([ - nw.from_native(pa.table(value)) for value in values - ]) + datasets = normalize_timezones( + [nw.from_native(pa.table(value)) for value in values] + ) case "arro3": # Pass through arrof3 datasets = values @@ -583,9 +589,9 @@ def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: raise ValueError(f"Unrecognized dataset_format: {dataset_format}") elif (namespace := _get_common_namespace(inline_datasets)) is not None: # Infer the type from the inline datasets - datasets = normalize_timezones([ - nw.from_arrow(value, native_namespace=namespace) for value in values - ]) + datasets = normalize_timezones( + [nw.from_arrow(value, native_namespace=namespace) for value in values] + ) else: # Either no inline datasets, inline datasets with mixed or # unrecognized types @@ -593,18 +599,21 @@ def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: # Try polars import polars as pl - datasets = normalize_timezones([ - nw.from_native(pl.DataFrame(value)) for value in values - ]) + datasets = normalize_timezones( + [nw.from_native(pl.DataFrame(value)) for value in values] + ) except ImportError: try: # Try pandas import pandas as _pd # noqa: F401 import pyarrow as pa - datasets = normalize_timezones([ - nw.from_native(pa.table(value).to_pandas()) for value in values - ]) + datasets = normalize_timezones( + [ + nw.from_native(pa.table(value).to_pandas()) + for value in values + ] + ) except ImportError: # Fall back to arro3 datasets = values From 08bdf720161842e50fd11f04b06a4c2988cb64cf Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 09:42:04 -0500 Subject: [PATCH 7/9] lint/types --- vegafusion-python/vegafusion/runtime.py | 50 ++++++++++++------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index cb0a8048..04ae455f 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -30,7 +30,7 @@ def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> ModuleType | None: - namespaces = set() + namespaces: set[ModuleType] = set() try: if inline_datasets is not None: for df in inline_datasets.values(): @@ -544,7 +544,9 @@ def pre_transform_datasets( inline_datasets=inline_arrow_dataset, ) - def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: + def normalize_timezones( + dfs: list[nw.DataFrame[IntoFrameT] | nw.LazyFrame[IntoFrameT]], + ) -> list[DataFrameLike]: # Convert to `local_tz` (or, set to UTC and then convert if starting # from time-zone-naive data), then extract the native DataFrame to return. processed_datasets = [] @@ -560,33 +562,29 @@ def normalize_timezones(dfs: list[nw.DataFrame]) -> list[DataFrameLike]: # Wrap result dataframes in Narwhals, using the input type and arrow # PyCapsule interface if dataset_format != "auto": - match dataset_format: - case "polars": - import polars as pl + if dataset_format == "polars": + import polars as pl - datasets = normalize_timezones( - [nw.from_native(pl.DataFrame(value)) for value in values] - ) - case "pandas": - import pyarrow as pa + datasets = normalize_timezones( + [nw.from_native(pl.DataFrame(value)) for value in values] + ) + elif dataset_format == "pandas": + import pyarrow as pa - datasets = normalize_timezones( - [ - nw.from_native(pa.table(value).to_pandas()) - for value in values - ] - ) - case "pyarrow": - import pyarrow as pa + datasets = normalize_timezones( + [nw.from_native(pa.table(value).to_pandas()) for value in values] + ) + elif dataset_format == "pyarrow": + import pyarrow as pa - datasets = normalize_timezones( - [nw.from_native(pa.table(value)) for value in values] - ) - case "arro3": - # Pass through arrof3 - datasets = values - case _: - raise ValueError(f"Unrecognized dataset_format: {dataset_format}") + datasets = normalize_timezones( + [nw.from_native(pa.table(value)) for value in values] + ) + elif dataset_format == "arro3": + # Pass through arrof3 + datasets = values + else: + raise ValueError(f"Unrecognized dataset_format: {dataset_format}") elif (namespace := _get_common_namespace(inline_datasets)) is not None: # Infer the type from the inline datasets datasets = normalize_timezones( From ec5f96669bc35e37d77fc80a477fca42b54bf4a7 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 10:14:53 -0500 Subject: [PATCH 8/9] pandas first --- vegafusion-python/vegafusion/runtime.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index 04ae455f..501e849f 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -594,23 +594,20 @@ def normalize_timezones( # Either no inline datasets, inline datasets with mixed or # unrecognized types try: - # Try polars - import polars as pl + # Try pandas + import pandas as _pd # noqa: F401 + import pyarrow as pa datasets = normalize_timezones( - [nw.from_native(pl.DataFrame(value)) for value in values] + [nw.from_native(pa.table(value).to_pandas()) for value in values] ) except ImportError: try: - # Try pandas - import pandas as _pd # noqa: F401 - import pyarrow as pa + # Try polars + import polars as pl datasets = normalize_timezones( - [ - nw.from_native(pa.table(value).to_pandas()) - for value in values - ] + [nw.from_native(pl.DataFrame(value)) for value in values] ) except ImportError: # Fall back to arro3 From 85043a7b0f10e275da0d62cec3eb0cf26d68ac23 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 10:41:04 -0500 Subject: [PATCH 9/9] fix test --- vegafusion-runtime/tests/test_pre_transform_values.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vegafusion-runtime/tests/test_pre_transform_values.rs b/vegafusion-runtime/tests/test_pre_transform_values.rs index abb1a8a6..f1c5836d 100644 --- a/vegafusion-runtime/tests/test_pre_transform_values.rs +++ b/vegafusion-runtime/tests/test_pre_transform_values.rs @@ -156,7 +156,7 @@ mod tests { if let Err(VegaFusionError::PreTransformError(err, _)) = result { assert_eq!( err, - "Requested variable PreTransformVariable { variable: Some(Variable { name: \"source_0\", namespace: Data }), scope: [] }\n \ + "Requested variable (Variable { name: \"source_0\", namespace: Data }, [])\n \ requires transforms or signal expressions that are not yet supported" ) } else {