diff --git a/examples/config.rs b/examples/config.rs index 408e1a2..72e765e 100644 --- a/examples/config.rs +++ b/examples/config.rs @@ -13,9 +13,193 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use polaris_rust::core::model::error::PolarisError; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use polaris_rust::{ + config::{ + api::{new_config_file_api_by_context, ConfigFileAPI}, + req::{ + CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest, + UpsertAndPublishConfigFileRequest, WatchConfigFileRequest, + }, + }, + core::{ + context::SDKContext, + model::{ + config::{ConfigFile, ConfigFileRelease}, + error::PolarisError, + }, + }, +}; +use tracing::level_filters::LevelFilter; #[tokio::main] async fn main() -> Result<(), PolarisError> { + tracing_subscriber::fmt() + // all spans/events with a level higher than TRACE (e.g, info, warn, etc.) + // will be written to stdout. + .with_thread_names(true) + .with_file(true) + .with_level(true) + .with_line_number(true) + .with_thread_ids(true) + .with_max_level(LevelFilter::INFO) + // sets this to be the default, global collector for this application. + .init(); + + let start_time = std::time::Instant::now(); + + let sdk_context_ret = SDKContext::default(); + if sdk_context_ret.is_err() { + tracing::error!( + "create sdk context fail: {}", + sdk_context_ret.err().unwrap() + ); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + let arc_ctx = Arc::new(sdk_context_ret.unwrap()); + + let config_file_api_ret = new_config_file_api_by_context(arc_ctx.clone()); + if config_file_api_ret.is_err() { + tracing::error!( + "create config_file api fail: {}", + config_file_api_ret.err().unwrap() + ); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + + tracing::info!( + "create config_file api client cost: {:?}", + start_time.elapsed() + ); + + let config_file_api = config_file_api_ret.unwrap(); + + let mut labels = HashMap::::new(); + + labels.insert("rust".to_string(), "rust".to_string()); + + // 创建文件 + let ret = config_file_api + .create_config_file(CreateConfigFileRequest { + flow_id: uuid::Uuid::new_v4().to_string(), + timeout: Duration::from_secs(1), + file: ConfigFile { + namespace: "rust".to_string(), + group: "rust".to_string(), + name: "rust.toml".to_string(), + content: "test".to_string(), + labels: labels.clone(), + ..Default::default() + }, + }) + .await; + + if ret.is_err() { + tracing::error!("create config_file fail: {}", ret.err().unwrap()); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + + // 更新文件 + let ret = config_file_api + .update_config_file(UpdateConfigFileRequest { + flow_id: uuid::Uuid::new_v4().to_string(), + timeout: Duration::from_secs(1), + file: ConfigFile { + namespace: "rust".to_string(), + group: "rust".to_string(), + name: "rust.toml".to_string(), + content: "test".to_string(), + labels: labels.clone(), + ..Default::default() + }, + }) + .await; + + if ret.is_err() { + tracing::error!("update config_file fail: {}", ret.err().unwrap()); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + + // 发布文件 + let ret = config_file_api + .publish_config_file(PublishConfigFileRequest { + flow_id: uuid::Uuid::new_v4().to_string(), + timeout: Duration::from_secs(1), + config_file: ConfigFileRelease { + namespace: "rust".to_string(), + group: "rust".to_string(), + file_name: "rust.toml".to_string(), + release_name: "rust".to_string(), + md5: "".to_string(), + }, + }) + .await; + + if ret.is_err() { + tracing::error!("publish config_file fail: {}", ret.err().unwrap()); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + + // 文件变更订阅 + let _ = config_file_api + .watch_config_file(WatchConfigFileRequest { + namespace: "rust".to_string(), + group: "rust".to_string(), + file: "rust.toml".to_string(), + call_back: Arc::new(|event| { + tracing::info!("event: {:?}", event); + }), + }) + .await; + + // 变更 10 次配置文件并发布 + for i in 0..10 { + let ret = config_file_api + .upsert_publish_config_file(UpsertAndPublishConfigFileRequest { + flow_id: uuid::Uuid::new_v4().to_string(), + timeout: Duration::from_secs(1), + release_name: format!("rust-{}", i), + md5: "".to_string(), + config_file: ConfigFile { + namespace: "rust".to_string(), + group: "rust".to_string(), + name: "rust.toml".to_string(), + content: format!("test-{}", i), + labels: labels.clone(), + ..Default::default() + }, + }) + .await; + + if ret.is_err() { + tracing::error!( + "upsert and publish config_file fail: {}", + ret.err().unwrap() + ); + return Err(PolarisError::new( + polaris_rust::core::model::error::ErrorCode::UnknownServerError, + "".to_string(), + )); + } + + std::thread::sleep(Duration::from_secs(10)); + } + Ok(()) } diff --git a/examples/discover.rs b/examples/discover.rs index c7f068d..e632873 100644 --- a/examples/discover.rs +++ b/examples/discover.rs @@ -80,7 +80,10 @@ async fn main() -> Result<(), PolarisError> { let provider = provider_ret.unwrap(); let consumer = consumer_ret.unwrap(); - tracing::info!("create provider cost: {:?}", start_time.elapsed()); + tracing::info!( + "create discovery api client cost: {:?}", + start_time.elapsed() + ); let metadata = HashMap::new(); let req = InstanceRegisterRequest { diff --git a/src/config/api.rs b/src/config/api.rs index 38a4e52..fa34ff7 100644 --- a/src/config/api.rs +++ b/src/config/api.rs @@ -24,8 +24,8 @@ use crate::{ }; use super::req::{ - CreateConfigFileRequest, DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, - PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest, + CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, PublishConfigFileRequest, + UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest, WatchConfigFileRequest, WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse, }; @@ -66,6 +66,11 @@ where req: PublishConfigFileRequest, ) -> Result; + async fn upsert_publish_config_file( + &self, + req: UpsertAndPublishConfigFileRequest, + ) -> Result; + async fn watch_config_file( &self, req: WatchConfigFileRequest, diff --git a/src/config/default.rs b/src/config/default.rs index 6258429..64f2e27 100644 --- a/src/config/default.rs +++ b/src/config/default.rs @@ -37,8 +37,9 @@ use super::{ api::{ConfigFileAPI, ConfigGroupAPI}, req::{ self, CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, - PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest, - WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse, + PublishConfigFileRequest, UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest, + WatchConfigFileRequest, WatchConfigFileResponse, WatchConfigGroupRequest, + WatchConfigGroupResponse, }, }; @@ -132,6 +133,16 @@ impl ConfigFileAPI for DefaultConfigFileAPI { self.context.get_engine().publish_config_file(req).await } + async fn upsert_publish_config_file( + &self, + req: UpsertAndPublishConfigFileRequest, + ) -> Result { + self.context + .get_engine() + .upsert_publish_config_file(req) + .await + } + async fn watch_config_file( &self, req: WatchConfigFileRequest, diff --git a/src/config/req.rs b/src/config/req.rs index 2af9323..5168801 100644 --- a/src/config/req.rs +++ b/src/config/req.rs @@ -13,12 +13,11 @@ // CONDITIONS OF ANY KIND, either express or implied. See the License for the // specific language governing permissions and limitations under the License. -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; -use crate::core::model::{ - config::{ConfigFile, ConfigFileChangeEvent, ConfigFileRequest, ConfigReleaseRequest}, - naming::ServiceInstancesChangeEvent, - pb::lib::ConfigFileRelease, +use crate::core::model::config::{ + ConfigFile, ConfigFileChangeEvent, ConfigFileRelease, ConfigFileRequest, ConfigPublishRequest, + ConfigReleaseRequest, }; #[derive(Clone, Debug)] @@ -70,71 +69,45 @@ impl UpdateConfigFileRequest { } #[derive(Clone, Debug)] -pub struct DeleteConfigFileRequest { +pub struct PublishConfigFileRequest { pub flow_id: String, - pub namespace: String, - pub group: String, - pub file: String, pub timeout: Duration, + pub config_file: ConfigFileRelease, } -impl DeleteConfigFileRequest { - pub fn to_config_request(&self) -> ConfigFileRequest { +impl PublishConfigFileRequest { + pub fn to_config_request(&self) -> ConfigReleaseRequest { let mut flow_id = self.flow_id.clone(); if flow_id.is_empty() { flow_id = uuid::Uuid::new_v4().to_string(); } - let mut file = ConfigFile::default(); - file.namespace = self.namespace.clone(); - file.group = self.group.clone(); - file.name = self.file.clone(); - ConfigFileRequest { + ConfigReleaseRequest { flow_id: flow_id, - config_file: file, + config_file: self.config_file.clone(), } } } #[derive(Clone, Debug)] -pub struct PublishConfigFileRequest { +pub struct UpsertAndPublishConfigFileRequest { pub flow_id: String, - pub namespace: String, - pub group: String, - pub file: String, + pub timeout: Duration, pub release_name: String, pub md5: String, - pub timeout: Duration, + pub config_file: ConfigFile, } -impl PublishConfigFileRequest { - pub fn to_config_request(&self) -> ConfigReleaseRequest { +impl UpsertAndPublishConfigFileRequest { + pub fn to_config_request(&self) -> ConfigPublishRequest { let mut flow_id = self.flow_id.clone(); if flow_id.is_empty() { flow_id = uuid::Uuid::new_v4().to_string(); } - ConfigReleaseRequest { + ConfigPublishRequest { flow_id: flow_id, - config_file: ConfigFileRelease { - id: None, - name: Some(self.release_name.clone()), - namespace: Some(self.namespace.clone()), - group: Some(self.group.clone()), - file_name: Some(self.file.clone()), - content: None, - comment: None, - md5: Some(self.md5.clone()), - version: None, - create_time: None, - create_by: None, - modify_time: None, - modify_by: None, - tags: vec![], - active: None, - format: None, - release_description: None, - release_type: None, - beta_labels: vec![], - }, + md5: self.md5.clone(), + release_name: self.release_name.clone(), + config_file: self.config_file.clone(), } } } diff --git a/src/core/engine.rs b/src/core/engine.rs index aaadbf7..4ee66ed 100644 --- a/src/core/engine.rs +++ b/src/core/engine.rs @@ -21,7 +21,7 @@ use tokio::sync::RwLock; use crate::config::req::{ CreateConfigFileRequest, GetConfigFileRequest, PublishConfigFileRequest, - UpdateConfigFileRequest, + UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest, }; use crate::core::config::config::Configuration; use crate::core::model::cache::{EventType, ResourceEventKey}; @@ -339,6 +339,22 @@ impl Engine { }; } + /// upsert_publish_config_file 更新或发布配置文件 + pub async fn upsert_publish_config_file( + &self, + req: UpsertAndPublishConfigFileRequest, + ) -> Result { + let config_file = req.to_config_request(); + + let connector = self.server_connector.clone(); + let rsp = connector.upsert_publish_config_file(config_file).await; + + return match rsp { + Ok(ret_rsp) => Ok(ret_rsp), + Err(err) => Err(err), + }; + } + pub async fn lookup_loadbalancer(&self, name: &str) -> Option>> { let lb = self.load_balancer.read().await; lb.get(name).map(|lb| lb.clone()) diff --git a/src/core/model/config.rs b/src/core/model/config.rs index 01e2e60..068a6fa 100644 --- a/src/core/model/config.rs +++ b/src/core/model/config.rs @@ -15,8 +15,6 @@ use std::collections::HashMap; -use super::pb::lib::ConfigFileRelease; - // CONFIG_FILE_TAG_KEY_USE_ENCRYPTED 配置加密开关标识,value 为 boolean const CONFIG_FILE_TAG_KEY_USE_ENCRYPTED: &str = "internal-encrypted"; // CONFIG_FILE_TAG_KEY_DATA_KEY 加密密钥 tag key @@ -30,18 +28,80 @@ pub struct ConfigFileRequest { pub config_file: ConfigFile, } +impl ConfigFileRequest { + pub fn convert_spec(&self) -> crate::core::model::pb::lib::ConfigFile { + let mut tags = Vec::::new(); + self.config_file.labels.iter().for_each(|(k, v)| { + tags.push(crate::core::model::pb::lib::ConfigFileTag { + key: Some(k.clone()), + value: Some(v.clone()), + }); + }); + + crate::core::model::pb::lib::ConfigFile { + id: None, + name: Some(self.config_file.name.clone()), + namespace: Some(self.config_file.namespace.clone()), + group: Some(self.config_file.group.clone()), + content: Some(self.config_file.content.clone()), + format: None, + comment: None, + status: None, + tags: tags, + create_time: None, + create_by: None, + modify_time: None, + modify_by: None, + release_time: None, + release_by: None, + encrypted: None, + encrypt_algo: None, + } + } +} + #[derive(Clone, Debug)] pub struct ConfigReleaseRequest { pub flow_id: String, pub config_file: ConfigFileRelease, } -impl ConfigFileRequest { - pub fn convert_spec(&self) -> crate::core::model::pb::lib::ConfigFile { - todo!() +impl ConfigReleaseRequest { + pub fn convert_spec(&self) -> crate::core::model::pb::lib::ConfigFileRelease { + crate::core::model::pb::lib::ConfigFileRelease { + id: None, + name: Some(self.config_file.release_name.clone()), + namespace: Some(self.config_file.namespace.clone()), + group: Some(self.config_file.group.clone()), + content: None, + format: None, + comment: None, + file_name: Some(self.config_file.file_name.clone()), + version: None, + tags: Vec::new(), + active: None, + release_description: None, + release_type: None, + beta_labels: Vec::new(), + create_time: None, + create_by: None, + modify_time: None, + modify_by: None, + md5: None, + } } +} + +#[derive(Clone, Debug)] +pub struct ConfigPublishRequest { + pub flow_id: String, + pub md5: String, + pub release_name: String, + pub config_file: ConfigFile, +} - pub fn convert_spec_release(&self) -> crate::core::model::pb::lib::ConfigFileRelease { +impl ConfigPublishRequest { + pub fn convert_spec(&self) -> crate::core::model::pb::lib::ConfigFilePublishInfo { todo!() } } @@ -65,6 +125,15 @@ impl ConfigFile { } } +#[derive(Clone, Debug)] +pub struct ConfigFileRelease { + pub namespace: String, + pub group: String, + pub file_name: String, + pub release_name: String, + pub md5: String, +} + #[derive(Default, Debug, Clone)] pub struct ConfigGroup { pub namespace: String, @@ -72,6 +141,7 @@ pub struct ConfigGroup { pub files: Vec, } +#[derive(Clone, Debug)] pub struct ConfigFileChangeEvent { pub config_file: ConfigFile, } diff --git a/src/core/plugin/connector.rs b/src/core/plugin/connector.rs index 1d4c36a..18cdc8d 100644 --- a/src/core/plugin/connector.rs +++ b/src/core/plugin/connector.rs @@ -19,7 +19,7 @@ use tokio::runtime::Runtime; use crate::core::config::config::Configuration; use crate::core::model::cache::{RemoteData, ResourceEventKey}; -use crate::core::model::config::{ConfigFileRequest, ConfigReleaseRequest}; +use crate::core::model::config::{ConfigFileRequest, ConfigPublishRequest, ConfigReleaseRequest}; use crate::core::model::error::PolarisError; use crate::core::model::naming::{InstanceRequest, InstanceResponse}; use crate::core::plugin::plugins::Plugin; @@ -82,7 +82,10 @@ pub trait Connector: Plugin { async fn release_config_file(&self, req: ConfigReleaseRequest) -> Result; /// upsert_publish_config_file 更新发布配置文件 - async fn upsert_publish_config_file(&self) -> Result; + async fn upsert_publish_config_file( + &self, + req: ConfigPublishRequest, + ) -> Result; } pub struct NoopConnector {} @@ -155,7 +158,10 @@ impl Connector for NoopConnector { todo!() } - async fn upsert_publish_config_file(&self) -> Result { + async fn upsert_publish_config_file( + &self, + req: ConfigPublishRequest, + ) -> Result { todo!() } } diff --git a/src/plugins/connector/grpc/connector.rs b/src/plugins/connector/grpc/connector.rs index 6cfd0d9..7b08d06 100644 --- a/src/plugins/connector/grpc/connector.rs +++ b/src/plugins/connector/grpc/connector.rs @@ -15,7 +15,7 @@ use crate::core::config::global::ServerConnectorConfig; use crate::core::model::cache::{EventType, RemoteData}; -use crate::core::model::config::{ConfigFileRequest, ConfigReleaseRequest}; +use crate::core::model::config::{ConfigFileRequest, ConfigPublishRequest, ConfigReleaseRequest}; use crate::core::model::error::ErrorCode::{ServerError, ServerUserError}; use crate::core::model::error::PolarisError; use crate::core::model::naming::{InstanceRequest, InstanceResponse}; @@ -618,7 +618,7 @@ impl Connector for GrpcConnector { let mut client = self.create_config_grpc_stub(req.flow_id.clone()); let ret = client - .publish_config_file(tonic::Request::new(req.config_file)) + .publish_config_file(tonic::Request::new(req.convert_spec())) .in_current_span() .await; return match ret { @@ -645,8 +645,41 @@ impl Connector for GrpcConnector { }; } - async fn upsert_publish_config_file(&self) -> Result { - todo!() + async fn upsert_publish_config_file( + &self, + req: ConfigPublishRequest, + ) -> Result { + tracing::debug!( + "[polaris][config][connector] send upsert and publish config_file request={req:?}" + ); + + let mut client = self.create_config_grpc_stub(req.flow_id.clone()); + let ret = client + .upsert_and_publish_config_file(tonic::Request::new(req.convert_spec())) + .in_current_span() + .await; + return match ret { + Ok(rsp) => { + let rsp = rsp.into_inner(); + let recv_code: Code = unsafe { std::mem::transmute(rsp.code.unwrap()) }; + if ExecuteSuccess.eq(&recv_code) { + return Ok(true); + } + tracing::error!( + "[polaris][config][connector] send upsert and publish config_file request to server receive fail: code={} info={}", + rsp.code.unwrap().clone(), + rsp.info.clone().unwrap(), + ); + Err(PolarisError::new(ServerError, rsp.info.unwrap())) + } + Err(err) => { + tracing::error!( + "[polaris][config][connector] send upsert and publish config_file request to server fail: {}", + err + ); + Err(PolarisError::new(ServerError, err.to_string())) + } + }; } } diff --git a/src/router/default.rs b/src/router/default.rs index 940dd88..0328f61 100644 --- a/src/router/default.rs +++ b/src/router/default.rs @@ -38,7 +38,7 @@ impl RouterAPI for DefaultRouterAPI { &self, req: super::req::ProcessRouteRequest, ) -> Result { - // FIXME: 需要支持路由规则,当前直接原封不动进行返回 + // TODO: 需要支持路由规则,当前直接原封不动进行返回 Ok(ProcessRouteResponse { service_instances: req.service_instances, })