Skip to content

Commit

Permalink
feat: add import feature in client (dragonflyoss#575)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jul 8, 2024
1 parent 5e839ed commit 2366f46
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 63 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.1.8
dragonfly-client-util = { path = "dragonfly-client-util", version = "0.1.82" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "0.1.82" }
thiserror = "1.0"
dragonfly-api = "2.0.126"
dragonfly-api = "2.0.129"
reqwest = { version = "0.12.4", features = ["stream", "native-tls", "default-tls", "rustls-tls"] }
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.2", features = ["full"] }
Expand Down
32 changes: 23 additions & 9 deletions dragonfly-client-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,18 @@ impl Storage {
}

// delete_task deletes the task metadatas, task content and piece metadatas.
pub async fn delete_task(&self, id: &str) -> Result<()> {
self.metadata.delete_task(id)?;
self.metadata.delete_pieces(id)?;
self.content.delete_task(id).await?;
Ok(())
pub async fn delete_task(&self, id: &str) {
self.metadata
.delete_task(id)
.unwrap_or_else(|err| error!("delete task metadata failed: {}", err));

self.metadata.delete_pieces(id).unwrap_or_else(|err| {
error!("delete piece metadatas failed: {}", err);
});

self.content.delete_task(id).await.unwrap_or_else(|err| {
error!("delete task content failed: {}", err);
});
}

// create_persistent_cache_task creates a new persistent cache task.
Expand Down Expand Up @@ -194,10 +201,17 @@ impl Storage {
}

// delete_cache_task deletes the cache task metadatas, cache task content and piece metadatas.
pub async fn delete_cache_task(&self, id: &str) -> Result<()> {
self.metadata.delete_cache_task(id)?;
self.content.delete_cache_task(id).await?;
Ok(())
pub async fn delete_cache_task(&self, id: &str) {
self.metadata.delete_cache_task(id).unwrap_or_else(|err| {
error!("delete cache task metadata failed: {}", err);
});

self.content
.delete_cache_task(id)
.await
.unwrap_or_else(|err| {
error!("delete cache task content failed: {}", err);
});
}

// download_piece_started updates the metadata of the piece and writes
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ async fn run(args: Args) -> Result<()> {
disable_back_to_source: args.disable_back_to_source,
certificate_chain: Vec::new(),
prefetch: false,
object_storage: None,
}),
})
.await
Expand Down
12 changes: 2 additions & 10 deletions dragonfly-client/src/gc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,7 @@ impl GC {
continue;
}

self.storage
.delete_task(&task.id)
.await
.unwrap_or_else(|err| {
info!("failed to evict task {}: {}", task.id, err);
});
self.storage.delete_task(&task.id).await;
info!("evict task {}", task.id);

self.delete_task_from_scheduler(task.clone()).await;
Expand Down Expand Up @@ -176,10 +171,7 @@ impl GC {
};

// Evict the task.
if let Err(err) = self.storage.delete_task(&task.id).await {
info!("failed to evict task {}: {}", task.id, err);
continue;
}
self.storage.delete_task(&task.id).await;

// Update the evicted space.
evicted_space += task_space;
Expand Down
23 changes: 4 additions & 19 deletions dragonfly-client/src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,11 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
.cache_task
.create_persistent_cache_task(
task_id.as_str(),
host_id.as_str(),
peer_id.as_str(),
path,
request.piece_length,
digest.to_string().as_str(),
request.clone(),
)
.await
{
Expand Down Expand Up @@ -666,24 +668,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
}
};

Ok(Response::new(CacheTask {
id: task.id,
persistent_replica_count: request.persistent_replica_count,
// TODO: Return from scheduler.
replica_count: 0,
digest: digest.to_string(),
tag: request.tag,
application: request.application,
piece_length: request.piece_length,
content_length: task.content_length,
// TODO: Return from scheduler.
piece_count: 0,
// TODO: Return from scheduler.
state: "".to_string(),
ttl: request.ttl,
created_at: Some(prost_wkt_types::Timestamp::from(task.created_at)),
updated_at: Some(prost_wkt_types::Timestamp::from(task.updated_at)),
}))
Ok(Response::new(task))
}

// TODO: Implement this.
Expand Down
39 changes: 34 additions & 5 deletions dragonfly-client/src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use dragonfly_api::scheduler::v2::{
AnnounceCachePeerResponse, AnnounceHostRequest, AnnouncePeerRequest, AnnouncePeerResponse,
DeleteCachePeerRequest, DeleteCacheTaskRequest, DeleteHostRequest, DeletePeerRequest,
DeleteTaskRequest, StatCachePeerRequest, StatCacheTaskRequest, StatPeerRequest,
StatTaskRequest, UploadCacheTaskRequest,
StatTaskRequest, UploadCacheTaskFailedRequest, UploadCacheTaskFinishedRequest,
UploadCacheTaskStartedRequest,
};
use dragonfly_client_core::error::{ErrorType, ExternalError, OrErr};
use dragonfly_client_core::{Error, Result};
Expand Down Expand Up @@ -348,22 +349,50 @@ impl SchedulerClient {
Ok(())
}

// upload_cache_task uploads the metadata of the cache task.
// upload_cache_task_started uploads the metadata of the cache task started.
#[instrument(skip(self))]
pub async fn upload_cache_task(
pub async fn upload_cache_task_started(
&self,
task_id: &str,
request: UploadCacheTaskRequest,
request: UploadCacheTaskStartedRequest,
) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id, None)
.await?
.upload_cache_task_started(request)
.await?;
Ok(())
}

// upload_cache_task_finished uploads the metadata of the cache task finished.
pub async fn upload_cache_task_finished(
&self,
task_id: &str,
request: UploadCacheTaskFinishedRequest,
) -> Result<CacheTask> {
let request = Self::make_request(request);
let response = self
.client(task_id, None)
.await?
.upload_cache_task(request)
.upload_cache_task_finished(request)
.await?;
Ok(response.into_inner())
}

// upload_cache_task_failed uploads the metadata of the cache task failed.
pub async fn upload_cache_task_failed(
&self,
task_id: &str,
request: UploadCacheTaskFailedRequest,
) -> Result<()> {
let request = Self::make_request(request);
self.client(task_id, None)
.await?
.upload_cache_task_failed(request)
.await?;
Ok(())
}

// stat_cache_task gets the status of the cache task.
#[instrument(skip(self))]
pub async fn stat_cache_task(
Expand Down
1 change: 1 addition & 0 deletions dragonfly-client/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ fn make_download_task_request(
// prefetch is enabled in the configuration.
prefetch: config.proxy.prefetch
&& reqwest_request_header.contains_key(reqwest::header::RANGE),
object_storage: None,
}),
})
}
Expand Down
116 changes: 109 additions & 7 deletions dragonfly-client/src/resource/cache_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/

use crate::grpc::scheduler::SchedulerClient;
use dragonfly_api::common::v2::CacheTask as CommonCacheTask;
use dragonfly_api::dfdaemon::v2::UploadCacheTaskRequest;
use dragonfly_api::scheduler::v2::{
UploadCacheTaskFailedRequest, UploadCacheTaskFinishedRequest, UploadCacheTaskStartedRequest,
};
use dragonfly_client_config::dfdaemon::Config;
use dragonfly_client_core::Result as ClientResult;
use dragonfly_client_storage::metadata;
use dragonfly_client_storage::Storage;
use dragonfly_client_util::id_generator::IDGenerator;
use std::path::Path;
use std::sync::Arc;
use tracing::error;

// CacheTask represents a cache task manager.
pub struct CacheTask {
Expand Down Expand Up @@ -55,17 +60,114 @@ impl CacheTask {
}
}

// TODO: Implement this.
// create_persistent_cache_task creates a persistent cache task from local.
pub async fn create_persistent_cache_task(
&self,
id: &str,
task_id: &str,
host_id: &str,
peer_id: &str,
path: &Path,
piece_length: u64,
digest: &str,
) -> ClientResult<metadata::CacheTask> {
self.storage
.create_persistent_cache_task(id, path, piece_length, digest)
request: UploadCacheTaskRequest,
) -> ClientResult<CommonCacheTask> {
// Notify the scheduler that the cache task is started.
self.scheduler_client
.upload_cache_task_started(
task_id,
UploadCacheTaskStartedRequest {
host_id: host_id.to_string(),
task_id: task_id.to_string(),
peer_id: peer_id.to_string(),
persistent_replica_count: request.persistent_replica_count,
tag: request.tag.clone(),
application: request.application.clone(),
piece_length: request.piece_length,
ttl: request.ttl.clone(),
timeout: request.timeout,
},
)
.await
.map_err(|err| {
error!("upload cache task started failed: {}", err);
err
})?;

// Create the persistent cache task.
match self
.storage
.create_persistent_cache_task(task_id, path, request.piece_length, digest)
.await
{
Ok(metadata) => {
let response = match self
.scheduler_client
.upload_cache_task_finished(task_id, UploadCacheTaskFinishedRequest {})
.await
{
Ok(response) => response,
Err(err) => {
// Delete the cache task.
self.storage.delete_cache_task(task_id).await;

// Notify the scheduler that the cache task is failed.
self.scheduler_client
.upload_cache_task_failed(
task_id,
UploadCacheTaskFailedRequest {
description: Some(err.to_string()),
},
)
.await
.map_err(|err| {
error!("upload cache task failed failed: {}", err);
err
})?;

return Err(err);
}
};

Ok(CommonCacheTask {
id: task_id.to_string(),
persistent_replica_count: request.persistent_replica_count,
replica_count: response.replica_count,
digest: digest.to_string(),
tag: request.tag,
application: request.application,
piece_length: request.piece_length,
content_length: metadata.content_length,
piece_count: response.piece_count,
state: response.state,
ttl: request.ttl,
created_at: response.created_at,
updated_at: response.updated_at,
})
}
Err(err) => {
// Delete the cache task.
self.storage.delete_cache_task(task_id).await;

// Notify the scheduler that the cache task is failed.
self.scheduler_client
.upload_cache_task_failed(
task_id,
UploadCacheTaskFailedRequest {
description: Some(err.to_string()),
},
)
.await
.map_err(|err| {
error!("upload cache task failed failed: {}", err);
err
})?;

Err(err)
}
}
}

// delete_cache_task deletes a cache task.
pub async fn delete_cache_task(&self, task_id: &str) {
self.storage.delete_cache_task(task_id).await
}
}
11 changes: 1 addition & 10 deletions dragonfly-client/src/resource/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1568,16 +1568,7 @@ impl Task {
));
}

self.storage
.delete_task(task.id.as_str())
.await
.map_err(|err| {
error!(
"delete task {} from local storage error: {:?}",
task.id, err
);
Status::internal(err.to_string())
})?;
self.storage.delete_task(task.id.as_str()).await;
info!("delete task {} from local storage", task.id);

Ok(())
Expand Down

0 comments on commit 2366f46

Please sign in to comment.