Skip to content

Commit

Permalink
Merge pull request #37 from rgdoliveira/sync_main
Browse files Browse the repository at this point in the history
Sync main branch with Apache main branch
  • Loading branch information
rgdoliveira authored Sep 5, 2024
2 parents f69f047 + ced181e commit 8cf73b2
Show file tree
Hide file tree
Showing 29 changed files with 1,429 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
job_name: [ kogito-quarkus-examples, kogito-springboot-examples, serverless-workflow-examples ]
os: [ubuntu-latest]
java-version: [17]
maven-version: ['3.9.3']
maven-version: ['3.9.6']
include:
- job_name: kogito-quarkus-examples
repository: kogito-examples
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr-kogito-apps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
os: [ubuntu-latest]
java-version: [17]
maven-version: ['3.9.3']
maven-version: ['3.9.6']
fail-fast: false
runs-on: ${{ matrix.os }}
name: ${{ matrix.os }} / Java-${{ matrix.java-version }} / Maven-${{ matrix.maven-version }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected Uni<JobServiceManagementInfo> tryBecomeLeader(JobServiceManagementInfo

protected Uni<Void> release(JobServiceManagementInfo info) {
leader.set(false);
return repository.set(new JobServiceManagementInfo(info.getId(), null, null))
return repository.release(info)
.onItem().invoke(this::disableCommunication)
.onItem().invoke(i -> LOGGER.info("Leader instance released"))
.onFailure().invoke(ex -> LOGGER.error("Error releasing leader"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.management;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Liveness;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@Liveness
@ApplicationScoped
public class JobServiceLeaderLivenessHealthCheck implements HealthCheck {

private final AtomicBoolean enabled = new AtomicBoolean(false);

private final AtomicLong startTime = new AtomicLong();

private static final String EXPIRATION_IN_SECONDS = "kogito.jobs-service.management.leader-check.expiration-in-seconds";

@ConfigProperty(name = EXPIRATION_IN_SECONDS, defaultValue = "-1")
long expirationInSeconds;

@PostConstruct
void init() {
startTime.set(getCurrentTimeMillis());
}

@Override
public HealthCheckResponse call() {
final HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Get Leader Instance Timeout");
if (hasExpired() && !enabled.get()) {
return responseBuilder.down().build();
}
return responseBuilder.up().build();
}

boolean hasExpired() {
return (expirationInSeconds > 0) && (getCurrentTimeMillis() - startTime.get()) > (expirationInSeconds * 1000);
}

protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) {
this.enabled.set(event.isEnabled());
startTime.set(getCurrentTimeMillis());
}

/**
* Facilitates testing
*/
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface JobServiceManagementRepository {

Uni<JobServiceManagementInfo> set(JobServiceManagementInfo info);

Uni<Boolean> release(JobServiceManagementInfo info);

Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info);

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,10 @@ public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info) {
info.setLastHeartbeat(DateUtil.now().toOffsetDateTime());
return set(info);
}

@Override
public Uni<Boolean> release(JobServiceManagementInfo info) {
instance.set(new JobServiceManagementInfo(info.getId(), null, null));
return Uni.createFrom().item(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ quarkus.http.port=8080
mp.openapi.filter=org.kie.kogito.jobs.service.openapi.JobServiceModelFilter

# Job Service
quarkus.smallrye-health.check."org.kie.kogito.jobs.service.management.JobServiceLeaderLivenessHealthCheck".enabled=false

kogito.jobs-service.maxIntervalLimitToRetryMillis=60000
kogito.jobs-service.backoffRetryMillis=1000
kogito.jobs-service.schedulerChunkInMinutes=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -46,8 +47,10 @@
import jakarta.enterprise.inject.Instance;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -112,7 +115,7 @@ void onShutdown() {

verify(tested, times(1)).release(infoCaptor.capture());
assertThat(infoCaptor.getValue()).isEqualTo(tested.getCurrentInfo());
verify(repository, times(1)).set(new JobServiceManagementInfo());
verify(repository, times(1)).release(tested.getCurrentInfo());
}

@Test
Expand Down Expand Up @@ -153,7 +156,18 @@ void heartbeatNotLeader() {
@Test
void heartbeatLeader() {
tested.startup(startupEvent);
tested.heartbeat(tested.getCurrentInfo()).await().indefinitely();
verify(repository).heartbeat(tested.getCurrentInfo());
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(tested.isLeader()).isTrue();
});
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(repository, atLeastOnce()).heartbeat(infoCaptor.capture());
});
JobServiceManagementInfo lastHeartbeat = infoCaptor.getValue();
assertThat(lastHeartbeat).isNotNull();
assertThat(lastHeartbeat.getId()).isEqualTo(tested.getCurrentInfo().getId());
assertThat(lastHeartbeat.getToken()).isEqualTo(tested.getCurrentInfo().getToken());
assertThat(lastHeartbeat.getLastHeartbeat()).isNotNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.jobs.service.management;

import org.eclipse.microprofile.health.HealthCheckResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

class JobServiceLeaderLivenessHealthCheckTest {

private static final long START_TIME = 1234;

private JobServiceLeaderLivenessHealthCheck healthCheck;

@BeforeEach
void setUp() {
healthCheck = spy(new JobServiceLeaderLivenessHealthCheck());
doReturn(START_TIME).when(healthCheck).getCurrentTimeMillis();
healthCheck.init();
}

@Test
void timeoutNotSet() {
doReturn(START_TIME + 1000 * 50).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.UP);
}

@Test
void timeoutSetButNotReached() {
healthCheck.expirationInSeconds = 60;
doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.UP);
}

@Test
void timeoutSetAndReached() {
healthCheck.expirationInSeconds = 60;
doReturn(START_TIME + 1000 * 60 + 1).when(healthCheck).getCurrentTimeMillis();
assertThat(healthCheck.call().getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.DOWN);
}

@Test
void statusChanged() {
healthCheck.onMessagingStatusChange(new MessagingChangeEvent(true));
doReturn(START_TIME + 1000 * 10).when(healthCheck).getCurrentTimeMillis();
HealthCheckResponse response = healthCheck.call();
assertThat(response.getStatus())
.isNotNull()
.isEqualTo(HealthCheckResponse.Status.UP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class VertxTimerServiceSchedulerTest {
private Job job;
private JobContext context;
private Trigger trigger;
private JobDetails jobDetails;

@Mock
private JobExecutorResolver jobExecutorResolver;
Expand All @@ -89,27 +88,32 @@ public void setUp() {

@Test
void testScheduleJob() {
ZonedDateTime time = DateUtil.now().plusSeconds(1);
final ManageableJobHandle handle = schedule(time);
JobDetails jobDetails = JobDetails.builder().build();
doReturn(jobExecutor).when(jobExecutorResolver).get(any());
JobExecutionResponse response = new JobExecutionResponse();
Uni<JobExecutionResponse> result = Uni.createFrom().item(response);
PublisherBuilder<JobDetails> executionSuccessPublisherBuilder = ReactiveStreams.of(jobDetails);
doReturn(executionSuccessPublisherBuilder).when(reactiveJobScheduler).handleJobExecutionSuccess(response);
doReturn(result).when(jobExecutor).execute(jobDetails);
ZonedDateTime time = DateUtil.now().plusSeconds(1);
final ManageableJobHandle handle = schedule(jobDetails, time);
verify(vertx).setTimer(timeCaptor.capture(), any());
assertThat(timeCaptor.getValue()).isGreaterThanOrEqualTo(time.toInstant().minusMillis(System.currentTimeMillis()).toEpochMilli());
given().await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(jobExecutorResolver).get(jobCaptor.capture()));
given().await()
.atMost(2, TimeUnit.SECONDS)
.untilAsserted(() -> verify(reactiveJobScheduler).handleJobExecutionSuccess(response));
assertThat(jobCaptor.getValue()).isEqualTo(jobDetails);
assertThat(handle.isCancel()).isFalse();
assertThat(handle.getScheduledTime()).isNotNull();
}

@Test
void testRemoveScheduleJob() {
final ManageableJobHandle handle = schedule(DateUtil.now().plusHours(1));
JobDetails jobDetails = JobDetails.builder().build();
final ManageableJobHandle handle = schedule(jobDetails, DateUtil.now().plusHours(1));
verify(vertx).setTimer(timeCaptor.capture(), any());
given().await()
.atMost(1, TimeUnit.SECONDS)
Expand All @@ -120,10 +124,9 @@ void testRemoveScheduleJob() {
assertThat(tested.removeJob(handle)).isTrue();
}

private ManageableJobHandle schedule(ZonedDateTime time) {
private ManageableJobHandle schedule(JobDetails jobDetails, ZonedDateTime time) {
final long timestamp = time.toInstant().toEpochMilli();
trigger = new PointInTimeTrigger(timestamp, null, null);
jobDetails = JobDetails.builder().build();
context = new JobDetailsContext(jobDetails);
job = new DelegateJob(jobExecutorResolver, reactiveJobScheduler);
return tested.scheduleJob(job, context, trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowIterator;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.SqlClient;
import io.vertx.mutiny.sqlclient.Tuple;
Expand Down Expand Up @@ -99,4 +100,13 @@ public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo info) {
.onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null)
.onItem().invoke(r -> LOGGER.trace("Heartbeat {}", r)));
}

@Override
public Uni<Boolean> release(JobServiceManagementInfo info) {
return client.withTransaction(conn -> conn
.preparedQuery("UPDATE job_service_management SET token = null, last_heartbeat = null WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat")
.execute(Tuple.of(info.getId(), info.getToken()))
.onItem().transform(RowSet::iterator)
.onItem().transform(RowIterator::hasNext));
}
}
Loading

0 comments on commit 8cf73b2

Please sign in to comment.