Skip to content

Commit

Permalink
Enable receiving args and returning result by protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
flcl42 committed Dec 6, 2023
1 parent 9be39ad commit 47c0cac
Show file tree
Hide file tree
Showing 36 changed files with 301 additions and 172 deletions.
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core.Benchmarks/Benchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Libp2p.Core.Benchmarks;
[MemoryDiagnoser]
public class ChannelsBenchmark
{
//class TestProtocol : IProtocol
//class TestProtocol : IDuplexProtocol
//{
// public string Id => "";

Expand Down
16 changes: 13 additions & 3 deletions src/libp2p/Libp2p.Core.TestsBase/TestLocalPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public Task<IRemotePeer> DialAsync(Multiaddr addr, CancellationToken token = def
return Task.FromResult<IRemotePeer>(new TestRemotePeer(addr));
}

public Task<IListener> ListenAsync(Multiaddr addr, CancellationToken token = default)
public Task<ILocalListener> ListenAsync(Multiaddr addr, CancellationToken token = default)
{
return Task.FromResult<IListener>(null);
return Task.FromResult<ILocalListener>(null);
}
}

Expand All @@ -36,11 +36,21 @@ public TestRemotePeer(Multiaddr addr)
public Identity Identity { get; set; }
public Multiaddr Address { get; set; }

public Task DialAsync<TProtocol>(CancellationToken token = default) where TProtocol : IProtocol
public Task DialAsync<TProtocol>(CancellationToken token = default) where TProtocol : IId, IDialer
{
return Task.CompletedTask;
}

public Task<TResult> DialAsync<TProtocol, TResult, TParams>(TParams @params, CancellationToken token = default) where TProtocol : IId, IInteractiveDialer<TResult, TParams>
{
return Task.FromResult(default(TResult)!);
}

public Task<TResult> DialAsync<TProtocol, TResult>(CancellationToken token = default) where TProtocol : IId, IInteractiveDialer<TResult>
{
return Task.FromResult(default(TResult)!);
}

public Task DisconnectAsync()
{
return Task.CompletedTask;
Expand Down
113 changes: 51 additions & 62 deletions src/libp2p/Libp2p.Core/ChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Nethermind.Libp2p.Core;
public class ChannelFactory : IChannelFactory
{
private readonly IServiceProvider _serviceProvider;
private IProtocol _parent;
private IDictionary<IProtocol, IChannelFactory> _factories;
private IId _parent;
private IDictionary<IId, IChannelFactory> _factories;
private readonly ILogger? _logger;

public ChannelFactory(IServiceProvider serviceProvider)
Expand All @@ -20,124 +20,113 @@ public ChannelFactory(IServiceProvider serviceProvider)
_logger = _serviceProvider.GetService<ILoggerFactory>()?.CreateLogger<ChannelFactory>();
}

public IEnumerable<IProtocol> SubProtocols => _factories.Keys;
public IEnumerable<IId> SubProtocols => _factories.Keys;

public IChannel SubDial(IPeerContext context, IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
IId subProtocol = GetSubProtocol(context, req);
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

_logger?.DialStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

_ = subProtocol.DialAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.DialFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync(task.Exception is null);
}
req?.CompletionSource?.SetResult();
});

SetupDialing(subProtocol, channel, context, req);
return channel;
}

public IChannel SubListen(IPeerContext context, IChannelRequest? req = null)
public IChannel SubDialAndBind(IChannel parent, IPeerContext context,
IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
IId subProtocol = GetSubProtocol(context, req);
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

_logger?.ListenStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());
channel.Bind(parent);
SetupDialing(subProtocol, channel, context, req);
return channel;
}

_ = subProtocol.ListenAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.ListenFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
}
private IId GetSubProtocol(IPeerContext context, IChannelRequest? req = null)
{
IId? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();

req?.CompletionSource?.SetResult();
});
if (subProtocol is null)
{
throw new Exception("No protocol to dial");
}

return channel;
return subProtocol;
}

public IChannel SubDialAndBind(IChannel parent, IPeerContext context,
IChannelRequest? req = null)
private void SetupDialing(IId subProtocol, Channel channel, IPeerContext context, IChannelRequest? req)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
Channel channel = CreateChannel(subProtocol);
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;
Func<IId, IChannel, IChannelFactory?, IPeerContext, Task>? call = (req as ChannelRequest)?.Call;

_logger?.DialAndBindStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());
_logger?.DialStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

channel.Bind(parent);
_ = subProtocol.DialAsync(channel.Reverse, channelFactory, context)
Task diallingTask = null!;

diallingTask = call is null ? ((IDialer)subProtocol).DialAsync(channel.Reverse, channelFactory, context) : call(subProtocol, channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.DialAndBindFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
_logger?.DialFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
await channel.CloseAsync(task.Exception is null);
}
req?.CompletionSource?.SetResult();
(req as ChannelRequest)?.SetResult?.Invoke(diallingTask);
});
}

public IChannel SubListen(IPeerContext context, IChannelRequest? req = null)
{
IId subProtocol = GetSubProtocol(context, req);
Channel channel = CreateChannel(subProtocol);
SetupListening(subProtocol, channel, context, req);
return channel;
}

public IChannel SubListenAndBind(IChannel parent, IPeerContext context,
IChannelRequest? req = null)
{
IProtocol? subProtocol = req?.SubProtocol ?? SubProtocols.FirstOrDefault();
IId subProtocol = GetSubProtocol(context, req);
Channel channel = CreateChannel(subProtocol);
channel.Bind(parent);
SetupListening(subProtocol, channel, context, req);
return channel;
}

private void SetupListening(IId subProtocol, Channel channel, IPeerContext context, IChannelRequest? req)
{
ChannelFactory? channelFactory = _factories[subProtocol] as ChannelFactory;

_logger?.ListenAndBindStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());
_logger?.ListenStarted(channel.Id, subProtocol.Id, channelFactory.GetSubProtocols());

Task listeningTask = null!;

channel.Bind(parent);
_ = subProtocol.ListenAsync(channel.Reverse, channelFactory, context)
listeningTask = ((IListener)subProtocol).ListenAsync(channel.Reverse, channelFactory, context)
.ContinueWith(async task =>
{
if (!task.IsCompletedSuccessfully)
{
_logger?.ListenAndBindFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
_logger?.ListenFailed(channel.Id, subProtocol.Id, task.Exception, task.Exception.GetErrorMessage());
}
if (!channel.IsClosed)
{
await channel.CloseAsync();
}
req?.CompletionSource?.SetResult();
(req as ChannelRequest)?.SetResult?.Invoke(listeningTask);
});

return channel;
}

public ChannelFactory Setup(IProtocol parent, IDictionary<IProtocol, IChannelFactory> factories)
public ChannelFactory Setup(IId parent, IDictionary<IId, IChannelFactory> factories)
{
_parent = parent;
_factories = factories;
return this;
}

private Channel CreateChannel(IProtocol? subProtocol)
private Channel CreateChannel(IId? subProtocol)
{
Channel channel = ActivatorUtilities.CreateInstance<Channel>(_serviceProvider);
channel.Id = $"{_parent.Id} <> {subProtocol?.Id}";
Expand Down
10 changes: 7 additions & 3 deletions src/libp2p/Libp2p.Core/ChannelRequest.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT


namespace Nethermind.Libp2p.Core;

public class ChannelRequest : IChannelRequest
internal class ChannelRequest : IChannelRequest
{
public IProtocol? SubProtocol { get; init; }
public TaskCompletionSource? CompletionSource { get; init; }
public Func<IId, IChannel, IChannelFactory?, IPeerContext, Task>? Call { get; init;}
public Action<Task>? SetResult { get; init;}

public IId? SubProtocol { get; init; }
//public TaskCompletionSource<object>? CompletionSource { get; init; }

public override string ToString()
{
Expand Down
10 changes: 5 additions & 5 deletions src/libp2p/Libp2p.Core/IChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Nethermind.Libp2p.Core;

public interface IChannelFactory
{
IEnumerable<IProtocol> SubProtocols { get; }
IEnumerable<IId> SubProtocols { get; }
IChannel SubDial(IPeerContext context, IChannelRequest? request = null);

IChannel SubListen(IPeerContext context, IChannelRequest? request = null);
Expand All @@ -14,22 +14,22 @@ public interface IChannelFactory

IChannel SubListenAndBind(IChannel parentChannel, IPeerContext context, IChannelRequest? request = null);

IChannel SubDial(IPeerContext context, IProtocol protocol)
IChannel SubDial(IPeerContext context, IId protocol)
{
return SubDial(context, new ChannelRequest { SubProtocol = protocol });
}

IChannel SubListen(IPeerContext context, IProtocol protocol)
IChannel SubListen(IPeerContext context, IId protocol)
{
return SubListen(context, new ChannelRequest { SubProtocol = protocol });
}

IChannel SubDialAndBind(IChannel parentChannel, IPeerContext context, IProtocol protocol)
IChannel SubDialAndBind(IChannel parentChannel, IPeerContext context, IId protocol)
{
return SubDialAndBind(parentChannel, context, new ChannelRequest { SubProtocol = protocol });
}

IChannel SubListenAndBind(IChannel parentChannel, IPeerContext context, IProtocol protocol)
IChannel SubListenAndBind(IChannel parentChannel, IPeerContext context, IId protocol)
{
return SubListenAndBind(parentChannel, context, new ChannelRequest { SubProtocol = protocol });
}
Expand Down
6 changes: 4 additions & 2 deletions src/libp2p/Libp2p.Core/IChannelRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Nethermind.Libp2p.Core;

public interface IChannelRequest
{
IProtocol? SubProtocol { get; }
public TaskCompletionSource? CompletionSource { get; }
IId? SubProtocol { get; }
//public TaskCompletionSource<object>? CompletionSource { get; }
//public Func<IId, IChannel, IChannelFactory?, IPeerContext, Task>? Call { get; }
//public Action<Task>? SetResult { get; }
}
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core/IListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Nethermind.Libp2p.Core;

public interface IListener
public interface ILocalListener
{
Multiaddr Address { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/libp2p/Libp2p.Core/ILocalPeer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ namespace Nethermind.Libp2p.Core;
public interface ILocalPeer : IPeer
{
Task<IRemotePeer> DialAsync(Multiaddr addr, CancellationToken token = default);
Task<IListener> ListenAsync(Multiaddr addr, CancellationToken token = default);
Task<ILocalListener> ListenAsync(Multiaddr addr, CancellationToken token = default);
}
3 changes: 2 additions & 1 deletion src/libp2p/Libp2p.Core/IPeerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface IPeerContext
IPeerContext Fork();

#region Allows muxer to manage session and channels for the app protocols
BlockingCollection<IChannelRequest> SubDialRequests { get; }
IEnumerable<IChannelRequest> GetBlockingSubDialRequestsEnumerable();

IChannelRequest? SpecificProtocolRequest { get; set; }

Expand All @@ -27,6 +27,7 @@ public interface IPeerContext

void Connected(IPeer peer);
void ListenerReady();
void RequestDial(IId protocol);
#endregion
}

Expand Down
4 changes: 2 additions & 2 deletions src/libp2p/Libp2p.Core/IPeerFactoryBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Nethermind.Libp2p.Core;

public interface IPeerFactoryBuilder
{
IPeerFactoryBuilder AddAppLayerProtocol<TProtocol>(TProtocol? instance = default) where TProtocol : IProtocol;
IPeerFactoryBuilder AddAppLayerProtocol<TProtocol>(TProtocol? instance = default) where TProtocol : IId;
IPeerFactory Build();
IEnumerable<IProtocol> AppLayerProtocols { get; }
IEnumerable<IId> AppLayerProtocols { get; }
}
Loading

0 comments on commit 47c0cac

Please sign in to comment.