diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index b48a3625b763..c47c40152e7d 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -296,6 +296,7 @@ eventMonitoringConfiguration: batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10} pathPattern: ${EVENT_MONITOR_PATH_PATTERN:-["/api/v1/tables/*", "/api/v1/health-check"]} latency: ${EVENT_MONITOR_LATENCY:-[0.99, 0.90]} # For value p99=0.99, p90=0.90, p50=0.50 etc. + servicesHealthCheckInterval: ${EVENT_MONITOR_SERVICES_HEALTH_CHECK_INTERVAL:-300} # it will use the default auth provider for AWS services if parameters are not set # parameters: # region: ${OM_MONITOR_REGION:-""} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index ee00e54cf121..900b34fcf0dd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -73,7 +73,7 @@ import org.openmetadata.service.events.EventFilter; import org.openmetadata.service.events.EventPubSub; import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler; -import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler; +import org.openmetadata.service.events.scheduled.ServicesStatusJobHandler; import org.openmetadata.service.exception.CatalogGenericExceptionMapper; import org.openmetadata.service.exception.ConstraintViolationExceptionMapper; import org.openmetadata.service.exception.JsonMappingExceptionMapper; @@ -264,16 +264,23 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ // Asset Servlet Registration registerAssetServlet(catalogConfig.getWebConfiguration(), environment); - // Handle Pipeline Service Client Status job - PipelineServiceStatusJobHandler pipelineServiceStatusJobHandler = - PipelineServiceStatusJobHandler.create( - catalogConfig.getPipelineServiceClientConfiguration(), catalogConfig.getClusterName()); - pipelineServiceStatusJobHandler.addPipelineServiceStatusJob(); + // Handle Services Jobs + registerHealthCheckJobs(catalogConfig); // Register Auth Handlers registerAuthServlets(catalogConfig, environment); } + private void registerHealthCheckJobs(OpenMetadataApplicationConfig catalogConfig) { + ServicesStatusJobHandler healthCheckStatusHandler = + ServicesStatusJobHandler.create( + catalogConfig.getEventMonitorConfiguration(), + catalogConfig.getPipelineServiceClientConfiguration(), + catalogConfig.getClusterName()); + healthCheckStatusHandler.addPipelineServiceStatusJob(); + healthCheckStatusHandler.addDatabaseAndSearchStatusJobs(); + } + private void registerAuthServlets(OpenMetadataApplicationConfig config, Environment environment) { if (config.getAuthenticationConfiguration() != null && config diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/DatabseAndSearchServiceStatusJob.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/DatabseAndSearchServiceStatusJob.java new file mode 100644 index 000000000000..c27d91df2a1b --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/DatabseAndSearchServiceStatusJob.java @@ -0,0 +1,55 @@ +package org.openmetadata.service.events.scheduled; + +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.Entity; +import org.openmetadata.service.search.SearchHealthStatus; +import org.quartz.Job; +import org.quartz.JobExecutionContext; + +@Slf4j +public class DatabseAndSearchServiceStatusJob implements Job { + private static final String SERVICE_COUNTER = "omd_service_unreachable"; + private static final String SERVICE_NAME = "service_name"; + private static final String SEARCH_SERVICE_NAME = "search"; + private static final String DATABASE_SERVICE_NAME = "database"; + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + PrometheusMeterRegistry meterRegistry = + (PrometheusMeterRegistry) + jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_METER_REGISTRY); + checkDatabaseStatus(meterRegistry); + checkElasticSearchStatus(meterRegistry); + } + + private void checkElasticSearchStatus(PrometheusMeterRegistry meterRegistry) { + try { + SearchHealthStatus status = + Entity.getSearchRepository().getSearchClient().getSearchHealthStatus(); + if (status.getStatus().equals(UNHEALTHY_STATUS)) { + publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME); + } + } catch (Exception ex) { + LOG.error("Elastic Search encountering exception.", ex); + publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME); + } + } + + private void checkDatabaseStatus(PrometheusMeterRegistry meterRegistry) { + try { + Entity.getCollectionDAO().systemDAO().testConnection(); + } catch (Exception ex) { + LOG.error("Database encountering exception.", ex); + publishUnhealthyCounter(meterRegistry, SERVICE_NAME, DATABASE_SERVICE_NAME); + } + } + + private void publishUnhealthyCounter(PrometheusMeterRegistry meterRegistry, String... tags) { + Counter.builder(SERVICE_COUNTER).tags(tags).register(meterRegistry).increment(); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java index e2c368c965f0..bc45a4bc0570 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJob.java @@ -2,9 +2,9 @@ import static org.openmetadata.sdk.PipelineServiceClientInterface.HEALTHY_STATUS; import static org.openmetadata.sdk.PipelineServiceClientInterface.STATUS_KEY; -import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME; -import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY; -import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT; import io.micrometer.core.instrument.Counter; import io.micrometer.prometheus.PrometheusMeterRegistry; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/ServicesStatusJobHandler.java similarity index 55% rename from openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java rename to openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/ServicesStatusJobHandler.java index 2e12d51764ff..c48b8df90eeb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/PipelineServiceStatusJobHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/ServicesStatusJobHandler.java @@ -5,7 +5,9 @@ import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration; import org.openmetadata.sdk.PipelineServiceClientInterface; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.monitoring.EventMonitorConfiguration; import org.openmetadata.service.util.MicrometerBundleSingleton; +import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; @@ -17,11 +19,14 @@ import org.quartz.impl.StdSchedulerFactory; @Slf4j -public class PipelineServiceStatusJobHandler { - +public class ServicesStatusJobHandler { + public static final String HEALTHY_STATUS = "healthy"; + public static final String UNHEALTHY_STATUS = "unhealthy"; + public static final String DATABASE_SEARCH_STATUS_JOB = "databaseAndSearchServiceStatusJob"; public static final String PIPELINE_SERVICE_STATUS_JOB = "pipelineServiceStatusJob"; public static final String STATUS_GROUP = "status"; - public static final String STATUS_CRON_TRIGGER = "statusTrigger"; + public static final String PIPELINE_STATUS_CRON_TRIGGER = "pipelineStatusTrigger"; + public static final String DATABSE_SEARCH_STATUS_CRON_TRIGGER = "databaseAndSearchStatusTrigger"; public static final String JOB_CONTEXT_PIPELINE_SERVICE_CLIENT = "pipelineServiceClient"; public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry"; public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName"; @@ -32,51 +37,55 @@ public class PipelineServiceStatusJobHandler { private final String clusterName; private final Integer healthCheckInterval; private final Scheduler scheduler = new StdSchedulerFactory().getScheduler(); + private final int servicesHealthCheckInterval; + private static ServicesStatusJobHandler instance; - private static PipelineServiceStatusJobHandler instance; - - private PipelineServiceStatusJobHandler( - PipelineServiceClientConfiguration config, String clusterName) throws SchedulerException { + private ServicesStatusJobHandler( + EventMonitorConfiguration monitorConfiguration, + PipelineServiceClientConfiguration config, + String clusterName) + throws SchedulerException { this.config = config; this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config); this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry; this.clusterName = clusterName; this.healthCheckInterval = config.getHealthCheckInterval(); + this.servicesHealthCheckInterval = monitorConfiguration.getServicesHealthCheckInterval(); this.scheduler.start(); } - public static PipelineServiceStatusJobHandler create( - PipelineServiceClientConfiguration config, String clusterName) { + public static ServicesStatusJobHandler create( + EventMonitorConfiguration eventMonitorConfiguration, + PipelineServiceClientConfiguration config, + String clusterName) { if (instance != null) return instance; try { - instance = new PipelineServiceStatusJobHandler(config, clusterName); + instance = new ServicesStatusJobHandler(eventMonitorConfiguration, config, clusterName); } catch (Exception ex) { LOG.error("Failed to initialize the Pipeline Service Status Handler"); } return instance; } - private JobDetail jobBuilder() { + private JobDetail jobBuilder(Class clazz, String jobName, String group) { JobDataMap dataMap = new JobDataMap(); dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient); dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry); dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName); JobBuilder jobBuilder = - JobBuilder.newJob(PipelineServiceStatusJob.class) - .withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP) - .usingJobData(dataMap); + JobBuilder.newJob(clazz).withIdentity(jobName, group).usingJobData(dataMap); return jobBuilder.build(); } - private Trigger getTrigger() { + private Trigger getTrigger(int checkInterval, String identity, String group) { return TriggerBuilder.newTrigger() - .withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP) + .withIdentity(identity, group) .withSchedule( SimpleScheduleBuilder.simpleSchedule() - .withIntervalInSeconds(healthCheckInterval) + .withIntervalInSeconds(checkInterval) .repeatForever()) .build(); } @@ -86,12 +95,27 @@ public void addPipelineServiceStatusJob() { // enabled if (config.getEnabled().equals(Boolean.TRUE)) { try { - JobDetail jobDetail = jobBuilder(); - Trigger trigger = getTrigger(); + JobDetail jobDetail = + jobBuilder(PipelineServiceStatusJob.class, PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP); + Trigger trigger = + getTrigger(healthCheckInterval, PIPELINE_STATUS_CRON_TRIGGER, STATUS_GROUP); scheduler.scheduleJob(jobDetail, trigger); } catch (Exception ex) { LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); } } } + + public void addDatabaseAndSearchStatusJobs() { + try { + JobDetail jobDetail = + jobBuilder( + DatabseAndSearchServiceStatusJob.class, DATABASE_SEARCH_STATUS_JOB, STATUS_GROUP); + Trigger trigger = + getTrigger(servicesHealthCheckInterval, DATABSE_SEARCH_STATUS_CRON_TRIGGER, STATUS_GROUP); + scheduler.scheduleJob(jobDetail, trigger); + } catch (Exception ex) { + LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java index 4e3f17b84c7b..552d34db3daf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/EventMonitorConfiguration.java @@ -31,4 +31,6 @@ public class EventMonitorConfiguration { private String[] pathPattern; private double[] latency; + + private int servicesHealthCheckInterval = 300; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index b739ebcb9a73..d42fab349264 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -392,4 +392,6 @@ static boolean shouldApplyRbacConditions( && !subjectContext.isBot() && rbacConditionEvaluator != null; } + + SearchHealthStatus getSearchHealthStatus() throws IOException; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java new file mode 100644 index 000000000000..f3d2b5cf0430 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchHealthStatus.java @@ -0,0 +1,12 @@ +package org.openmetadata.service.search; + +import lombok.Getter; + +@Getter +public class SearchHealthStatus { + public SearchHealthStatus(String status) { + this.status = status; + } + + String status; +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index cf9e7557d80e..69ebeafd86e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -13,6 +13,8 @@ import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -39,6 +41,8 @@ import com.fasterxml.jackson.databind.JsonNode; import es.org.elasticsearch.ElasticsearchStatusException; import es.org.elasticsearch.action.ActionListener; +import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import es.org.elasticsearch.action.bulk.BulkRequest; @@ -62,6 +66,7 @@ import es.org.elasticsearch.client.indices.GetMappingsRequest; import es.org.elasticsearch.client.indices.GetMappingsResponse; import es.org.elasticsearch.client.indices.PutMappingRequest; +import es.org.elasticsearch.cluster.health.ClusterHealthStatus; import es.org.elasticsearch.cluster.metadata.MappingMetadata; import es.org.elasticsearch.common.lucene.search.function.CombineFunction; import es.org.elasticsearch.common.lucene.search.function.FieldValueFactorFunction; @@ -168,6 +173,7 @@ import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.SearchHealthStatus; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; import org.openmetadata.service.search.SearchSortFilter; @@ -2878,6 +2884,18 @@ public Object getLowLevelClient() { return client.getLowLevelClient(); } + @Override + public SearchHealthStatus getSearchHealthStatus() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest(); + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + if (response.getStatus().equals(ClusterHealthStatus.GREEN) + || response.getStatus().equals(ClusterHealthStatus.YELLOW)) { + return new SearchHealthStatus(HEALTHY_STATUS); + } else { + return new SearchHealthStatus(UNHEALTHY_STATUS); + } + } + private void buildSearchRBACQuery( SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) { if (SearchClient.shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index 64308639f05f..e9d8da5fce2e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -12,6 +12,8 @@ import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; import static org.openmetadata.service.Entity.TABLE; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS; +import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -86,6 +88,7 @@ import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.SearchHealthStatus; import org.openmetadata.service.search.SearchIndexUtils; import org.openmetadata.service.search.SearchRequest; import org.openmetadata.service.search.SearchSortFilter; @@ -136,6 +139,8 @@ import os.org.opensearch.OpenSearchException; import os.org.opensearch.OpenSearchStatusException; import os.org.opensearch.action.ActionListener; +import os.org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import os.org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import os.org.opensearch.action.bulk.BulkRequest; @@ -157,6 +162,7 @@ import os.org.opensearch.client.indices.GetMappingsRequest; import os.org.opensearch.client.indices.GetMappingsResponse; import os.org.opensearch.client.indices.PutMappingRequest; +import os.org.opensearch.cluster.health.ClusterHealthStatus; import os.org.opensearch.cluster.metadata.MappingMetadata; import os.org.opensearch.common.lucene.search.function.CombineFunction; import os.org.opensearch.common.lucene.search.function.FieldValueFactorFunction; @@ -2870,4 +2876,16 @@ private static void buildSearchSourceFilter( } } } + + @Override + public SearchHealthStatus getSearchHealthStatus() throws IOException { + ClusterHealthRequest request = new ClusterHealthRequest(); + ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT); + if (response.getStatus().equals(ClusterHealthStatus.GREEN) + || response.getStatus().equals(ClusterHealthStatus.YELLOW)) { + return new SearchHealthStatus(HEALTHY_STATUS); + } else { + return new SearchHealthStatus(UNHEALTHY_STATUS); + } + } }