From 58b8f1441507a604707a6395a6e84bc3f2f360b0 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Tue, 20 Aug 2019 14:04:19 +1200 Subject: [PATCH 1/9] feat: manage dead peers --- src/MultiAddressBlackList.cs | 2 +- src/PeerManager.cs | 201 +++++++++++++++++++++++++++++++++++ src/PeerTalk.csproj | 2 +- src/TaskWhenAnyResult.cs | 33 +++++- test/PeerManagerTest.cs | 144 +++++++++++++++++++++++++ test/PeerTalkTests.csproj | 2 +- 6 files changed, 380 insertions(+), 4 deletions(-) create mode 100644 src/PeerManager.cs create mode 100644 test/PeerManagerTest.cs diff --git a/src/MultiAddressBlackList.cs b/src/MultiAddressBlackList.cs index 782ddab..6bfa65d 100644 --- a/src/MultiAddressBlackList.cs +++ b/src/MultiAddressBlackList.cs @@ -15,7 +15,7 @@ namespace PeerTalk /// /// Only targets that do match a filter will pass. /// - public class MultiAddressBlackList : ConcurrentBag, IPolicy + public class MultiAddressBlackList : List, IPolicy { /// public Task IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) diff --git a/src/PeerManager.cs b/src/PeerManager.cs new file mode 100644 index 0000000..5221d76 --- /dev/null +++ b/src/PeerManager.cs @@ -0,0 +1,201 @@ +using Common.Logging; +using Ipfs; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace PeerTalk +{ + /// + /// Manages the peers. + /// + /// + /// Listens to the events to determine the state + /// of a peer. + /// + public class PeerManager : IService + { + static ILog log = LogManager.GetLogger(typeof(PeerManager)); + Thread thread; + CancellationTokenSource cancel; + + /// + /// Initial time to wait before attempting a reconnection + /// to a dead peer. + /// + /// + /// Defaults to 1 minute. + /// + public TimeSpan InitialBackoff = TimeSpan.FromMinutes(1); + + /// + /// When reached, the peer is considered permanently dead. + /// + /// + /// Defaults to 64 minutes. + /// + public TimeSpan MaxBackoff = TimeSpan.FromMinutes(64); + + /// + /// Provides access to other peers. + /// + public Swarm Swarm { get; set; } + + /// + /// The peers that are reachable. + /// + public ConcurrentDictionary DeadPeers = new ConcurrentDictionary(); + + /// + public Task StartAsync() + { + Swarm.ConnectionEstablished += Swarm_ConnectionEstablished; + Swarm.PeerNotReachable += Swarm_PeerNotReachable; + + var thread = new Thread(Phoenix) + { + IsBackground = true + }; + cancel = new CancellationTokenSource(); + thread.Start(); + + log.Debug("started"); + return Task.CompletedTask; + } + + /// + public Task StopAsync() + { + Swarm.ConnectionEstablished -= Swarm_ConnectionEstablished; + Swarm.PeerNotReachable -= Swarm_PeerNotReachable; + DeadPeers.Clear(); + + cancel.Cancel(); + cancel.Dispose(); + + log.Debug("stopped"); + return Task.CompletedTask; + } + + /// + /// Indicates that the peer can not be connected to. + /// + /// + public void SetNotReachable(Peer peer) + { + var dead = DeadPeers.AddOrUpdate(peer, + new DeadPeer + { + Peer = peer, + Backoff = InitialBackoff, + NextAttempt = DateTime.Now + InitialBackoff + }, + (key, existing) => + { + existing.Backoff += existing.Backoff; + existing.NextAttempt = existing.Backoff <= MaxBackoff + ? DateTime.Now + existing.Backoff + : DateTime.MaxValue; + return existing; + }); + + Swarm.BlackList.Add($"/p2p/{peer.Id}"); + if (dead.NextAttempt == DateTime.MaxValue) + { + log.DebugFormat("Dead '{0}' for {1} minutes.", dead.Peer, dead.Backoff.TotalMinutes); + } + else + { + Swarm.DeregisterPeer(dead.Peer); + log.DebugFormat("Permanently dead '{0}'.", dead.Peer); + } + } + + /// + /// Indicates that the peer can be connected to. + /// + /// + public void SetReachable(Peer peer) + { + log.DebugFormat("Alive '{0}'.", peer); + + DeadPeers.TryRemove(peer, out DeadPeer _); + Swarm.BlackList.Remove($"/p2p/{peer.Id}"); + } + + /// + /// Is invoked by the when a peer can not be connected to. + /// + void Swarm_PeerNotReachable(object sender, Peer peer) + { + SetNotReachable(peer); + } + + /// + /// Is invoked by the when a peer is connected to. + /// + void Swarm_ConnectionEstablished(object sender, PeerConnection connection) + { + SetReachable(connection.RemotePeer); + } + + /// + /// Background process to try reconnecting to a dead peer. + /// + async void Phoenix() + { + while (!cancel.IsCancellationRequested) + { + try + { + await Task.Delay(InitialBackoff); + var now = DateTime.Now; + await DeadPeers.Values + .Where(p => p.NextAttempt < now) + .ParallelForEachAsync(async dead => + { + log.DebugFormat("Attempt reconnect to {0}", dead.Peer); + Swarm.BlackList.Remove($"/p2p/{dead.Peer.Id}"); + try + { + await Swarm.ConnectAsync(dead.Peer, cancel.Token); + } + catch + { + // eat it + } + }, maxDoP: 10); + } + catch + { + // eat it. + } + } + } + } + + /// + /// Information on a peer that is not reachable. + /// + public class DeadPeer + { + /// + /// The peer that does not respond. + /// + public Peer Peer { get; set; } + + /// + /// How long to wait before attempting another connect. + /// + public TimeSpan Backoff { get; set; } + + /// + /// When another connect should be tried. + /// + public DateTime NextAttempt { get; set; } + } +} diff --git a/src/PeerTalk.csproj b/src/PeerTalk.csproj index bb049af..a30c881 100644 --- a/src/PeerTalk.csproj +++ b/src/PeerTalk.csproj @@ -1,7 +1,7 @@  - net461;netstandard14;netstandard2 + net461;netstandard2 PeerTalk PeerTalk bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml diff --git a/src/TaskWhenAnyResult.cs b/src/TaskWhenAnyResult.cs index 7ef2912..3d836c7 100644 --- a/src/TaskWhenAnyResult.cs +++ b/src/TaskWhenAnyResult.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -10,7 +11,7 @@ namespace PeerTalk /// /// Some helpers for tasks. /// - public class TaskHelper + public static class TaskHelper { /// /// Gets the first result from a set of tasks. @@ -62,5 +63,35 @@ public static async Task WhenAnyResult( cancel.ThrowIfCancellationRequested(); throw new AggregateException("No task(s) returned a result.", exceptions); } + + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// Copied from https://houseofcat.io/tutorials/csharp/async/parallelforeachasync + /// + public static Task ParallelForEachAsync(this IEnumerable source, Func funcBody, int maxDoP = 4) + { + async Task AwaitPartition(IEnumerator partition) + { + using (partition) + { + while (partition.MoveNext()) + { await funcBody(partition.Current); } + } + } + + return Task.WhenAll( + Partitioner + .Create(source) + .GetPartitions(maxDoP) + .AsParallel() + .Select(p => AwaitPartition(p))); + } } } diff --git a/test/PeerManagerTest.cs b/test/PeerManagerTest.cs new file mode 100644 index 0000000..5313e06 --- /dev/null +++ b/test/PeerManagerTest.cs @@ -0,0 +1,144 @@ +using Ipfs; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using PeerTalk.Protocols; +using System; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace PeerTalk +{ + [TestClass] + public class PeerManagerTest + { + Peer self = new Peer + { + AgentVersion = "self", + Id = "QmXK9VBxaXFuuT29AaPUTgW3jBWZ9JgLVZYdMYTHC6LLAH", + PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQCC5r4nQBtnd9qgjnG8fBN5+gnqIeWEIcUFUdCG4su/vrbQ1py8XGKNUBuDjkyTv25Gd3hlrtNJV3eOKZVSL8ePAgMBAAE=" + }; + + [TestMethod] + public void IsNotReachable() + { + var peer = new Peer { Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb" }; + var manager = new PeerManager { Swarm = new Swarm() }; + Assert.AreEqual(0, manager.DeadPeers.Count); + + manager.SetNotReachable(peer); + Assert.IsTrue(manager.DeadPeers.ContainsKey(peer)); + Assert.AreEqual(1, manager.DeadPeers.Count); + + manager.SetNotReachable(peer); + Assert.IsTrue(manager.DeadPeers.ContainsKey(peer)); + Assert.AreEqual(1, manager.DeadPeers.Count); + + manager.SetReachable(peer); + Assert.IsFalse(manager.DeadPeers.ContainsKey(peer)); + Assert.AreEqual(0, manager.DeadPeers.Count); + } + + [TestMethod] + public async Task BlackListsThePeer() + { + var peer = new Peer { Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb" }; + var manager = new PeerManager { Swarm = new Swarm() }; + Assert.AreEqual(0, manager.DeadPeers.Count); + + manager.SetNotReachable(peer); + Assert.IsFalse(await manager.Swarm.IsAllowedAsync("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + + manager.SetReachable(peer); + Assert.IsTrue(await manager.Swarm.IsAllowedAsync("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + } + + [TestMethod] + public async Task Backoff_Increases() + { + var peer = new Peer + { + Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb", + Addresses = new MultiAddress[] + { + "/ip4/127.0.0.1/tcp/4040/ipfs/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb" + } + }; + var swarm = new Swarm { LocalPeer = self }; + var manager = new PeerManager + { + Swarm = swarm, + InitialBackoff = TimeSpan.FromMilliseconds(100), + }; + Assert.AreEqual(0, manager.DeadPeers.Count); + + try + { + await manager.StartAsync(); + try + { + await swarm.ConnectAsync(peer); + } + catch { } + Assert.AreEqual(1, manager.DeadPeers.Count); + + var end = DateTime.Now + TimeSpan.FromSeconds(2); + while (DateTime.Now <= end) + { + if (manager.DeadPeers[peer].Backoff > manager.InitialBackoff) + return; + } + Assert.Fail("backoff did not increase"); + } + finally + { + await manager.StopAsync(); + } + } + + [TestMethod] + public async Task PermanentlyDead() + { + var peer = new Peer + { + Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb", + Addresses = new MultiAddress[] + { + "/ip4/127.0.0.1/tcp/4040/ipfs/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb" + } + }; + var swarm = new Swarm { LocalPeer = self }; + var manager = new PeerManager + { + Swarm = swarm, + InitialBackoff = TimeSpan.FromMilliseconds(100), + MaxBackoff = TimeSpan.FromMilliseconds(200), + }; + Assert.AreEqual(0, manager.DeadPeers.Count); + + try + { + await manager.StartAsync(); + try + { + await swarm.ConnectAsync(peer); + } + catch { } + Assert.AreEqual(1, manager.DeadPeers.Count); + + var end = DateTime.Now + TimeSpan.FromSeconds(2); + while (DateTime.Now <= end) + { + if (manager.DeadPeers[peer].NextAttempt == DateTime.MaxValue) + return; + } + Assert.Fail("not truely dead"); + } + finally + { + await manager.StopAsync(); + } + } + + } +} diff --git a/test/PeerTalkTests.csproj b/test/PeerTalkTests.csproj index fad0156..75691c3 100644 --- a/test/PeerTalkTests.csproj +++ b/test/PeerTalkTests.csproj @@ -1,7 +1,7 @@  - net461;netcoreapp1.1;netcoreapp2.1 + net461;netcoreapp2.1 false portable From 7b88cd9e146ff1f08e6a580abec089e77b72c3f5 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Tue, 20 Aug 2019 15:08:23 +1200 Subject: [PATCH 2/9] fix(PeerManager): determine permanently dead --- src/PeerManager.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/PeerManager.cs b/src/PeerManager.cs index 5221d76..b286258 100644 --- a/src/PeerManager.cs +++ b/src/PeerManager.cs @@ -20,7 +20,6 @@ namespace PeerTalk public class PeerManager : IService { static ILog log = LogManager.GetLogger(typeof(PeerManager)); - Thread thread; CancellationTokenSource cancel; /// @@ -104,7 +103,7 @@ public void SetNotReachable(Peer peer) }); Swarm.BlackList.Add($"/p2p/{peer.Id}"); - if (dead.NextAttempt == DateTime.MaxValue) + if (dead.NextAttempt != DateTime.MaxValue) { log.DebugFormat("Dead '{0}' for {1} minutes.", dead.Peer, dead.Backoff.TotalMinutes); } From 117fddf221291efb434f1f067c771710cd114e2f Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Tue, 20 Aug 2019 15:09:19 +1200 Subject: [PATCH 3/9] docs: improve --- src/{TaskWhenAnyResult.cs => TaskHelper.cs} | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) rename src/{TaskWhenAnyResult.cs => TaskHelper.cs} (88%) diff --git a/src/TaskWhenAnyResult.cs b/src/TaskHelper.cs similarity index 88% rename from src/TaskWhenAnyResult.cs rename to src/TaskHelper.cs index 3d836c7..4152233 100644 --- a/src/TaskWhenAnyResult.cs +++ b/src/TaskHelper.cs @@ -65,13 +65,20 @@ public static async Task WhenAnyResult( } /// - /// + /// Run async tasks in parallel, /// - /// - /// - /// - /// - /// + /// + /// A sequence of some data. + /// + /// + /// The async code to perform. + /// + /// + /// The number of partitions to create. + /// + /// + /// A Task to await. + /// /// /// Copied from https://houseofcat.io/tutorials/csharp/async/parallelforeachasync /// From a493adde70f52716703503e244a8b4891d51e8ef Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Tue, 20 Aug 2019 15:57:50 +1200 Subject: [PATCH 4/9] feat: make IPolicy synchronous --- src/BlackList.cs | 10 ++------ src/IPolicy.cs | 24 +++--------------- src/MultiAddressBlackList.cs | 9 ++----- src/MultiAddressWhiteList.cs | 11 +++------ src/Policy.cs | 15 ++++++------ src/Swarm.cs | 18 +++++--------- src/WhiteList.cs | 11 ++------- test/BlackListTest.cs | 27 ++++++-------------- test/MultiAdressBlackListTest.cs | 42 +++++++++++--------------------- test/MultiAdressWhiteListTest.cs | 42 +++++++++++--------------------- test/PeerManagerTest.cs | 6 ++--- test/PolicyTest.cs | 10 +++----- test/WhiteList.cs | 27 ++++++-------------- 13 files changed, 74 insertions(+), 178 deletions(-) diff --git a/src/BlackList.cs b/src/BlackList.cs index 7d6b8d1..51543d4 100644 --- a/src/BlackList.cs +++ b/src/BlackList.cs @@ -21,16 +21,10 @@ public class BlackList : ConcurrentBag, IPolicy where T : IEquatable { /// - public Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) + public bool IsAllowed(T target) { - return Task.FromResult(!this.Contains(target)); + return !this.Contains(target); } - /// - public async Task IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) - { - var q = await IsAllowedAsync(target, cancel).ConfigureAwait(false); - return !q; - } } } diff --git a/src/IPolicy.cs b/src/IPolicy.cs index a661a49..0fda7a7 100644 --- a/src/IPolicy.cs +++ b/src/IPolicy.cs @@ -21,28 +21,10 @@ interface IPolicy /// /// An object to test against the rule. /// - /// - /// Is used to stop the task. When cancelled, the is raised. - /// - /// - /// A task that represents the asynchronous operation. The task's result is - /// true if the passes the rule. - /// - Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)); - - /// - /// Determines if the target fails the rule. - /// - /// - /// An object to test against the rule. - /// - /// - /// Is used to stop the task. When cancelled, the is raised. - /// /// - /// A task that represents the asynchronous operation. The task's result is - /// true if the fails the rule. + /// true if the passes the rule; + /// otherwise false. /// - Task IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)); + bool IsAllowed(T target); } } diff --git a/src/MultiAddressBlackList.cs b/src/MultiAddressBlackList.cs index 6bfa65d..663a777 100644 --- a/src/MultiAddressBlackList.cs +++ b/src/MultiAddressBlackList.cs @@ -18,9 +18,9 @@ namespace PeerTalk public class MultiAddressBlackList : List, IPolicy { /// - public Task IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) + public bool IsAllowed(MultiAddress target) { - return Task.FromResult(!this.Any(filter => Matches(filter, target))); + return !this.Any(filter => Matches(filter, target)); } bool Matches(MultiAddress filter, MultiAddress target) @@ -30,10 +30,5 @@ bool Matches(MultiAddress filter, MultiAddress target) .All(fp => target.Protocols.Any(tp => tp.Code == fp.Code && tp.Value == fp.Value)); } - /// - public async Task IsNotAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) - { - return !await IsAllowedAsync(target, cancel).ConfigureAwait(false); - } } } diff --git a/src/MultiAddressWhiteList.cs b/src/MultiAddressWhiteList.cs index e37c61f..1f7c5d5 100644 --- a/src/MultiAddressWhiteList.cs +++ b/src/MultiAddressWhiteList.cs @@ -19,12 +19,12 @@ namespace PeerTalk public class MultiAddressWhiteList : ConcurrentBag, IPolicy { /// - public Task IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) + public bool IsAllowed(MultiAddress target) { if (IsEmpty) - return Task.FromResult(true); + return true; - return Task.FromResult(this.Any(filter => Matches(filter, target))); + return this.Any(filter => Matches(filter, target)); } bool Matches(MultiAddress filter, MultiAddress target) @@ -34,10 +34,5 @@ bool Matches(MultiAddress filter, MultiAddress target) .All(fp => target.Protocols.Any(tp => tp.Code == fp.Code && tp.Value == fp.Value)); } - /// - public async Task IsNotAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) - { - return !await IsAllowedAsync(target, cancel).ConfigureAwait(false); - } } } diff --git a/src/Policy.cs b/src/Policy.cs index a218c6d..d5bad1e 100644 --- a/src/Policy.cs +++ b/src/Policy.cs @@ -16,13 +16,12 @@ namespace PeerTalk public abstract class Policy : IPolicy { /// - public abstract Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)); + public abstract bool IsAllowed(T target); /// - public async Task IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) + public bool IsNotAllowed(T target) { - var q = await IsAllowedAsync(target, cancel).ConfigureAwait(false); - return !q; + return !IsAllowed(target); } } @@ -35,9 +34,9 @@ public abstract class Policy : IPolicy public class PolicyAlways : Policy { /// - public override Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) + public override bool IsAllowed(T target) { - return Task.FromResult(true); + return true; } } @@ -50,9 +49,9 @@ public class PolicyAlways : Policy public class PolicyNever : Policy { /// - public override Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) + public override bool IsAllowed(T target) { - return Task.FromResult(false); + return false; } } } diff --git a/src/Swarm.cs b/src/Swarm.cs index 618478f..570bc38 100644 --- a/src/Swarm.cs +++ b/src/Swarm.cs @@ -231,7 +231,7 @@ public IEnumerable KnownPeers /// added to the . /// /// - public async Task RegisterPeerAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken)) + public Task RegisterPeerAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken)) { var peerId = address.PeerId; if (peerId == LocalPeer.Id) @@ -239,7 +239,7 @@ public IEnumerable KnownPeers throw new Exception("Cannot register to self."); } - if (!await IsAllowedAsync(address, cancel).ConfigureAwait(false)) + if (!IsAllowed(address)) { throw new Exception($"Communication with '{address}' is not allowed."); } @@ -250,7 +250,7 @@ public IEnumerable KnownPeers Addresses = new List { address } }; - return RegisterPeer(peer); + return Task.FromResult(RegisterPeer(peer)); } /// @@ -1047,17 +1047,11 @@ public async Task StopListeningAsync(MultiAddress address) } /// - public async Task IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) + public bool IsAllowed(MultiAddress target) { - return await BlackList.IsAllowedAsync(target, cancel).ConfigureAwait(false) - && await WhiteList.IsAllowedAsync(target, cancel).ConfigureAwait(false); + return BlackList.IsAllowed(target) + && WhiteList.IsAllowed(target); } - /// - public async Task IsNotAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken)) - { - var q = await IsAllowedAsync(target, cancel).ConfigureAwait(false); - return !q; - } } } diff --git a/src/WhiteList.cs b/src/WhiteList.cs index 672beb6..24c395e 100644 --- a/src/WhiteList.cs +++ b/src/WhiteList.cs @@ -22,16 +22,9 @@ public class WhiteList : ConcurrentBag, IPolicy where T : IEquatable { /// - public Task IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) + public bool IsAllowed(T target) { - return Task.FromResult(this.IsEmpty || this.Contains(target)); - } - - /// - public async Task IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken)) - { - var q = await IsAllowedAsync(target, cancel).ConfigureAwait(false); - return !q; + return this.IsEmpty || this.Contains(target); } } } diff --git a/test/BlackListTest.cs b/test/BlackListTest.cs index c1220f5..565a874 100644 --- a/test/BlackListTest.cs +++ b/test/BlackListTest.cs @@ -10,35 +10,22 @@ namespace PeerTalk public class BlackListTest { [TestMethod] - public async Task Allowed() + public void Allowed() { var policy = new BlackList(); policy.Add("c"); policy.Add("d"); - Assert.IsTrue(await policy.IsAllowedAsync("a")); - Assert.IsTrue(await policy.IsAllowedAsync("b")); - Assert.IsFalse(await policy.IsAllowedAsync("c")); - Assert.IsFalse(await policy.IsAllowedAsync("d")); + Assert.IsTrue(policy.IsAllowed("a")); + Assert.IsTrue(policy.IsAllowed("b")); + Assert.IsFalse(policy.IsAllowed("c")); + Assert.IsFalse(policy.IsAllowed("d")); } [TestMethod] - public async Task NotAllowed() + public void Empty() { var policy = new BlackList(); - policy.Add("c"); - policy.Add("d"); - Assert.IsFalse(await policy.IsNotAllowedAsync("a")); - Assert.IsFalse(await policy.IsNotAllowedAsync("b")); - Assert.IsTrue(await policy.IsNotAllowedAsync("c")); - Assert.IsTrue(await policy.IsNotAllowedAsync("d")); - } - - [TestMethod] - public async Task Empty() - { - var policy = new BlackList(); - Assert.IsTrue(await policy.IsAllowedAsync("a")); - Assert.IsFalse(await policy.IsNotAllowedAsync("a")); + Assert.IsTrue(policy.IsAllowed("a")); } } } diff --git a/test/MultiAdressBlackListTest.cs b/test/MultiAdressBlackListTest.cs index 165677f..9d49a7f 100644 --- a/test/MultiAdressBlackListTest.cs +++ b/test/MultiAdressBlackListTest.cs @@ -17,49 +17,35 @@ public class MultiAddressBlackListTest MultiAddress d = "/p2p/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"; [TestMethod] - public async Task Allowed() + public void Allowed() { var policy = new MultiAddressBlackList(); policy.Add(a); policy.Add(b); - Assert.IsFalse(await policy.IsAllowedAsync(a)); - Assert.IsFalse(await policy.IsAllowedAsync(a1)); - Assert.IsFalse(await policy.IsAllowedAsync(b)); - Assert.IsTrue(await policy.IsAllowedAsync(c)); - Assert.IsTrue(await policy.IsAllowedAsync(d)); + Assert.IsFalse(policy.IsAllowed(a)); + Assert.IsFalse(policy.IsAllowed(a1)); + Assert.IsFalse(policy.IsAllowed(b)); + Assert.IsTrue(policy.IsAllowed(c)); + Assert.IsTrue(policy.IsAllowed(d)); } [TestMethod] - public async Task Allowed_Alias() + public void Allowed_Alias() { var policy = new MultiAddressBlackList(); policy.Add(a); - Assert.IsFalse(await policy.IsAllowedAsync(a)); - Assert.IsFalse(await policy.IsAllowedAsync(a1)); - Assert.IsFalse(await policy.IsAllowedAsync(b)); - Assert.IsTrue(await policy.IsAllowedAsync(c)); - Assert.IsTrue(await policy.IsAllowedAsync(d)); + Assert.IsFalse(policy.IsAllowed(a)); + Assert.IsFalse(policy.IsAllowed(a1)); + Assert.IsFalse(policy.IsAllowed(b)); + Assert.IsTrue(policy.IsAllowed(c)); + Assert.IsTrue(policy.IsAllowed(d)); } [TestMethod] - public async Task NotAllowed() + public void Empty() { var policy = new MultiAddressBlackList(); - policy.Add(a); - policy.Add(b); - Assert.IsTrue(await policy.IsNotAllowedAsync(a)); - Assert.IsTrue(await policy.IsNotAllowedAsync(a1)); - Assert.IsTrue(await policy.IsNotAllowedAsync(b)); - Assert.IsFalse(await policy.IsNotAllowedAsync(c)); - Assert.IsFalse(await policy.IsNotAllowedAsync(d)); - } - - [TestMethod] - public async Task Empty() - { - var policy = new MultiAddressBlackList(); - Assert.IsTrue(await policy.IsAllowedAsync(a)); - Assert.IsFalse(await policy.IsNotAllowedAsync(a)); + Assert.IsTrue( policy.IsAllowed(a)); } } } diff --git a/test/MultiAdressWhiteListTest.cs b/test/MultiAdressWhiteListTest.cs index e832617..3fd81b9 100644 --- a/test/MultiAdressWhiteListTest.cs +++ b/test/MultiAdressWhiteListTest.cs @@ -17,49 +17,35 @@ public class MultiAddressWhiteListTest MultiAddress d = "/p2p/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"; [TestMethod] - public async Task Allowed() + public void Allowed() { var policy = new MultiAddressWhiteList(); policy.Add(a); policy.Add(b); - Assert.IsTrue(await policy.IsAllowedAsync(a)); - Assert.IsTrue(await policy.IsAllowedAsync(a1)); - Assert.IsTrue(await policy.IsAllowedAsync(b)); - Assert.IsFalse(await policy.IsAllowedAsync(c)); - Assert.IsFalse(await policy.IsAllowedAsync(d)); + Assert.IsTrue(policy.IsAllowed(a)); + Assert.IsTrue(policy.IsAllowed(a1)); + Assert.IsTrue(policy.IsAllowed(b)); + Assert.IsFalse(policy.IsAllowed(c)); + Assert.IsFalse(policy.IsAllowed(d)); } [TestMethod] - public async Task Allowed_Alias() + public void Allowed_Alias() { var policy = new MultiAddressWhiteList(); policy.Add(a); - Assert.IsTrue(await policy.IsAllowedAsync(a)); - Assert.IsTrue(await policy.IsAllowedAsync(a1)); - Assert.IsTrue(await policy.IsAllowedAsync(b)); - Assert.IsFalse(await policy.IsAllowedAsync(c)); - Assert.IsFalse(await policy.IsAllowedAsync(d)); + Assert.IsTrue(policy.IsAllowed(a)); + Assert.IsTrue(policy.IsAllowed(a1)); + Assert.IsTrue(policy.IsAllowed(b)); + Assert.IsFalse(policy.IsAllowed(c)); + Assert.IsFalse(policy.IsAllowed(d)); } [TestMethod] - public async Task NotAllowed() + public void Empty() { var policy = new MultiAddressWhiteList(); - policy.Add(a); - policy.Add(b); - Assert.IsFalse(await policy.IsNotAllowedAsync(a)); - Assert.IsFalse(await policy.IsNotAllowedAsync(a1)); - Assert.IsFalse(await policy.IsNotAllowedAsync(b)); - Assert.IsTrue(await policy.IsNotAllowedAsync(c)); - Assert.IsTrue(await policy.IsNotAllowedAsync(d)); - } - - [TestMethod] - public async Task Empty() - { - var policy = new MultiAddressWhiteList(); - Assert.IsTrue(await policy.IsAllowedAsync(a)); - Assert.IsFalse(await policy.IsNotAllowedAsync(a)); + Assert.IsTrue(policy.IsAllowed(a)); } } } diff --git a/test/PeerManagerTest.cs b/test/PeerManagerTest.cs index 5313e06..481b2ea 100644 --- a/test/PeerManagerTest.cs +++ b/test/PeerManagerTest.cs @@ -40,17 +40,17 @@ public void IsNotReachable() } [TestMethod] - public async Task BlackListsThePeer() + public void BlackListsThePeer() { var peer = new Peer { Id = "QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb" }; var manager = new PeerManager { Swarm = new Swarm() }; Assert.AreEqual(0, manager.DeadPeers.Count); manager.SetNotReachable(peer); - Assert.IsFalse(await manager.Swarm.IsAllowedAsync("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + Assert.IsFalse(manager.Swarm.IsAllowed("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); manager.SetReachable(peer); - Assert.IsTrue(await manager.Swarm.IsAllowedAsync("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + Assert.IsTrue(manager.Swarm.IsAllowed("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); } [TestMethod] diff --git a/test/PolicyTest.cs b/test/PolicyTest.cs index ed82a6b..68b977b 100644 --- a/test/PolicyTest.cs +++ b/test/PolicyTest.cs @@ -10,19 +10,17 @@ namespace PeerTalk public class PolicyTest { [TestMethod] - public async Task Always() + public void Always() { var policy = new PolicyAlways(); - Assert.IsTrue(await policy.IsAllowedAsync("foo")); - Assert.IsFalse(await policy.IsNotAllowedAsync("foo")); + Assert.IsTrue(policy.IsAllowed("foo")); } [TestMethod] - public async Task Never() + public void Never() { var policy = new PolicyNever(); - Assert.IsFalse(await policy.IsAllowedAsync("foo")); - Assert.IsTrue(await policy.IsNotAllowedAsync("foo")); + Assert.IsFalse(policy.IsAllowed("foo")); } } } diff --git a/test/WhiteList.cs b/test/WhiteList.cs index 1da5621..bb9c9a4 100644 --- a/test/WhiteList.cs +++ b/test/WhiteList.cs @@ -10,35 +10,22 @@ namespace PeerTalk public class WhiteListTest { [TestMethod] - public async Task Allowed() + public void Allowed() { var policy = new WhiteList(); policy.Add("a"); policy.Add("b"); - Assert.IsTrue(await policy.IsAllowedAsync("a")); - Assert.IsTrue(await policy.IsAllowedAsync("b")); - Assert.IsFalse(await policy.IsAllowedAsync("c")); - Assert.IsFalse(await policy.IsAllowedAsync("d")); + Assert.IsTrue(policy.IsAllowed("a")); + Assert.IsTrue(policy.IsAllowed("b")); + Assert.IsFalse(policy.IsAllowed("c")); + Assert.IsFalse(policy.IsAllowed("d")); } [TestMethod] - public async Task NotAllowed() + public void Empty() { var policy = new WhiteList(); - policy.Add("a"); - policy.Add("b"); - Assert.IsFalse(await policy.IsNotAllowedAsync("a")); - Assert.IsFalse(await policy.IsNotAllowedAsync("b")); - Assert.IsTrue(await policy.IsNotAllowedAsync("c")); - Assert.IsTrue(await policy.IsNotAllowedAsync("d")); - } - - [TestMethod] - public async Task Empty() - { - var policy = new WhiteList(); - Assert.IsTrue(await policy.IsAllowedAsync("a")); - Assert.IsFalse(await policy.IsNotAllowedAsync("a")); + Assert.IsTrue(policy.IsAllowed("a")); } } } From 828f328ab25d643f4231c0bdcfa5a8b4efc6f296 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Wed, 21 Aug 2019 12:20:06 +1200 Subject: [PATCH 5/9] fix(Swarm.RegisterPeer): check policy --- src/Routing/Dht1.cs | 3 ++- src/Routing/DistributedQuery.cs | 4 ++-- src/Swarm.cs | 15 ++++++++++++- test/PeerManagerTest.cs | 4 ++-- test/SwarmTest.cs | 40 +++++++++++++++++++++++++++++++++ 5 files changed, 60 insertions(+), 6 deletions(-) diff --git a/src/Routing/Dht1.cs b/src/Routing/Dht1.cs index 3fea7b5..8c66a87 100644 --- a/src/Routing/Dht1.cs +++ b/src/Routing/Dht1.cs @@ -404,7 +404,8 @@ public DhtMessage ProcessAddProvider(Peer remotePeer, DhtMessage request, DhtMes .Select(p => p.TryToPeer(out Peer peer) ? peer : (Peer)null) .Where(p => p != null) .Where(p => p == remotePeer) - .Where(p => p.Addresses.Count() > 0); + .Where(p => p.Addresses.Count() > 0) + .Where(p => Swarm.IsAllowed(p)); foreach (var provider in providers) { Swarm.RegisterPeer(provider); diff --git a/src/Routing/DistributedQuery.cs b/src/Routing/DistributedQuery.cs index 637e195..a07d33d 100644 --- a/src/Routing/DistributedQuery.cs +++ b/src/Routing/DistributedQuery.cs @@ -211,7 +211,7 @@ void ProcessProviders(DhtPeerMessage[] providers) { if (provider.TryToPeer(out Peer p)) { - if (p == Dht.Swarm.LocalPeer) + if (p == Dht.Swarm.LocalPeer || !Dht.Swarm.IsAllowed(p)) continue; p = Dht.Swarm.RegisterPeer(p); @@ -236,7 +236,7 @@ void ProcessCloserPeers(DhtPeerMessage[] closerPeers) { if (closer.TryToPeer(out Peer p)) { - if (p == Dht.Swarm.LocalPeer) + if (p == Dht.Swarm.LocalPeer || !Dht.Swarm.IsAllowed(p)) continue; p = Dht.Swarm.RegisterPeer(p); diff --git a/src/Swarm.cs b/src/Swarm.cs index 570bc38..1d9ee53 100644 --- a/src/Swarm.cs +++ b/src/Swarm.cs @@ -21,7 +21,7 @@ namespace PeerTalk /// /// Manages communication with other peers. /// - public class Swarm : IService, IPolicy + public class Swarm : IService, IPolicy, IPolicy { static ILog log = LogManager.GetLogger(typeof(Swarm)); @@ -276,6 +276,9 @@ public IEnumerable KnownPeers /// is raised. /// /// + /// + /// The or policies forbid it. + /// public Peer RegisterPeer(Peer peer) { if (peer.Id == null) @@ -286,6 +289,10 @@ public Peer RegisterPeer(Peer peer) { throw new ArgumentException("Cannot register self."); } + if (!IsAllowed(peer)) + { + throw new Exception($"Communication with '{peer}' is not allowed."); + } var isNew = false; var p = otherPeers.AddOrUpdate(peer.Id.ToBase58(), @@ -1053,5 +1060,11 @@ public bool IsAllowed(MultiAddress target) && WhiteList.IsAllowed(target); } + /// + public bool IsAllowed(Peer peer) + { + return peer.Addresses.All(a => IsAllowed(a)); + } + } } diff --git a/test/PeerManagerTest.cs b/test/PeerManagerTest.cs index 481b2ea..c7c20d5 100644 --- a/test/PeerManagerTest.cs +++ b/test/PeerManagerTest.cs @@ -47,10 +47,10 @@ public void BlackListsThePeer() Assert.AreEqual(0, manager.DeadPeers.Count); manager.SetNotReachable(peer); - Assert.IsFalse(manager.Swarm.IsAllowed("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + Assert.IsFalse(manager.Swarm.IsAllowed((MultiAddress)"/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); manager.SetReachable(peer); - Assert.IsTrue(manager.Swarm.IsAllowed("/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); + Assert.IsTrue(manager.Swarm.IsAllowed((MultiAddress)"/p2p/QmXFX2P5ammdmXQgfqGkfswtEVFsZUJ5KeHRXQYCTdiTAb")); } [TestMethod] diff --git a/test/SwarmTest.cs b/test/SwarmTest.cs index d5c3605..730f9e7 100644 --- a/test/SwarmTest.cs +++ b/test/SwarmTest.cs @@ -1121,6 +1121,46 @@ public void DeregisterPeer() Assert.IsFalse(swarm.KnownPeers.Contains(other)); Assert.AreEqual(other, removedPeer); } + + [TestMethod] + public void IsAllowed_Peer() + { + var swarm = new Swarm(); + var peer = new Peer + { + Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h", + Addresses = new MultiAddress[] + { + "/ip4/127.0.0.1/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h" + } + }; + + Assert.IsTrue(swarm.IsAllowed(peer)); + + swarm.BlackList.Add(peer.Addresses.First()); + Assert.IsFalse(swarm.IsAllowed(peer)); + + swarm.BlackList.Clear(); + swarm.BlackList.Add("/p2p/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + Assert.IsFalse(swarm.IsAllowed(peer)); + } + + [TestMethod] + public void RegisterPeer_BlackListed() + { + var swarm = new Swarm { LocalPeer = self }; + var peer = new Peer + { + Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h", + Addresses = new MultiAddress[] + { + "/ip4/127.0.0.1/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h" + } + }; + + swarm.BlackList.Add(peer.Addresses.First()); + ExceptionAssert.Throws(() => swarm.RegisterPeer(peer)); + } } /// From 41bc2cf5b66ca92eaa6366c4b5756a73ade74ac7 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Wed, 21 Aug 2019 12:48:04 +1200 Subject: [PATCH 6/9] fix(MultiAddressWhiteList): make it a list not a bag --- src/MultiAddressWhiteList.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/MultiAddressWhiteList.cs b/src/MultiAddressWhiteList.cs index 1f7c5d5..42d6183 100644 --- a/src/MultiAddressWhiteList.cs +++ b/src/MultiAddressWhiteList.cs @@ -16,12 +16,12 @@ namespace PeerTalk /// Only targets that are a subset of any filters will pass. If no filters are defined, then anything /// passes. /// - public class MultiAddressWhiteList : ConcurrentBag, IPolicy + public class MultiAddressWhiteList : List, IPolicy { /// public bool IsAllowed(MultiAddress target) { - if (IsEmpty) + if (Count == 0) return true; return this.Any(filter => Matches(filter, target)); From 1b741e6f9ad3100976b67f9f1b095741fe88b74d Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Wed, 21 Aug 2019 12:49:57 +1200 Subject: [PATCH 7/9] fix(Swarm): rename RegisterPeerAsync to RegisterPeerAddress --- src/Swarm.cs | 33 ++++++++-------------------- test/AutoDialerTest.cs | 8 +++---- test/Routing/Dht1Test.cs | 32 ++++++++++++++-------------- test/SwarmTest.cs | 46 ++++++++++++++++++++-------------------- 4 files changed, 52 insertions(+), 67 deletions(-) diff --git a/src/Swarm.cs b/src/Swarm.cs index 1d9ee53..76be904 100644 --- a/src/Swarm.cs +++ b/src/Swarm.cs @@ -180,9 +180,9 @@ public Peer LocalPeer /// /// /// Contains any peer address that has been - /// discovered. + /// discovered. /// - /// + /// public IEnumerable KnownPeerAddresses { get @@ -198,9 +198,9 @@ public IEnumerable KnownPeerAddresses /// /// /// Contains any peer that has been - /// discovered. + /// discovered. /// - /// + /// public IEnumerable KnownPeers { get @@ -215,12 +215,8 @@ public IEnumerable KnownPeers /// /// An address to the peer. It must end with the peer ID. /// - /// - /// Is used to stop the task. When cancelled, the is raised. - /// /// - /// A task that represents the asynchronous operation. The task's result - /// is the that is registered. + /// The that is registered. /// /// /// The or policies forbid it. @@ -231,26 +227,15 @@ public IEnumerable KnownPeers /// added to the . /// /// - public Task RegisterPeerAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken)) + public Peer RegisterPeerAddress(MultiAddress address) { - var peerId = address.PeerId; - if (peerId == LocalPeer.Id) - { - throw new Exception("Cannot register to self."); - } - - if (!IsAllowed(address)) - { - throw new Exception($"Communication with '{address}' is not allowed."); - } - var peer = new Peer { - Id = peerId, + Id = address.PeerId, Addresses = new List { address } }; - return Task.FromResult(RegisterPeer(peer)); + return RegisterPeer(peer); } /// @@ -468,7 +453,7 @@ public async Task StopAsync() /// public async Task ConnectAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken)) { - var peer = await RegisterPeerAsync(address, cancel).ConfigureAwait(false); + var peer = RegisterPeerAddress(address); return await ConnectAsync(peer, cancel).ConfigureAwait(false); } diff --git a/test/AutoDialerTest.cs b/test/AutoDialerTest.cs index 6fdec7a..72f4d99 100644 --- a/test/AutoDialerTest.cs +++ b/test/AutoDialerTest.cs @@ -56,7 +56,7 @@ public async Task Connects_OnPeerDiscovered_When_Below_MinConnections() { using (var dialer = new AutoDialer(swarmA)) { - var other = await swarmA.RegisterPeerAsync(peerBAddress); + var other = swarmA.RegisterPeerAddress(peerBAddress); // wait for the connection. var endTime = DateTime.Now.AddSeconds(3); @@ -90,7 +90,7 @@ public async Task Noop_OnPeerDiscovered_When_NotBelow_MinConnections() { using (var dialer = new AutoDialer(swarmA) { MinConnections = 0 }) { - var other = await swarmA.RegisterPeerAsync(peerBAddress); + var other = swarmA.RegisterPeerAddress(peerBAddress); // wait for the connection. var endTime = DateTime.Now.AddSeconds(3); @@ -136,8 +136,8 @@ public async Task Connects_OnPeerDisconnected_When_Below_MinConnections() { using (var dialer = new AutoDialer(swarmA) { MinConnections = 1 }) { - var b = await swarmA.RegisterPeerAsync(peerBAddress); - var c = await swarmA.RegisterPeerAsync(peerCAddress); + var b = swarmA.RegisterPeerAddress(peerBAddress); + var c = swarmA.RegisterPeerAddress(peerCAddress); // wait for the peer B connection. var endTime = DateTime.Now.AddSeconds(3); diff --git a/test/Routing/Dht1Test.cs b/test/Routing/Dht1Test.cs index 82dc1fa..8f88ea0 100644 --- a/test/Routing/Dht1Test.cs +++ b/test/Routing/Dht1Test.cs @@ -44,7 +44,7 @@ public async Task StoppedEventRaised() public async Task SeedsRoutingTableFromSwarm() { var swarm = new Swarm { LocalPeer = self }; - var peer = await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + var peer = swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); var dht = new Dht1 { Swarm = swarm }; await dht.StartAsync(); try @@ -65,7 +65,7 @@ public async Task AddDiscoveredPeerToRoutingTable() await dht.StartAsync(); try { - var peer = await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + var peer = swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); Assert.IsTrue(dht.RoutingTable.Contains(peer)); } finally @@ -82,7 +82,7 @@ public async Task RemovesPeerFromRoutingTable() await dht.StartAsync(); try { - var peer = await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + var peer = swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); Assert.IsTrue(dht.RoutingTable.Contains(peer)); swarm.DeregisterPeer(peer); @@ -151,7 +151,7 @@ public async Task ProcessFindNodeMessage_InRoutingTable() public async Task ProcessFindNodeMessage_InSwarm() { var swarm = new Swarm { LocalPeer = self }; - var other = await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + var other = swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); var dht = new Dht1 { Swarm = swarm }; await dht.StartAsync(); try @@ -181,11 +181,11 @@ public async Task ProcessFindNodeMessage_InSwarm() public async Task ProcessFindNodeMessage_Closest() { var swarm = new Swarm { LocalPeer = self }; - await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1a"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1b"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1c"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.4/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1d"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.5/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1e"); + swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1a"); + swarm.RegisterPeerAddress("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1b"); + swarm.RegisterPeerAddress("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1c"); + swarm.RegisterPeerAddress("/ip4/127.0.0.4/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1d"); + swarm.RegisterPeerAddress("/ip4/127.0.0.5/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1e"); var dht = new Dht1 { Swarm = swarm, CloserPeerCount = 3 }; await dht.StartAsync(); try @@ -209,11 +209,11 @@ public async Task ProcessFindNodeMessage_Closest() public async Task ProcessFindNodeMessage_BadNodeId() { var swarm = new Swarm { LocalPeer = self }; - await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1a"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1b"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1c"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.4/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1d"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.5/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1e"); + swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1a"); + swarm.RegisterPeerAddress("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1b"); + swarm.RegisterPeerAddress("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1c"); + swarm.RegisterPeerAddress("/ip4/127.0.0.4/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1d"); + swarm.RegisterPeerAddress("/ip4/127.0.0.5/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1e"); var dht = new Dht1 { Swarm = swarm, CloserPeerCount = 3 }; await dht.StartAsync(); try @@ -349,8 +349,8 @@ public async Task QueryIsCancelled_WhenDhtStops() { var unknownPeer = new MultiHash("QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCxxx"); var swarm = new Swarm { LocalPeer = self }; - await swarm.RegisterPeerAsync("/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd"); - await swarm.RegisterPeerAsync("/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"); + swarm.RegisterPeerAddress("/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd"); + swarm.RegisterPeerAddress("/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"); var dht = new Dht1 { Swarm = swarm }; await dht.StartAsync(); var task = dht.FindPeerAsync(unknownPeer); diff --git a/test/SwarmTest.cs b/test/SwarmTest.cs index 730f9e7..34c7524 100644 --- a/test/SwarmTest.cs +++ b/test/SwarmTest.cs @@ -48,10 +48,10 @@ public void Start_NoLocalPeer() } [TestMethod] - public async Task NewPeerAddress() + public void NewPeerAddress() { var swarm = new Swarm { LocalPeer = self }; - await swarm.RegisterPeerAsync(mars); + swarm.RegisterPeerAddress(mars); Assert.IsTrue(swarm.KnownPeerAddresses.Contains(mars)); } @@ -62,13 +62,13 @@ public void NewPeerAddress_Self() var selfAddress = "/ip4/178.62.158.247/tcp/4001/ipfs/" + self.Id; ExceptionAssert.Throws(() => { - var _ = swarm.RegisterPeerAsync(selfAddress).Result; + var _ = swarm.RegisterPeerAddress(selfAddress); }); selfAddress = "/ip4/178.62.158.247/tcp/4001/p2p/" + self.Id; ExceptionAssert.Throws(() => { - var _ = swarm.RegisterPeerAsync(selfAddress).Result; + var _ = swarm.RegisterPeerAddress(selfAddress); }); } @@ -80,11 +80,11 @@ public void NewPeerAddress_BlackList() ExceptionAssert.Throws(() => { - var _ = swarm.RegisterPeerAsync(mars).Result; + var _ = swarm.RegisterPeerAddress(mars); }); Assert.IsFalse(swarm.KnownPeerAddresses.Contains(mars)); - Assert.IsNotNull(swarm.RegisterPeerAsync(venus).Result); + Assert.IsNotNull(swarm.RegisterPeerAddress(venus)); Assert.IsTrue(swarm.KnownPeerAddresses.Contains(venus)); } @@ -96,11 +96,11 @@ public void NewPeerAddress_WhiteList() ExceptionAssert.Throws(() => { - var _ = swarm.RegisterPeerAsync(mars).Result; + var _ = swarm.RegisterPeerAddress(mars); }); Assert.IsFalse(swarm.KnownPeerAddresses.Contains(mars)); - Assert.IsNotNull(swarm.RegisterPeerAsync(venus).Result); + Assert.IsNotNull(swarm.RegisterPeerAddress(venus)); Assert.IsTrue(swarm.KnownPeerAddresses.Contains(venus)); } @@ -110,38 +110,38 @@ public void NewPeerAddress_InvalidAddress_MissingPeerId() var swarm = new Swarm { LocalPeer = self }; ExceptionAssert.Throws(() => { - var _ = swarm.RegisterPeerAsync("/ip4/10.1.10.10/tcp/29087").Result; + var _ = swarm.RegisterPeerAddress("/ip4/10.1.10.10/tcp/29087"); }); Assert.AreEqual(0, swarm.KnownPeerAddresses.Count()); } [TestMethod] - public async Task NewPeerAddress_Duplicate() + public void NewPeerAddress_Duplicate() { var swarm = new Swarm { LocalPeer = self }; - await swarm.RegisterPeerAsync(mars); + swarm.RegisterPeerAddress(mars); Assert.AreEqual(1, swarm.KnownPeerAddresses.Count()); - await swarm.RegisterPeerAsync(mars); + swarm.RegisterPeerAddress(mars); Assert.AreEqual(1, swarm.KnownPeerAddresses.Count()); } [TestMethod] - public async Task KnownPeers() + public void KnownPeers() { var swarm = new Swarm { LocalPeer = self }; Assert.AreEqual(0, swarm.KnownPeers.Count()); Assert.AreEqual(0, swarm.KnownPeerAddresses.Count()); - await swarm.RegisterPeerAsync("/ip4/10.1.10.10/tcp/29087/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3"); + swarm.RegisterPeerAddress("/ip4/10.1.10.10/tcp/29087/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3"); Assert.AreEqual(1, swarm.KnownPeers.Count()); Assert.AreEqual(1, swarm.KnownPeerAddresses.Count()); - await swarm.RegisterPeerAsync("/ip4/10.1.10.11/tcp/29087/p2p/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3"); + swarm.RegisterPeerAddress("/ip4/10.1.10.11/tcp/29087/p2p/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3"); Assert.AreEqual(1, swarm.KnownPeers.Count()); Assert.AreEqual(2, swarm.KnownPeerAddresses.Count()); - await swarm.RegisterPeerAsync(venus); + swarm.RegisterPeerAddress(venus); Assert.AreEqual(2, swarm.KnownPeers.Count()); Assert.AreEqual(3, swarm.KnownPeerAddresses.Count()); } @@ -1046,7 +1046,7 @@ public async Task Dial_Peer() } [TestMethod] - public async Task PeerDiscovered() + public void PeerDiscovered() { var swarm = new Swarm { LocalPeer = self }; var peerCount = 0; @@ -1054,12 +1054,12 @@ public async Task PeerDiscovered() { ++peerCount; }; - await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); - await swarm.RegisterPeerAsync("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); + swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + swarm.RegisterPeerAddress("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + swarm.RegisterPeerAddress("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h"); + swarm.RegisterPeerAddress("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); + swarm.RegisterPeerAddress("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); + swarm.RegisterPeerAddress("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1i"); swarm.RegisterPeer(new Peer { Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1j" }); swarm.RegisterPeer(new Peer { Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1j" }); From 5527fb391ca7fa01fc55ce0ddf08c4adb904c45d Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Thu, 22 Aug 2019 10:13:26 +1200 Subject: [PATCH 8/9] fix(Policy): all policy lists must be thread safe --- src/MultiAddressBlackList.cs | 36 ++++++++++++++++++++++-- src/MultiAddressWhiteList.cs | 35 ++++++++++++++++++++++-- test/MultiAdressBlackListTest.cs | 47 ++++++++++++++++++++++++++++++++ test/MultiAdressWhiteListTest.cs | 47 ++++++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+), 6 deletions(-) diff --git a/src/MultiAddressBlackList.cs b/src/MultiAddressBlackList.cs index 663a777..c82b6a2 100644 --- a/src/MultiAddressBlackList.cs +++ b/src/MultiAddressBlackList.cs @@ -1,5 +1,6 @@ using Ipfs; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -10,17 +11,19 @@ namespace PeerTalk { /// - /// A sequence of filters that are not approved. + /// A collection of filters that are not approved. /// /// /// Only targets that do match a filter will pass. /// - public class MultiAddressBlackList : List, IPolicy + public class MultiAddressBlackList : ICollection, IPolicy { + ConcurrentDictionary filters = new ConcurrentDictionary(); + /// public bool IsAllowed(MultiAddress target) { - return !this.Any(filter => Matches(filter, target)); + return !filters.Any(kvp => Matches(kvp.Key, target)); } bool Matches(MultiAddress filter, MultiAddress target) @@ -30,5 +33,32 @@ bool Matches(MultiAddress filter, MultiAddress target) .All(fp => target.Protocols.Any(tp => tp.Code == fp.Code && tp.Value == fp.Value)); } + /// + public bool Remove(MultiAddress item) => filters.TryRemove(item, out _); + + /// + public int Count => filters.Count; + + /// + public bool IsReadOnly => false; + + /// + public void Add(MultiAddress item) => filters.TryAdd(item, item); + + /// + public void Clear() => filters.Clear(); + + /// + public bool Contains(MultiAddress item) => filters.Keys.Contains(item); + + /// + public void CopyTo(MultiAddress[] array, int arrayIndex) => filters.Keys.CopyTo(array, arrayIndex); + + /// + public IEnumerator GetEnumerator() => filters.Keys.GetEnumerator(); + + /// + IEnumerator IEnumerable.GetEnumerator() => filters.Keys.GetEnumerator(); + } } diff --git a/src/MultiAddressWhiteList.cs b/src/MultiAddressWhiteList.cs index 42d6183..b23d661 100644 --- a/src/MultiAddressWhiteList.cs +++ b/src/MultiAddressWhiteList.cs @@ -1,5 +1,6 @@ using Ipfs; using System; +using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -16,15 +17,17 @@ namespace PeerTalk /// Only targets that are a subset of any filters will pass. If no filters are defined, then anything /// passes. /// - public class MultiAddressWhiteList : List, IPolicy + public class MultiAddressWhiteList : ICollection, IPolicy { + ConcurrentDictionary filters = new ConcurrentDictionary(); + /// public bool IsAllowed(MultiAddress target) { - if (Count == 0) + if (filters.IsEmpty) return true; - return this.Any(filter => Matches(filter, target)); + return filters.Any(kvp => Matches(kvp.Key, target)); } bool Matches(MultiAddress filter, MultiAddress target) @@ -34,5 +37,31 @@ bool Matches(MultiAddress filter, MultiAddress target) .All(fp => target.Protocols.Any(tp => tp.Code == fp.Code && tp.Value == fp.Value)); } + /// + public bool Remove(MultiAddress item) => filters.TryRemove(item, out _); + + /// + public int Count => filters.Count; + + /// + public bool IsReadOnly => false; + + /// + public void Add(MultiAddress item) => filters.TryAdd(item, item); + + /// + public void Clear() => filters.Clear(); + + /// + public bool Contains(MultiAddress item) => filters.Keys.Contains(item); + + /// + public void CopyTo(MultiAddress[] array, int arrayIndex) => filters.Keys.CopyTo(array, arrayIndex); + + /// + public IEnumerator GetEnumerator() => filters.Keys.GetEnumerator(); + + /// + IEnumerator IEnumerable.GetEnumerator() => filters.Keys.GetEnumerator(); } } diff --git a/test/MultiAdressBlackListTest.cs b/test/MultiAdressBlackListTest.cs index 9d49a7f..1c26fe9 100644 --- a/test/MultiAdressBlackListTest.cs +++ b/test/MultiAdressBlackListTest.cs @@ -47,5 +47,52 @@ public void Empty() var policy = new MultiAddressBlackList(); Assert.IsTrue( policy.IsAllowed(a)); } + + [TestMethod] + public void Collection() + { + MultiAddress a = "/ip4/127.0.0.1"; + MultiAddress b = "/ip4/127.0.0.2"; + + var policy = new MultiAddressBlackList(); + Assert.IsFalse(policy.IsReadOnly); + Assert.AreEqual(0, policy.Count); + Assert.IsFalse(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(a); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(a); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(b); + Assert.AreEqual(2, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsTrue(policy.Contains(b)); + + policy.Remove(b); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + var array = new MultiAddress[1]; + policy.CopyTo(array, 0); + Assert.AreSame(a, array[0]); + + foreach (var filter in policy) + { + Assert.AreSame(a, filter); + } + + policy.Clear(); + Assert.AreEqual(0, policy.Count); + Assert.IsFalse(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + } } } diff --git a/test/MultiAdressWhiteListTest.cs b/test/MultiAdressWhiteListTest.cs index 3fd81b9..ad41d13 100644 --- a/test/MultiAdressWhiteListTest.cs +++ b/test/MultiAdressWhiteListTest.cs @@ -47,5 +47,52 @@ public void Empty() var policy = new MultiAddressWhiteList(); Assert.IsTrue(policy.IsAllowed(a)); } + + [TestMethod] + public void Collection() + { + MultiAddress a = "/ip4/127.0.0.1"; + MultiAddress b = "/ip4/127.0.0.2"; + + var policy = new MultiAddressWhiteList(); + Assert.IsFalse(policy.IsReadOnly); + Assert.AreEqual(0, policy.Count); + Assert.IsFalse(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(a); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(a); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + policy.Add(b); + Assert.AreEqual(2, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsTrue(policy.Contains(b)); + + policy.Remove(b); + Assert.AreEqual(1, policy.Count); + Assert.IsTrue(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + + var array = new MultiAddress[1]; + policy.CopyTo(array, 0); + Assert.AreSame(a, array[0]); + + foreach (var filter in policy) + { + Assert.AreSame(a, filter); + } + + policy.Clear(); + Assert.AreEqual(0, policy.Count); + Assert.IsFalse(policy.Contains(a)); + Assert.IsFalse(policy.Contains(b)); + } } } From 844a2ebe862c2a91af39ad8e8bbdd4e5a73c9108 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Thu, 22 Aug 2019 11:35:01 +1200 Subject: [PATCH 9/9] chore: update dependencies --- src/PeerTalk.csproj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/PeerTalk.csproj b/src/PeerTalk.csproj index a30c881..3523c3a 100644 --- a/src/PeerTalk.csproj +++ b/src/PeerTalk.csproj @@ -8,8 +8,8 @@ portable - 0.42.0 - 0.42.0 + 0.42 + 0.42 PeerTalk @@ -51,7 +51,7 @@ - +