Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New health check #1580

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,21 @@ public final class Messages {
public static final String UNSUPPORTED_ALGORITHM_PROVIDED = "Unsupported algorithm: \"{0}\"";
public static final String NO_TOKEN_PARSER_FOUND_FOR_THE_CURRENT_TOKEN = "No token parser found for the current token";
public static final String RESOURCE_0_CANNOT_BE_CREATED_DUE_TO_UNRESOLVED_DYNAMIC_PARAMETER = "Resouce \"{0}\" cannot be created due to unresolved dynamic parameter. Please specify \"{1}\" in the processed-after section!";
public static final String ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING = "Error occurred during object store health checking";
public static final String ERROR_OCCURRED_DURING_DATABASE_HEALTH_CHECKING = "Error occurred during database health checking";
public static final String ERROR_OCCURRED_WHILE_CHECKING_FOR_INCREASED_LOCKS = "Error occurred while checking for increased locks";
public static final String THREAD_WAS_INTERRUPTED_WHILE_WAITING_FOR_THE_RESULT_OF_A_FUTURE = "Thread was interrupted while waiting for the result of a future";
public static final String ERROR_OCCURRED_DURING_HEALTH_CHECKING_FOR_INSTANCE_0_MESSAGE_1 = "Error occurred during health checking for instance: \"{0}\". Message: \"{1}\"";
public static final String OBJECT_STORE_FILE_STORAGE_HEALTH_DATABASE_HEALTH = "Object store file storage health: \"{0}\", Database health: \"{1}\"";
public static final String ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING_FOR_INSTANCE = "Error occurred during object store health checking for instance: \"{0}\"";
public static final String ERROR_OCCURRED_WHILE_CHECKING_DATABASE_INSTANCE_0 = "Error occurred while checking database instance: \"{0}\"";

// Warning messages
public static final String ENVIRONMENT_VARIABLE_IS_NOT_SET_USING_DEFAULT = "Environment variable \"{0}\" is not set. Using default \"{1}\"...";
public static final String OPTIONAL_RESOURCE_IS_NOT_SERVICE = "Optional resource \"{0}\" it will be not created because it''s not a service";
public static final String SERVICE_IS_NOT_ACTIVE = "Service \"{0}\" is inactive and will not be processed";
public static final String DETECTED_INCREASED_NUMBER_OF_PROCESSES_WAITING_FOR_LOCKS_FOR_INSTANCE = "Detected increased number of processes waiting for locks: \"{0}\" for instance: \"{1}\"";
public static final String DETECTED_INCREASED_NUMBER_OF_PROCESSES_WAITING_FOR_LOCKS_FOR_INSTANCE_0_GETTING_THE_LOCKS = "Detected increased number of processes waiting for locks for instance {0}. Getting the locks...";

public static final String INVALID_VCAP_APPLICATION = "Invalid VCAP_APPLICATION \"{0}\"";
public static final String IGNORING_LABEL_FOR_USER_PROVIDED_SERVICE = "Ignoring label \"{0}\" for service \"{1}\", as user-provided services do not support labels!";
Expand Down Expand Up @@ -170,6 +180,7 @@ public final class Messages {
public static final String THREADS_FOR_FILE_UPLOAD_TO_CONTROLLER_0 = "Threads for file upload to controller: {0}";
public static final String THREADS_FOR_FILE_STORAGE_UPLOAD_0 = "Threads for file storage upload: {0}";
public static final String DELETED_ORPHANED_MTA_DESCRIPTORS_COUNT = "Deleted orphaned mta descriptors count: {0}";
public static final String IS_HEALTH_CHECK_ENABLED = "Is health check enabled: {0}";

// Debug messages
public static final String DEPLOYMENT_DESCRIPTOR = "Deployment descriptor: {0}";
Expand All @@ -180,6 +191,11 @@ public final class Messages {
public static final String PARSED_TOKEN_TYPE_0 = "Parsed token type: {0}";
public static final String PARSED_TOKEN_EXPIRES_IN_0 = "Parsed token expires in: {0}";
public static final String PARSER_CHAIN_0 = "Parser chain: {0}";
public static final String VALUES_IN_INSTANCE_IN_THE_WAITING_FOR_LOCKS_SAMPLES = "Values in instance: \"{0}\" in the waiting for locks samples: {1}";
public static final String INCREASING_OR_EQUAL_INDEX_0_1_2 = "Increasing or equal index: {0} / {1} = {2}";
public static final String DECREASING_INDEX_0_1_2 = "Decreasing index: {0} / {1} = {2}";
public static final String OBJECT_STORE_FILE_STORAGE_IS_NOT_AVAILABLE_FOR_INSTANCE = "Object store file storage is not available for instance: \"{0}\"";
public static final String NOT_ENOUGH_SAMPLES_TO_DETECT_ANOMALY_0_1 = "Not enough samples to detect anomaly: {0} / {1}";

// Audit log

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package org.cloudfoundry.multiapps.controller.core.application.health;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.cloudfoundry.multiapps.controller.client.util.ResilientOperationExecutor;
import org.cloudfoundry.multiapps.controller.core.Messages;
import org.cloudfoundry.multiapps.controller.core.application.health.database.DatabaseWaitingLocksAnalyzer;
import org.cloudfoundry.multiapps.controller.core.application.health.model.ApplicationHealthResult;
import org.cloudfoundry.multiapps.controller.core.application.health.model.ImmutableApplicationHealthResult;
import org.cloudfoundry.multiapps.controller.core.model.CachedObject;
import org.cloudfoundry.multiapps.controller.core.util.ApplicationConfiguration;
import org.cloudfoundry.multiapps.controller.core.util.ApplicationInstanceNameUtil;
import org.cloudfoundry.multiapps.controller.persistence.services.DatabaseHealthService;
import org.cloudfoundry.multiapps.controller.persistence.services.DatabaseMonitoringService;
import org.cloudfoundry.multiapps.controller.persistence.services.ObjectStoreFileStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;

import jakarta.inject.Named;

@Named
public class ApplicationHealthCalculator {

private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationHealthCalculator.class);

private static final int UPDATE_HEALTH_CHECK_STATUS_PERIOD_IN_SECONDS = 10;
private static final int TIMEOUT_FOR_TASK_EXECUTION_IN_SECONDS = 50;

private final ObjectStoreFileStorage objectStoreFileStorage;
private final ApplicationConfiguration applicationConfiguration;
private final DatabaseHealthService databaseHealthService;
private final DatabaseMonitoringService databaseMonitoringService;
private final DatabaseWaitingLocksAnalyzer databaseWaitingLocksAnalyzer;

private final CachedObject<Boolean> objectStoreFileStorageHealthCache = new CachedObject<>(Duration.ofMinutes(1));
private final CachedObject<Boolean> dbHealthServiceCache = new CachedObject<>(Duration.ofMinutes(1));
private final CachedObject<Boolean> hasIncreasedLocksCache = new CachedObject<>(false, Duration.ofMinutes(1));

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final ExecutorService executor = Executors.newFixedThreadPool(3);

private final ResilientOperationExecutor resilientOperationExecutor = getResilienceExecutor();

@Autowired
public ApplicationHealthCalculator(@Autowired(required = false) ObjectStoreFileStorage objectStoreFileStorage,
ApplicationConfiguration applicationConfiguration, DatabaseHealthService databaseHealthService,
DatabaseMonitoringService databaseMonitoringService,
DatabaseWaitingLocksAnalyzer databaseWaitingLocksAnalyzer) {
this.objectStoreFileStorage = objectStoreFileStorage;
this.applicationConfiguration = applicationConfiguration;
this.databaseHealthService = databaseHealthService;
this.databaseMonitoringService = databaseMonitoringService;
this.databaseWaitingLocksAnalyzer = databaseWaitingLocksAnalyzer;
scheduleRegularHealthUpdate();
}

protected void scheduleRegularHealthUpdate() {
scheduler.scheduleAtFixedRate(this::updateHealthStatus, 0, UPDATE_HEALTH_CHECK_STATUS_PERIOD_IN_SECONDS, TimeUnit.SECONDS);
}

protected void updateHealthStatus() {
List<Callable<Boolean>> tasks = List.of(this::isObjectStoreFileStorageHealthy, this::isDatabaseHealthy,
databaseWaitingLocksAnalyzer::hasIncreasedDbLocks);
try {
List<Future<Boolean>> completedFutures = executor.invokeAll(tasks, TIMEOUT_FOR_TASK_EXECUTION_IN_SECONDS, TimeUnit.SECONDS);
executeFuture(completedFutures.get(0), isHealthy -> objectStoreFileStorageHealthCache.refresh(() -> isHealthy), false,
Messages.ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING);
executeFuture(completedFutures.get(1), isHealthy -> dbHealthServiceCache.refresh(() -> isHealthy), false,
Messages.ERROR_OCCURRED_DURING_DATABASE_HEALTH_CHECKING);
executeFuture(completedFutures.get(2), hasIncreasedLocks -> hasIncreasedLocksCache.refresh(() -> hasIncreasedLocks), true,
Messages.ERROR_OCCURRED_WHILE_CHECKING_FOR_INCREASED_LOCKS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
LOGGER.error(Messages.THREAD_WAS_INTERRUPTED_WHILE_WAITING_FOR_THE_RESULT_OF_A_FUTURE, e);
dbHealthServiceCache.refresh(() -> false);
objectStoreFileStorageHealthCache.refresh(() -> false);
hasIncreasedLocksCache.refresh(() -> false);
}
}

private void executeFuture(Future<Boolean> future, Consumer<Boolean> consumer, boolean onErrorValue, String errorMessage) {
IvanBorislavovDimitrov marked this conversation as resolved.
Show resolved Hide resolved
try {
Boolean result = future.get();
consumer.accept(result);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
LOGGER.error(Messages.THREAD_WAS_INTERRUPTED_WHILE_WAITING_FOR_THE_RESULT_OF_A_FUTURE, e);
consumer.accept(onErrorValue);
} catch (Exception e) {
LOGGER.error(MessageFormat.format(Messages.ERROR_OCCURRED_DURING_HEALTH_CHECKING_FOR_INSTANCE_0_MESSAGE_1,
applicationConfiguration.getApplicationInstanceIndex(), errorMessage),
e);
consumer.accept(onErrorValue);
}
}

public ResponseEntity<ApplicationHealthResult> calculateApplicationHealth() {
if (!applicationConfiguration.isHealthCheckEnabled()) {
return ResponseEntity.ok(ImmutableApplicationHealthResult.builder()
.status(ApplicationHealthResult.Status.UP)
.hasIncreasedLocks(false)
.build());
}
boolean isObjectStoreFileStorageHealthy = objectStoreFileStorageHealthCache.getOrRefresh(() -> false);
boolean isDbHealthy = dbHealthServiceCache.getOrRefresh(() -> false);

if (!isObjectStoreFileStorageHealthy || !isDbHealthy) {
LOGGER.error(MessageFormat.format(Messages.OBJECT_STORE_FILE_STORAGE_HEALTH_DATABASE_HEALTH, isObjectStoreFileStorageHealthy,
isDbHealthy));
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(ImmutableApplicationHealthResult.builder()
.status(ApplicationHealthResult.Status.DOWN)
.hasIncreasedLocks(false)
.build());
}
boolean hasIncreasedDbLocks = hasIncreasedLocksCache.getOrRefresh(() -> true);
if (hasIncreasedDbLocks) {
LOGGER.warn(Messages.DETECTED_INCREASED_NUMBER_OF_PROCESSES_WAITING_FOR_LOCKS_FOR_INSTANCE_0_GETTING_THE_LOCKS,
applicationConfiguration.getApplicationInstanceIndex());
long countOfProcessesWaitingForLocks = resilientOperationExecutor.execute((Supplier<Long>) () -> databaseMonitoringService.getProcessesWaitingForLocks(ApplicationInstanceNameUtil.buildApplicationInstanceTemplate(applicationConfiguration)));
LOGGER.warn(MessageFormat.format(Messages.DETECTED_INCREASED_NUMBER_OF_PROCESSES_WAITING_FOR_LOCKS_FOR_INSTANCE,
countOfProcessesWaitingForLocks, applicationConfiguration.getApplicationInstanceIndex()));
return ResponseEntity.ok(ImmutableApplicationHealthResult.builder() // TODO: Make this return 503 instead of 200 when the
IvanBorislavovDimitrov marked this conversation as resolved.
Show resolved Hide resolved
// detection is trustworthy
.status(ApplicationHealthResult.Status.DOWN)
.hasIncreasedLocks(true)
.countOfProcessesWaitingForLocks(countOfProcessesWaitingForLocks)
.build());
}
return ResponseEntity.ok(ImmutableApplicationHealthResult.builder()
.status(ApplicationHealthResult.Status.UP)
.hasIncreasedLocks(false)
.build());
}

private boolean isObjectStoreFileStorageHealthy() {
if (objectStoreFileStorage == null) {
LOGGER.debug(MessageFormat.format(Messages.OBJECT_STORE_FILE_STORAGE_IS_NOT_AVAILABLE_FOR_INSTANCE,
applicationConfiguration.getApplicationInstanceIndex()));
return true;
}
try {
resilientOperationExecutor.execute(objectStoreFileStorage::testConnection);
} catch (Exception e) {
LOGGER.error(MessageFormat.format(Messages.ERROR_OCCURRED_DURING_OBJECT_STORE_HEALTH_CHECKING_FOR_INSTANCE,
applicationConfiguration.getApplicationInstanceIndex()),
e);
return false;
}
return true;
}

private boolean isDatabaseHealthy() {
try {
resilientOperationExecutor.execute(databaseHealthService::testDatabaseConnection);
return true;
} catch (Exception e) {
LOGGER.error(MessageFormat.format(Messages.ERROR_OCCURRED_WHILE_CHECKING_DATABASE_INSTANCE_0,
applicationConfiguration.getApplicationInstanceIndex()),
e);
return false;
}
}

protected ResilientOperationExecutor getResilienceExecutor() {
return new ResilientOperationExecutor();
IvanBorislavovDimitrov marked this conversation as resolved.
Show resolved Hide resolved
}

IvanBorislavovDimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading