From 2e9c374f49173573115fc20c9d1dd2458113e68c Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Thu, 7 Nov 2024 11:51:45 -0500 Subject: [PATCH] v2-docs: Add docs for transformed data (#533) * transformed data cleanup and docs * remove redundant grpc runtime implementations * fmt * fix tests * clippy fixes * python lint * lint/types * pandas first * fix test --- Cargo.lock | 6 +- docs/source/column_usage.md | 2 +- docs/source/index.md | 1 + docs/source/transformed_data.md | 16 ++ examples/python-examples/column_usage.py | 5 +- .../python-examples/pre_transformed_data.py | 170 +++++++++++++++ examples/rust-examples/Cargo.toml | 2 + .../examples/pre_transform_data.rs | 196 ++++++++++++++++++ vegafusion-core/src/proto/pretransform.proto | 12 +- .../src/proto/prost_gen/pretransform.rs | 12 +- .../src/proto/tonic_gen/pretransform.rs | 12 +- vegafusion-core/src/runtime/grpc_runtime.rs | 130 +----------- vegafusion-core/src/runtime/runtime.rs | 21 +- vegafusion-python/pyproject.toml | 2 +- vegafusion-python/src/lib.rs | 22 +- vegafusion-python/vegafusion/runtime.py | 154 ++++++++------ vegafusion-runtime/benches/spec_benchmarks.rs | 7 +- vegafusion-runtime/src/task_graph/runtime.rs | 11 +- vegafusion-runtime/src/transform/timeunit.rs | 12 +- vegafusion-runtime/tests/test_chart_state.rs | 8 +- .../test_destringify_selection_datasets.rs | 8 +- .../tests/test_image_comparison.rs | 20 +- vegafusion-runtime/tests/test_planning.rs | 7 +- .../tests/test_pre_transform_extract.rs | 11 +- .../test_pre_transform_keep_variables.rs | 12 +- .../tests/test_pre_transform_values.rs | 96 ++------- .../tests/test_stringify_datetimes.rs | 44 +--- .../tests/test_task_graph_runtime.rs | 13 +- vegafusion-server/src/main.rs | 25 +-- vegafusion-wasm/src/lib.rs | 4 +- 30 files changed, 585 insertions(+), 456 deletions(-) create mode 100644 docs/source/transformed_data.md create mode 100644 examples/python-examples/pre_transformed_data.py create mode 100644 examples/rust-examples/examples/pre_transform_data.rs diff --git a/Cargo.lock b/Cargo.lock index 3c56be39..beb146ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3592,7 +3592,9 @@ name = "rust-examples" version = "0.1.0" dependencies = [ "serde_json", + "tokio", "vegafusion-core", + "vegafusion-runtime", ] [[package]] @@ -4255,9 +4257,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", diff --git a/docs/source/column_usage.md b/docs/source/column_usage.md index 972c545a..d7cde66c 100644 --- a/docs/source/column_usage.md +++ b/docs/source/column_usage.md @@ -1,4 +1,4 @@ -# Get Column Usage +# Column Usage VegaFusion provides a function for introspecting a Vega specification and determining which columns are referenced from each root dataset. A root dataset is one defined at the top-level of the spec that includes a `url` or `values` properties. This is useful in contexts where it's more efficient to minimize the number of columns provided to the Vega specification. For example, the Python library uses this function to determine how to downsample the input DataFrame columns prior to converting to Arrow. When VegaFusion cannot precisely determine which columns are referenced from each root dataset, this function returns `None` or `null` for the corresponding dataset. diff --git a/docs/source/index.md b/docs/source/index.md index a5686c30..dd7a6b86 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -18,4 +18,5 @@ If you've arrived here looking for information on how to scale Vega-Altair visua :caption: Contents column_usage +transformed_data ``` diff --git a/docs/source/transformed_data.md b/docs/source/transformed_data.md new file mode 100644 index 00000000..9f58a153 --- /dev/null +++ b/docs/source/transformed_data.md @@ -0,0 +1,16 @@ +# Transformed Data + +VegaFusion can be used to evaluate datasets in a Vega spec and return them as arrow tables or DataFrames. This is the foundation for Vega-Altair's [`chart.transformed_data`](https://altair-viz.github.io/user_guide/transform/index.html#accessing-transformed-data) method. + +## Python + +```{eval-rst} +.. automethod:: vegafusion.runtime.VegaFusionRuntime.pre_transform_datasets +``` + +**Example**: See [pre_transform_data.py](https://github.com/vega/vegafusion/tree/v2/examples/python-examples/pre_transform_data.py) for a complete example. + +## Rust +The Rust API provides a slightly more general `pre_transform_values` method that can extract dataset or signal values. + +See [pre_transform_data.rs](https://github.com/vega/vegafusion/tree/v2/examples/rust-examples/examples/pre_transform_data.rs) for a complete example of extracting dataset values as arrow tables. diff --git a/examples/python-examples/column_usage.py b/examples/python-examples/column_usage.py index 149caed7..756e2716 100644 --- a/examples/python-examples/column_usage.py +++ b/examples/python-examples/column_usage.py @@ -1,12 +1,11 @@ import json from typing import Any - -from vegafusion import get_column_usage +import vegafusion as vf def main(): spec = get_spec() - column_usage = get_column_usage(spec) + column_usage = vf.get_column_usage(spec) print(json.dumps(column_usage, indent=2)) assert column_usage == { diff --git a/examples/python-examples/pre_transformed_data.py b/examples/python-examples/pre_transformed_data.py new file mode 100644 index 00000000..bf634a79 --- /dev/null +++ b/examples/python-examples/pre_transformed_data.py @@ -0,0 +1,170 @@ +import json +from typing import Any + +import vegafusion as vf + + +def main(): + spec = get_spec() + res, warnings = vf.runtime.pre_transform_datasets( + spec, ["counts"], dataset_format="polars" + ) + assert warnings == [] + assert len(res) == 1 + print(res[0]) + + +def get_spec() -> dict[str, Any]: + """ + Based on https://vega.github.io/editor/#/examples/vega/histogram-null-values + """ + spec_str = """ +{ + "$schema": "https://vega.github.io/schema/vega/v5.json", + "description": "A histogram of film ratings, modified to include null values.", + "width": 400, + "height": 200, + "padding": 5, + "autosize": {"type": "fit", "resize": true}, + + "signals": [ + { + "name": "maxbins", "value": 10 + }, + { + "name": "binCount", + "update": "(bins.stop - bins.start) / bins.step" + }, + { + "name": "nullGap", "value": 10 + }, + { + "name": "barStep", + "update": "(width - nullGap) / (1 + binCount)" + } + ], + + "data": [ + { + "name": "table", + "url": "data/movies.json", + "transform": [ + { + "type": "extent", "field": "IMDB Rating", + "signal": "extent" + }, + { + "type": "bin", "signal": "bins", + "field": "IMDB Rating", "extent": {"signal": "extent"}, + "maxbins": 10 + } + ] + }, + { + "name": "counts", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] != null" + }, + { + "type": "aggregate", + "groupby": ["bin0", "bin1"] + } + ] + }, + { + "name": "nulls", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] == null" + }, + { + "type": "aggregate", + "groupby": [] + } + ] + } + ], + + "scales": [ + { + "name": "yscale", + "type": "linear", + "range": "height", + "round": true, "nice": true, + "domain": { + "fields": [ + {"data": "counts", "field": "count"}, + {"data": "nulls", "field": "count"} + ] + } + }, + { + "name": "xscale", + "type": "linear", + "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], + "round": true, + "domain": {"signal": "[bins.start, bins.stop]"}, + "bins": {"signal": "bins"} + }, + { + "name": "xscale-null", + "type": "band", + "range": [0, {"signal": "barStep"}], + "round": true, + "domain": [null] + } + ], + + "axes": [ + {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, + {"orient": "bottom", "scale": "xscale-null"}, + {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5} + ], + + "marks": [ + { + "type": "rect", + "from": {"data": "counts"}, + "encode": { + "update": { + "x": {"scale": "xscale", "field": "bin0", "offset": 1}, + "x2": {"scale": "xscale", "field": "bin1"}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "steelblue"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + }, + { + "type": "rect", + "from": {"data": "nulls"}, + "encode": { + "update": { + "x": {"scale": "xscale-null", "value": null, "offset": 1}, + "x2": {"scale": "xscale-null", "band": 1}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "#aaa"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + } + ] +} + + """ + return json.loads(spec_str) + + +if __name__ == "__main__": + main() diff --git a/examples/rust-examples/Cargo.toml b/examples/rust-examples/Cargo.toml index 64a4368c..ef3ee6e6 100644 --- a/examples/rust-examples/Cargo.toml +++ b/examples/rust-examples/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" [dev-dependencies] serde_json = { workspace = true } vegafusion-core = { path = "../../vegafusion-core" } +vegafusion-runtime = { path = "../../vegafusion-runtime" } +tokio = "1.41.1" \ No newline at end of file diff --git a/examples/rust-examples/examples/pre_transform_data.rs b/examples/rust-examples/examples/pre_transform_data.rs new file mode 100644 index 00000000..4285120e --- /dev/null +++ b/examples/rust-examples/examples/pre_transform_data.rs @@ -0,0 +1,196 @@ +use vegafusion_core::proto::gen::tasks::Variable; +use vegafusion_core::runtime::VegaFusionRuntimeTrait; +use vegafusion_core::spec::chart::ChartSpec; +use vegafusion_core::task_graph::task_value::TaskValue; +use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; + +/// This example demonstrates how to use the `pre_transform_values` method to get +/// transformed datasets as Arrow tables. +#[tokio::main] +async fn main() { + let spec = get_spec(); + + let runtime = VegaFusionRuntime::new(None); + + let (values, warnings) = runtime + .pre_transform_values( + &spec, + &[(Variable::new_data("counts"), vec![])], + &Default::default(), // Inline datasets + &Default::default(), // Options + ) + .await + .unwrap(); + + assert_eq!(values.len(), 1); + assert_eq!(warnings.len(), 0); + + let TaskValue::Table(counts_table) = &values[0] else { + panic!("Expected a table") + }; + + let tbl_repr = counts_table.pretty_format(None).unwrap(); + + assert_eq!( + tbl_repr, + "\ ++------+------+-------+ +| bin0 | bin1 | count | ++------+------+-------+ +| 6.0 | 7.0 | 985 | +| 3.0 | 4.0 | 100 | +| 7.0 | 8.0 | 741 | +| 5.0 | 6.0 | 633 | +| 8.0 | 9.0 | 204 | +| 2.0 | 3.0 | 43 | +| 4.0 | 5.0 | 273 | +| 9.0 | 10.0 | 4 | +| 1.0 | 2.0 | 5 | ++------+------+-------+" + ) +} + +fn get_spec() -> ChartSpec { + let spec_str = r##" + { + "$schema": "https://vega.github.io/schema/vega/v5.json", + "description": "A histogram of film ratings, modified to include null values.", + "width": 400, + "height": 200, + "padding": 5, + "autosize": {"type": "fit", "resize": true}, + "data": [ + { + "name": "table", + "url": "data/movies.json", + "transform": [ + { + "type": "extent", "field": "IMDB Rating", + "signal": "extent" + }, + { + "type": "bin", "signal": "bins", + "field": "IMDB Rating", "extent": {"signal": "extent"}, + "maxbins": 10 + } + ] + }, + { + "name": "counts", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] != null" + }, + { + "type": "aggregate", + "groupby": ["bin0", "bin1"] + } + ] + }, + { + "name": "nulls", + "source": "table", + "transform": [ + { + "type": "filter", + "expr": "datum['IMDB Rating'] == null" + }, + { + "type": "aggregate", + "groupby": [] + } + ] + } + ], + "signals": [ + { + "name": "maxbins", "value": 10 + }, + { + "name": "binCount", + "update": "(bins.stop - bins.start) / bins.step" + }, + { + "name": "nullGap", "value": 10 + }, + { + "name": "barStep", + "update": "(width - nullGap) / (1 + binCount)" + } + ], + "scales": [ + { + "name": "yscale", + "type": "linear", + "range": "height", + "round": true, "nice": true, + "domain": { + "fields": [ + {"data": "counts", "field": "count"}, + {"data": "nulls", "field": "count"} + ] + } + }, + { + "name": "xscale", + "type": "linear", + "range": [{"signal": "barStep + nullGap"}, {"signal": "width"}], + "round": true, + "domain": {"signal": "[bins.start, bins.stop]"}, + "bins": {"signal": "bins"} + }, + { + "name": "xscale-null", + "type": "band", + "range": [0, {"signal": "barStep"}], + "round": true, + "domain": [null] + } + ], + + "axes": [ + {"orient": "bottom", "scale": "xscale", "tickMinStep": 0.5}, + {"orient": "bottom", "scale": "xscale-null"}, + {"orient": "left", "scale": "yscale", "tickCount": 5, "offset": 5} + ], + + "marks": [ + { + "type": "rect", + "from": {"data": "counts"}, + "encode": { + "update": { + "x": {"scale": "xscale", "field": "bin0", "offset": 1}, + "x2": {"scale": "xscale", "field": "bin1"}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "steelblue"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + }, + { + "type": "rect", + "from": {"data": "nulls"}, + "encode": { + "update": { + "x": {"scale": "xscale-null", "value": null, "offset": 1}, + "x2": {"scale": "xscale-null", "band": 1}, + "y": {"scale": "yscale", "field": "count"}, + "y2": {"scale": "yscale", "value": 0}, + "fill": {"value": "#aaa"} + }, + "hover": { + "fill": {"value": "firebrick"} + } + } + } + ] + } + "##; + serde_json::from_str(spec_str).unwrap() +} diff --git a/vegafusion-core/src/proto/pretransform.proto b/vegafusion-core/src/proto/pretransform.proto index cba6700c..d106fa26 100644 --- a/vegafusion-core/src/proto/pretransform.proto +++ b/vegafusion-core/src/proto/pretransform.proto @@ -49,16 +49,16 @@ message PreTransformVariable { } message PreTransformValuesOpts { - repeated PreTransformVariable variables = 1; - optional uint32 row_limit = 2; - string local_tz = 3; - optional string default_input_tz = 4; + optional uint32 row_limit = 1; + string local_tz = 2; + optional string default_input_tz = 3; } message PreTransformValuesRequest { string spec = 1; - repeated tasks.InlineDataset inline_datasets = 2; - PreTransformValuesOpts opts = 3; + repeated PreTransformVariable variables = 2; + repeated tasks.InlineDataset inline_datasets = 3; + PreTransformValuesOpts opts = 4; } message PreTransformValuesResponse { diff --git a/vegafusion-core/src/proto/prost_gen/pretransform.rs b/vegafusion-core/src/proto/prost_gen/pretransform.rs index 3ebc33ab..e314003f 100644 --- a/vegafusion-core/src/proto/prost_gen/pretransform.rs +++ b/vegafusion-core/src/proto/prost_gen/pretransform.rs @@ -80,13 +80,11 @@ pub struct PreTransformVariable { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreTransformValuesOpts { - #[prost(message, repeated, tag = "1")] - pub variables: ::prost::alloc::vec::Vec, - #[prost(uint32, optional, tag = "2")] + #[prost(uint32, optional, tag = "1")] pub row_limit: ::core::option::Option, - #[prost(string, tag = "3")] + #[prost(string, tag = "2")] pub local_tz: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] + #[prost(string, optional, tag = "3")] pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -95,8 +93,10 @@ pub struct PreTransformValuesRequest { #[prost(string, tag = "1")] pub spec: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] + pub variables: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] pub inline_datasets: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "4")] pub opts: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/vegafusion-core/src/proto/tonic_gen/pretransform.rs b/vegafusion-core/src/proto/tonic_gen/pretransform.rs index 3ebc33ab..e314003f 100644 --- a/vegafusion-core/src/proto/tonic_gen/pretransform.rs +++ b/vegafusion-core/src/proto/tonic_gen/pretransform.rs @@ -80,13 +80,11 @@ pub struct PreTransformVariable { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreTransformValuesOpts { - #[prost(message, repeated, tag = "1")] - pub variables: ::prost::alloc::vec::Vec, - #[prost(uint32, optional, tag = "2")] + #[prost(uint32, optional, tag = "1")] pub row_limit: ::core::option::Option, - #[prost(string, tag = "3")] + #[prost(string, tag = "2")] pub local_tz: ::prost::alloc::string::String, - #[prost(string, optional, tag = "4")] + #[prost(string, optional, tag = "3")] pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -95,8 +93,10 @@ pub struct PreTransformValuesRequest { #[prost(string, tag = "1")] pub spec: ::prost::alloc::string::String, #[prost(message, repeated, tag = "2")] + pub variables: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "3")] pub inline_datasets: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "3")] + #[prost(message, optional, tag = "4")] pub opts: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/vegafusion-core/src/runtime/grpc_runtime.rs b/vegafusion-core/src/runtime/grpc_runtime.rs index fc4a7eaf..9789d6fa 100644 --- a/vegafusion-core/src/runtime/grpc_runtime.rs +++ b/vegafusion-core/src/runtime/grpc_runtime.rs @@ -1,35 +1,21 @@ use crate::{ data::dataset::VegaFusionDataset, proto::gen::{ - pretransform::{ - PreTransformExtractOpts, PreTransformExtractRequest, PreTransformExtractWarning, - PreTransformSpecOpts, PreTransformSpecRequest, PreTransformSpecWarning, - PreTransformValuesOpts, PreTransformValuesRequest, PreTransformValuesWarning, - }, services::{ - pre_transform_extract_result, pre_transform_spec_result, pre_transform_values_result, query_request, query_result, vega_fusion_runtime_client::VegaFusionRuntimeClient, QueryRequest, }, tasks::{NodeValueIndex, TaskGraph, TaskGraphValueRequest}, }, - spec::chart::ChartSpec, - task_graph::task_value::{NamedTaskValue, TaskValue}, + task_graph::task_value::NamedTaskValue, }; +use super::{runtime::encode_inline_datasets, VegaFusionRuntimeTrait}; use async_mutex::Mutex; use async_trait::async_trait; use std::collections::HashMap; use std::{any::Any, sync::Arc}; -use vegafusion_common::{ - data::table::VegaFusionTable, - error::{Result, VegaFusionError}, -}; - -use super::{ - runtime::{encode_inline_datasets, PreTransformExtractTable}, - VegaFusionRuntimeTrait, -}; +use vegafusion_common::error::{Result, VegaFusionError}; #[derive(Clone)] pub struct GrpcVegaFusionRuntime { @@ -75,116 +61,6 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime { )), } } - - async fn pre_transform_spec( - &self, - spec: &ChartSpec, - inline_datasets: &HashMap, - options: &PreTransformSpecOpts, - ) -> Result<(ChartSpec, Vec)> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformSpecRequest { - spec: serde_json::to_string(spec)?, - inline_datasets, - opts: Some(options.clone()), - }; - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_spec(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_spec_result::Result::Response(response) => { - Ok((serde_json::from_str(&response.spec)?, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } - - async fn pre_transform_extract( - &self, - spec: &ChartSpec, - inline_datasets: &HashMap, - options: &PreTransformExtractOpts, - ) -> Result<( - ChartSpec, - Vec, - Vec, - )> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformExtractRequest { - spec: serde_json::to_string(spec)?, - inline_datasets, - opts: Some(options.clone()), - }; - - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_extract(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_extract_result::Result::Response(response) => { - let spec: ChartSpec = serde_json::from_str(&response.spec)?; - let datasets = response - .datasets - .into_iter() - .map(|ds| { - Ok(PreTransformExtractTable { - name: ds.name, - scope: ds.scope, - table: VegaFusionTable::from_ipc_bytes(&ds.table)?, - }) - }) - .collect::>>()?; - Ok((spec, datasets, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } - - async fn pre_transform_values( - &self, - spec: &ChartSpec, - inline_datasets: &HashMap, - options: &PreTransformValuesOpts, - ) -> Result<(Vec, Vec)> { - let inline_datasets = encode_inline_datasets(inline_datasets)?; - - let request = PreTransformValuesRequest { - spec: serde_json::to_string(spec)?, - inline_datasets, - opts: Some(options.clone()), - }; - - let mut locked_client = self.client.lock().await; - let response = locked_client - .pre_transform_values(request) - .await - .map_err(|e| VegaFusionError::internal(e.to_string()))?; - - match response.into_inner().result.unwrap() { - pre_transform_values_result::Result::Response(response) => { - let values = response - .values - .into_iter() - .map(|v| TaskValue::try_from(&v.value.unwrap())) - .collect::>>()?; - Ok((values, response.warnings)) - } - _ => Err(VegaFusionError::internal( - "Invalid grpc response type".to_string(), - )), - } - } } impl GrpcVegaFusionRuntime { diff --git a/vegafusion-core/src/runtime/runtime.rs b/vegafusion-core/src/runtime/runtime.rs index 120349f1..99344e4f 100644 --- a/vegafusion-core/src/runtime/runtime.rs +++ b/vegafusion-core/src/runtime/runtime.rs @@ -244,13 +244,14 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { async fn pre_transform_values( &self, spec: &ChartSpec, + variables: &[ScopedVariable], inline_datasets: &HashMap, options: &PreTransformValuesOpts, ) -> Result<(Vec, Vec)> { // Check that requested variables exist and collect indices - for var in &options.variables { - let scope = var.scope.as_slice(); - let variable = var.variable.clone().unwrap(); + for var in variables { + let scope = var.1.as_slice(); + let variable = var.0.clone(); let name = variable.name.clone(); let namespace = variable.clone().ns(); @@ -282,12 +283,7 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { // Make sure planner keeps the requested variables, event // if they are not used elsewhere in the spec - let keep_variables = options - .variables - .clone() - .into_iter() - .map(|v| (v.variable.unwrap(), v.scope)) - .collect(); + let keep_variables = Vec::from(variables); // Create spec plan let plan = SpecPlan::try_new( @@ -333,13 +329,10 @@ pub trait VegaFusionRuntimeTrait: Send + Sync { } // Collect node indices for variables - let indices: Vec<_> = options - .variables + let indices: Vec<_> = variables .iter() .map(|var| { - if let Some(index) = - task_graph_mapping.get(&(var.variable.clone().unwrap(), var.scope.clone())) - { + if let Some(index) = task_graph_mapping.get(&(var.0.clone(), var.1.clone())) { Ok(index.clone()) } else { Err(VegaFusionError::pre_transform(format!( diff --git a/vegafusion-python/pyproject.toml b/vegafusion-python/pyproject.toml index ccfbdc09..cb66628c 100644 --- a/vegafusion-python/pyproject.toml +++ b/vegafusion-python/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ "License :: OSI Approved :: BSD License", "Topic :: Scientific/Engineering :: Visualization", ] -dependencies = ["arro3-core", "packaging", "narwhals>=1.9"] +dependencies = ["arro3-core", "packaging", "narwhals>=1.13"] [[project.authors]] name = "VegaFusion Contributors" diff --git a/vegafusion-python/src/lib.rs b/vegafusion-python/src/lib.rs index 80842b99..96de1674 100644 --- a/vegafusion-python/src/lib.rs +++ b/vegafusion-python/src/lib.rs @@ -36,7 +36,7 @@ use vegafusion_core::task_graph::task_value::TaskValue; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; use vegafusion_core::runtime::VegaFusionRuntimeTrait; -use vegafusion_runtime::datafusion::context::make_datafusion_context; +use vegafusion_runtime::task_graph::cache::VegaFusionCache; static INIT: Once = Once::new(); @@ -211,11 +211,10 @@ impl PyVegaFusionRuntime { .external("Failed to create Tokio thread pool")?; Ok(Self { - runtime: Arc::new(VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), + runtime: Arc::new(VegaFusionRuntime::new(Some(VegaFusionCache::new( max_capacity, memory_limit, - )), + )))), tokio_runtime: Arc::new(tokio_runtime_connection), }) } @@ -363,24 +362,17 @@ impl PyVegaFusionRuntime { .collect(); let (values, warnings) = py.allow_threads(|| { - self.tokio_runtime.block_on( - self.runtime.pre_transform_values( + self.tokio_runtime + .block_on(self.runtime.pre_transform_values( &spec, + &variables, &inline_datasets, &PreTransformValuesOpts { - variables: variables - .into_iter() - .map(|v| PreTransformVariable { - variable: Some(v.0), - scope: v.1, - }) - .collect(), local_tz, default_input_tz, row_limit, }, - ), - ) + )) })?; let warnings: Vec<_> = warnings diff --git a/vegafusion-python/vegafusion/runtime.py b/vegafusion-python/vegafusion/runtime.py index b915f65e..501e849f 100644 --- a/vegafusion-python/vegafusion/runtime.py +++ b/vegafusion-python/vegafusion/runtime.py @@ -29,15 +29,15 @@ UnaryUnaryMultiCallable = Any -def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> str | None: - namespaces = set() +def _get_common_namespace(inline_datasets: dict[str, Any] | None) -> ModuleType | None: + namespaces: set[ModuleType] = set() try: if inline_datasets is not None: for df in inline_datasets.values(): namespaces.add(nw.get_native_namespace(nw.from_native(df))) if len(namespaces) == 1: - return str(next(iter(namespaces)).__name__) + return next(iter(namespaces)) else: return None except TypeError: @@ -80,6 +80,9 @@ class PreTransformWarning(TypedDict): message: str +DatasetFormat = Literal["auto", "polars", "pandas", "pyarrow", "arro3"] + + class ChartState: def __init__(self, chart_state: PyChartState | PyChartStateGrpc) -> None: self._chart_state = chart_state @@ -472,8 +475,10 @@ def pre_transform_datasets( row_limit: int | None = None, inline_datasets: dict[str, DataFrameLike] | None = None, trim_unused_columns: bool = False, + dataset_format: DatasetFormat = "auto", ) -> tuple[list[DataFrameLike], list[dict[str, str]]]: - """Extract the fully evaluated form of the requested datasets from a Vega + """ + Extract the fully evaluated form of the requested datasets from a Vega specification. Extracts datasets as pandas DataFrames. @@ -481,15 +486,16 @@ def pre_transform_datasets( Args: spec: A Vega specification dict or JSON string. datasets: A list with elements that are either: - - The name of a top-level dataset as a string - - A two-element tuple where the first element is the name of a dataset + + * The name of a top-level dataset as a string + * A two-element tuple where the first element is the name of a dataset as a string and the second element is the nested scope of the dataset as a list of integers - local_tz: Name of timezone to be considered local. E.g. 'America/New_York'. - Defaults to the value of vf.get_local_tz(), which defaults to the - system timezone if one can be determined. - default_input_tz: Name of timezone (e.g. 'America/New_York') that naive - datetime strings should be interpreted in. Defaults to `local_tz`. + local_tz: Name of timezone to be considered local. E.g. + ``'America/New_York'``. Defaults to the value of vf.get_local_tz(), + which defaults to the system timezone if one can be determined. + default_input_tz: Name of timezone (e.g. ``'America/New_York'``) that naive + datetime strings should be interpreted in. Defaults to ``local_tz``. row_limit: Maximum number of dataset rows to include in the returned datasets. If exceeded, datasets will be truncated to this number of rows and a RowLimitExceeded warning will be included in the resulting @@ -500,19 +506,22 @@ def pre_transform_datasets( or 'table://{dataset_name}'. trim_unused_columns: If True, unused columns are removed from returned datasets. - - Returns: - A tuple containing: - - List of pandas DataFrames corresponding to the input datasets list - - A list of warnings as dictionaries. Each warning dict has a 'type' + dataset_format: Format for returned datasets. One of: + + * ``"auto"``: (default) Infer the result type based on the types of + inline datasets. If no inline datasets are provided, return type will + depend on installed packages. + * ``"polars"``: polars.DataFrame + * ``"pandas"``: pandas.DataFrame + * ``"pyarrow"``: pyarrow.Table + * ``"arro3"``: arro3.Table + + Returns: Two-element tuple + * List of pandas DataFrames corresponding to the input datasets list + * A list of warnings as dictionaries. Each warning dict has a 'type' key indicating the warning type, and a 'message' key containing a description of the warning. """ - if not TYPE_CHECKING: - pl = sys.modules.get("polars", None) - pa = sys.modules.get("pyarrow", None) - pd = sys.modules.get("pandas", None) - local_tz = local_tz or get_local_tz() # Build input variables @@ -535,49 +544,76 @@ def pre_transform_datasets( inline_datasets=inline_arrow_dataset, ) - # Wrap result dataframes in native format, then with Narwhals so that - # we can manipulate them with a uniform API - namespace = _get_common_namespace(inline_datasets) - if namespace == "polars" and pl is not None: - nw_dataframes = [nw.from_native(pl.DataFrame(value)) for value in values] - - elif namespace == "pyarrow" and pa is not None: - nw_dataframes = [nw.from_native(pa.table(value)) for value in values] - elif namespace == "pandas" and pd is not None and pa is not None: - nw_dataframes = [ - nw.from_native(pa.table(value).to_pandas()) for value in values - ] + def normalize_timezones( + dfs: list[nw.DataFrame[IntoFrameT] | nw.LazyFrame[IntoFrameT]], + ) -> list[DataFrameLike]: + # Convert to `local_tz` (or, set to UTC and then convert if starting + # from time-zone-naive data), then extract the native DataFrame to return. + processed_datasets = [] + for df in dfs: + df = df.with_columns( + nw.col(col).dt.convert_time_zone(local_tz) + for col, dtype in df.schema.items() + if dtype == nw.Datetime + ) + processed_datasets.append(df.to_native()) + return processed_datasets + + # Wrap result dataframes in Narwhals, using the input type and arrow + # PyCapsule interface + if dataset_format != "auto": + if dataset_format == "polars": + import polars as pl + + datasets = normalize_timezones( + [nw.from_native(pl.DataFrame(value)) for value in values] + ) + elif dataset_format == "pandas": + import pyarrow as pa + + datasets = normalize_timezones( + [nw.from_native(pa.table(value).to_pandas()) for value in values] + ) + elif dataset_format == "pyarrow": + import pyarrow as pa + + datasets = normalize_timezones( + [nw.from_native(pa.table(value)) for value in values] + ) + elif dataset_format == "arro3": + # Pass through arrof3 + datasets = values + else: + raise ValueError(f"Unrecognized dataset_format: {dataset_format}") + elif (namespace := _get_common_namespace(inline_datasets)) is not None: + # Infer the type from the inline datasets + datasets = normalize_timezones( + [nw.from_arrow(value, native_namespace=namespace) for value in values] + ) else: # Either no inline datasets, inline datasets with mixed or # unrecognized types - if pa is not None and pd is not None: - nw_dataframes = [ - nw.from_native(pa.table(value).to_pandas()) for value in values - ] - elif pl is not None: - nw_dataframes = [ - nw.from_native(pl.DataFrame(value)) for value in values - ] - else: - # Hopefully narwhals will eventually help us fall back to whatever - # is installed here - raise ValueError( - "Either polars or pandas must be installed to extract " - "transformed data" + try: + # Try pandas + import pandas as _pd # noqa: F401 + import pyarrow as pa + + datasets = normalize_timezones( + [nw.from_native(pa.table(value).to_pandas()) for value in values] ) + except ImportError: + try: + # Try polars + import polars as pl - # Convert to `local_tz` (or, set to UTC and then convert if starting - # from time-zone-naive data), then extract the native DataFrame to return. - processed_datasets = [] - for df in nw_dataframes: - df = df.with_columns( - nw.col(col).dt.convert_time_zone(local_tz) - for col, dtype in df.schema.items() - if dtype == nw.Datetime - ) - processed_datasets.append(df.to_native()) + datasets = normalize_timezones( + [nw.from_native(pl.DataFrame(value)) for value in values] + ) + except ImportError: + # Fall back to arro3 + datasets = values - return processed_datasets, warnings + return datasets, warnings def pre_transform_extract( self, @@ -597,7 +633,7 @@ def pre_transform_extract( Evaluate supported transforms in an input Vega specification. Produces a new specification with small pre-transformed datasets (under 100 - rows) included inline and larger inline datasets (100 rows or more) extracted + rows) included inline and larger inline datasets (20 rows or more) extracted into pyarrow tables. Args: diff --git a/vegafusion-runtime/benches/spec_benchmarks.rs b/vegafusion-runtime/benches/spec_benchmarks.rs index 986d4491..d229fd5f 100644 --- a/vegafusion-runtime/benches/spec_benchmarks.rs +++ b/vegafusion-runtime/benches/spec_benchmarks.rs @@ -52,8 +52,7 @@ async fn eval_spec_get_variable(full_spec: ChartSpec, var: &ScopedVariable) -> V let task_graph_mapping = task_graph.build_mapping(); // Initialize task graph runtime - let ctx = make_datafusion_context(); - let runtime = VegaFusionRuntime::new(Arc::new(ctx), Some(64), None); + let runtime = VegaFusionRuntime::new(None); let node_index = task_graph_mapping.get(var).unwrap(); @@ -104,8 +103,7 @@ async fn eval_spec_sequence(full_spec: ChartSpec, full_updates: Vec tokio::runtime::Runtime { diff --git a/vegafusion-runtime/src/task_graph/runtime.rs b/vegafusion-runtime/src/task_graph/runtime.rs index c0932584..8ad2c2c4 100644 --- a/vegafusion-runtime/src/task_graph/runtime.rs +++ b/vegafusion-runtime/src/task_graph/runtime.rs @@ -1,3 +1,4 @@ +use crate::datafusion::context::make_datafusion_context; use crate::task_graph::cache::VegaFusionCache; use crate::task_graph::task::TaskCall; use crate::task_graph::timezone::RuntimeTzConfig; @@ -25,14 +26,10 @@ pub struct VegaFusionRuntime { } impl VegaFusionRuntime { - pub fn new( - ctx: Arc, - capacity: Option, - memory_limit: Option, - ) -> Self { + pub fn new(cache: Option) -> Self { Self { - cache: VegaFusionCache::new(capacity, memory_limit), - ctx, + cache: cache.unwrap_or_else(|| VegaFusionCache::new(Some(32), None)), + ctx: Arc::new(make_datafusion_context()), } } diff --git a/vegafusion-runtime/src/transform/timeunit.rs b/vegafusion-runtime/src/transform/timeunit.rs index b2aa1f3a..d85df296 100644 --- a/vegafusion-runtime/src/transform/timeunit.rs +++ b/vegafusion-runtime/src/transform/timeunit.rs @@ -289,7 +289,7 @@ fn timeunit_custom_udf( } /// Convert a column to a timezone aware timestamp with Millisecond precision, in UTC -pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String) -> Result { +pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &str) -> Result { Ok(match expr.get_type(schema)? { DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(_)) => expr, DataType::Timestamp(_, Some(tz)) => expr.try_cast_to( @@ -297,10 +297,7 @@ pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String schema, )?, DataType::Timestamp(_, None) => expr.try_cast_to( - &DataType::Timestamp( - ArrowTimeUnit::Millisecond, - Some(default_input_tz.as_str().into()), - ), + &DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(default_input_tz.into())), schema, )?, DataType::Date32 | DataType::Date64 => cast_to( @@ -309,10 +306,7 @@ pub fn to_timestamp_col(expr: Expr, schema: &DFSchema, default_input_tz: &String schema, )? .try_cast_to( - &DataType::Timestamp( - ArrowTimeUnit::Millisecond, - Some(default_input_tz.as_str().into()), - ), + &DataType::Timestamp(ArrowTimeUnit::Millisecond, Some(default_input_tz.into())), schema, )?, DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => { diff --git a/vegafusion-runtime/tests/test_chart_state.rs b/vegafusion-runtime/tests/test_chart_state.rs index 9df8fee1..6f9afe88 100644 --- a/vegafusion-runtime/tests/test_chart_state.rs +++ b/vegafusion-runtime/tests/test_chart_state.rs @@ -8,12 +8,10 @@ mod tests { use crate::crate_dir; use serde_json::json; use std::fs; - use std::sync::Arc; use vegafusion_core::chart_state::ChartState; use vegafusion_core::planning::watch::{ExportUpdateJSON, ExportUpdateNamespace}; use vegafusion_core::proto::gen::tasks::TzConfig; use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -27,11 +25,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let chart_state = ChartState::try_new( &runtime, diff --git a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs index 1940c132..e1e52fd7 100644 --- a/vegafusion-runtime/tests/test_destringify_selection_datasets.rs +++ b/vegafusion-runtime/tests/test_destringify_selection_datasets.rs @@ -7,12 +7,10 @@ fn crate_dir() -> String { mod tests { use crate::crate_dir; use std::fs; - use std::sync::Arc; use vegafusion_core::proto::gen::pretransform::PreTransformSpecOpts; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::spec::transform::TransformSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -26,11 +24,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (chart_spec, _warnings) = runtime .pre_transform_spec( diff --git a/vegafusion-runtime/tests/test_image_comparison.rs b/vegafusion-runtime/tests/test_image_comparison.rs index 524d33be..bb7d0c75 100644 --- a/vegafusion-runtime/tests/test_image_comparison.rs +++ b/vegafusion-runtime/tests/test_image_comparison.rs @@ -26,7 +26,6 @@ use vegafusion_core::proto::gen::tasks::{TaskGraph, TzConfig}; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::graph::ScopedVariable; use vegafusion_core::task_graph::task_value::TaskValue; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use vegafusion_runtime::tokio_runtime::TOKIO_THREAD_STACK_SIZE; @@ -1177,7 +1176,6 @@ mod test_pre_transform_inline { use super::*; use crate::util::datasets::vega_json_dataset_async; use vegafusion_core::{data::dataset::VegaFusionDataset, runtime::VegaFusionRuntimeTrait}; - use vegafusion_runtime::datafusion::context::make_datafusion_context; #[tokio::test] async fn test() { @@ -1186,11 +1184,7 @@ mod test_pre_transform_inline { let vegajs_runtime = vegajs_runtime(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Get timezone let local_tz = vegajs_runtime.nodejs_runtime.local_timezone().unwrap(); @@ -1342,11 +1336,7 @@ async fn check_pre_transform_spec_from_files(spec_name: &str, tolerance: f64) { let vegajs_runtime = vegajs_runtime(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Get timezone let local_tz = vegajs_runtime.nodejs_runtime.local_timezone().unwrap(); @@ -1465,11 +1455,7 @@ async fn check_spec_sequence( .collect(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Extract the initial values of all of the variables that should be sent from the // server to the client diff --git a/vegafusion-runtime/tests/test_planning.rs b/vegafusion-runtime/tests/test_planning.rs index 731f0489..9c7830f9 100644 --- a/vegafusion-runtime/tests/test_planning.rs +++ b/vegafusion-runtime/tests/test_planning.rs @@ -10,7 +10,6 @@ use vegafusion_core::planning::split_domain_data::split_domain_data; use vegafusion_core::planning::stitch::stitch_specs; use vegafusion_core::planning::strip_encodings::strip_encodings; -use vegafusion_runtime::datafusion::context::make_datafusion_context; #[tokio::test(flavor = "multi_thread")] async fn test_extract_server_data() { @@ -62,11 +61,7 @@ async fn test_extract_server_data() { let mapping = graph.build_mapping(); // println!("{:#?}", mapping); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); let _data_3 = graph_runtime .get_node_value( graph.clone(), diff --git a/vegafusion-runtime/tests/test_pre_transform_extract.rs b/vegafusion-runtime/tests/test_pre_transform_extract.rs index ffdd805e..98fbc97e 100644 --- a/vegafusion-runtime/tests/test_pre_transform_extract.rs +++ b/vegafusion-runtime/tests/test_pre_transform_extract.rs @@ -10,12 +10,9 @@ mod tests { use vegafusion_core::proto::gen::pretransform::PreTransformExtractOpts; use std::fs; - use std::sync::Arc; - - use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::runtime::VegaFusionRuntimeTrait; - use vegafusion_runtime::datafusion::context::make_datafusion_context; + use vegafusion_core::spec::chart::ChartSpec; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -29,11 +26,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (tx_spec, datasets, warnings) = runtime .pre_transform_extract( diff --git a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs index 02d31dc8..e69fe4a7 100644 --- a/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs +++ b/vegafusion-runtime/tests/test_pre_transform_keep_variables.rs @@ -8,15 +8,11 @@ mod tests { use crate::crate_dir; use std::fs; - use std::sync::Arc; use vegafusion_common::error::VegaFusionError; use vegafusion_core::proto::gen::pretransform::{PreTransformSpecOpts, PreTransformVariable}; use vegafusion_core::proto::gen::tasks::Variable; - - use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_core::runtime::VegaFusionRuntimeTrait; - use vegafusion_runtime::datafusion::context::make_datafusion_context; + use vegafusion_core::spec::chart::ChartSpec; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -30,11 +26,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (tx_spec, warnings) = runtime .pre_transform_spec( diff --git a/vegafusion-runtime/tests/test_pre_transform_values.rs b/vegafusion-runtime/tests/test_pre_transform_values.rs index 3173042c..f1c5836d 100644 --- a/vegafusion-runtime/tests/test_pre_transform_values.rs +++ b/vegafusion-runtime/tests/test_pre_transform_values.rs @@ -20,18 +20,15 @@ mod tests { use std::collections::HashMap; use std::env; use std::fs; - use std::sync::Arc; use vegafusion_common::data::table::VegaFusionTable; use vegafusion_common::error::VegaFusionError; use vegafusion_core::data::dataset::VegaFusionDataset; use vegafusion_core::proto::gen::pretransform::pre_transform_values_warning::WarningType; use vegafusion_core::proto::gen::pretransform::PreTransformValuesOpts; - use vegafusion_core::proto::gen::pretransform::PreTransformVariable; use vegafusion_core::proto::gen::tasks::Variable; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::spec::values::StringOrSignalSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test] @@ -42,21 +39,14 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -98,21 +88,14 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: Some(3), local_tz: "UTC".to_string(), default_input_tz: None, @@ -154,22 +137,15 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // Check existent but unsupported dataset name let result = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -180,7 +156,7 @@ mod tests { if let Err(VegaFusionError::PreTransformError(err, _)) = result { assert_eq!( err, - "Requested variable PreTransformVariable { variable: Some(Variable { name: \"source_0\", namespace: Data }), scope: [] }\n \ + "Requested variable (Variable { name: \"source_0\", namespace: Data }, [])\n \ requires transforms or signal expressions that are not yet supported" ) } else { @@ -191,12 +167,9 @@ mod tests { let result = runtime .pre_transform_values( &spec, + &[(Variable::new_data("bogus_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("bogus_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -222,11 +195,7 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let source_0 = VegaFusionTable::from_json(&json!([{"normal": 1, "a.b": 2}, {"normal": 1, "a.b": 4}])) @@ -241,12 +210,9 @@ mod tests { let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &inline_datasets, &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -285,21 +251,14 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("data_3"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("data_3")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -339,27 +298,17 @@ mod tests { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( &spec, + &[ + (Variable::new_data("click_selected"), vec![]), + (Variable::new_data("drag_selected"), vec![]), + ], &Default::default(), &PreTransformValuesOpts { - variables: vec![ - PreTransformVariable { - variable: Some(Variable::new_data("click_selected")), - scope: vec![], - }, - PreTransformVariable { - variable: Some(Variable::new_data("drag_selected")), - scope: vec![], - }, - ], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, @@ -433,21 +382,14 @@ mod tests { ))); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (values, warnings) = runtime .pre_transform_values( &spec, + &[(Variable::new_data("source_0"), vec![])], &Default::default(), &PreTransformValuesOpts { - variables: vec![PreTransformVariable { - variable: Some(Variable::new_data("source_0")), - scope: vec![], - }], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, diff --git a/vegafusion-runtime/tests/test_stringify_datetimes.rs b/vegafusion-runtime/tests/test_stringify_datetimes.rs index 238cc794..5f6616cd 100644 --- a/vegafusion-runtime/tests/test_stringify_datetimes.rs +++ b/vegafusion-runtime/tests/test_stringify_datetimes.rs @@ -10,16 +10,20 @@ lazy_static! { .unwrap(); } +fn crate_dir() -> String { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .display() + .to_string() +} + #[cfg(test)] mod test_stringify_datetimes { use crate::{crate_dir, TOKIO_RUNTIME}; use rstest::rstest; use std::fs; - use std::sync::Arc; use vegafusion_core::proto::gen::pretransform::PreTransformSpecOpts; use vegafusion_core::runtime::VegaFusionRuntimeTrait; use vegafusion_core::spec::chart::ChartSpec; - use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[rstest( @@ -82,11 +86,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let local_tz = local_tz.to_string(); let (spec, _warnings) = runtime @@ -139,11 +139,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); // let local_tz = "America/New_York".to_string(); let local_tz = "UTC".to_string(); let default_input_tz = "UTC".to_string(); @@ -229,11 +225,7 @@ mod test_stringify_datetimes { let spec_str = fs::read_to_string(spec_path).unwrap(); let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = runtime .pre_transform_spec( @@ -298,11 +290,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = TOKIO_RUNTIME .block_on(runtime.pre_transform_spec( @@ -347,11 +335,7 @@ mod test_stringify_datetimes { let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); // Initialize task graph runtime - let runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(16), - Some(1024_i32.pow(3) as usize), - ); + let runtime = VegaFusionRuntime::new(None); let (spec, _warnings) = runtime .pre_transform_spec( @@ -386,9 +370,3 @@ mod test_stringify_datetimes { assert_eq!(month_date, "2012-01-01T00:00:00.000"); } } - -fn crate_dir() -> String { - std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .display() - .to_string() -} diff --git a/vegafusion-runtime/tests/test_task_graph_runtime.rs b/vegafusion-runtime/tests/test_task_graph_runtime.rs index c971a8c9..fcf1ef28 100644 --- a/vegafusion-runtime/tests/test_task_graph_runtime.rs +++ b/vegafusion-runtime/tests/test_task_graph_runtime.rs @@ -12,7 +12,6 @@ use vegafusion_core::proto::gen::transforms::{ use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::task_graph::scope::TaskScope; use vegafusion_core::task_graph::task_value::TaskValue; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; #[tokio::test(flavor = "multi_thread")] @@ -81,11 +80,7 @@ async fn try_it() { ]; let graph = Arc::new(TaskGraph::new(tasks, &task_scope).unwrap()); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); // let result = graph_runtime.get_node_value(graph, 2, None).await.unwrap(); let result = graph_runtime .get_node_value(graph, &NodeValueIndex::new(2, Some(0)), Default::default()) @@ -143,11 +138,7 @@ async fn try_it_from_spec() { let graph = Arc::new(TaskGraph::new(tasks, &task_scope).unwrap()); - let graph_runtime = VegaFusionRuntime::new( - Arc::new(make_datafusion_context()), - Some(20), - Some(1024_i32.pow(3) as usize), - ); + let graph_runtime = VegaFusionRuntime::new(None); let result = graph_runtime .get_node_value(graph, &NodeValueIndex::new(2, Some(0)), Default::default()) .await diff --git a/vegafusion-server/src/main.rs b/vegafusion-server/src/main.rs index 7a567d8d..2d0ecf9e 100644 --- a/vegafusion-server/src/main.rs +++ b/vegafusion-server/src/main.rs @@ -20,13 +20,13 @@ use vegafusion_core::task_graph::graph::ScopedVariable; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use clap::Parser; -use datafusion::prelude::SessionContext; use regex::Regex; use vegafusion_core::proto::gen::pretransform::{ PreTransformExtractDataset, PreTransformExtractRequest, PreTransformExtractResponse, PreTransformSpecOpts, PreTransformSpecRequest, PreTransformSpecResponse, PreTransformValuesOpts, PreTransformValuesRequest, PreTransformValuesResponse, }; +use vegafusion_runtime::task_graph::cache::VegaFusionCache; #[derive(Clone)] pub struct VegaFusionRuntimeGrpc { @@ -175,31 +175,27 @@ impl VegaFusionRuntimeGrpc { .opts .clone() .unwrap_or_else(|| PreTransformValuesOpts { - variables: vec![], row_limit: None, local_tz: "UTC".to_string(), default_input_tz: None, }); + let variables: Vec = request + .variables + .iter() + .map(|v| (v.variable.clone().unwrap(), v.scope.clone())) + .collect::>(); + // Extract and deserialize inline datasets let inline_datasets = decode_inline_datasets(request.inline_datasets)?; - // Extract requested variables - let variables: Vec = request - .opts - .map(|opts| opts.variables) - .unwrap_or_default() - .into_iter() - .map(|var| (var.variable.unwrap(), var.scope)) - .collect(); - // Parse spec let spec_string = request.spec; let spec: ChartSpec = serde_json::from_str(&spec_string)?; let (values, warnings) = self .runtime - .pre_transform_values(&spec, &inline_datasets, &opts) + .pre_transform_values(&spec, &variables, &inline_datasets, &opts) .await?; let response_values: Vec<_> = values @@ -325,11 +321,10 @@ async fn main() -> Result<(), VegaFusionError> { None }; - let tg_runtime = VegaFusionRuntime::new( - Arc::new(SessionContext::new()), + let tg_runtime = VegaFusionRuntime::new(Some(VegaFusionCache::new( Some(args.capacity), memory_limit, - ); + ))); grpc_server(grpc_address, tg_runtime.clone(), args.web) .await diff --git a/vegafusion-wasm/src/lib.rs b/vegafusion-wasm/src/lib.rs index 95b284e1..b581b777 100644 --- a/vegafusion-wasm/src/lib.rs +++ b/vegafusion-wasm/src/lib.rs @@ -32,7 +32,6 @@ use vegafusion_core::spec::chart::ChartSpec; use vegafusion_core::chart_state::ChartState; use vegafusion_core::data::dataset::VegaFusionDataset; use vegafusion_core::get_column_usage; -use vegafusion_runtime::datafusion::context::make_datafusion_context; use vegafusion_runtime::task_graph::runtime::VegaFusionRuntime; use web_sys::Element; @@ -439,8 +438,7 @@ pub async fn vegafusion_embed( let runtime: Box = if query_fn.is_undefined() || query_fn.is_null() { // Use embedded runtime - let ctx = make_datafusion_context(); - Box::new(VegaFusionRuntime::new(Arc::new(ctx), None, None)) + Box::new(VegaFusionRuntime::new(None)) } else { let query_fn = query_fn.dyn_into::().map_err(|e| { JsError::new(&format!(