Skip to content

Commit

Permalink
fix: use BufReader for loading files (#561)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Jan 6, 2025
1 parent e5fca11 commit 2d65a9c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 14 deletions.
9 changes: 5 additions & 4 deletions cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::io::BufRead;
use std::path::Path;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::Result;
use async_recursion::async_recursion;
Expand All @@ -26,12 +31,8 @@ use rustyline::config::Builder;
use rustyline::error::ReadlineError;
use rustyline::history::DefaultHistory;
use rustyline::{CompletionType, Editor};
use std::collections::BTreeMap;
use std::io::BufRead;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::fs::{remove_file, File};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle;
Expand Down
2 changes: 2 additions & 0 deletions core/src/presign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{collections::BTreeMap, path::Path};

use log::info;
use reqwest::{Body, Client as HttpClient, StatusCode};
use tokio::io::AsyncRead;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -43,6 +44,7 @@ pub async fn presign_upload_to_stage(
data: Reader,
size: u64,
) -> Result<()> {
info!("upload to stage with presigned url, size: {}", size);
let client = HttpClient::new();
let mut builder = client.put(presigned.url);
for (k, v) in presigned.headers {
Expand Down
4 changes: 3 additions & 1 deletion core/tests/core/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use tokio::fs::File;
use tokio::io::BufReader;

use databend_client::APIClient;

Expand All @@ -32,6 +33,7 @@ async fn insert_with_stage(presign: bool) {

let file = File::open("tests/core/data/sample.csv").await.unwrap();
let metadata = file.metadata().await.unwrap();
let data = BufReader::new(file);

let path = chrono::Utc::now().format("%Y%m%d%H%M%S%9f").to_string();
let stage_location = format!("@~/{}/sample.csv", path);
Expand All @@ -42,7 +44,7 @@ async fn insert_with_stage(presign: bool) {
};

client
.upload_to_stage(&stage_location, Box::new(file), metadata.len())
.upload_to_stage(&stage_location, Box::new(data), metadata.len())
.await
.unwrap();
let sql = format!(
Expand Down
11 changes: 8 additions & 3 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ use std::path::Path;
use std::sync::Arc;

use async_trait::async_trait;
use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
use once_cell::sync::Lazy;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::BufReader;
use tokio_stream::StreamExt;
use url::Url;

Expand All @@ -29,6 +30,7 @@ use crate::flight_sql::FlightSQLConnection;
use databend_client::StageLocation;
use databend_client::{presign_download_from_stage, PresignedResponse};
use databend_driver_core::error::{Error, Result};
use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
use databend_driver_core::schema::{DataType, Field, NumberDataType, Schema};
use databend_driver_core::value::{NumberValue, Value};
Expand Down Expand Up @@ -152,13 +154,15 @@ pub trait Connection: Send + Sync {
file_format_options: Option<BTreeMap<&str, &str>>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats>;

async fn load_file(
&self,
sql: &str,
fp: &Path,
format_options: Option<BTreeMap<&str, &str>>,
copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats>;

async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats>;

// PUT file://<path_to_file>/<filename> internalStage|externalStage
Expand All @@ -180,8 +184,9 @@ pub trait Connection: Send + Sync {
Error::BadArgument(format!("Invalid local file path: {:?}", entry))
})?;
let stage_file = stage_location.file_path(filename);
let data = tokio::fs::File::open(&entry).await?;
let size = data.metadata().await?.len();
let file = File::open(&entry).await?;
let size = file.metadata().await?.len();
let data = BufReader::new(file);
let (fname, status) = match self
.upload_to_stage(&stage_file, Box::new(data), size)
.await
Expand Down
16 changes: 11 additions & 5 deletions driver/src/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ use std::task::{Context, Poll};

use async_compression::tokio::write::ZstdEncoder;
use async_trait::async_trait;
use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats};
use log::info;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio_stream::Stream;

use databend_client::PresignedResponse;
use databend_client::QueryResponse;
use databend_client::{APIClient, SchemaField};
use databend_driver_core::error::{Error, Result};
use databend_driver_core::raw_rows::{RawRow, RawRowIterator, RawRowWithStats};
use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, RowWithStats, ServerStats};
use databend_driver_core::schema::{Schema, SchemaRef};

Expand Down Expand Up @@ -191,8 +191,8 @@ impl Connection for RestAPIConnection {
);
let file = File::open(fp).await?;
let metadata = file.metadata().await?;
let data = Box::new(file);
let size = metadata.len();
let data = BufReader::new(file);
let mut format_options = format_options.unwrap_or_else(Self::default_file_format_options);
if !format_options.contains_key("type") {
let file_type = fp
Expand All @@ -202,8 +202,14 @@ impl Connection for RestAPIConnection {
.ok_or_else(|| Error::BadArgument("file type empty".to_string()))?;
format_options.insert("type", file_type);
}
self.load_data(sql, data, size, Some(format_options), copy_options)
.await
self.load_data(
sql,
Box::new(data),
size,
Some(format_options),
copy_options,
)
.await
}

async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats> {
Expand Down
3 changes: 2 additions & 1 deletion ttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ authors = { workspace = true }
repository = { workspace = true }

[dependencies]
databend-driver = { workspace = true }

bytes = "1"
clap = { version = "4.4", features = ["derive", "env"] }
databend-driver = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.34", features = [
Expand Down

0 comments on commit 2d65a9c

Please sign in to comment.