Skip to content

Commit

Permalink
feat(cli): export metric physical tables first (#3949)
Browse files Browse the repository at this point in the history
* feat: export metric physical tables first

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored May 16, 2024
1 parent dff7ba7 commit c915916
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
4 changes: 4 additions & 0 deletions src/cmd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl App for Instance {
self.tool.do_work().await
}

fn wait_signal(&self) -> bool {
false
}

async fn stop(&self) -> Result<()> {
Ok(())
}
Expand Down
46 changes: 40 additions & 6 deletions src/cmd/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -28,6 +29,7 @@ use snafu::{OptionExt, ResultExt};
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Semaphore;
use tokio::time::Instant;

use crate::cli::{Instance, Tool};
use crate::error::{
Expand Down Expand Up @@ -176,6 +178,28 @@ impl Export {
/// Return a list of [`TableReference`] to be exported.
/// Includes all tables under the given `catalog` and `schema`
async fn get_table_list(&self, catalog: &str, schema: &str) -> Result<Vec<TableReference>> {
// Puts all metric table first
let sql = format!(
"select table_catalog, table_schema, table_name from \
information_schema.columns where column_name = '__tsid' \
and table_catalog = \'{catalog}\' and table_schema = \'{schema}\'"
);
let result = self.sql(&sql).await?;
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let mut metric_physical_tables = HashSet::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
let serde_json::Value::String(value) = v else {
unreachable!()
};
t.push(value);
}
metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
}

// TODO: SQL injection hurts
let sql = format!(
"select table_catalog, table_schema, table_name from \
Expand All @@ -193,7 +217,7 @@ impl Export {
return Ok(vec![]);
}

let mut result = Vec::with_capacity(records.len());
let mut remaining_tables = Vec::with_capacity(records.len());
for value in records {
let mut t = Vec::with_capacity(3);
for v in &value {
Expand All @@ -202,10 +226,17 @@ impl Export {
};
t.push(value);
}
result.push((t[0].clone(), t[1].clone(), t[2].clone()));
let table = (t[0].clone(), t[1].clone(), t[2].clone());
// Ignores the physical table
if !metric_physical_tables.contains(&table) {
remaining_tables.push(table);
}
}
let mut tables = Vec::with_capacity(metric_physical_tables.len() + remaining_tables.len());
tables.extend(metric_physical_tables.into_iter());
tables.extend(remaining_tables);

Ok(result)
Ok(tables)
}

async fn show_create_table(&self, catalog: &str, schema: &str, table: &str) -> Result<String> {
Expand All @@ -225,6 +256,7 @@ impl Export {
}

async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
Expand Down Expand Up @@ -270,12 +302,14 @@ impl Export {
})
.count();

info!("success {success}/{db_count} jobs");
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, cost: {:?}", elapsed);

Ok(())
}

async fn export_table_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
Expand Down Expand Up @@ -351,8 +385,8 @@ impl Export {
}
})
.count();

info!("success {success}/{db_count} jobs");
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);

Ok(())
}
Expand Down
17 changes: 12 additions & 5 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ pub trait App: Send {

async fn start(&mut self) -> error::Result<()>;

/// Waits the quit signal by default.
fn wait_signal(&self) -> bool {
true
}

async fn stop(&self) -> error::Result<()>;
}

Expand All @@ -51,11 +56,13 @@ pub async fn start_app(mut app: Box<dyn App>) -> error::Result<()> {

app.start().await?;

if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
if app.wait_signal() {
if let Err(e) = tokio::signal::ctrl_c().await {
error!("Failed to listen for ctrl-c signal: {}", e);
// It's unusual to fail to listen for ctrl-c signal, maybe there's something unexpected in
// the underlying system. So we stop the app instead of running nonetheless to let people
// investigate the issue.
}
}

app.stop().await?;
Expand Down

0 comments on commit c915916

Please sign in to comment.