Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus counter for search and database #19561

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/openmetadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ eventMonitoringConfiguration:
batchSize: ${EVENT_MONITOR_BATCH_SIZE:-10}
pathPattern: ${EVENT_MONITOR_PATH_PATTERN:-["/api/v1/tables/*", "/api/v1/health-check"]}
latency: ${EVENT_MONITOR_LATENCY:-[0.99, 0.90]} # For value p99=0.99, p90=0.90, p50=0.50 etc.
servicesHealthCheckInterval: ${EVENT_MONITOR_SERVICES_HEALTH_CHECK_INTERVAL:-300}
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
# it will use the default auth provider for AWS services if parameters are not set
# parameters:
# region: ${OM_MONITOR_REGION:-""}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.openmetadata.service.events.EventFilter;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler;
import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler;
import org.openmetadata.service.events.scheduled.ServicesStatusJobHandler;
import org.openmetadata.service.exception.CatalogGenericExceptionMapper;
import org.openmetadata.service.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.service.exception.JsonMappingExceptionMapper;
Expand Down Expand Up @@ -264,16 +264,23 @@ public void run(OpenMetadataApplicationConfig catalogConfig, Environment environ
// Asset Servlet Registration
registerAssetServlet(catalogConfig.getWebConfiguration(), environment);

// Handle Pipeline Service Client Status job
PipelineServiceStatusJobHandler pipelineServiceStatusJobHandler =
PipelineServiceStatusJobHandler.create(
catalogConfig.getPipelineServiceClientConfiguration(), catalogConfig.getClusterName());
pipelineServiceStatusJobHandler.addPipelineServiceStatusJob();
// Handle Services Jobs
registerHealthCheckJobs(catalogConfig);

// Register Auth Handlers
registerAuthServlets(catalogConfig, environment);
}

private void registerHealthCheckJobs(OpenMetadataApplicationConfig catalogConfig) {
ServicesStatusJobHandler pipelineServiceStatusJobHandler =
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
ServicesStatusJobHandler.create(
catalogConfig.getEventMonitorConfiguration(),
catalogConfig.getPipelineServiceClientConfiguration(),
catalogConfig.getClusterName());
pipelineServiceStatusJobHandler.addPipelineServiceStatusJob();
pipelineServiceStatusJobHandler.addDatabaseAndSearchStatusJobs();
}

private void registerAuthServlets(OpenMetadataApplicationConfig config, Environment environment) {
if (config.getAuthenticationConfiguration() != null
&& config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.openmetadata.service.events.scheduled;

import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;

import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.SearchHealthStatus;
import org.quartz.Job;
import org.quartz.JobExecutionContext;

@Slf4j
public class DatabseAndSearchServiceStatusJob implements Job {
private static final String SERVICE_COUNTER = "omd_service_unreachable";
private static final String SERVICE_NAME = "service_name";
private static final String SEARCH_SERVICE_NAME = "search";
private static final String DATABASE_SERVICE_NAME = "database";

@Override
public void execute(JobExecutionContext jobExecutionContext) {
PrometheusMeterRegistry meterRegistry =
(PrometheusMeterRegistry)
jobExecutionContext.getJobDetail().getJobDataMap().get(JOB_CONTEXT_METER_REGISTRY);
checkDatabaseStatus(meterRegistry);
checkElasticSearchStatus(meterRegistry);
}

private void checkElasticSearchStatus(PrometheusMeterRegistry meterRegistry) {
try {
SearchHealthStatus status =
Entity.getSearchRepository().getSearchClient().getSearchHealthStatus();
if (status.getStatus().equals(UNHEALTHY_STATUS)) {
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME);
}
} catch (Exception ex) {
LOG.error("Elastic Search encountering exception.", ex);
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, SEARCH_SERVICE_NAME);
}
}

private void checkDatabaseStatus(PrometheusMeterRegistry meterRegistry) {
try {
Entity.getCollectionDAO().systemDAO().testConnection();
} catch (Exception ex) {
LOG.error("Database encountering exception.", ex);
publishUnhealthyCounter(meterRegistry, SERVICE_NAME, DATABASE_SERVICE_NAME);
}
}

private void publishUnhealthyCounter(PrometheusMeterRegistry meterRegistry, String... tags) {
Counter.builder(SERVICE_COUNTER).tags(tags).register(meterRegistry).increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import static org.openmetadata.sdk.PipelineServiceClientInterface.HEALTHY_STATUS;
import static org.openmetadata.sdk.PipelineServiceClientInterface.STATUS_KEY;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_CLUSTER_NAME;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_METER_REGISTRY;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.JOB_CONTEXT_PIPELINE_SERVICE_CLIENT;

import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusMeterRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import org.openmetadata.schema.api.configuration.pipelineServiceClient.PipelineServiceClientConfiguration;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.monitoring.EventMonitorConfiguration;
import org.openmetadata.service.util.MicrometerBundleSingleton;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
Expand All @@ -17,11 +19,14 @@
import org.quartz.impl.StdSchedulerFactory;

@Slf4j
public class PipelineServiceStatusJobHandler {

public class ServicesStatusJobHandler {
public static final String HEALTHY_STATUS = "healthy";
public static final String UNHEALTHY_STATUS = "unhealthy";
public static final String DATABASE_SEARCH_STATUS_JOB = "databaseAndSearchServiceStatusJob";
public static final String PIPELINE_SERVICE_STATUS_JOB = "pipelineServiceStatusJob";
public static final String STATUS_GROUP = "status";
public static final String STATUS_CRON_TRIGGER = "statusTrigger";
public static final String PIPELINE_STATUS_CRON_TRIGGER = "pipelineStatusTrigger";
public static final String DATABSE_SEARCH_STATUS_CRON_TRIGGER = "databaseAndSearchStatusTrigger";
public static final String JOB_CONTEXT_PIPELINE_SERVICE_CLIENT = "pipelineServiceClient";
public static final String JOB_CONTEXT_METER_REGISTRY = "meterRegistry";
public static final String JOB_CONTEXT_CLUSTER_NAME = "clusterName";
Expand All @@ -32,51 +37,55 @@ public class PipelineServiceStatusJobHandler {
private final String clusterName;
private final Integer healthCheckInterval;
private final Scheduler scheduler = new StdSchedulerFactory().getScheduler();
private final int servicesHealthCheckInterval;
private static ServicesStatusJobHandler instance;

private static PipelineServiceStatusJobHandler instance;

private PipelineServiceStatusJobHandler(
PipelineServiceClientConfiguration config, String clusterName) throws SchedulerException {
private ServicesStatusJobHandler(
EventMonitorConfiguration monitorConfiguration,
PipelineServiceClientConfiguration config,
String clusterName)
throws SchedulerException {
this.config = config;
this.pipelineServiceClient = PipelineServiceClientFactory.createPipelineServiceClient(config);
this.meterRegistry = MicrometerBundleSingleton.prometheusMeterRegistry;
this.clusterName = clusterName;
this.healthCheckInterval = config.getHealthCheckInterval();
this.servicesHealthCheckInterval = monitorConfiguration.getServicesHealthCheckInterval();
this.scheduler.start();
}

public static PipelineServiceStatusJobHandler create(
PipelineServiceClientConfiguration config, String clusterName) {
public static ServicesStatusJobHandler create(
EventMonitorConfiguration eventMonitorConfiguration,
PipelineServiceClientConfiguration config,
String clusterName) {
if (instance != null) return instance;

try {
instance = new PipelineServiceStatusJobHandler(config, clusterName);
instance = new ServicesStatusJobHandler(eventMonitorConfiguration, config, clusterName);
} catch (Exception ex) {
LOG.error("Failed to initialize the Pipeline Service Status Handler");
}
return instance;
}

private JobDetail jobBuilder() {
private JobDetail jobBuilder(Class<? extends Job> clazz, String jobName, String group) {
JobDataMap dataMap = new JobDataMap();
dataMap.put(JOB_CONTEXT_PIPELINE_SERVICE_CLIENT, pipelineServiceClient);
dataMap.put(JOB_CONTEXT_METER_REGISTRY, meterRegistry);
dataMap.put(JOB_CONTEXT_CLUSTER_NAME, clusterName);

JobBuilder jobBuilder =
JobBuilder.newJob(PipelineServiceStatusJob.class)
.withIdentity(PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP)
.usingJobData(dataMap);
JobBuilder.newJob(clazz).withIdentity(jobName, group).usingJobData(dataMap);

return jobBuilder.build();
}

private Trigger getTrigger() {
private Trigger getTrigger(int checkInterval, String identity, String group) {
return TriggerBuilder.newTrigger()
.withIdentity(STATUS_CRON_TRIGGER, STATUS_GROUP)
.withIdentity(identity, group)
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(healthCheckInterval)
.withIntervalInSeconds(checkInterval)
.repeatForever())
.build();
}
Expand All @@ -86,12 +95,27 @@ public void addPipelineServiceStatusJob() {
// enabled
if (config.getEnabled().equals(Boolean.TRUE)) {
try {
JobDetail jobDetail = jobBuilder();
Trigger trigger = getTrigger();
JobDetail jobDetail =
jobBuilder(PipelineServiceStatusJob.class, PIPELINE_SERVICE_STATUS_JOB, STATUS_GROUP);
Trigger trigger =
getTrigger(healthCheckInterval, PIPELINE_STATUS_CRON_TRIGGER, STATUS_GROUP);
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception ex) {
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
}
}
}

public void addDatabaseAndSearchStatusJobs() {
try {
JobDetail jobDetail =
jobBuilder(
DatabseAndSearchServiceStatusJob.class, DATABASE_SEARCH_STATUS_JOB, STATUS_GROUP);
Trigger trigger =
getTrigger(servicesHealthCheckInterval, DATABSE_SEARCH_STATUS_CRON_TRIGGER, STATUS_GROUP);
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception ex) {
LOG.error("Failed in setting up job Scheduler for Pipeline Service Status", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public class EventMonitorConfiguration {
private String[] pathPattern;

private double[] latency;

private int servicesHealthCheckInterval = 300;
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,6 @@ static boolean shouldApplyRbacConditions(
&& !subjectContext.isBot()
&& rbacConditionEvaluator != null;
}

SearchHealthStatus getSearchHealthStatus() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.openmetadata.service.search;

import lombok.Getter;

@Getter
public class SearchHealthStatus {
public SearchHealthStatus(String status) {
this.status = status;
}

String status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD;
Expand All @@ -39,6 +41,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import es.org.elasticsearch.ElasticsearchStatusException;
import es.org.elasticsearch.action.ActionListener;
import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import es.org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import es.org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import es.org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import es.org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -62,6 +66,7 @@
import es.org.elasticsearch.client.indices.GetMappingsRequest;
import es.org.elasticsearch.client.indices.GetMappingsResponse;
import es.org.elasticsearch.client.indices.PutMappingRequest;
import es.org.elasticsearch.cluster.health.ClusterHealthStatus;
import es.org.elasticsearch.cluster.metadata.MappingMetadata;
import es.org.elasticsearch.common.lucene.search.function.CombineFunction;
import es.org.elasticsearch.common.lucene.search.function.FieldValueFactorFunction;
Expand Down Expand Up @@ -168,6 +173,7 @@
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
import org.openmetadata.service.search.SearchAggregation;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchHealthStatus;
import org.openmetadata.service.search.SearchIndexUtils;
import org.openmetadata.service.search.SearchRequest;
import org.openmetadata.service.search.SearchSortFilter;
Expand Down Expand Up @@ -2878,6 +2884,18 @@ public Object getLowLevelClient() {
return client.getLowLevelClient();
}

@Override
public SearchHealthStatus getSearchHealthStatus() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.getStatus().equals(ClusterHealthStatus.GREEN)
|| response.getStatus().equals(ClusterHealthStatus.YELLOW)) {
return new SearchHealthStatus(HEALTHY_STATUS);
} else {
return new SearchHealthStatus(UNHEALTHY_STATUS);
}
}

private void buildSearchRBACQuery(
SubjectContext subjectContext, SearchSourceBuilder searchSourceBuilder) {
if (SearchClient.shouldApplyRbacConditions(subjectContext, rbacConditionEvaluator)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.TABLE;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.HEALTHY_STATUS;
import static org.openmetadata.service.events.scheduled.ServicesStatusJobHandler.UNHEALTHY_STATUS;
import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD;
import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD;
Expand Down Expand Up @@ -86,6 +88,7 @@
import org.openmetadata.service.jdbi3.TestCaseResultRepository;
import org.openmetadata.service.search.SearchAggregation;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchHealthStatus;
import org.openmetadata.service.search.SearchIndexUtils;
import org.openmetadata.service.search.SearchRequest;
import org.openmetadata.service.search.SearchSortFilter;
Expand Down Expand Up @@ -136,6 +139,8 @@
import os.org.opensearch.OpenSearchException;
import os.org.opensearch.OpenSearchStatusException;
import os.org.opensearch.action.ActionListener;
import os.org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import os.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import os.org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import os.org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import os.org.opensearch.action.bulk.BulkRequest;
Expand All @@ -157,6 +162,7 @@
import os.org.opensearch.client.indices.GetMappingsRequest;
import os.org.opensearch.client.indices.GetMappingsResponse;
import os.org.opensearch.client.indices.PutMappingRequest;
import os.org.opensearch.cluster.health.ClusterHealthStatus;
import os.org.opensearch.cluster.metadata.MappingMetadata;
import os.org.opensearch.common.lucene.search.function.CombineFunction;
import os.org.opensearch.common.lucene.search.function.FieldValueFactorFunction;
Expand Down Expand Up @@ -2870,4 +2876,16 @@ private static void buildSearchSourceFilter(
}
}
}

@Override
public SearchHealthStatus getSearchHealthStatus() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
if (response.getStatus().equals(ClusterHealthStatus.GREEN)
|| response.getStatus().equals(ClusterHealthStatus.YELLOW)) {
return new SearchHealthStatus(HEALTHY_STATUS);
} else {
return new SearchHealthStatus(UNHEALTHY_STATUS);
}
}
}
Loading