Skip to content

Commit

Permalink
match bug fix 2
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Oct 26, 2016
1 parent 56c5248 commit 1263c8a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,7 @@ private int handleItem(Object value, Source source) {
}
}
// requests are batched so that each source gets a turn
if (requestFromA == requestSize && completed == COMPLETED_B) {
requestFromA = 0;
aSub.requestMore(requestSize);
} else if (requestFromB == requestSize && completed == COMPLETED_A) {
requestFromB = 0;
bSub.requestMore(requestSize);
} else if (requestFromA == requestSize && requestFromB == requestSize) {
requestFromA = 0;
requestFromB = 0;
aSub.requestMore(requestSize);
bSub.requestMore(requestSize);
}
checkToRequestMore();
return numEmitted;
}

Expand All @@ -259,6 +248,23 @@ private void handleCompleted(CompletedFrom comp) {
if (done) {
clear();
child.onCompleted();
} else {
checkToRequestMore();
}
}

private void checkToRequestMore() {
if (requestFromA == requestSize && completed == COMPLETED_B) {
requestFromA = 0;
aSub.requestMore(requestSize);
} else if (requestFromB == requestSize && completed == COMPLETED_A) {
requestFromB = 0;
bSub.requestMore(requestSize);
} else if (requestFromA == requestSize && requestFromB == requestSize) {
requestFromA = 0;
requestFromB = 0;
aSub.requestMore(requestSize);
bSub.requestMore(requestSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public void testKeepsRequesting() {
Observable<Integer> b = Observable.just(2).repeat(1000).concatWith(Observable.just(1));
match(a, b, 1);
}

@Test
public void testKeepsRequestingSwitched() {
Observable<Integer> a = Observable.just(2).repeat(1000).concatWith(Observable.just(1));
Observable<Integer> b = Observable.just(1);
match(a, b, 1);
}

@Test
public void test2() {
Expand Down Expand Up @@ -98,6 +105,13 @@ public void testNoMatchExistsForAtLeastOneFirstLonger() {
Observable<Integer> b = Observable.just(1);
match(a, b, 1);
}

@Test
public void testNoMatchExistsForAtLeastOneFirstLongerSwitched() {
Observable<Integer> a = Observable.just(1);
Observable<Integer> b = Observable.just(1, 2);
match(a, b, 1);
}

@Test
public void testNoMatchExistsForAtLeastOneSameLength() {
Expand Down

0 comments on commit 1263c8a

Please sign in to comment.