Skip to content

Commit

Permalink
[feat] Expose GetSyncPeers
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Mar 2, 2024
1 parent 827aa8d commit e5cc4b4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 18 deletions.
27 changes: 10 additions & 17 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use iroh_net::ticket::BlobTicket;
use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, NodeAddr};
use iroh_sync::actor::OpenState;
use iroh_sync::store::DownloadPolicy;
use iroh_sync::{store::Query, AuthorId, CapabilityKind, NamespaceId};
use iroh_sync::{store::Query, AuthorId, CapabilityKind, NamespaceId, PeerIdBytes};
use iroh_sync::{ContentStatus, RecordIdentifier};
use quic_rpc::message::RpcMsg;
use quic_rpc::{RpcClient, ServiceConnection};
Expand All @@ -34,22 +34,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio_util::io::{ReaderStream, StreamReader};
use tracing::warn;

use crate::rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest,
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest,
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse,
BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse,
BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest,
DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest,
DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest,
DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress,
ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest,
NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
};
use crate::rpc_protocol::{AuthorCreateRequest, AuthorListRequest, BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse, BlobListRequest, BlobListResponse, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CounterStats, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, DocGetSyncPeersRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption};
use crate::sync_engine::SyncEvent;

pub mod mem;
Expand Down Expand Up @@ -1036,6 +1021,14 @@ where
.await??;
Ok(res.policy)
}

/// Get sync peersfor this document
pub async fn get_sync_peers(&self) -> Result<Option<Vec<PeerIdBytes>>> {
let res = self
.rpc(DocGetSyncPeersRequest { doc_id: self.id() })
.await??;
Ok(res.peers)
}
}

impl<'a, C: ServiceConnection<ProviderService>> From<&'a Doc<C>>
Expand Down
6 changes: 6 additions & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,12 @@ fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
})
.await
}
DocGetSyncPeers(msg) => {
chan.rpc(msg, handler, |handler, req| async move {
handler.inner.sync.doc_get_sync_peers(req).await
})
.await
}
}
});
}
Expand Down
22 changes: 21 additions & 1 deletion iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use iroh_net::{
use iroh_sync::{
actor::OpenState,
store::{DownloadPolicy, Query},
{AuthorId, CapabilityKind, Entry, NamespaceId, SignedEntry},
PeerIdBytes, {AuthorId, CapabilityKind, Entry, NamespaceId, SignedEntry},
};
use quic_rpc::{
message::{BidiStreaming, BidiStreamingMsg, Msg, RpcMsg, ServerStreaming, ServerStreamingMsg},
Expand Down Expand Up @@ -930,6 +930,24 @@ pub struct DocGetDownloadPolicyResponse {
pub policy: DownloadPolicy,
}

/// Get peers for document
#[derive(Serialize, Deserialize, Debug)]
pub struct DocGetSyncPeersRequest {
/// The document id
pub doc_id: NamespaceId,
}

impl RpcMsg<ProviderService> for DocGetSyncPeersRequest {
type Response = RpcResult<DocGetSyncPeersResponse>;
}

/// Response to [`DocGetSyncPeersRequest`]
#[derive(Serialize, Deserialize, Debug)]
pub struct DocGetSyncPeersResponse {
/// The download policy
pub peers: Option<Vec<PeerIdBytes>>,
}

/// Get the bytes for a hash
#[derive(Serialize, Deserialize, Debug)]
pub struct BlobReadAtRequest {
Expand Down Expand Up @@ -1070,6 +1088,7 @@ pub enum ProviderRequest {
DocSubscribe(DocSubscribeRequest),
DocGetDownloadPolicy(DocGetDownloadPolicyRequest),
DocSetDownloadPolicy(DocSetDownloadPolicyRequest),
DocGetSyncPeers(DocGetSyncPeersRequest),

AuthorList(AuthorListRequest),
AuthorCreate(AuthorCreateRequest),
Expand Down Expand Up @@ -1121,6 +1140,7 @@ pub enum ProviderResponse {
DocSubscribe(RpcResult<DocSubscribeResponse>),
DocGetDownloadPolicy(RpcResult<DocGetDownloadPolicyResponse>),
DocSetDownloadPolicy(RpcResult<DocSetDownloadPolicyResponse>),
DocGetSyncPeers(RpcResult<DocGetSyncPeersResponse>),

AuthorList(RpcResult<AuthorListResponse>),
AuthorCreate(RpcResult<AuthorCreateResponse>),
Expand Down
9 changes: 9 additions & 0 deletions iroh/src/sync_engine/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use iroh_bytes::{store::Store as BaoStore, BlobFormat};
use iroh_sync::{Author, NamespaceSecret};
use tokio_stream::StreamExt;

use crate::rpc_protocol::{DocGetSyncPeersRequest, DocGetSyncPeersResponse};
use crate::{
rpc_protocol::{
AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse,
Expand Down Expand Up @@ -258,4 +259,12 @@ impl SyncEngine {
let policy = self.sync.get_download_policy(req.doc_id).await?;
Ok(DocGetDownloadPolicyResponse { policy })
}

pub async fn doc_get_sync_peers(
&self,
req: DocGetSyncPeersRequest,
) -> RpcResult<DocGetSyncPeersResponse> {
let peers = self.sync.get_sync_peers(req.doc_id).await?;
Ok(DocGetSyncPeersResponse { peers })
}
}

0 comments on commit e5cc4b4

Please sign in to comment.