Skip to content

Commit

Permalink
feat: 完成支持把nacos mysql数据(包含配置、命名空间、用户数据)导出为中间数据文件功能开发; #138
Browse files Browse the repository at this point in the history
  • Loading branch information
heqingpan committed Nov 11, 2024
1 parent 3bd7d94 commit 6f67835
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ fs2 = "0.4.3"
dirs = "5.0"
bitflags = "2.6.0"

sqlx = { version = "0.6", features = ["runtime-tokio-rustls","mysql"] }

[dependencies.uuid]
version = "1.2.1"
features = [
Expand Down
8 changes: 8 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub enum Commands {
/// out to transfer middle data file
out: String,
},
/// nacos mysql to transfer middle data
#[command(arg_required_else_help = true)]
MysqlToData {
/// the nacos mysql db uri; example: mysql://user:password@localhost:3306/nacos
uri: String,
/// out to transfer middle data file
out: String,
},
#[command(arg_required_else_help = true)]
OpenapiToData {
/// nacos auth username,default is empty
Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod protobuf_utils;
pub mod rusqlite_utils;
pub mod sequence_utils;
pub mod sled_utils;
pub mod sqlx_utils;
pub mod string_utils;
pub mod tempfile;
pub mod web_utils;
Expand Down
167 changes: 167 additions & 0 deletions src/common/sqlx_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use serde_json::Value;
use sqlx::database::HasArguments;
use sqlx::mysql::MySqlRow;
use sqlx::query::Query;
use sqlx::{Executor, MySql, MySqlConnection, Transaction};

pub fn build_mysql_query<'a>(
sql: &'a str,
args: &'a [serde_json::Value],
) -> Query<'a, MySql, <MySql as HasArguments<'a>>::Arguments> {
let mut q = sqlx::query(sql);
for arg in args {
match arg {
Value::Number(v) => {
if v.is_f64() {
q = q.bind(v.as_f64().unwrap())
} else {
q = q.bind(v.as_i64().unwrap())
}
}
Value::Bool(v) => q = q.bind(Some(v.to_owned())),
Value::Null => q = q.bind(Option::<String>::None),
Value::String(v) => {
q = q.bind(Some(v.to_owned()));
}
_ => {
q = q.bind(Some(arg.to_string()));
}
}
}
q
}

pub enum MySqlExecutor<'c> {
Pool(&'c mut sqlx::Pool<MySql>),
Conn(&'c mut MySqlConnection),
Transaction(&'c mut Transaction<'c, MySql>),
}

#[derive(Debug, Clone)]
pub struct ExecuteResult {
pub rows_affected: u64,
pub last_insert_id: u64,
}

impl ExecuteResult {
pub fn new(rows_affected: u64, last_insert_id: u64) -> ExecuteResult {
Self {
rows_affected,
last_insert_id,
}
}
}

impl<'c> MySqlExecutor<'c> {
pub fn new_by_pool(pool: &'c mut sqlx::Pool<MySql>) -> Self {
Self::Pool(pool)
}
pub fn new_by_conn(conn: &'c mut MySqlConnection) -> Self {
Self::Conn(conn)
}
pub fn new_by_transaction(tx: &'c mut Transaction<'c, MySql>) -> Self {
Self::Transaction(tx)
}

pub async fn fetch<T>(
&mut self,
sql: &str,
args: &[serde_json::Value],
) -> anyhow::Result<Vec<T>>
where
T: for<'r> sqlx::FromRow<'r, MySqlRow> + Send + Unpin,
{
let q = build_mysql_query(sql, args);
match self {
MySqlExecutor::Pool(executor) => {
let res = executor.fetch_all(q).await?;
let rlist: Vec<T> = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect();
Ok(rlist)
}
MySqlExecutor::Conn(executor) => {
let res = executor.fetch_all(q).await?;
let rlist: Vec<T> = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect();
Ok(rlist)
}
MySqlExecutor::Transaction(executor) => {
let res = executor.fetch_all(q).await?;
let rlist: Vec<T> = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect();
Ok(rlist)
}
}
}

pub async fn fetch_one<T>(&mut self, sql: &str, args: &[serde_json::Value]) -> anyhow::Result<T>
where
T: for<'r> sqlx::FromRow<'r, MySqlRow> + Send + Unpin,
{
let q = build_mysql_query(sql, args);
match self {
MySqlExecutor::Pool(executor) => {
let res = executor.fetch_one(q).await?;
Ok(T::from_row(&res)?)
}
MySqlExecutor::Conn(executor) => {
let res = executor.fetch_one(q).await?;
Ok(T::from_row(&res)?)
}
MySqlExecutor::Transaction(executor) => {
let res = executor.fetch_one(q).await?;
Ok(T::from_row(&res)?)
}
}
}

pub async fn fetch_row(
&mut self,
sql: &str,
args: &[serde_json::Value],
) -> anyhow::Result<MySqlRow> {
let q = build_mysql_query(sql, args);
match self {
MySqlExecutor::Pool(executor) => {
let res = executor.fetch_one(q).await?;
Ok(res)
}
MySqlExecutor::Conn(executor) => {
let res = executor.fetch_one(q).await?;
Ok(res)
}
MySqlExecutor::Transaction(executor) => {
let res = executor.fetch_one(q).await?;
Ok(res)
}
}
}

pub async fn execute(
&mut self,
sql: &str,
args: &[serde_json::Value],
) -> anyhow::Result<ExecuteResult> {
let q = build_mysql_query(sql, args);
match self {
MySqlExecutor::Pool(executor) => {
let res = executor.execute(q).await?;
Ok(ExecuteResult::new(
res.rows_affected(),
res.last_insert_id(),
))
}
MySqlExecutor::Conn(executor) => {
let res = executor.execute(q).await?;
Ok(ExecuteResult::new(
res.rows_affected(),
res.last_insert_id(),
))
}
MySqlExecutor::Transaction(executor) => {
let res = executor.execute(q).await?;
Ok(ExecuteResult::new(
res.rows_affected(),
res.last_insert_id(),
))
}
}
}
}
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use rnacos::common::appdata::AppShareData;
use rnacos::openapi::middle::auth_middle::ApiCheckAuth;
use rnacos::raft::NacosRaft;
use rnacos::transfer::data_to_sqlite::data_to_sqlite;
use rnacos::transfer::mysql_to_data::mysql_to_data;
use rnacos::transfer::openapi_to_data::openapi_to_data;
use rnacos::transfer::sqlite_to_data::sqlite_to_data;
use rnacos::web_config::{app_config, console_config};
Expand Down Expand Up @@ -152,6 +153,10 @@ async fn run_subcommand(commands: Commands) -> Result<(), Box<dyn Error>> {
log::info!("sqlite to middle data, from:{file} to:{out}");
sqlite_to_data(&file, &out).await?;
}
Commands::MysqlToData { uri, out } => {
log::info!("nacos mysql to middle data, from: mysql://** to:{out}");
mysql_to_data(&uri, &out).await?;
}
Commands::OpenapiToData {
host,
username,
Expand Down
2 changes: 2 additions & 0 deletions src/transfer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use actix::Addr;

pub mod data_to_sqlite;
pub mod model;
pub mod mysql;
pub mod mysql_to_data;
pub mod openapi_to_data;
pub mod reader;
pub mod sqlite;
Expand Down
108 changes: 108 additions & 0 deletions src/transfer/mysql/dao/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::common::sqlx_utils::MySqlExecutor;
use rsql_builder::B;
use serde::{Deserialize, Serialize};
use sqlx::Row;

#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)]
pub struct ConfigInfoDO {
pub id: Option<i64>,
pub data_id: Option<String>,
pub group_id: Option<String>,
pub content: Option<String>,
pub md5: Option<String>,
pub gmt_modified_timestamp: Option<i64>,
pub src_user: Option<String>,
pub src_ip: Option<String>,
pub app_name: Option<String>,
pub tenant_id: Option<String>,
pub c_desc: Option<String>,
pub c_use: Option<String>,
pub effect: Option<String>,
pub r#type: Option<String>,
pub c_schema: Option<String>,
pub encrypted_data_key: Option<String>,
}

#[derive(Debug, Default)]
pub struct ConfigInfoParam {
pub id: Option<i64>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
pub struct ConfigInfoSql {}

impl ConfigInfoSql {
fn conditions(&self, param: &ConfigInfoParam) -> B {
let mut whr = B::new_where();
if let Some(id) = &param.id {
whr.eq("id", id);
}
whr
}

pub fn query_prepare(&self, param: &ConfigInfoParam) -> (String, Vec<serde_json::Value>) {
B::prepare(
B::new_sql("select id, data_id, group_id, content, md5, UNIX_TIMESTAMP(gmt_modified) \
as gmt_modified_timestamp, src_user, src_ip, app_name, tenant_id, c_desc, c_use, effect, \
type, c_schema, encrypted_data_key from config_info")
.push_build(&mut self.conditions(param))
.push_fn(||{
let mut b= B::new();
if let Some(limit) = &param.limit{
b.limit(limit);
}
if let Some(offset ) = &param.offset{
b.offset(offset);
}
b
})
)
}

pub fn query_count_prepare(&self, param: &ConfigInfoParam) -> (String, Vec<serde_json::Value>) {
B::prepare(
B::new_sql("select count(1) from config_info").push_build(&mut self.conditions(param)),
)
}
}

pub struct ConfigInfoDao<'a> {
executor: MySqlExecutor<'a>,
inner: ConfigInfoSql,
}

impl<'a> ConfigInfoDao<'a> {
pub fn new(executor: MySqlExecutor<'a>) -> Self {
Self {
executor,
inner: ConfigInfoSql {},
}
}

pub async fn fetch(
&mut self,
sql: &str,
args: &[serde_json::Value],
) -> anyhow::Result<Vec<ConfigInfoDO>> {
self.executor.fetch(sql, args).await
}

pub async fn fetch_count(
&mut self,
sql: &str,
args: &[serde_json::Value],
) -> anyhow::Result<i64> {
let v = self.executor.fetch_row(sql, args).await?;
v.try_get(0).map_err(anyhow::Error::msg)
}

pub async fn query(&mut self, param: &ConfigInfoParam) -> anyhow::Result<Vec<ConfigInfoDO>> {
let (sql, args) = self.inner.query_prepare(param);
self.fetch(&sql, &args).await
}

pub async fn query_count(&mut self, param: &ConfigInfoParam) -> anyhow::Result<i64> {
let (sql, args) = self.inner.query_count_prepare(param);
self.fetch_count(&sql, &args).await
}
}
Loading

0 comments on commit 6f67835

Please sign in to comment.