Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
Fix Throttle() yield condition. Part of bugfix for issue #15.
Browse files Browse the repository at this point in the history
Though this somehow causes "regressive" failure in Throttle() test. I believe
this is due to some other reason.
  • Loading branch information
Atsushi Eno committed May 18, 2012
1 parent a31d928 commit a76fc04
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ RUNTIME = mono --debug $(MONO_OPTIONS)
all: System.Reactive.Tests/bin/Debug/System.Reactive.Tests.dll

System.Reactive/bin/Debug/System.Reactive.dll:
xbuild
xbuild mono-reactive.sln

System.Reactive.Tests/bin/Debug/System.Reactive.Tests.dll: System.Reactive/bin/Debug/System.Reactive.dll System.Reactive.Tests/*/*.cs
xbuild
xbuild mono-reactive.sln

run-test: all
$(RUNTIME) external/nunit26/nunit-console.exe System.Reactive.Tests/bin/Debug/System.Reactive.Tests.dll $(NUNIT_OPTIONS)
30 changes: 30 additions & 0 deletions System.Reactive.Tests/System.Reactive.Linq/ObservableTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using NUnit.Framework;

Expand Down Expand Up @@ -940,6 +941,35 @@ public void Throttle ()
dis.Dispose ();
}

[Test]
public void Throttle2 ()
{
var subject = new Subject<string> ();
var scheduler = new HistoricalScheduler ();
var input = (from text in subject select text).Throttle (TimeSpan.FromSeconds (0.5), scheduler).Timestamp (scheduler);
var sw = new StringWriter ();
input.Subscribe (
v => sw.WriteLine ("THR: {0} at {1:ss.fff} timer:{2:ss.fff}", v.Value, v.Timestamp, scheduler.Now),
() => sw.WriteLine ("THR: completed: {0:ss.fff}", scheduler.Now));

int [] vals = {100, 600, 300, 600, 400, 900, 500, 800};
for (int i = 0; i < 10; i++) {
var val = vals [i % vals.Length];
scheduler.AdvanceBy (TimeSpan.FromMilliseconds (val));
subject.OnNext (val.ToString ());
}
subject.OnCompleted ();
string expected = @"THR: 100 at 00.600 timer:00.600
THR: 300 at 01.500 timer:01.500
THR: 400 at 02.500 timer:02.500
THR: 900 at 03.400 timer:03.400
THR: 500 at 03.900 timer:03.900
THR: 100 at 04.800 timer:04.800
THR: completed: 04.900
".Replace ("\r\n", "\n");
Assert.AreEqual (expected, sw.ToString ().Replace ("\r\n", "\n"), "#1");
}

[Test]
public void TimeoutInTime ()
{
Expand Down
26 changes: 10 additions & 16 deletions System.Reactive/System.Reactive.Linq/Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2300,23 +2300,17 @@ public static IObservable<TSource> Throttle<TSource> (

return new ColdObservableEach<TSource> (sub => {
// ----
DateTimeOffset last = scheduler.Now;
bool fire = false;
TSource value;
var current = new SerialDisposable ();
TSource value = default (TSource);
return source.Subscribe (Observer.Create<TSource> (v => {
if (scheduler.Now - last >= dueTime) {
last = scheduler.Now;
sub.OnNext (v);
} else {
value = v;
if (!fire) {
fire = true;
var slotDueTime = dueTime - (scheduler.Now - last);
var ddis = new SingleAssignmentDisposable ();
ddis.Disposable = scheduler.Schedule (slotDueTime, () => { last = scheduler.Now; sub.OnNext (value); fire = false; value = default (TSource); ddis.Dispose ();});
}
}
}, ex => sub.OnError (ex), () => sub.OnCompleted ()));
current.Disposable = scheduler.Schedule (dueTime, () => { sub.OnNext (v); value = v; });
}, ex => {
sub.OnError (ex); current.Dispose ();
}, () => {
sub.OnCompleted ();
// The reason for this delay is, the ongoing value should be successfully submitted, instead of being discarded.
scheduler.Schedule (current.Disposable == null ? TimeSpan.Zero : dueTime, () => current.Dispose ());
}));
// ----
}, scheduler);
}
Expand Down

0 comments on commit a76fc04

Please sign in to comment.