Skip to content

Commit

Permalink
fix leaked getSchedInfo calls (#661)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Apr 25, 2024
1 parent 41410bf commit 3142e98
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.server.master.client;

public class JobIdNotFoundException extends Exception {
public JobIdNotFoundException(String jobId) {
super(String.format("JobId %s not found", jobId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,16 +699,24 @@ public Observable<String> getJobStatusObservable(final String jobId) {
* @return
*/
public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
final ConditionalRetry retryObject = new ConditionalRetry(null, "assignmentresults_" + jobId);
return masterMonitor.getMasterObservable()
.filter(masterDescription -> masterDescription != null)
.retryWhen(retryLogic)
.switchMap((Func1<MasterDescription,
Observable<JobSchedulingInfo>>) masterDescription -> getRxnettySseClient(
masterDescription.getHostname(), masterDescription.getSchedInfoPort())
.submit(HttpClientRequest.createGet("/assignmentresults/" + jobId + "?sendHB=true"))
.submit(
HttpClientRequest.createGet("/assignmentresults/" + jobId + "?sendHB=true"))
.flatMap((Func1<HttpClientResponse<ServerSentEvent>,
Observable<JobSchedulingInfo>>) response -> {
if (!HttpResponseStatus.OK.equals(response.getStatus())) {
if (HttpResponseStatus.NOT_FOUND.equals(response.getStatus())) {
logger.error("GET assignmentresults not found: {}", response.getStatus());
JobIdNotFoundException notFoundException = new JobIdNotFoundException(jobId);
retryObject.setErrorRef(notFoundException);
return Observable.error(notFoundException);
} else if (!HttpResponseStatus.OK.equals(response.getStatus())) {
logger.error("GET assignmentresults failed: {}", response.getStatus());
return Observable.error(new Exception(response.getStatus().reasonPhrase()));
}
return response.getContent()
Expand All @@ -717,6 +725,7 @@ public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
return objectMapper.readValue(event.contentAsString(),
JobSchedulingInfo.class);
} catch (IOException e) {
logger.warn("Invalid schedInfo json: {}", e.getMessage());
throw new RuntimeException("Invalid schedInfo json: " + e.getMessage(), e);
}
})
Expand All @@ -727,7 +736,7 @@ public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
;
}))
.repeatWhen(repeatLogic)
.retryWhen(retryLogic)
.retryWhen(retryObject.getRetryLogic())
;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,8 +40,6 @@
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServer;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import org.junit.AfterClass;
import org.junit.Test;
Expand Down Expand Up @@ -68,22 +68,23 @@ public static void cleanup() throws InterruptedException {
}
}

public HttpServer<String, String> createHttpServer(int port) {
public HttpServer<String, String> createHttpServer(int port, RequestHandler<String, String> requestHandler) {
final HttpServer<String, String> server = RxNetty.newHttpServerBuilder(
port,
new RequestHandler<String, String>() {
@Override
public Observable<Void> handle(HttpServerRequest<String> req, HttpServerResponse<String> resp) {
resp.writeAndFlush("200 OK");
return Observable.empty();
}
})
.pipelineConfigurator(PipelineConfigurators.httpServerConfigurator())
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WriteBufferWaterMark.DEFAULT)
.build();
requestHandler)
.pipelineConfigurator(PipelineConfigurators.httpServerConfigurator())
.channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WriteBufferWaterMark.DEFAULT)
.build();
return server;
}

public HttpServer<String, String> createHttpServer(int port) {
return createHttpServer(port, (req, resp) -> {
resp.writeAndFlush("200 OK");
return Observable.empty();
});
}

@Test
public void testScaleStageRequestRetries() throws InterruptedException {

Expand Down Expand Up @@ -234,4 +235,41 @@ public void call() {
assertTrue(retryLatch.await(5, TimeUnit.SECONDS));
assertTrue(completedLatch.await(5, TimeUnit.SECONDS));
}

@Test
public void testGetSchedulingInfoRetry() throws InterruptedException {
// This test is to validate mantisMasterClientApi.schedulingChanges() stream can handle completed/closed job.
MasterMonitor mockMasterMonitor = mock(MasterMonitor.class);
final BehaviorSubject<MasterDescription> mdSubject = BehaviorSubject.create();
when(mockMasterMonitor.getMasterObservable()).thenReturn(mdSubject);
MantisMasterClientApi mantisMasterClientApi = new MantisMasterClientApi(mockMasterMonitor);

final int apiPort = port.incrementAndGet();
Schedulers.newThread().createWorker().schedule(() -> {
final HttpServer<String, String> httpServer = createHttpServer(
apiPort,
(req, resp) -> {
resp.setStatus(HttpResponseStatus.NOT_FOUND);
return Observable.empty();
});
startedServers.add(httpServer);
httpServer.start();
});

final String jobId = "test-job-id1";
mdSubject.onNext(new MasterDescription("localhost", "127.0.0.1", apiPort, apiPort, apiPort, "status", apiPort, System.currentTimeMillis()));
final Observable<JobSchedulingInfo> resultObs = mantisMasterClientApi.schedulingChanges(jobId);
final CountDownLatch completedLatch = new CountDownLatch(1);

resultObs
.doOnError(throwable -> {
logger.info("Got expected error: ", throwable);
completedLatch.countDown();
})
.doOnCompleted(() -> {
fail("Obs should fail to doOnError");
}).subscribe();

assertTrue(completedLatch.await(3, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.mantisrx.master.api.akka.route.proto.JobClusterInfo;
import io.mantisrx.master.api.akka.route.proto.JobDiscoveryRouteProto;
import io.mantisrx.master.api.akka.route.utils.StreamingUtils;
import io.mantisrx.master.jobcluster.proto.BaseResponse.ResponseCode;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.master.domain.JobId;
Expand Down Expand Up @@ -97,6 +98,16 @@ private Route getJobDiscoveryRoutes() {
return completeAsync(
schedulingInfoRespCS,
r -> {
if (r.responseCode.equals(ResponseCode.CLIENT_ERROR_NOT_FOUND)) {
logger.warn(
"Sched info stream not found for job {}",
jobId);
return complete(
StatusCodes.NOT_FOUND,
"Sched info stream not found for job " +
jobId);
}

Optional<Observable<JobSchedulingInfo>> schedInfoStreamO = r
.getSchedInfoStream();
if (schedInfoStreamO.isPresent()) {
Expand Down

0 comments on commit 3142e98

Please sign in to comment.