Skip to content

Commit

Permalink
Quartz queue monitoring - Job counting fix
Browse files Browse the repository at this point in the history
  • Loading branch information
comunacho authored and aurambaj committed Jul 29, 2020
1 parent a0bf0f6 commit 73c2377
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.box.l10n.mojito.monitoring;

import io.micrometer.core.instrument.Tags;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Profile;
Expand All @@ -8,11 +9,16 @@
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

@Profile("!disablescheduling")
@Component
Expand All @@ -21,29 +27,48 @@ public class QuartzPendingJobsReportingTask {

private JdbcTemplate jdbcTemplate;
private MeterRegistry meterRegistry;
private Map<String, AtomicLong> queueSizes;

public QuartzPendingJobsReportingTask(@Autowired DataSource dataSource,
@Autowired MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.jdbcTemplate = new JdbcTemplate(dataSource);
this.queueSizes = new ConcurrentHashMap<>();
}

@Scheduled(fixedRateString = "${l10n.management.metrics.quartz.sql-queue-monitoring.execution-rate}")
public void reportPendingJobs() {
fetchResults().forEach(this::reportResults);
Map<String, PendingJob> results = fetchResults();
updateQueueSizes(results);
results.forEach(this::registerJobQueueSize);
}

private void registerJobQueueSize(String key, PendingJob pendingJob) {
queueSizes.computeIfAbsent(key, k -> createGauge(pendingJob))
.set(pendingJob.count);
}

private AtomicLong createGauge(PendingJob pendingJob) {
return meterRegistry.gauge("quartz.pending.jobs",
Tags.of("jobClass", pendingJob.jobClass, "jobGroup", pendingJob.jobGroup),
new AtomicLong(pendingJob.count));
}

private void reportResults(PendingJob pendingJob){
meterRegistry.gauge("quartz.pending.jobs",
Tags.of("jobClass", pendingJob.jobClass, "jobGroup", pendingJob.jobGroup),
pendingJob.count);
private void updateQueueSizes(Map<String, PendingJob> pendingJobs) {
queueSizes.forEach((key, val) -> {
Long size = pendingJobs.containsKey(key) ? pendingJobs.get(key).count : 0L;
// If the list of yielded results doesn't contains the pendingjob, then we update its value to be zero
queueSizes.get(key).set(size);
});
}

List<PendingJob> fetchResults(){
return jdbcTemplate.query(
"SELECT job_class_name, job_group, COUNT(*) FROM QRTZ_JOB_DETAILS GROUP BY job_class_name, job_group",
(rs, num) -> new PendingJob(extractClassName(rs.getString(1)), rs.getString(2), rs.getLong(3))
Map<String, PendingJob> fetchResults() {
List<PendingJob> result = jdbcTemplate.query(
"SELECT job_class_name, job_group, COUNT(*) FROM QRTZ_JOB_DETAILS GROUP BY job_class_name, job_group",
(rs, num) -> new PendingJob(extractClassName(rs.getString(1)), rs.getString(2), rs.getLong(3))
);

return result.stream().collect(Collectors.toMap(PendingJob::getKey, Function.identity()));
}

static String extractClassName(String input) {
Expand All @@ -52,18 +77,21 @@ static String extractClassName(String input) {
}

/*
* This class represents data associated with a group of jobs pending to be executed in our Quartz instance
* */
* This class represents data associated with a group of jobs pending to be executed in our Quartz instance
* */
static class PendingJob {
public String jobClass;
public String jobGroup;
public Long count;
public Long count;

public PendingJob(String jobClass, String jobGroup, Long count) {
this.jobClass = jobClass;
this.jobGroup = jobGroup;
this.count = count;
}
}

public String getKey() {
return jobClass + "-" + jobGroup;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import org.quartz.TriggerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.TestPropertySource;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.sql.DataSource;

Expand All @@ -34,6 +34,7 @@
import static com.box.l10n.mojito.monitoring.QuartzPendingJobsReportingTask.extractClassName;
import static com.box.l10n.mojito.quartz.QuartzConfig.DYNAMIC_GROUP_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.awaitility.Awaitility.await;
import static org.quartz.JobBuilder.newJob;

Expand Down Expand Up @@ -74,7 +75,7 @@ public class QuartzPendingJobsReportingTaskTest extends ServiceTestBase {
@Before
public void setUp() throws SchedulerException {


Assume.assumeTrue(dbUtils.isMysql() && monitoringEnabled);
task = new QuartzPendingJobsReportingTask(dataSource, meterRegistry);

scheduler.clear();
Expand All @@ -83,8 +84,6 @@ public void setUp() throws SchedulerException {
@Test
public void testGaugesCountPendingJobsForNonDynamicGroup() throws Exception {

Assume.assumeTrue(dbUtils.isMysql() && monitoringEnabled);

JobDetail job;
TriggerBuilder<Trigger> builder;

Expand Down Expand Up @@ -117,11 +116,11 @@ public void testGaugesCountPendingJobsForNonDynamicGroup() throws Exception {
@Test
public void testGaugesCountPendingJobsForDynamicGroup() throws Exception {

Assume.assumeTrue(dbUtils.isMysql() && monitoringEnabled);

JobDetail job;
TriggerBuilder<Trigger> builder;

// First run: We have 5 jobs of each type

for (int i = 1; i <= 5; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
.build();
Expand Down Expand Up @@ -150,17 +149,79 @@ public void testGaugesCountPendingJobsForDynamicGroup() throws Exception {

await.untilAsserted(() -> assertThat(test1Gauges.gauges()).isNotEmpty());
await.untilAsserted(() -> assertThat(test2Gauges.gauges()).isNotEmpty());
await.untilAsserted(() -> assertThat(test1Gauges.gauge().value()).isEqualTo(5));
await.untilAsserted(() -> assertThat(test2Gauges.gauge().value()).isEqualTo(5));
assertThat(test1Gauges.gauge().value()).isEqualTo(5);
assertThat(test2Gauges.gauge().value()).isEqualTo(5);

// Second run: We have 4 new jobs of each type

for (int i = 6; i <= 9; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());

job = newJob(EmptyTestJob.class).withIdentity(testIdWatcher.getEntityName("EmptyJob_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

task.reportPendingJobs();

assertThat(test1Gauges.gauge().value()).isEqualTo(9);
assertThat(test2Gauges.gauge().value()).isEqualTo(9);

// Third run: We add 11 new jobs of the Test1Job class

for (int i = 11; i <= 21; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

task.reportPendingJobs();

assertThat(test1Gauges.gauge().value()).isEqualTo(20);
assertThat(test2Gauges.gauge().value()).isEqualTo(9);

// Fourth run:
// a) All jobs that were scheduled previously have completed (simulated through scheduler.clear())
// b) We add 5 jobs of type EmptyJob

scheduler.clear();

for (int i = 1; i <= 5; i++) {
job = newJob(EmptyTestJob.class).withIdentity(testIdWatcher.getEntityName("EmptyJob_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

task.reportPendingJobs();

assertThat(test1Gauges.gauge().value()).isEqualTo(0);
assertThat(test2Gauges.gauge().value()).isEqualTo(5);
}

@Test
public void testFetchResults() throws Exception {

Assume.assumeTrue(dbUtils.isMysql() && monitoringEnabled);

JobDetail job;
TriggerBuilder<Trigger> builder;
QuartzPendingJobsReportingTask.PendingJob pendingJob1, pendingJob2;
String key1 = "QuartzPendingJobsReportingTaskTest$Test1Job-" + DYNAMIC_GROUP_NAME;
String key2 = "EmptyTestJob-" + DYNAMIC_GROUP_NAME;

// First run: We have 5 jobs of each type

for (int i = 1; i <= 5; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
Expand All @@ -178,15 +239,105 @@ public void testFetchResults() throws Exception {
scheduler.scheduleJob(job, builder.build());
}

List<QuartzPendingJobsReportingTask.PendingJob> pendingJobs = task.fetchResults();
Map<String, QuartzPendingJobsReportingTask.PendingJob> pendingJobs = task.fetchResults();

assertThat(pendingJobs).hasSize(2);
assertThat(pendingJobs).containsKey(key1);
assertThat(pendingJobs).containsKey(key2);

pendingJob1 = pendingJobs.get(key1);
pendingJob2 = pendingJobs.get(key2);

assertThat(pendingJob1.jobClass).isEqualTo("QuartzPendingJobsReportingTaskTest$Test1Job");
assertThat(pendingJob1.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob1.count).isEqualTo(5L);
assertThat(pendingJob2.jobClass).isEqualTo("EmptyTestJob");
assertThat(pendingJob2.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob2.count).isEqualTo(5L);

// Second run: We have 4 new jobs of each type

for (int i = 6; i <= 9; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());

job = newJob(EmptyTestJob.class).withIdentity(testIdWatcher.getEntityName("EmptyJob_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

pendingJobs = task.fetchResults();

assertThat(pendingJobs).hasSize(2);
assertThat(pendingJobs).containsKey(key1);
assertThat(pendingJobs).containsKey(key2);

pendingJob1 = pendingJobs.get(key1);
pendingJob2 = pendingJobs.get(key2);

assertThat(pendingJob1.jobClass).isEqualTo("QuartzPendingJobsReportingTaskTest$Test1Job");
assertThat(pendingJob1.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob1.count).isEqualTo(9L);
assertThat(pendingJob2.jobClass).isEqualTo("EmptyTestJob");
assertThat(pendingJob2.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob2.count).isEqualTo(9L);

// Third run: We add 11 new jobs of the Test1Job class

for (int i = 11; i <= 21; i++) {
job = newJob(Test1Job.class).withIdentity(testIdWatcher.getEntityName("Test1Job_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

pendingJobs = task.fetchResults();

assertThat(pendingJobs).hasSize(2);
assertThat(pendingJobs).extracting("jobClass")
.containsExactlyInAnyOrder("QuartzPendingJobsReportingTaskTest$Test1Job", "EmptyTestJob");
assertThat(pendingJobs).extracting("jobGroup")
.contains(DYNAMIC_GROUP_NAME);
assertThat(pendingJobs).extracting("count")
.containsExactlyInAnyOrder((long) 5, (long) 5);
assertThat(pendingJobs).containsKey(key1);
assertThat(pendingJobs).containsKey(key2);

pendingJob1 = pendingJobs.get(key1);
pendingJob2 = pendingJobs.get(key2);

assertThat(pendingJob1.jobClass).isEqualTo("QuartzPendingJobsReportingTaskTest$Test1Job");
assertThat(pendingJob1.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob1.count).isEqualTo(20L);
assertThat(pendingJob2.jobClass).isEqualTo("EmptyTestJob");
assertThat(pendingJob2.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob2.count).isEqualTo(9L);

scheduler.clear();

for (int i = 1; i <= 5; i++) {
job = newJob(EmptyTestJob.class).withIdentity(testIdWatcher.getEntityName("EmptyJob_" + i), DYNAMIC_GROUP_NAME)
.build();
builder = TriggerBuilder.newTrigger()
.forJob(job)
.startAt(Date.from(ZonedDateTime.now().plusHours(i).toInstant()));
scheduler.scheduleJob(job, builder.build());
}

pendingJobs = task.fetchResults();

assertThat(pendingJobs).hasSize(1);
assertThat(pendingJobs).doesNotContainKey(key1);
assertThat(pendingJobs).containsKey(key2);

pendingJob2 = pendingJobs.get(key2);

assertThat(pendingJob2.jobClass).isEqualTo("EmptyTestJob");
assertThat(pendingJob2.jobGroup).isEqualTo(DYNAMIC_GROUP_NAME);
assertThat(pendingJob2.count).isEqualTo(5L);
}

@Test
Expand Down

0 comments on commit 73c2377

Please sign in to comment.