From 6f67835416d521277795624e3329aac3e56a411f Mon Sep 17 00:00:00 2001 From: heqingpan Date: Mon, 11 Nov 2024 08:56:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E6=88=90=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=8A=8Anacos=20mysql=E6=95=B0=E6=8D=AE=EF=BC=88=E5=8C=85?= =?UTF-8?q?=E5=90=AB=E9=85=8D=E7=BD=AE=E3=80=81=E5=91=BD=E5=90=8D=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=E3=80=81=E7=94=A8=E6=88=B7=E6=95=B0=E6=8D=AE=EF=BC=89?= =?UTF-8?q?=E5=AF=BC=E5=87=BA=E4=B8=BA=E4=B8=AD=E9=97=B4=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=8A=9F=E8=83=BD=E5=BC=80=E5=8F=91;=20#138?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 2 + src/cli.rs | 8 + src/common/mod.rs | 1 + src/common/sqlx_utils.rs | 167 +++++++++++++++++ src/main.rs | 5 + src/transfer/mod.rs | 2 + src/transfer/mysql/dao/config.rs | 108 +++++++++++ src/transfer/mysql/dao/config_history.rs | 131 ++++++++++++++ src/transfer/mysql/dao/mod.rs | 34 ++++ src/transfer/mysql/dao/tenant.rs | 93 ++++++++++ src/transfer/mysql/dao/user.rs | 91 ++++++++++ src/transfer/mysql/mod.rs | 2 + src/transfer/mysql_to_data.rs | 220 +++++++++++++++++++++++ 13 files changed, 864 insertions(+) create mode 100644 src/common/sqlx_utils.rs create mode 100644 src/transfer/mysql/dao/config.rs create mode 100644 src/transfer/mysql/dao/config_history.rs create mode 100644 src/transfer/mysql/dao/mod.rs create mode 100644 src/transfer/mysql/dao/tenant.rs create mode 100644 src/transfer/mysql/dao/user.rs create mode 100644 src/transfer/mysql/mod.rs create mode 100644 src/transfer/mysql_to_data.rs diff --git a/Cargo.toml b/Cargo.toml index d4470411..58f7dc48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,8 @@ fs2 = "0.4.3" dirs = "5.0" bitflags = "2.6.0" +sqlx = { version = "0.6", features = ["runtime-tokio-rustls","mysql"] } + [dependencies.uuid] version = "1.2.1" features = [ diff --git a/src/cli.rs b/src/cli.rs index af9a5f25..e893ec93 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -30,6 +30,14 @@ pub enum Commands { /// out to transfer middle data file out: String, }, + /// nacos mysql to transfer middle data + #[command(arg_required_else_help = true)] + MysqlToData { + /// the nacos mysql db uri; example: mysql://user:password@localhost:3306/nacos + uri: String, + /// out to transfer middle data file + out: String, + }, #[command(arg_required_else_help = true)] OpenapiToData { /// nacos auth username,default is empty diff --git a/src/common/mod.rs b/src/common/mod.rs index 859557b0..56da5f0a 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -20,6 +20,7 @@ pub mod protobuf_utils; pub mod rusqlite_utils; pub mod sequence_utils; pub mod sled_utils; +pub mod sqlx_utils; pub mod string_utils; pub mod tempfile; pub mod web_utils; diff --git a/src/common/sqlx_utils.rs b/src/common/sqlx_utils.rs new file mode 100644 index 00000000..2d673e3d --- /dev/null +++ b/src/common/sqlx_utils.rs @@ -0,0 +1,167 @@ +use serde_json::Value; +use sqlx::database::HasArguments; +use sqlx::mysql::MySqlRow; +use sqlx::query::Query; +use sqlx::{Executor, MySql, MySqlConnection, Transaction}; + +pub fn build_mysql_query<'a>( + sql: &'a str, + args: &'a [serde_json::Value], +) -> Query<'a, MySql, >::Arguments> { + let mut q = sqlx::query(sql); + for arg in args { + match arg { + Value::Number(v) => { + if v.is_f64() { + q = q.bind(v.as_f64().unwrap()) + } else { + q = q.bind(v.as_i64().unwrap()) + } + } + Value::Bool(v) => q = q.bind(Some(v.to_owned())), + Value::Null => q = q.bind(Option::::None), + Value::String(v) => { + q = q.bind(Some(v.to_owned())); + } + _ => { + q = q.bind(Some(arg.to_string())); + } + } + } + q +} + +pub enum MySqlExecutor<'c> { + Pool(&'c mut sqlx::Pool), + Conn(&'c mut MySqlConnection), + Transaction(&'c mut Transaction<'c, MySql>), +} + +#[derive(Debug, Clone)] +pub struct ExecuteResult { + pub rows_affected: u64, + pub last_insert_id: u64, +} + +impl ExecuteResult { + pub fn new(rows_affected: u64, last_insert_id: u64) -> ExecuteResult { + Self { + rows_affected, + last_insert_id, + } + } +} + +impl<'c> MySqlExecutor<'c> { + pub fn new_by_pool(pool: &'c mut sqlx::Pool) -> Self { + Self::Pool(pool) + } + pub fn new_by_conn(conn: &'c mut MySqlConnection) -> Self { + Self::Conn(conn) + } + pub fn new_by_transaction(tx: &'c mut Transaction<'c, MySql>) -> Self { + Self::Transaction(tx) + } + + pub async fn fetch( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result> + where + T: for<'r> sqlx::FromRow<'r, MySqlRow> + Send + Unpin, + { + let q = build_mysql_query(sql, args); + match self { + MySqlExecutor::Pool(executor) => { + let res = executor.fetch_all(q).await?; + let rlist: Vec = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect(); + Ok(rlist) + } + MySqlExecutor::Conn(executor) => { + let res = executor.fetch_all(q).await?; + let rlist: Vec = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect(); + Ok(rlist) + } + MySqlExecutor::Transaction(executor) => { + let res = executor.fetch_all(q).await?; + let rlist: Vec = res.into_iter().map(|e| T::from_row(&e).unwrap()).collect(); + Ok(rlist) + } + } + } + + pub async fn fetch_one(&mut self, sql: &str, args: &[serde_json::Value]) -> anyhow::Result + where + T: for<'r> sqlx::FromRow<'r, MySqlRow> + Send + Unpin, + { + let q = build_mysql_query(sql, args); + match self { + MySqlExecutor::Pool(executor) => { + let res = executor.fetch_one(q).await?; + Ok(T::from_row(&res)?) + } + MySqlExecutor::Conn(executor) => { + let res = executor.fetch_one(q).await?; + Ok(T::from_row(&res)?) + } + MySqlExecutor::Transaction(executor) => { + let res = executor.fetch_one(q).await?; + Ok(T::from_row(&res)?) + } + } + } + + pub async fn fetch_row( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let q = build_mysql_query(sql, args); + match self { + MySqlExecutor::Pool(executor) => { + let res = executor.fetch_one(q).await?; + Ok(res) + } + MySqlExecutor::Conn(executor) => { + let res = executor.fetch_one(q).await?; + Ok(res) + } + MySqlExecutor::Transaction(executor) => { + let res = executor.fetch_one(q).await?; + Ok(res) + } + } + } + + pub async fn execute( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let q = build_mysql_query(sql, args); + match self { + MySqlExecutor::Pool(executor) => { + let res = executor.execute(q).await?; + Ok(ExecuteResult::new( + res.rows_affected(), + res.last_insert_id(), + )) + } + MySqlExecutor::Conn(executor) => { + let res = executor.execute(q).await?; + Ok(ExecuteResult::new( + res.rows_affected(), + res.last_insert_id(), + )) + } + MySqlExecutor::Transaction(executor) => { + let res = executor.execute(q).await?; + Ok(ExecuteResult::new( + res.rows_affected(), + res.last_insert_id(), + )) + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 1c8e2643..3d6541e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -41,6 +41,7 @@ use rnacos::common::appdata::AppShareData; use rnacos::openapi::middle::auth_middle::ApiCheckAuth; use rnacos::raft::NacosRaft; use rnacos::transfer::data_to_sqlite::data_to_sqlite; +use rnacos::transfer::mysql_to_data::mysql_to_data; use rnacos::transfer::openapi_to_data::openapi_to_data; use rnacos::transfer::sqlite_to_data::sqlite_to_data; use rnacos::web_config::{app_config, console_config}; @@ -152,6 +153,10 @@ async fn run_subcommand(commands: Commands) -> Result<(), Box> { log::info!("sqlite to middle data, from:{file} to:{out}"); sqlite_to_data(&file, &out).await?; } + Commands::MysqlToData { uri, out } => { + log::info!("nacos mysql to middle data, from: mysql://** to:{out}"); + mysql_to_data(&uri, &out).await?; + } Commands::OpenapiToData { host, username, diff --git a/src/transfer/mod.rs b/src/transfer/mod.rs index 87336e55..9bb6fe2a 100644 --- a/src/transfer/mod.rs +++ b/src/transfer/mod.rs @@ -8,6 +8,8 @@ use actix::Addr; pub mod data_to_sqlite; pub mod model; +pub mod mysql; +pub mod mysql_to_data; pub mod openapi_to_data; pub mod reader; pub mod sqlite; diff --git a/src/transfer/mysql/dao/config.rs b/src/transfer/mysql/dao/config.rs new file mode 100644 index 00000000..79db0673 --- /dev/null +++ b/src/transfer/mysql/dao/config.rs @@ -0,0 +1,108 @@ +use crate::common::sqlx_utils::MySqlExecutor; +use rsql_builder::B; +use serde::{Deserialize, Serialize}; +use sqlx::Row; + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +pub struct ConfigInfoDO { + pub id: Option, + pub data_id: Option, + pub group_id: Option, + pub content: Option, + pub md5: Option, + pub gmt_modified_timestamp: Option, + pub src_user: Option, + pub src_ip: Option, + pub app_name: Option, + pub tenant_id: Option, + pub c_desc: Option, + pub c_use: Option, + pub effect: Option, + pub r#type: Option, + pub c_schema: Option, + pub encrypted_data_key: Option, +} + +#[derive(Debug, Default)] +pub struct ConfigInfoParam { + pub id: Option, + pub limit: Option, + pub offset: Option, +} +pub struct ConfigInfoSql {} + +impl ConfigInfoSql { + fn conditions(&self, param: &ConfigInfoParam) -> B { + let mut whr = B::new_where(); + if let Some(id) = ¶m.id { + whr.eq("id", id); + } + whr + } + + pub fn query_prepare(&self, param: &ConfigInfoParam) -> (String, Vec) { + B::prepare( + B::new_sql("select id, data_id, group_id, content, md5, UNIX_TIMESTAMP(gmt_modified) \ + as gmt_modified_timestamp, src_user, src_ip, app_name, tenant_id, c_desc, c_use, effect, \ + type, c_schema, encrypted_data_key from config_info") + .push_build(&mut self.conditions(param)) + .push_fn(||{ + let mut b= B::new(); + if let Some(limit) = ¶m.limit{ + b.limit(limit); + } + if let Some(offset ) = ¶m.offset{ + b.offset(offset); + } + b + }) + ) + } + + pub fn query_count_prepare(&self, param: &ConfigInfoParam) -> (String, Vec) { + B::prepare( + B::new_sql("select count(1) from config_info").push_build(&mut self.conditions(param)), + ) + } +} + +pub struct ConfigInfoDao<'a> { + executor: MySqlExecutor<'a>, + inner: ConfigInfoSql, +} + +impl<'a> ConfigInfoDao<'a> { + pub fn new(executor: MySqlExecutor<'a>) -> Self { + Self { + executor, + inner: ConfigInfoSql {}, + } + } + + pub async fn fetch( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result> { + self.executor.fetch(sql, args).await + } + + pub async fn fetch_count( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let v = self.executor.fetch_row(sql, args).await?; + v.try_get(0).map_err(anyhow::Error::msg) + } + + pub async fn query(&mut self, param: &ConfigInfoParam) -> anyhow::Result> { + let (sql, args) = self.inner.query_prepare(param); + self.fetch(&sql, &args).await + } + + pub async fn query_count(&mut self, param: &ConfigInfoParam) -> anyhow::Result { + let (sql, args) = self.inner.query_count_prepare(param); + self.fetch_count(&sql, &args).await + } +} diff --git a/src/transfer/mysql/dao/config_history.rs b/src/transfer/mysql/dao/config_history.rs new file mode 100644 index 00000000..519c8428 --- /dev/null +++ b/src/transfer/mysql/dao/config_history.rs @@ -0,0 +1,131 @@ +use crate::common::sqlx_utils::MySqlExecutor; +use rsql_builder::B; +use serde::{Deserialize, Serialize}; +use sqlx::Row; +use std::sync::Arc; + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +pub struct ConfigHistoryDO { + pub id: Option, + pub nid: Option, + pub data_id: Option, + pub group_id: Option, + pub app_name: Option, + pub content: Option, + pub md5: Option, + pub gmt_create_timestamp: Option, + pub src_user: Option, + pub src_ip: Option, + pub op_type: Option, + pub tenant_id: Option, + pub encrypted_data_key: Option, +} + +#[derive(Debug, Default)] +pub struct ConfigHistoryParam { + pub id: Option, + pub limit: Option, + pub offset: Option, + pub data_id: Option>, + pub group_id: Option>, + pub tenant_id: Option>, + pub order_by_gmt_create_desc: bool, +} +pub struct ConfigHistorySql {} + +impl ConfigHistorySql { + fn conditions(&self, param: &ConfigHistoryParam) -> B { + let mut whr = B::new_where(); + if let Some(id) = ¶m.id { + whr.eq("id", id); + } + if let Some(data_id) = ¶m.data_id { + whr.eq("data_id", data_id); + } + if let Some(group_id) = ¶m.group_id { + whr.eq("group_id", group_id); + } + if let Some(tenant_id) = ¶m.tenant_id { + whr.eq("tenant_id", tenant_id); + } + whr + } + + pub fn query_prepare(&self, param: &ConfigHistoryParam) -> (String, Vec) { + B::prepare( + B::new_sql( + "select id, nid, data_id, group_id, app_name, content, md5, \ + UNIX_TIMESTAMP(gmt_create) as gmt_create_timestamp, src_user, src_ip, op_type, \ + tenant_id, encrypted_data_key from his_config_info", + ) + .push_build(&mut self.conditions(param)) + .push_fn(|| { + let mut b = B::new(); + if param.order_by_gmt_create_desc { + b.order_by("gmt_create", true); + } + if let Some(limit) = ¶m.limit { + b.limit(limit); + } + if let Some(offset) = ¶m.offset { + b.offset(offset); + } + b + }), + ) + } + + pub fn query_count_prepare( + &self, + param: &ConfigHistoryParam, + ) -> (String, Vec) { + B::prepare( + B::new_sql("select count(1) from his_config_info") + .push_build(&mut self.conditions(param)), + ) + } +} + +pub struct ConfigHistoryDao<'a> { + executor: MySqlExecutor<'a>, + inner: ConfigHistorySql, +} + +impl<'a> ConfigHistoryDao<'a> { + pub fn new(executor: MySqlExecutor<'a>) -> Self { + Self { + executor, + inner: ConfigHistorySql {}, + } + } + + pub async fn fetch_count( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let v = self.executor.fetch_row(sql, args).await?; + v.try_get(0).map_err(anyhow::Error::msg) + } + + pub async fn fetch( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result> { + self.executor.fetch(sql, args).await + } + + pub async fn query( + &mut self, + param: &ConfigHistoryParam, + ) -> anyhow::Result> { + let (sql, args) = self.inner.query_prepare(param); + self.fetch(&sql, &args).await + } + + pub async fn query_count(&mut self, param: &ConfigHistoryParam) -> anyhow::Result { + let (sql, args) = self.inner.query_count_prepare(param); + self.fetch_count(&sql, &args).await + } +} diff --git a/src/transfer/mysql/dao/mod.rs b/src/transfer/mysql/dao/mod.rs new file mode 100644 index 00000000..e27aa416 --- /dev/null +++ b/src/transfer/mysql/dao/mod.rs @@ -0,0 +1,34 @@ +#![allow(clippy::field_reassign_with_default)] + +use crate::common::constant::EMPTY_STR; +use crate::now_millis; +use crate::transfer::mysql::dao::user::UserDO; +use crate::user::model::UserDo; +use crate::user::permission::USER_ROLE_DEVELOPER; +use std::collections::HashMap; + +pub mod config; +pub mod config_history; +pub mod tenant; +pub mod user; + +impl From for UserDo { + fn from(v: UserDO) -> Self { + let enable = false; + let roles = vec![USER_ROLE_DEVELOPER.as_str().to_owned()]; + let extend_info = HashMap::default(); + let nickname = v.username.clone().unwrap_or_default(); + let now = (now_millis() / 1000) as u32; + Self { + username: v.username.unwrap_or_default(), + password: EMPTY_STR.to_string(), + nickname, + gmt_create: now, + gmt_modified: now, + enable, + roles, + extend_info, + password_hash: v.password, + } + } +} diff --git a/src/transfer/mysql/dao/tenant.rs b/src/transfer/mysql/dao/tenant.rs new file mode 100644 index 00000000..8bd749b9 --- /dev/null +++ b/src/transfer/mysql/dao/tenant.rs @@ -0,0 +1,93 @@ +use crate::common::sqlx_utils::MySqlExecutor; +use rsql_builder::B; +use serde::{Deserialize, Serialize}; +use sqlx::Row; + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +pub struct TenantDO { + pub id: Option, + pub tenant_id: Option, + pub tenant_name: Option, + pub tenant_desc: Option, +} + +#[derive(Debug, Default)] +pub struct TenantParam { + pub id: Option, + pub limit: Option, + pub offset: Option, +} +pub struct TenantSql {} + +impl TenantSql { + fn conditions(&self, param: &TenantParam) -> B { + let mut whr = B::new_where(); + if let Some(id) = ¶m.id { + whr.eq("id", id); + } + whr + } + + pub fn query_prepare(&self, param: &TenantParam) -> (String, Vec) { + B::prepare( + B::new_sql("select id, tenant_id, tenant_name, tenant_desc from tenant_info") + .push_build(&mut self.conditions(param)) + .push_fn(|| { + let mut b = B::new(); + if let Some(limit) = ¶m.limit { + b.limit(limit); + } + if let Some(offset) = ¶m.offset { + b.offset(offset); + } + b + }), + ) + } + + pub fn query_count_prepare(&self, param: &TenantParam) -> (String, Vec) { + B::prepare( + B::new_sql("select count(1) from tenant_info").push_build(&mut self.conditions(param)), + ) + } +} + +pub struct TenantDao<'a> { + executor: MySqlExecutor<'a>, + inner: TenantSql, +} + +impl<'a> TenantDao<'a> { + pub fn new(executor: MySqlExecutor<'a>) -> Self { + Self { + executor, + inner: TenantSql {}, + } + } + pub async fn fetch( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result> { + self.executor.fetch(sql, args).await + } + + pub async fn fetch_count( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let v = self.executor.fetch_row(sql, args).await?; + v.try_get(0).map_err(anyhow::Error::msg) + } + + pub async fn query(&mut self, param: &TenantParam) -> anyhow::Result> { + let (sql, args) = self.inner.query_prepare(param); + self.fetch(&sql, &args).await + } + + pub async fn query_count(&mut self, param: &TenantParam) -> anyhow::Result { + let (sql, args) = self.inner.query_count_prepare(param); + self.fetch_count(&sql, &args).await + } +} diff --git a/src/transfer/mysql/dao/user.rs b/src/transfer/mysql/dao/user.rs new file mode 100644 index 00000000..0a63216e --- /dev/null +++ b/src/transfer/mysql/dao/user.rs @@ -0,0 +1,91 @@ +use crate::common::sqlx_utils::MySqlExecutor; +use rsql_builder::B; +use serde::{Deserialize, Serialize}; +use sqlx::Row; + +#[derive(Debug, Default, Serialize, Deserialize, sqlx::FromRow)] +pub struct UserDO { + pub username: Option, + pub password: Option, + pub enabled: Option, +} + +#[derive(Debug, Default)] +pub struct UserParam { + pub id: Option, + pub limit: Option, + pub offset: Option, +} +pub struct UserSql {} + +impl UserSql { + fn conditions(&self, param: &UserParam) -> B { + let mut whr = B::new_where(); + if let Some(id) = ¶m.id { + whr.eq("id", id); + } + whr + } + + pub fn query_prepare(&self, param: &UserParam) -> (String, Vec) { + B::prepare( + B::new_sql("select username, password, enabled from users") + .push_build(&mut self.conditions(param)) + .push_fn(|| { + let mut b = B::new(); + if let Some(limit) = ¶m.limit { + b.limit(limit); + } + if let Some(offset) = ¶m.offset { + b.offset(offset); + } + b + }), + ) + } + + pub fn query_count_prepare(&self, param: &UserParam) -> (String, Vec) { + B::prepare(B::new_sql("select count(1) from users").push_build(&mut self.conditions(param))) + } +} + +pub struct UserDao<'a> { + executor: MySqlExecutor<'a>, + inner: UserSql, +} + +impl<'a> UserDao<'a> { + pub fn new(executor: MySqlExecutor<'a>) -> Self { + Self { + executor, + inner: UserSql {}, + } + } + + pub async fn fetch_count( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result { + let v = self.executor.fetch_row(sql, args).await?; + v.try_get(0).map_err(anyhow::Error::msg) + } + + pub async fn fetch( + &mut self, + sql: &str, + args: &[serde_json::Value], + ) -> anyhow::Result> { + self.executor.fetch(sql, args).await + } + + pub async fn query(&mut self, param: &UserParam) -> anyhow::Result> { + let (sql, args) = self.inner.query_prepare(param); + self.fetch(&sql, &args).await + } + + pub async fn query_count(&mut self, param: &UserParam) -> anyhow::Result { + let (sql, args) = self.inner.query_count_prepare(param); + self.fetch_count(&sql, &args).await + } +} diff --git a/src/transfer/mysql/mod.rs b/src/transfer/mysql/mod.rs new file mode 100644 index 00000000..c0bdcbac --- /dev/null +++ b/src/transfer/mysql/mod.rs @@ -0,0 +1,2 @@ +///用于nacos mysql数据迁移 +pub mod dao; diff --git a/src/transfer/mysql_to_data.rs b/src/transfer/mysql_to_data.rs new file mode 100644 index 00000000..50c73ccb --- /dev/null +++ b/src/transfer/mysql_to_data.rs @@ -0,0 +1,220 @@ +use crate::common::constant::{ + CONFIG_TREE_NAME, EMPTY_ARC_STRING, EMPTY_STR, NAMESPACE_TREE_NAME, USER_TREE_NAME, +}; +use crate::common::sqlx_utils::MySqlExecutor; +use crate::config::core::{ConfigKey, ConfigValue}; +use crate::config::model::ConfigValueDO; +use crate::config::ConfigUtils; +use crate::namespace::model::{NamespaceDO, FROM_USER_VALUE}; +use crate::now_millis_i64; +use crate::transfer::init_writer_actor; +use crate::transfer::model::{TransferRecordDto, TransferWriterRequest}; +use crate::transfer::mysql::dao::config::{ConfigInfoDO, ConfigInfoDao, ConfigInfoParam}; +use crate::transfer::mysql::dao::config_history::{ + ConfigHistoryDO, ConfigHistoryDao, ConfigHistoryParam, +}; +use crate::transfer::mysql::dao::tenant::{TenantDao, TenantParam}; +use crate::transfer::mysql::dao::user::{UserDao, UserParam}; +use crate::transfer::sqlite::TableSeq; +use crate::transfer::writer::TransferWriterActor; +use crate::user::model::UserDo; +use actix::Addr; +use sqlx::mysql::{MySqlConnectOptions, MySqlPoolOptions}; +use sqlx::{MySql, Pool}; +use std::str::FromStr; +use std::sync::Arc; + +pub async fn mysql_to_data(db_uri: &str, data_file: &str) -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "info,sqlx::query=error"); + let option = MySqlConnectOptions::from_str(db_uri)?; + let pool = MySqlPoolOptions::new() + .max_connections(2) + .connect_with(option) + .await?; + let writer_actor = init_writer_actor(data_file); + let mut table_seq = TableSeq::default(); + apply_config(&pool, &mut table_seq, &writer_actor).await?; + apply_tenant(&pool, &writer_actor).await?; + apply_user(&pool, &writer_actor).await?; + Ok(()) +} + +async fn apply_config( + pool: &Pool, + table_seq: &mut TableSeq, + writer_actor: &Addr, +) -> anyhow::Result<()> { + let mut config_pool = pool.clone(); + let mut config_history_pool = pool.clone(); + let mut config_dao = ConfigInfoDao::new(MySqlExecutor::new_by_pool(&mut config_pool)); + let mut config_history_dao = + ConfigHistoryDao::new(MySqlExecutor::new_by_pool(&mut config_history_pool)); + let mut count = 0; + let mut offset = 0; + let limit = 100; + let mut config_query_param = ConfigInfoParam { + id: None, + limit: Some(limit), + offset: Some(offset), + }; + let mut patch = config_dao.query(&config_query_param).await?; + let mut patch_is_empty = patch.is_empty(); + while !patch_is_empty { + for item in patch { + let mut tenant_id = Arc::new(item.tenant_id.clone().unwrap_or_default()); + if ConfigUtils::is_default_tenant(&tenant_id) { + tenant_id = EMPTY_ARC_STRING.clone(); + } + let key = ConfigKey::new_by_arc( + Arc::new(item.data_id.clone().unwrap_or_default()), + Arc::new(item.group_id.clone().unwrap_or_default()), + tenant_id, + ); + let history_query_param = ConfigHistoryParam { + id: None, + data_id: Some(key.data_id.clone()), + group_id: Some(key.group.clone()), + tenant_id: Some(key.tenant.clone()), + limit: Some(100), + offset: None, + order_by_gmt_create_desc: true, + }; + let histories = config_history_dao.query(&history_query_param).await?; + let record = build_config_record(table_seq, key, item, histories)?; + count += 1; + writer_actor.do_send(TransferWriterRequest::AddRecord(record)); + } + offset += limit; + config_query_param.offset = Some(offset); + patch = config_dao.query(&config_query_param).await?; + patch_is_empty = patch.is_empty(); + } + log::info!("transfer config total count:{count}"); + Ok(()) +} + +fn build_config_record( + table_seq: &mut TableSeq, + key: ConfigKey, + config_do: ConfigInfoDO, + histories: Vec, +) -> anyhow::Result { + let current_content = config_do.content.unwrap_or_default(); + let mut config_value = ConfigValue::new(Arc::new(current_content.clone())); + let mut last_content = None; + let mut use_histories = vec![]; + for item in histories { + if let Some(op_type) = &item.op_type { + if op_type == "D" { + //删除 + break; + } + } + use_histories.push(item); + } + let mut last_op_time = 0; + for item in use_histories.into_iter().rev() { + if let Some(content) = item.content { + let op_time = item + .gmt_create_timestamp + .map(|v| v * 1000) + .unwrap_or(now_millis_i64()); + last_op_time = op_time; + last_content = Some(content.clone()); + config_value.update_value( + Arc::new(content), + table_seq.next_config_id() as u64, + op_time, + None, + item.src_user.map(Arc::new), + ); + } + } + let need_pull_current = if let Some(last_content) = &last_content { + last_content != ¤t_content + } else { + true + }; + if need_pull_current { + let op_time = config_do + .gmt_modified_timestamp + .map(|v| v * 1000) + .unwrap_or(now_millis_i64()); + let op_time = std::cmp::max(op_time, last_op_time); + config_value.update_value( + Arc::new(current_content), + table_seq.next_config_id() as u64, + op_time, + None, + None, + ); + } + let value_do: ConfigValueDO = config_value.into(); + let record = TransferRecordDto { + table_name: Some(CONFIG_TREE_NAME.clone()), + key: key.build_key().as_bytes().to_vec(), + value: value_do.to_bytes()?, + table_id: 0, + }; + Ok(record) +} + +async fn apply_tenant( + pool: &Pool, + writer_actor: &Addr, +) -> anyhow::Result<()> { + let mut count = 0; + let mut new_pool = pool.clone(); + let mut tenant_dao = TenantDao::new(MySqlExecutor::new_by_pool(&mut new_pool)); + let query_param = TenantParam::default(); + for item in tenant_dao.query(&query_param).await? { + let key = if let Some(v) = &item.tenant_id { + v.as_bytes().to_vec() + } else { + EMPTY_STR.as_bytes().to_vec() + }; + let value_do = NamespaceDO { + namespace_id: item.tenant_id, + namespace_name: item.tenant_name, + r#type: Some(FROM_USER_VALUE.to_string()), + }; + let record = TransferRecordDto { + table_name: Some(NAMESPACE_TREE_NAME.clone()), + key, + value: value_do.to_bytes()?, + table_id: 0, + }; + writer_actor.do_send(TransferWriterRequest::AddRecord(record)); + count += 1; + } + log::info!("transfer tenant count:{count}"); + Ok(()) +} + +async fn apply_user( + pool: &Pool, + writer_actor: &Addr, +) -> anyhow::Result<()> { + let mut count = 0; + let mut new_pool = pool.clone(); + let mut user_dao = UserDao::new(MySqlExecutor::new_by_pool(&mut new_pool)); + let query_param = UserParam::default(); + for item in user_dao.query(&query_param).await? { + let key = if let Some(v) = &item.username { + v.as_bytes().to_vec() + } else { + EMPTY_STR.as_bytes().to_vec() + }; + let value_do: UserDo = item.into(); + let record = TransferRecordDto { + table_name: Some(USER_TREE_NAME.clone()), + key, + value: value_do.to_bytes(), + table_id: 0, + }; + writer_actor.do_send(TransferWriterRequest::AddRecord(record)); + count += 1; + } + log::info!("transfer user count:{count}"); + Ok(()) +}