From a8e9d6521f06267308d8f92b6e7322b15e0236e3 Mon Sep 17 00:00:00 2001 From: Solomon Date: Tue, 16 Jan 2024 20:15:28 +0100 Subject: [PATCH 1/2] feat: BigQuery sink --- Cargo.lock | 217 ++++++++++++++- dozer-cli/src/pipeline/builder.rs | 5 +- dozer-cli/src/simple/executor.rs | 5 +- dozer-sinks/Cargo.toml | 1 + dozer-sinks/bigquery/Cargo.toml | 22 ++ dozer-sinks/bigquery/src/lib.rs | 407 +++++++++++++++++++++++++++++ dozer-sinks/src/lib.rs | 1 + dozer-types/src/helper.rs | 19 ++ dozer-types/src/models/endpoint.rs | 67 +++++ json_schemas/dozer.json | 100 ++++++- 10 files changed, 826 insertions(+), 18 deletions(-) create mode 100644 dozer-sinks/bigquery/Cargo.toml create mode 100644 dozer-sinks/bigquery/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index ca1a57a170..aa628c1e38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2652,7 +2652,7 @@ dependencies = [ "itertools 0.11.0", "log", "num_cpus", - "object_store", + "object_store 0.7.1", "parking_lot", "parquet", "pin-project-lite", @@ -2680,7 +2680,7 @@ dependencies = [ "chrono", "half 2.3.1", "num_cpus", - "object_store", + "object_store 0.7.1", "parquet", "sqlparser 0.39.0", ] @@ -2698,7 +2698,7 @@ dependencies = [ "futures", "hashbrown 0.14.2", "log", - "object_store", + "object_store 0.7.1", "parking_lot", "rand 0.8.5", "tempfile", @@ -2810,7 +2810,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-expr", - "object_store", + "object_store 0.7.1", "prost 0.12.3", ] @@ -2880,7 +2880,7 @@ dependencies = [ "num-bigint", "num-traits", "num_cpus", - "object_store", + "object_store 0.7.1", "once_cell", "parking_lot", "parquet", @@ -3400,7 +3400,7 @@ dependencies = [ "once_cell", "rustls 0.21.9", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "rustls-webpki 0.101.7", "serde", "webpki-roots 0.25.3", @@ -3537,6 +3537,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -3967,7 +3968,7 @@ version = "0.3.0" dependencies = [ "datafusion", "dozer-ingestion-connector", - "object_store", + "object_store 0.7.1", "url", ] @@ -4082,9 +4083,30 @@ dependencies = [ name = "dozer-sinks" version = "0.1.0" dependencies = [ + "dozer-sinks-bigquery", "dozer-sinks-snowflake", ] +[[package]] +name = "dozer-sinks-bigquery" +version = "0.1.0" +dependencies = [ + "bytes", + "chrono", + "dozer-api", + "dozer-core", + "dozer-log", + "dozer-recordstore", + "dozer-types", + "futures-util", + "gcp-bigquery-client", + "html-escape", + "itertools 0.10.5", + "object_store 0.9.0", + "parquet", + "yup-oauth2", +] + [[package]] name = "dozer-sinks-snowflake" version = "0.1.0" @@ -5064,6 +5086,29 @@ dependencies = [ "termcolor", ] +[[package]] +name = "gcp-bigquery-client" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0ce6fcbdaca0a4521a734f2bc7f2f6bd872fe40576e24f8bd0b05732c19a74f" +dependencies = [ + "async-stream", + "async-trait", + "dyn-clone", + "hyper 0.14.27", + "hyper-rustls", + "log", + "reqwest", + "serde", + "serde_json", + "thiserror", + "time 0.3.30", + "tokio", + "tokio-stream", + "url", + "yup-oauth2", +] + [[package]] name = "genawaiter" version = "0.99.1" @@ -5470,6 +5515,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "html-escape" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d1ad449764d627e22bfd7cd5e8868264fc9236e07c752972b4080cd351cb476" +dependencies = [ + "utf8-width", +] + [[package]] name = "http" version = "0.2.11" @@ -5925,6 +5979,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -6710,7 +6773,7 @@ dependencies = [ "rand 0.8.5", "rustc_version_runtime", "rustls 0.21.9", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_bytes", "serde_with", @@ -6792,7 +6855,7 @@ dependencies = [ "pin-project", "priority-queue", "rustls 0.21.9", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "socket2 0.5.5", @@ -7220,6 +7283,15 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -7282,7 +7354,7 @@ dependencies = [ "itertools 0.11.0", "parking_lot", "percent-encoding", - "quick-xml", + "quick-xml 0.30.0", "rand 0.8.5", "reqwest", "ring 0.16.20", @@ -7295,6 +7367,36 @@ dependencies = [ "walkdir", ] +[[package]] +name = "object_store" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000" +dependencies = [ + "async-trait", + "base64 0.21.5", + "bytes", + "chrono", + "futures", + "humantime", + "hyper 0.14.27", + "itertools 0.12.0", + "parking_lot", + "percent-encoding", + "quick-xml 0.31.0", + "rand 0.8.5", + "reqwest", + "ring 0.17.5", + "rustls-pemfile 2.0.0", + "serde", + "serde_json", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "odbc" version = "0.17.0" @@ -7728,7 +7830,7 @@ dependencies = [ "lz4_flex", "num", "num-bigint", - "object_store", + "object_store 0.7.1", "paste", "seq-macro", "snap", @@ -8701,6 +8803,16 @@ dependencies = [ "serde", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "0.6.13" @@ -9025,7 +9137,8 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls 0.21.9", - "rustls-pemfile", + "rustls-native-certs", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -9369,6 +9482,20 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +dependencies = [ + "log", + "ring 0.17.5", + "rustls-pki-types", + "rustls-webpki 0.102.1", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -9376,7 +9503,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -9390,6 +9517,22 @@ dependencies = [ "base64 0.21.5", ] +[[package]] +name = "rustls-pemfile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4" +dependencies = [ + "base64 0.21.5", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" + [[package]] name = "rustls-webpki" version = "0.100.3" @@ -9410,6 +9553,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +dependencies = [ + "ring 0.17.5", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -10985,6 +11139,8 @@ checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "libc", + "num_threads", "powerfmt", "serde", "time-core", @@ -11301,7 +11457,7 @@ dependencies = [ "prost 0.12.3", "rustls 0.21.9", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls 0.24.1", "tokio-stream", @@ -11872,6 +12028,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" + [[package]] name = "utf8parse" version = "0.2.1" @@ -12664,6 +12826,33 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "yup-oauth2" +version = "8.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b61da40aeb0907a65f7fb5c1de83c5a224d6a9ebb83bf918588a2bb744d636b8" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.5", + "futures", + "http", + "hyper 0.14.27", + "hyper-rustls", + "itertools 0.12.0", + "log", + "percent-encoding", + "rustls 0.22.2", + "rustls-pemfile 1.0.4", + "seahash", + "serde", + "serde_json", + "time 0.3.30", + "tokio", + "tower-service", + "url", +] + [[package]] name = "z85" version = "3.0.5" diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index c71e71d53e..fff2dfb519 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -10,12 +10,13 @@ use dozer_core::app::PipelineEntryPoint; use dozer_core::node::SinkFactory; use dozer_core::DEFAULT_PORT_HANDLE; use dozer_ingestion::{get_connector, get_connector_info_table}; +use dozer_sinks::bigquery::BigQuerySinkFactory; use dozer_sql::builder::statement_to_pipeline; use dozer_sql::builder::{OutputNodeInfo, QueryContext}; use dozer_tracing::LabelsAndProgress; use dozer_types::log::debug; use dozer_types::models::connection::Connection; -use dozer_types::models::endpoint::AerospikeSinkConfig; +use dozer_types::models::endpoint::{AerospikeSinkConfig, BigQuery}; use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; @@ -60,6 +61,7 @@ pub enum EndpointLogKind { Api { log: Arc> }, Dummy, Aerospike { config: AerospikeSinkConfig }, + BigQuery(BigQuery), } pub struct PipelineBuilder<'a> { @@ -290,6 +292,7 @@ impl<'a> PipelineBuilder<'a> { EndpointLogKind::Aerospike { config } => { Box::new(AerospikeSinkFactory::new(config)) } + EndpointLogKind::BigQuery(config) => Box::new(BigQuerySinkFactory::new(config)), }; match table_info { diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index dc76777942..dab24b31cb 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -5,7 +5,7 @@ use dozer_cache::dozer_log::home_dir::{BuildPath, HomeDir}; use dozer_cache::dozer_log::replication::Log; use dozer_core::checkpoint::{CheckpointOptions, OptionCheckpoint}; use dozer_tracing::LabelsAndProgress; -use dozer_types::models::endpoint::{AerospikeSinkConfig, Endpoint, EndpointKind}; +use dozer_types::models::endpoint::{AerospikeSinkConfig, BigQuery, Endpoint, EndpointKind}; use dozer_types::models::flags::Flags; use tokio::runtime::Runtime; use tokio::sync::Mutex; @@ -45,6 +45,7 @@ enum ExecutorEndpointKind { Api { log_endpoint: LogEndpoint }, Dummy, Aerospike { config: AerospikeSinkConfig }, + BigQuery(BigQuery), } impl<'a> Executor<'a> { @@ -89,6 +90,7 @@ impl<'a> Executor<'a> { EndpointKind::Aerospike(config) => ExecutorEndpointKind::Aerospike { config: config.clone(), }, + EndpointKind::BigQuery(config) => ExecutorEndpointKind::BigQuery(config.clone()), }; executor_endpoints.push(ExecutorEndpoint { @@ -147,6 +149,7 @@ impl<'a> Executor<'a> { ExecutorEndpointKind::Aerospike { config } => { EndpointLogKind::Aerospike { config } } + ExecutorEndpointKind::BigQuery(config) => EndpointLogKind::BigQuery(config), }; EndpointLog { table_name: endpoint.table_name, diff --git a/dozer-sinks/Cargo.toml b/dozer-sinks/Cargo.toml index 4c48b35056..980f8b2222 100644 --- a/dozer-sinks/Cargo.toml +++ b/dozer-sinks/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] dozer-sinks-snowflake = { path = "./snowflake", optional = true } +dozer-sinks-bigquery = { path = "./bigquery" } [features] snowflake = ["dep:dozer-sinks-snowflake"] diff --git a/dozer-sinks/bigquery/Cargo.toml b/dozer-sinks/bigquery/Cargo.toml new file mode 100644 index 0000000000..49eccb8407 --- /dev/null +++ b/dozer-sinks/bigquery/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "dozer-sinks-bigquery" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dozer-api = { path = "../../dozer-api" } +dozer-types = { path = "../../dozer-types" } +dozer-log = { path = "../../dozer-log" } +dozer-core = { path = "../../dozer-core" } +dozer-recordstore = { path = "../../dozer-recordstore" } +itertools = "0.10.5" +futures-util = "0.3.28" +chrono = "0.4.31" +gcp-bigquery-client = "0.18.0" +object_store = { version = "0.9.0", features = ["gcp"] } +parquet = "48.0.1" +yup-oauth2 = "8.3.2" +bytes = "1.5.0" +html-escape = "0.2.13" diff --git a/dozer-sinks/bigquery/src/lib.rs b/dozer-sinks/bigquery/src/lib.rs new file mode 100644 index 0000000000..501cf170f7 --- /dev/null +++ b/dozer-sinks/bigquery/src/lib.rs @@ -0,0 +1,407 @@ +use std::{ + ops::Mul, + sync::Arc, + thread, + time::{Duration, Instant}, +}; + +use bytes::Bytes; +use dozer_core::{ + node::{Sink, SinkFactory}, + DEFAULT_PORT_HANDLE, +}; +use dozer_log::tokio; +use dozer_recordstore::ProcessorRecordStore; +use dozer_types::{ + arrow_types::to_arrow::{map_record_to_arrow, map_to_arrow_schema}, + errors::internal::BoxedError, + log::{debug, error, info}, + models::endpoint::{ + bigquery::{default_batch_size, default_stage_max_size_in_mb, Destination}, + BigQuery as BigQueryConfig, + }, + rust_decimal::{ + prelude::{FromPrimitive, ToPrimitive}, + Decimal, + }, + types::{FieldType, Operation, Schema}, +}; +use gcp_bigquery_client::{ + error::BQError, + model::{ + dataset::Dataset, job::Job, job_configuration::JobConfiguration, + job_configuration_load::JobConfigurationLoad, table::Table, + table_field_schema::TableFieldSchema, table_reference::TableReference, + table_schema::TableSchema, + }, + Client, +}; +use object_store::{ + gcp::GoogleCloudStorageBuilder, path::Path, BackoffConfig, ObjectStore, RetryConfig, +}; +use parquet::arrow::ArrowWriter; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +#[derive(Debug)] +pub struct BigQuerySinkFactory { + config: BigQueryConfig, +} + +impl BigQuerySinkFactory { + pub fn new(config: BigQueryConfig) -> Self { + Self { config } + } +} + +impl SinkFactory for BigQuerySinkFactory { + fn get_input_ports(&self) -> Vec { + vec![DEFAULT_PORT_HANDLE] + } + + fn prepare( + &self, + _input_schemas: std::collections::HashMap, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn build( + &self, + mut input_schemas: std::collections::HashMap, + ) -> Result, BoxedError> { + let schema = input_schemas.remove(&DEFAULT_PORT_HANDLE).unwrap(); + Ok(Box::new(BigQuerySink::new(self.config.clone(), schema))) + } +} + +#[derive(Debug)] +pub struct BigQuerySink { + batch: Vec, + max_batch_size: usize, + sender: Sender>, +} + +impl Sink for BigQuerySink { + fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + Ok(()) + } + + fn process( + &mut self, + _from_port: dozer_core::node::PortHandle, + _record_store: &ProcessorRecordStore, + op: dozer_types::types::Operation, + ) -> Result<(), BoxedError> { + self.process_op(op) + } + + fn persist( + &mut self, + _epoch: &dozer_core::epoch::Epoch, + _queue: &dozer_log::storage::Queue, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_started( + &mut self, + _connection_name: String, + ) -> Result<(), BoxedError> { + Ok(()) + } + + fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> { + Ok(()) + } +} + +impl BigQuerySink { + pub fn new(mut config: BigQueryConfig, schema: Schema) -> Self { + let (sender, receiver) = channel(20); + let options = config.options.get_or_insert_with(Default::default); + let max_batch_size = *options.batch_size.get_or_insert_with(default_batch_size); + let _var = options + .stage_max_size_in_mb + .get_or_insert_with(default_stage_max_size_in_mb); + + Self::start_client(receiver, config.clone(), schema.clone()); + + Self { + sender, + batch: Vec::new(), + max_batch_size, + } + } + + fn process_op(&mut self, op: dozer_types::types::Operation) -> Result<(), BoxedError> { + self.batch.push(op); + if self.batch.len() >= self.max_batch_size { + self.send_batch(); + } + Ok(()) + } + + fn send_batch(&mut self) { + let mut batch = Vec::new(); + std::mem::swap(&mut batch, &mut self.batch); + debug!("sending {} ops to bigquery client", batch.len()); + if let Err(err) = self.sender.blocking_send(batch) { + panic!("bigquery client crashed: {err:?}"); + } + } + + fn start_client(receiver: Receiver>, config: BigQueryConfig, schema: Schema) { + thread::spawn({ + move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + if let Err(err) = bigquery_client(receiver, config, schema).await { + error!("BigQuery client crashed with error: {err:?}"); + } + }) + } + }); + } +} + +async fn bigquery_client( + mut records_receiver: Receiver>, + config: BigQueryConfig, + schema: Schema, +) -> Result<(), BoxedError> { + info!("Starting BigQuery Sink"); + + let Destination { + stage_gcs_bucket_name: bucket_name, + project, + dataset, + table, + .. + } = &config.destination; + + let options = config.options.as_ref().unwrap(); + let max_batch_size = options.batch_size.unwrap(); + let max_stage_size_in_bytes = options + .stage_max_size_in_mb + .unwrap() + .mul(Decimal::from_i32(1024 * 1024).unwrap()) + .to_i64() + .unwrap() as usize; + + let service_account_key = + html_escape::decode_html_entities(&config.auth.service_account_key).to_string(); + let gcp_sa_key = yup_oauth2::parse_service_account_key(&service_account_key)?; + + let client = gcp_bigquery_client::Client::from_service_account_key(gcp_sa_key, false).await?; + + create_table(&config, &client, &schema).await?; + + let stage_object_store = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket_name) + .with_service_account_key(service_account_key) + .with_retry(RetryConfig { + backoff: BackoffConfig::default(), + max_retries: usize::max_value(), + retry_timeout: std::time::Duration::from_secs(u64::MAX), + }) + .build()?; + + let arrow_schema = Arc::new(map_to_arrow_schema(&schema)?); + + let mut buffer: Vec; + let mut writer: ArrowWriter<&mut Vec>; + let mut records_processed: usize; + + let mut total_staged_size_bytes = 0usize; + let mut staged_files_uris = Vec::new(); + + let mut total_written_bytes = 0usize; + + let batch_start_time = Instant::now(); + + macro_rules! reset_segment { + () => { + buffer = Vec::new(); + writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None).unwrap(); + records_processed = 0usize; + }; + } + + reset_segment!(); + + loop { + let Some(ops) = records_receiver.recv().await else { + break; + }; + + let mut records = Vec::with_capacity(ops.len()); + for op in ops { + match op { + Operation::Insert { new } => records.push(new), + Operation::BatchInsert { new } => records.extend(new), + Operation::Delete { .. } => todo!(), + Operation::Update { .. } => todo!(), + } + } + + let num_recods = records.len(); + for record in records { + writer.write(&map_record_to_arrow(record, &schema)?)?; + } + + records_processed += num_recods; + + if records_processed >= max_batch_size { + debug!("writing parquet file with {records_processed} records"); + writer.close()?; + let num_bytes = buffer.len(); + let bytes = Bytes::from(buffer); + reset_segment!(); + let filename = format!( + "{project}.{dataset}.{table}-{}.parquet", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + ); + stage_object_store + .put(&Path::from(filename.clone()), bytes) + .await?; + + staged_files_uris.push(format!("gs://{bucket_name}/{filename}")); + total_staged_size_bytes += num_bytes; + + if total_staged_size_bytes >= max_stage_size_in_bytes { + debug!( + "loading {} parquet files into bigquery", + staged_files_uris.len() + ); + let job = Job { + configuration: Some(JobConfiguration { + job_timeout_ms: Some("300000".to_string()), + load: Some(JobConfigurationLoad { + create_disposition: Some("CREATE_NEVER".to_string()), + destination_table: Some(TableReference::new(project, dataset, table)), + source_format: Some("PARQUET".to_string()), + source_uris: Some(staged_files_uris), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }; + staged_files_uris = Vec::new(); + + let job = client.job().insert(project, job).await?; + + let job_ref = job.job_reference.expect("job_reference not found"); + debug!("job_ref: {job_ref:?}"); + + while client + .job() + .get_job(project, job_ref.job_id.as_ref().unwrap(), None) + .await? + .status + .expect("job_status not found") + .state + != Some("DONE".to_string()) + { + tokio::time::sleep(Duration::from_secs_f64(0.5)).await; + } + + debug!( + "Inserted {} MB of data into BigQuery", + total_staged_size_bytes / 1024 / 1024 + ); + + let batch_time = Instant::now() - batch_start_time; + debug!( + "report: Total time: {} hours, {} minutes, {} seconds {} milliseconds\n rate: {} MB/sec", + batch_time.as_secs() / 3600, + (batch_time.as_secs() % 3600) / 60, + batch_time.as_secs() % 60, + batch_time.subsec_millis(), + total_written_bytes as f64 / 1024f64 / 1024f64 / batch_time.as_secs_f64() + ); + + total_written_bytes += total_staged_size_bytes; + + total_staged_size_bytes = 0; + } + } + } + + Ok(()) +} + +async fn create_table( + config: &BigQueryConfig, + client: &Client, + schema: &Schema, +) -> Result { + let Destination { + project, + dataset, + dataset_location, + table, + .. + } = &config.destination; + + let dataset = { + let result = client.dataset().get(project, dataset).await; + match result { + Ok(dataset) => dataset, + Err(_) => { + client + .dataset() + .create(Dataset::new(project, dataset).location(dataset_location)) + .await? + } + } + }; + + client + .table() + .delete_if_exists(project, dataset.dataset_id(), table) + .await; + + dataset + .create_table( + client, + Table::from_dataset(&dataset, table, table_schema_from_dozer_schema(schema)), + ) + .await +} + +fn table_schema_from_dozer_schema(schema: &Schema) -> TableSchema { + let fields = schema + .fields + .iter() + .map(|field| { + let field_name = &field.name; + let mut field_schema = match field.typ { + FieldType::UInt => TableFieldSchema::numeric(field_name), + FieldType::U128 => TableFieldSchema::big_numeric(field_name), + FieldType::Int => TableFieldSchema::integer(field_name), + FieldType::I128 => TableFieldSchema::big_numeric(field_name), + FieldType::Float => TableFieldSchema::float(field_name), + FieldType::Boolean => TableFieldSchema::bool(field_name), + FieldType::String => TableFieldSchema::string(field_name), + FieldType::Text => TableFieldSchema::string(field_name), + FieldType::Binary => TableFieldSchema::bytes(field_name), + FieldType::Decimal => TableFieldSchema::numeric(field_name), + FieldType::Timestamp => TableFieldSchema::timestamp(field_name), + FieldType::Date => TableFieldSchema::date(field_name), + FieldType::Json => TableFieldSchema::json(field_name), + FieldType::Point => TableFieldSchema::geography(field_name), + FieldType::Duration => TableFieldSchema::date_time(field_name), // todo: switch to interval data type once it's GA + }; + if field.nullable { + field_schema.mode = Some("NULLABLE".to_string()); + } + field_schema + }) + .collect::>(); + TableSchema::new(fields) +} diff --git a/dozer-sinks/src/lib.rs b/dozer-sinks/src/lib.rs index e7ce849b3b..5a7942416b 100644 --- a/dozer-sinks/src/lib.rs +++ b/dozer-sinks/src/lib.rs @@ -1,2 +1,3 @@ +pub use dozer_sinks_bigquery as bigquery; #[cfg(feature = "snowflake")] pub use dozer_sinks_snowflake as snowflake; diff --git a/dozer-types/src/helper.rs b/dozer-types/src/helper.rs index 508d6a40e0..93d97a2b05 100644 --- a/dozer-types/src/helper.rs +++ b/dozer-types/src/helper.rs @@ -4,6 +4,7 @@ use crate::types::{DozerDuration, DozerPoint, TimeUnit, DATE_FORMAT}; use crate::types::{Field, FieldType}; use chrono::{DateTime, NaiveDate}; use ordered_float::OrderedFloat; +use rust_decimal::prelude::{FromPrimitive, ToPrimitive}; use rust_decimal::Decimal; use schemars::JsonSchema; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -323,6 +324,24 @@ pub fn f64_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema: f64::json_schema(gen) } +pub fn serialize_decimal_as_f64(decimal: &Option, s: S) -> Result +where + S: Serializer, +{ + decimal.map(|d| d.to_f64()).flatten().serialize(s) +} + +pub fn deserialize_decimal_as_f64<'a, D>(d: D) -> Result, D::Error> +where + D: Deserializer<'a>, +{ + Option::::deserialize(d).map(|f| f.and_then(Decimal::from_f64)) +} + +pub fn f64_opt_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + f64::json_schema(gen) +} + #[cfg(test)] mod tests { use crate::json_types::json; diff --git a/dozer-types/src/models/endpoint.rs b/dozer-types/src/models/endpoint.rs index 0f4373292c..087a376f98 100644 --- a/dozer-types/src/models/endpoint.rs +++ b/dozer-types/src/models/endpoint.rs @@ -110,6 +110,7 @@ pub enum EndpointKind { Api(ApiEndpoint), Dummy, Aerospike(AerospikeSinkConfig), + BigQuery(BigQuery), } #[derive(Debug, Serialize, Deserialize, JsonSchema, Eq, PartialEq, Clone)] @@ -165,3 +166,69 @@ impl std::fmt::Display for SecondaryIndex { } } } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct BigQuery { + pub auth: bigquery::Authentication, + pub destination: bigquery::Destination, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub options: Option, +} + +pub mod bigquery { + use crate::helper::{deserialize_decimal_as_f64, f64_opt_schema, serialize_decimal_as_f64}; + use rust_decimal::Decimal; + use schemars::JsonSchema; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] + #[serde(deny_unknown_fields)] + pub struct Authentication { + pub service_account_key: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] + #[serde(deny_unknown_fields)] + pub struct Destination { + pub stage_gcs_bucket_name: String, + pub project: String, + pub dataset: String, + pub dataset_location: String, + pub table: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema)] + #[serde(deny_unknown_fields)] + pub struct Options { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub batch_size: Option, + + #[serde( + default, + skip_serializing_if = "Option::is_none", + deserialize_with = "deserialize_decimal_as_f64", + serialize_with = "serialize_decimal_as_f64" + )] + #[schemars(schema_with = "f64_opt_schema")] + pub stage_max_size_in_mb: Option, + } + + impl Default for Options { + fn default() -> Self { + Self { + batch_size: Some(default_batch_size()), + stage_max_size_in_mb: Some(default_stage_max_size_in_mb()), + } + } + } + + pub fn default_batch_size() -> usize { + 1000000 + } + + pub fn default_stage_max_size_in_mb() -> Decimal { + Decimal::from(215) + } +} diff --git a/json_schemas/dozer.json b/json_schemas/dozer.json index b9ee851aa8..5d97f32589 100644 --- a/json_schemas/dozer.json +++ b/json_schemas/dozer.json @@ -406,6 +406,44 @@ }, "additionalProperties": false }, + "Authentication": { + "type": "object", + "required": [ + "service_account_key" + ], + "properties": { + "service_account_key": { + "type": "string" + } + }, + "additionalProperties": false + }, + "BigQuery": { + "type": "object", + "required": [ + "auth", + "destination" + ], + "properties": { + "auth": { + "$ref": "#/definitions/Authentication" + }, + "destination": { + "$ref": "#/definitions/Destination" + }, + "options": { + "anyOf": [ + { + "$ref": "#/definitions/Options" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false + }, "Cloud": { "type": "object", "properties": { @@ -749,6 +787,34 @@ } }, "Destination": { + "type": "object", + "required": [ + "dataset", + "dataset_location", + "project", + "stage_gcs_bucket_name", + "table" + ], + "properties": { + "dataset": { + "type": "string" + }, + "dataset_location": { + "type": "string" + }, + "project": { + "type": "string" + }, + "stage_gcs_bucket_name": { + "type": "string" + }, + "table": { + "type": "string" + } + }, + "additionalProperties": false + }, + "Destination2": { "type": "object", "required": [ "database", @@ -874,6 +940,18 @@ } }, "additionalProperties": false + }, + { + "type": "object", + "required": [ + "BigQuery" + ], + "properties": { + "BigQuery": { + "$ref": "#/definitions/BigQuery" + } + }, + "additionalProperties": false } ] }, @@ -1486,6 +1564,24 @@ "additionalProperties": false }, "Options": { + "type": "object", + "properties": { + "batch_size": { + "type": [ + "integer", + "null" + ], + "format": "uint", + "minimum": 0.0 + }, + "stage_max_size_in_mb": { + "type": "number", + "format": "double" + } + }, + "additionalProperties": false + }, + "Options2": { "type": "object", "properties": { "batch_interval_seconds": { @@ -1872,7 +1968,7 @@ "$ref": "#/definitions/ConnectionParameters" }, "destination": { - "$ref": "#/definitions/Destination" + "$ref": "#/definitions/Destination2" }, "endpoint": { "type": "string" @@ -1880,7 +1976,7 @@ "options": { "anyOf": [ { - "$ref": "#/definitions/Options" + "$ref": "#/definitions/Options2" }, { "type": "null" From dd18550f6376a1bc174031eb4b6b50cac780e460 Mon Sep 17 00:00:00 2001 From: Solomon Date: Wed, 17 Jan 2024 02:19:46 +0100 Subject: [PATCH 2/2] optimize: parallelize and make performance metrics more granular --- dozer-sinks/bigquery/src/lib.rs | 404 ++++++++++++++++++++++---------- 1 file changed, 282 insertions(+), 122 deletions(-) diff --git a/dozer-sinks/bigquery/src/lib.rs b/dozer-sinks/bigquery/src/lib.rs index 501cf170f7..094b6e8fb4 100644 --- a/dozer-sinks/bigquery/src/lib.rs +++ b/dozer-sinks/bigquery/src/lib.rs @@ -1,5 +1,5 @@ use std::{ - ops::Mul, + ops::{Deref, Mul}, sync::Arc, thread, time::{Duration, Instant}, @@ -83,6 +83,7 @@ pub struct BigQuerySink { impl Sink for BigQuerySink { fn commit(&mut self, _epoch_details: &dozer_core::epoch::Epoch) -> Result<(), BoxedError> { + self.send_batch(); Ok(()) } @@ -117,7 +118,7 @@ impl Sink for BigQuerySink { impl BigQuerySink { pub fn new(mut config: BigQueryConfig, schema: Schema) -> Self { - let (sender, receiver) = channel(20); + let (sender, receiver) = channel(1000000); let options = config.options.get_or_insert_with(Default::default); let max_batch_size = *options.batch_size.get_or_insert_with(default_batch_size); let _var = options @@ -144,7 +145,7 @@ impl BigQuerySink { fn send_batch(&mut self) { let mut batch = Vec::new(); std::mem::swap(&mut batch, &mut self.batch); - debug!("sending {} ops to bigquery client", batch.len()); + // debug!("sending {} ops to bigquery client", batch.len()); if let Err(err) = self.sender.blocking_send(batch) { panic!("bigquery client crashed: {err:?}"); } @@ -165,6 +166,26 @@ impl BigQuerySink { } } +struct Metrics { + start_time: Instant, + total_processed_bytes: usize, + total_parquet_conversion_time: Duration, + total_parquet_upload_time: Duration, + total_bigquery_load_time: Duration, +} + +impl Metrics { + fn new() -> Self { + Self { + start_time: Instant::now(), + total_processed_bytes: Default::default(), + total_parquet_conversion_time: Default::default(), + total_parquet_upload_time: Default::default(), + total_bigquery_load_time: Default::default(), + } + } +} + async fn bigquery_client( mut records_receiver: Receiver>, config: BigQueryConfig, @@ -172,14 +193,6 @@ async fn bigquery_client( ) -> Result<(), BoxedError> { info!("Starting BigQuery Sink"); - let Destination { - stage_gcs_bucket_name: bucket_name, - project, - dataset, - table, - .. - } = &config.destination; - let options = config.options.as_ref().unwrap(); let max_batch_size = options.batch_size.unwrap(); let max_stage_size_in_bytes = options @@ -193,142 +206,289 @@ async fn bigquery_client( html_escape::decode_html_entities(&config.auth.service_account_key).to_string(); let gcp_sa_key = yup_oauth2::parse_service_account_key(&service_account_key)?; + let metrics = Arc::new(std::sync::Mutex::new(Metrics::new())); + + let (parquet_sender, parquet_receiver) = channel(1000); + tokio::spawn(parquet_loader( + parquet_receiver, + gcp_sa_key.clone(), + max_stage_size_in_bytes, + config.destination.clone(), + metrics.clone(), + )); + let client = gcp_bigquery_client::Client::from_service_account_key(gcp_sa_key, false).await?; create_table(&config, &client, &schema).await?; - let stage_object_store = GoogleCloudStorageBuilder::new() - .with_bucket_name(bucket_name) - .with_service_account_key(service_account_key) - .with_retry(RetryConfig { - backoff: BackoffConfig::default(), - max_retries: usize::max_value(), - retry_timeout: std::time::Duration::from_secs(u64::MAX), - }) - .build()?; - let arrow_schema = Arc::new(map_to_arrow_schema(&schema)?); - let mut buffer: Vec; - let mut writer: ArrowWriter<&mut Vec>; - let mut records_processed: usize; + let mut records = Vec::new(); - let mut total_staged_size_bytes = 0usize; - let mut staged_files_uris = Vec::new(); + 'main: loop { + loop { + let capacity = 100; + let mut results = Vec::with_capacity(capacity); + let num_results = records_receiver.recv_many(&mut results, capacity).await; + if num_results == 0 { + break 'main; + } + let continue_reading = num_results < capacity; + for ops in results { + // debug!("got {} ops", ops.len()); + + for op in ops { + match op { + Operation::Insert { new } => records.push(new), + Operation::BatchInsert { new } => records.extend(new), + Operation::Delete { .. } => todo!(), + Operation::Update { .. } => todo!(), + } + } + } + if !continue_reading || records.len() >= max_batch_size { + break; + } + } - let mut total_written_bytes = 0usize; + let num_recods = records.len(); - let batch_start_time = Instant::now(); + if num_recods >= max_batch_size { + debug!("writing parquet file with {num_recods} records"); + let mut parquet_records = Vec::new(); + let schema = schema.clone(); + let arrow_schema = arrow_schema.clone(); + let destination = config.destination.clone(); + let service_account_key = service_account_key.clone(); + let parquet_sender = parquet_sender.clone(); + let metrics = metrics.clone(); + + std::mem::swap(&mut parquet_records, &mut records); + tokio::task::spawn_blocking(move || { + if let Err(err) = (move || -> Result<(), BoxedError> { + let conversion_start_time = Instant::now(); + + let mut buffer = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None).unwrap(); + + for record in parquet_records { + writer.write(&map_record_to_arrow(record, &schema)?)?; + } + + if let Err(err) = writer.close() { + error!("parquet error: {err}"); + } + + let conversion_time = Instant::now() - conversion_start_time; + metrics.lock().unwrap().total_parquet_conversion_time += conversion_time; + + let Destination { + stage_gcs_bucket_name: bucket_name, + project, + dataset, + table, + .. + } = &destination; + + let filename = format!( + "{project}.{dataset}.{table}-{}.parquet", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + ); + + let num_bytes = buffer.len(); + let bytes = Bytes::from(buffer); + + tokio::spawn({ + let stage_object_store = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket_name) + .with_service_account_key(&service_account_key) + .with_retry(RetryConfig { + backoff: BackoffConfig::default(), + max_retries: usize::max_value(), + retry_timeout: std::time::Duration::from_secs(u64::MAX), + }) + .build()?; + + let sender = parquet_sender; + let uri = format!("gs://{bucket_name}/{filename}"); + async move { + let upload_start_time = Instant::now(); + + if let Err(err) = stage_object_store + .put(&Path::from(filename.clone()), bytes) + .await + { + error!("failed to upload parquet file {uri}; {err}"); + return; + } + + let upload_time = Instant::now() - upload_start_time; + metrics.lock().unwrap().total_parquet_upload_time += upload_time; + + let _r = sender + .send(ParquetMetadata { + gcs_uri: uri, + size_in_bytes: num_bytes, + }) + .await; + } + }); - macro_rules! reset_segment { - () => { - buffer = Vec::new(); - writer = ArrowWriter::try_new(&mut buffer, arrow_schema.clone(), None).unwrap(); - records_processed = 0usize; - }; + Ok(()) + })() { + error!("error writing parquet: {err}"); + panic!(); + } + }); + } } - reset_segment!(); + Ok(()) +} - loop { - let Some(ops) = records_receiver.recv().await else { - break; - }; +struct ParquetMetadata { + gcs_uri: String, + size_in_bytes: usize, +} - let mut records = Vec::with_capacity(ops.len()); - for op in ops { - match op { - Operation::Insert { new } => records.push(new), - Operation::BatchInsert { new } => records.extend(new), - Operation::Delete { .. } => todo!(), - Operation::Update { .. } => todo!(), +fn report(metrics: &Metrics) { + let uptime = Instant::now() - metrics.start_time; + + struct Format<'a>(&'a Duration); + impl std::fmt::Display for Format<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let time = self.0; + let hours = time.as_secs() / 3600; + let minutes = (time.as_secs() % 3600) / 60; + let seconds = time.as_secs() % 60; + let millis = time.subsec_millis(); + write!(f, "{hours:02}:{minutes:02}:{seconds:02}")?; + if millis != 0 { + write!(f, ".{millis:03}")?; } + Ok(()) } + } - let num_recods = records.len(); - for record in records { - writer.write(&map_record_to_arrow(record, &schema)?)?; - } + debug!( + concat!( + "\nMetrics Report:\n", + " rate: {} MB/sec\n", + " uptime: {}\n", + " in-memory records-to-parquet conversion time: {} (parallelized)\n", + " parquet upload to GCS time: {} (parallelized)\n", + " bigquery load parquet from GCS time: {} (parallelized)\n" + ), + metrics.total_processed_bytes as f64 / 1024f64 / 1024f64 / uptime.as_secs_f64(), + Format(&uptime), + Format(&metrics.total_parquet_conversion_time), + Format(&metrics.total_parquet_upload_time), + Format(&metrics.total_bigquery_load_time), + ); +} - records_processed += num_recods; - - if records_processed >= max_batch_size { - debug!("writing parquet file with {records_processed} records"); - writer.close()?; - let num_bytes = buffer.len(); - let bytes = Bytes::from(buffer); - reset_segment!(); - let filename = format!( - "{project}.{dataset}.{table}-{}.parquet", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() - ); - stage_object_store - .put(&Path::from(filename.clone()), bytes) - .await?; - - staged_files_uris.push(format!("gs://{bucket_name}/{filename}")); - total_staged_size_bytes += num_bytes; - - if total_staged_size_bytes >= max_stage_size_in_bytes { - debug!( - "loading {} parquet files into bigquery", - staged_files_uris.len() - ); - let job = Job { - configuration: Some(JobConfiguration { - job_timeout_ms: Some("300000".to_string()), - load: Some(JobConfigurationLoad { - create_disposition: Some("CREATE_NEVER".to_string()), - destination_table: Some(TableReference::new(project, dataset, table)), - source_format: Some("PARQUET".to_string()), - source_uris: Some(staged_files_uris), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }; - staged_files_uris = Vec::new(); +async fn parquet_loader( + mut receiver: Receiver, + gcp_sa_key: yup_oauth2::ServiceAccountKey, + max_stage_size_in_bytes: usize, + destination: Destination, + metrics: Arc>, +) -> Result<(), BoxedError> { + let mut total_staged_bytes = 0usize; + let mut staged_files_uris = Vec::new(); - let job = client.job().insert(project, job).await?; + let Destination { + project, + dataset, + table, + .. + } = &destination; - let job_ref = job.job_reference.expect("job_reference not found"); - debug!("job_ref: {job_ref:?}"); + let client = gcp_bigquery_client::Client::from_service_account_key(gcp_sa_key, false).await?; - while client - .job() - .get_job(project, job_ref.job_id.as_ref().unwrap(), None) - .await? - .status - .expect("job_status not found") - .state - != Some("DONE".to_string()) - { - tokio::time::sleep(Duration::from_secs_f64(0.5)).await; - } + loop { + let Some(ParquetMetadata { + gcs_uri, + size_in_bytes, + }) = receiver.recv().await + else { + break; + }; - debug!( - "Inserted {} MB of data into BigQuery", - total_staged_size_bytes / 1024 / 1024 - ); + total_staged_bytes += size_in_bytes; + staged_files_uris.push(gcs_uri); - let batch_time = Instant::now() - batch_start_time; - debug!( - "report: Total time: {} hours, {} minutes, {} seconds {} milliseconds\n rate: {} MB/sec", - batch_time.as_secs() / 3600, - (batch_time.as_secs() % 3600) / 60, - batch_time.as_secs() % 60, - batch_time.subsec_millis(), - total_written_bytes as f64 / 1024f64 / 1024f64 / batch_time.as_secs_f64() - ); + if total_staged_bytes >= max_stage_size_in_bytes { + debug!( + "loading {} parquet files into bigquery", + staged_files_uris.len() + ); - total_written_bytes += total_staged_size_bytes; + let load_start_time = Instant::now(); - total_staged_size_bytes = 0; - } + let job = Job { + configuration: Some(JobConfiguration { + job_timeout_ms: Some("300000".to_string()), + load: Some(JobConfigurationLoad { + create_disposition: Some("CREATE_NEVER".to_string()), + destination_table: Some(TableReference::new(project, dataset, table)), + source_format: Some("PARQUET".to_string()), + source_uris: Some(staged_files_uris), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }; + let job = client.job().insert(project, job).await?; + + let job_ref = job.job_reference.expect("job_reference not found"); + debug!("job_ref: {job_ref:?}"); + + metrics.lock().unwrap().total_processed_bytes += total_staged_bytes; + total_staged_bytes = 0; + staged_files_uris = Vec::new(); + + tokio::spawn({ + let jobs = client.job().clone(); + let project = project.clone(); + let metrics = metrics.clone(); + async move { + loop { + match jobs + .get_job(&project, job_ref.job_id.as_ref().unwrap(), None) + .await + { + Ok(job) => { + if job.status.expect("job_status not found").state + == Some("DONE".to_string()) + { + break; + } else { + tokio::time::sleep(Duration::from_secs_f64(0.5)).await; + continue; + } + } + Err(err) => { + error!( + "error getting job status for job: {:?}; {err}", + job_ref.job_id + ); + return; + } + } + } + + let load_time = Instant::now() - load_start_time; + metrics.lock().unwrap().total_bigquery_load_time += load_time; + + report(metrics.lock().unwrap().deref()); + } + }); } }