Skip to content

Commit

Permalink
Fix index obs (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Aug 29, 2023
1 parent 919236d commit 867913f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public Observable<Observable<Integer>> call(Context context, Index index) {
return Observable.just(
Observable.interval(0, period, TimeUnit.SECONDS)
.map(time -> {
System.out.println("total worker num: " + index.getTotalNumWorkers());
if (useRandom) {
return randomNumGenerator.nextInt((max - min) + 1) + min;
} else {
Expand Down
14 changes: 11 additions & 3 deletions mantis-runtime/src/main/java/io/mantisrx/runtime/source/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package io.mantisrx.runtime.source;

import lombok.extern.slf4j.Slf4j;
import rx.Observable;
import rx.subjects.BehaviorSubject;


@Slf4j
public class Index {

private final int workerIndex;
private final Observable<Integer> totalNumWorkersObservable;
private final BehaviorSubject<Integer> totalNumWorkersObservable;


public Index(int offset, int total) {
Expand All @@ -34,14 +35,21 @@ public Index(int offset, int total) {

public Index(int offset, final Observable<Integer> totalWorkerAtStageObservable) {
this.workerIndex = offset;
this.totalNumWorkersObservable = totalWorkerAtStageObservable;
this.totalNumWorkersObservable = BehaviorSubject.create();
totalWorkerAtStageObservable.subscribe(this.totalNumWorkersObservable);
}

public int getWorkerIndex() {
return workerIndex;
}

public int getTotalNumWorkers() {
Integer workerNum = this.totalNumWorkersObservable.getValue();
if (workerNum != null) {
return workerNum;
}

log.info("totalNumWorkersObservable is not ready yet, waiting.");
return totalNumWorkersObservable.take(1).toBlocking().first();
}

Expand Down

0 comments on commit 867913f

Please sign in to comment.