Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a LRU-based cache module for preheat jobs #945

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

SouthWest7
Copy link
Contributor

@SouthWest7 SouthWest7 commented Jan 16, 2025

Description

This PR introduces a Cache module designed specifically for preheating jobs, providing an LRU-based mechanism for storing and managing piece content. The purpose of the cache is to enable other peers within the cluster to access user-space maintained cached pieces when downloading the same tasks.

Dependency Updates:

  • Updates the version of dragonfly-api from 2.1.3 to 2.1.4 in Cargo.toml.

Refactoring and Method Changes:

  • Refactored the upload_piece method in dragonfly-client-storage/src/lib.rs. This change aims to support uploading piece content from cache when hitting cache.
    Changed the return type from Result<impl AsyncRead> to Result<Pin<Box<dyn AsyncRead + Send>>> because reading cache or disk will return different types implementing AsyncRead.

  • Refactored the download_from_local_into_async_read and upload_from_local_into_async_read method in dragonfly-client/src/resource/piece.rs.
    Modified their return types, because they both call upload_piece and return its result.

  • Refactored the download_from_source and download_from_parent method in dragonfly-client/src/resource/piece.rs. Add a new parameter load_to_cache on both. This change is aim to support loading piece content to cache when downloading piece content from parent peer or source.

  • Refactored the download_partial_from_local in dragonfly-client/src/resource/persistent_cache_task.rs. Add a new parameter load_to_cache. Added the code for loading content into cache.

  • Add a new method upload_piece_from_cache in dragonfly-client-storage/src/lib.rs. This method is used by the storage to provide reading cache.

  • Add a new method load_piece_to_cache in dragonfly-client-storage/src/lib.rs. This method is used by the storage to provide writing cache.

  • Add a new parameter load_to_cache in some methods.

Metadata and Struct Changes:

  • Add a new file cache in Storage in dragonfly-client-storage/src/lib.rs.

Module Changes:

  • Add a new module Cache with unit tests in dragonfly-client-storage/src/cache.rs.
  • For detailed design ideas and method definition, please refer to related issue.

Related Issue

dragonflyoss/dragonfly#3742

Motivation and Context

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation Update (if none of the other choices apply)

Checklist

  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
  • I have added tests to cover my changes.

SouthWest7 and others added 18 commits January 6, 2025 15:39
Signed-off-by: southwest <[email protected]>
Signed-off-by: southwest <[email protected]>
@SouthWest7 SouthWest7 requested a review from a team as a code owner January 16, 2025 09:34
Copy link

codecov bot commented Jan 16, 2025

Codecov Report

Attention: Patch coverage is 96.36364% with 10 lines in your changes missing coverage. Please review.

Project coverage is 34.40%. Comparing base (d4096db) to head (fe27f7e).

Files with missing lines Patch % Lines
dragonfly-client/src/resource/task.rs 0.00% 6 Missing ⚠️
dragonfly-client-storage/src/cache.rs 99.25% 2 Missing ⚠️
dragonfly-client-storage/src/lib.rs 0.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #945      +/-   ##
==========================================
+ Coverage   32.66%   34.40%   +1.73%     
==========================================
  Files          60       61       +1     
  Lines        9792    10067     +275     
==========================================
+ Hits         3199     3464     +265     
- Misses       6593     6603      +10     
Files with missing lines Coverage Δ
dragonfly-client/src/resource/piece.rs 55.51% <ø> (ø)
dragonfly-client-storage/src/cache.rs 99.25% <99.25%> (ø)
dragonfly-client-storage/src/lib.rs 1.38% <0.00%> (-0.04%) ⬇️
dragonfly-client/src/resource/task.rs 0.00% <0.00%> (ø)

dragonfly-client-storage/Cargo.toml Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
dragonfly-client-storage/src/cache.rs Outdated Show resolved Hide resolved
Signed-off-by: southwest miao <[email protected]>
#[derive(Clone)]
pub struct Cache {
/// pieces stores the pieces with their piece id and content.
pieces: Arc<Mutex<LruCache<String, bytes::Bytes>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RWMutex.

}

#[tokio::test]
async fn test_cache_initialization() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more cases.

/// Cache implements the cache for storing piece content by LRU algorithm.
impl Cache {
/// new creates a new cache with the specified capacity.
pub fn new(config: Arc<Config>) -> Result<Self> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub fn new(capacity: usize) -> Result<Self> {

@@ -405,7 +414,7 @@ impl Storage {
piece_id: &str,
task_id: &str,
range: Option<Range>,
) -> Result<impl AsyncRead> {
) -> Result<Pin<Box<dyn AsyncRead + Send>>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return Result<impl AsyncRead>.

@@ -415,6 +424,24 @@ impl Storage {
// Get the piece metadata and return the content of the piece.
match self.metadata.get_piece(piece_id) {
Ok(Some(piece)) => {
// Try to upload piece content form cache.
if !self.cache.is_empty().await && self.cache.contains_piece(piece_id).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.cache.contains_piece(piece_id).await

@@ -520,6 +522,14 @@ impl Piece {
};
})?;

// Load piece content to cache from parent
if load_to_cache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to download_piece_from_parent_finished and download_piece_from_source_finished.

if load_to_cache {
// Load piece content to cache from source.
self.storage
.load_piece_to_cache(piece_id, &mut reader, length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to download_piece_from_parent_finished and download_piece_from_source_finished.

// If need_piece_content is true, read the piece content from the local.
if need_piece_content {
// If need_piece_content or load_to_cache is true, read the piece content from the local.
if need_piece_content || load_to_cache {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants