Skip to content

Commit

Permalink
feat:完善配置中心客户端逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Oct 23, 2024
1 parent 7f325c5 commit 1f5c94e
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 67 deletions.
186 changes: 185 additions & 1 deletion examples/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,193 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use polaris_rust::core::model::error::PolarisError;
use std::{collections::HashMap, sync::Arc, time::Duration};

use polaris_rust::{
config::{
api::{new_config_file_api_by_context, ConfigFileAPI},
req::{
CreateConfigFileRequest, PublishConfigFileRequest, UpdateConfigFileRequest,
UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
},
},
core::{
context::SDKContext,
model::{
config::{ConfigFile, ConfigFileRelease},
error::PolarisError,
},
},
};
use tracing::level_filters::LevelFilter;

#[tokio::main]
async fn main() -> Result<(), PolarisError> {
tracing_subscriber::fmt()
// all spans/events with a level higher than TRACE (e.g, info, warn, etc.)
// will be written to stdout.
.with_thread_names(true)
.with_file(true)
.with_level(true)
.with_line_number(true)
.with_thread_ids(true)
.with_max_level(LevelFilter::INFO)
// sets this to be the default, global collector for this application.
.init();

let start_time = std::time::Instant::now();

let sdk_context_ret = SDKContext::default();
if sdk_context_ret.is_err() {
tracing::error!(
"create sdk context fail: {}",
sdk_context_ret.err().unwrap()
);
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}
let arc_ctx = Arc::new(sdk_context_ret.unwrap());

let config_file_api_ret = new_config_file_api_by_context(arc_ctx.clone());
if config_file_api_ret.is_err() {
tracing::error!(
"create config_file api fail: {}",
config_file_api_ret.err().unwrap()
);
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}

tracing::info!(
"create config_file api client cost: {:?}",
start_time.elapsed()
);

let config_file_api = config_file_api_ret.unwrap();

let mut labels = HashMap::<String, String>::new();

labels.insert("rust".to_string(), "rust".to_string());

// 创建文件
let ret = config_file_api
.create_config_file(CreateConfigFileRequest {
flow_id: uuid::Uuid::new_v4().to_string(),
timeout: Duration::from_secs(1),
file: ConfigFile {
namespace: "rust".to_string(),
group: "rust".to_string(),
name: "rust.toml".to_string(),
content: "test".to_string(),
labels: labels.clone(),
..Default::default()
},
})
.await;

if ret.is_err() {
tracing::error!("create config_file fail: {}", ret.err().unwrap());
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}

// 更新文件
let ret = config_file_api
.update_config_file(UpdateConfigFileRequest {
flow_id: uuid::Uuid::new_v4().to_string(),
timeout: Duration::from_secs(1),
file: ConfigFile {
namespace: "rust".to_string(),
group: "rust".to_string(),
name: "rust.toml".to_string(),
content: "test".to_string(),
labels: labels.clone(),
..Default::default()
},
})
.await;

if ret.is_err() {
tracing::error!("update config_file fail: {}", ret.err().unwrap());
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}

// 发布文件
let ret = config_file_api
.publish_config_file(PublishConfigFileRequest {
flow_id: uuid::Uuid::new_v4().to_string(),
timeout: Duration::from_secs(1),
config_file: ConfigFileRelease {
namespace: "rust".to_string(),
group: "rust".to_string(),
file_name: "rust.toml".to_string(),
release_name: "rust".to_string(),
md5: "".to_string(),
},
})
.await;

if ret.is_err() {
tracing::error!("publish config_file fail: {}", ret.err().unwrap());
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}

// 文件变更订阅
let _ = config_file_api
.watch_config_file(WatchConfigFileRequest {
namespace: "rust".to_string(),
group: "rust".to_string(),
file: "rust.toml".to_string(),
call_back: Arc::new(|event| {
tracing::info!("event: {:?}", event);
}),
})
.await;

// 变更 10 次配置文件并发布
for i in 0..10 {
let ret = config_file_api
.upsert_publish_config_file(UpsertAndPublishConfigFileRequest {
flow_id: uuid::Uuid::new_v4().to_string(),
timeout: Duration::from_secs(1),
release_name: format!("rust-{}", i),
md5: "".to_string(),
config_file: ConfigFile {
namespace: "rust".to_string(),
group: "rust".to_string(),
name: "rust.toml".to_string(),
content: format!("test-{}", i),
labels: labels.clone(),
..Default::default()
},
})
.await;

if ret.is_err() {
tracing::error!(
"upsert and publish config_file fail: {}",
ret.err().unwrap()
);
return Err(PolarisError::new(
polaris_rust::core::model::error::ErrorCode::UnknownServerError,
"".to_string(),
));
}

std::thread::sleep(Duration::from_secs(10));
}

Ok(())
}
5 changes: 4 additions & 1 deletion examples/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ async fn main() -> Result<(), PolarisError> {
let provider = provider_ret.unwrap();
let consumer = consumer_ret.unwrap();

tracing::info!("create provider cost: {:?}", start_time.elapsed());
tracing::info!(
"create discovery api client cost: {:?}",
start_time.elapsed()
);
let metadata = HashMap::new();

let req = InstanceRegisterRequest {
Expand Down
9 changes: 7 additions & 2 deletions src/config/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
};

use super::req::{
CreateConfigFileRequest, DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,
PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest,
CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, PublishConfigFileRequest,
UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest, WatchConfigFileRequest,
WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse,
};

Expand Down Expand Up @@ -66,6 +66,11 @@ where
req: PublishConfigFileRequest,
) -> Result<bool, PolarisError>;

async fn upsert_publish_config_file(
&self,
req: UpsertAndPublishConfigFileRequest,
) -> Result<bool, PolarisError>;

async fn watch_config_file(
&self,
req: WatchConfigFileRequest,
Expand Down
15 changes: 13 additions & 2 deletions src/config/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ use super::{
api::{ConfigFileAPI, ConfigGroupAPI},
req::{
self, CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,

Check warning on line 39 in src/config/default.rs

View workflow job for this annotation

GitHub Actions / Run tests

unused import: `self`

Check warning on line 39 in src/config/default.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `self`

warning: unused import: `self` --> src/config/default.rs:39:9 | 39 | self, CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, | ^^^^ | = note: `#[warn(unused_imports)]` on by default
PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest,
WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse,
PublishConfigFileRequest, UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest,
WatchConfigFileRequest, WatchConfigFileResponse, WatchConfigGroupRequest,
WatchConfigGroupResponse,
},
};

Expand Down Expand Up @@ -132,6 +133,16 @@ impl ConfigFileAPI for DefaultConfigFileAPI {
self.context.get_engine().publish_config_file(req).await
}

async fn upsert_publish_config_file(
&self,
req: UpsertAndPublishConfigFileRequest,
) -> Result<bool, PolarisError> {
self.context
.get_engine()
.upsert_publish_config_file(req)
.await
}

async fn watch_config_file(
&self,
req: WatchConfigFileRequest,
Expand Down
65 changes: 19 additions & 46 deletions src/config/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use crate::core::model::{
config::{ConfigFile, ConfigFileChangeEvent, ConfigFileRequest, ConfigReleaseRequest},
naming::ServiceInstancesChangeEvent,
pb::lib::ConfigFileRelease,
use crate::core::model::config::{
ConfigFile, ConfigFileChangeEvent, ConfigFileRelease, ConfigFileRequest, ConfigPublishRequest,
ConfigReleaseRequest,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -70,71 +69,45 @@ impl UpdateConfigFileRequest {
}

#[derive(Clone, Debug)]
pub struct DeleteConfigFileRequest {
pub struct PublishConfigFileRequest {
pub flow_id: String,
pub namespace: String,
pub group: String,
pub file: String,
pub timeout: Duration,
pub config_file: ConfigFileRelease,
}

impl DeleteConfigFileRequest {
pub fn to_config_request(&self) -> ConfigFileRequest {
impl PublishConfigFileRequest {
pub fn to_config_request(&self) -> ConfigReleaseRequest {
let mut flow_id = self.flow_id.clone();
if flow_id.is_empty() {
flow_id = uuid::Uuid::new_v4().to_string();
}
let mut file = ConfigFile::default();
file.namespace = self.namespace.clone();
file.group = self.group.clone();
file.name = self.file.clone();
ConfigFileRequest {
ConfigReleaseRequest {
flow_id: flow_id,

Check warning on line 85 in src/config/req.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant field names in struct initialization

warning: redundant field names in struct initialization --> src/config/req.rs:85:13 | 85 | flow_id: flow_id, | ^^^^^^^^^^^^^^^^ help: replace it with: `flow_id` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names
config_file: file,
config_file: self.config_file.clone(),
}
}
}

#[derive(Clone, Debug)]
pub struct PublishConfigFileRequest {
pub struct UpsertAndPublishConfigFileRequest {
pub flow_id: String,
pub namespace: String,
pub group: String,
pub file: String,
pub timeout: Duration,
pub release_name: String,
pub md5: String,
pub timeout: Duration,
pub config_file: ConfigFile,
}

impl PublishConfigFileRequest {
pub fn to_config_request(&self) -> ConfigReleaseRequest {
impl UpsertAndPublishConfigFileRequest {
pub fn to_config_request(&self) -> ConfigPublishRequest {
let mut flow_id = self.flow_id.clone();
if flow_id.is_empty() {
flow_id = uuid::Uuid::new_v4().to_string();
}
ConfigReleaseRequest {
ConfigPublishRequest {
flow_id: flow_id,

Check warning on line 107 in src/config/req.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant field names in struct initialization

warning: redundant field names in struct initialization --> src/config/req.rs:107:13 | 107 | flow_id: flow_id, | ^^^^^^^^^^^^^^^^ help: replace it with: `flow_id` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names
config_file: ConfigFileRelease {
id: None,
name: Some(self.release_name.clone()),
namespace: Some(self.namespace.clone()),
group: Some(self.group.clone()),
file_name: Some(self.file.clone()),
content: None,
comment: None,
md5: Some(self.md5.clone()),
version: None,
create_time: None,
create_by: None,
modify_time: None,
modify_by: None,
tags: vec![],
active: None,
format: None,
release_description: None,
release_type: None,
beta_labels: vec![],
},
md5: self.md5.clone(),
release_name: self.release_name.clone(),
config_file: self.config_file.clone(),
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/core/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::RwLock;

use crate::config::req::{
CreateConfigFileRequest, GetConfigFileRequest, PublishConfigFileRequest,
UpdateConfigFileRequest,
UpdateConfigFileRequest, UpsertAndPublishConfigFileRequest,
};
use crate::core::config::config::Configuration;
use crate::core::model::cache::{EventType, ResourceEventKey};
Expand Down Expand Up @@ -339,6 +339,22 @@ impl Engine {
};

Check warning on line 339 in src/core/engine.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> src/core/engine.rs:336:9 | 336 | / return match rsp { 337 | | Ok(ret_rsp) => Ok(ret_rsp), 338 | | Err(err) => Err(err), 339 | | }; | |_________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return help: remove `return` | 336 ~ match rsp { 337 + Ok(ret_rsp) => Ok(ret_rsp), 338 + Err(err) => Err(err), 339 ~ } |
}

/// upsert_publish_config_file 更新或发布配置文件
pub async fn upsert_publish_config_file(
&self,
req: UpsertAndPublishConfigFileRequest,
) -> Result<bool, PolarisError> {
let config_file = req.to_config_request();

let connector = self.server_connector.clone();
let rsp = connector.upsert_publish_config_file(config_file).await;

return match rsp {
Ok(ret_rsp) => Ok(ret_rsp),
Err(err) => Err(err),
};

Check warning on line 355 in src/core/engine.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement

warning: unneeded `return` statement --> src/core/engine.rs:352:9 | 352 | / return match rsp { 353 | | Ok(ret_rsp) => Ok(ret_rsp), 354 | | Err(err) => Err(err), 355 | | }; | |_________^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_return help: remove `return` | 352 ~ match rsp { 353 + Ok(ret_rsp) => Ok(ret_rsp), 354 + Err(err) => Err(err), 355 ~ } |
}

pub async fn lookup_loadbalancer(&self, name: &str) -> Option<Arc<Box<dyn LoadBalancer>>> {
let lb = self.load_balancer.read().await;
lb.get(name).map(|lb| lb.clone())

Check warning on line 360 in src/core/engine.rs

View workflow job for this annotation

GitHub Actions / clippy

you are using an explicit closure for cloning elements

warning: you are using an explicit closure for cloning elements --> src/core/engine.rs:360:9 | 360 | lb.get(name).map(|lb| lb.clone()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: consider calling the dedicated `cloned` method: `lb.get(name).cloned()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#map_clone = note: `#[warn(clippy::map_clone)]` on by default
Expand Down
Loading

0 comments on commit 1f5c94e

Please sign in to comment.