Skip to content

Commit

Permalink
Merge pull request #1071 from HubSpot/rfc_sched
Browse files Browse the repository at this point in the history
Reimplementation of RFC5545 Schedule
  • Loading branch information
ssalinas authored Jun 16, 2016
2 parents b2e845b + 2a69772 commit 7af386d
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public enum ScheduleType {

CRON, QUARTZ;
CRON, QUARTZ, RFC5545;

}
15 changes: 15 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,21 @@
<artifactId>api-all</artifactId>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>lib-recur</artifactId>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>rfc5545-datetime</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.hubspot.singularity.data;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.hubspot.singularity.WebExceptions.badRequest;
import static com.hubspot.singularity.WebExceptions.checkBadRequest;

import java.net.URI;
Expand All @@ -14,7 +15,8 @@
import javax.inject.Singleton;

import org.apache.commons.lang3.StringUtils;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.dmfs.rfc5545.recur.RecurrenceRule;
import org.quartz.CronExpression;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -136,22 +138,26 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op
if (request.isScheduled()) {
checkBadRequest(request.getQuartzSchedule().isPresent() || request.getSchedule().isPresent(), "Specify at least one of schedule or quartzSchedule");

final String originalSchedule = request.getQuartzScheduleSafe();
String originalSchedule = request.getQuartzScheduleSafe();

if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) {
checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank");
}
if (request.getScheduleType().or(ScheduleType.QUARTZ) != ScheduleType.RFC5545) {
if (request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent()) {
checkBadRequest(request.getScheduleType().or(ScheduleType.QUARTZ) == ScheduleType.QUARTZ, "If using quartzSchedule specify scheduleType QUARTZ or leave it blank");
}

if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) {
quartzSchedule = originalSchedule;
} else {
checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank");
checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule");
if (request.getQuartzSchedule().isPresent() || (request.getScheduleType().isPresent() && request.getScheduleType().get() == ScheduleType.QUARTZ)) {
quartzSchedule = originalSchedule;
} else {
checkBadRequest(request.getScheduleType().or(ScheduleType.CRON) == ScheduleType.CRON, "If not using quartzSchedule specify scheduleType CRON or leave it blank");
checkBadRequest(!request.getQuartzSchedule().isPresent(), "If using schedule type CRON do not specify quartzSchedule");

quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule);
}
quartzSchedule = getQuartzScheduleFromCronSchedule(originalSchedule);
}

checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule);
checkBadRequest(isValidCronSchedule(quartzSchedule), "Schedule %s (from: %s) was not valid", quartzSchedule, originalSchedule);
} else {
checkForValidRFC5545Schedule(request.getSchedule().get());
}
} else {
checkBadRequest(!request.getQuartzSchedule().isPresent() && !request.getSchedule().isPresent(), "Non-scheduled requests can not specify a schedule");
checkBadRequest(!request.getScheduleType().isPresent(), "ScheduleType can only be set for scheduled requests");
Expand All @@ -174,6 +180,14 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op
return request.toBuilder().setQuartzSchedule(Optional.fromNullable(quartzSchedule)).build();
}

private void checkForValidRFC5545Schedule(String schedule) {
try {
new RecurrenceRule(schedule);
} catch (InvalidRecurrenceRuleException ex) {
badRequest("Schedule %s was not a valid RFC5545 schedule, error was: %s", schedule, ex);
}
}

public SingularityWebhook checkSingularityWebhook(SingularityWebhook webhook) {
checkNotNull(webhook, "Webhook is null");
checkNotNull(webhook.getUri(), "URI is null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.hubspot.singularity.helpers;

import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.dmfs.rfc5545.recur.RecurrenceRule;
import org.dmfs.rfc5545.DateTime;
import org.dmfs.rfc5545.recur.RecurrenceRule.Part;
import org.dmfs.rfc5545.recur.RecurrenceRuleIterator;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class RFC5545Schedule {
public static final int MAX_ITERATIONS = 10000;
private final RecurrenceRule recurrenceRule;
private final org.joda.time.DateTime dtStart;

public RFC5545Schedule(String schedule) throws InvalidRecurrenceRuleException {
// DTSTART is RFC5545 but NOT in the recur string, but its a nice to have? :)
Pattern pattern = Pattern.compile("DTSTART=([0-9]{8}T[0-9]{6})");
Matcher matcher = pattern.matcher(schedule);

if (matcher.find()) {
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd'T'HHmmss");
if (schedule.contains("REPEAT") || schedule.contains("COUNT")) {
this.dtStart = formatter.parseDateTime(matcher.group(1));
} else {
org.joda.time.DateTime start = formatter.parseDateTime(matcher.group(1));
org.joda.time.DateTime now = org.joda.time.DateTime.now().withSecondOfMinute(0);
if (now.getMillis() > start.getMillis()) {
start = now;
}
this.dtStart = start;
}
this.recurrenceRule = new RecurrenceRule(matcher.replaceAll("").replace("RRULE:", ""));
} else {
this.recurrenceRule = new RecurrenceRule(schedule);
this.dtStart = org.joda.time.DateTime.now().withSecondOfMinute(0);
}
}

public org.joda.time.DateTime getStartDateTime() {
return dtStart;
}

public Date getNextValidTime() {
final long now = System.currentTimeMillis();
DateTime startDateTime = new DateTime(dtStart.getYear(), (dtStart.getMonthOfYear() - 1), dtStart.getDayOfMonth(),
dtStart.getHourOfDay(), dtStart.getMinuteOfHour(), dtStart.getSecondOfMinute());
RecurrenceRuleIterator timeIterator = recurrenceRule.iterator(startDateTime);

int count = 0;
while (timeIterator.hasNext() && (count < MAX_ITERATIONS || (recurrenceRule.hasPart(Part.COUNT) && count < recurrenceRule.getCount()))) {
count ++;
long nextRunAtTimestamp = timeIterator.nextMillis();
if (nextRunAtTimestamp >= now) {
return new Date(nextRunAtTimestamp);
}
}
return null;
}

public RecurrenceRule getRecurrenceRule() {
return recurrenceRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import javax.inject.Singleton;

import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,13 +19,15 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.hubspot.singularity.ScheduleType;
import com.hubspot.singularity.SingularityDeployStatistics;
import com.hubspot.singularity.SingularityRequestWithState;
import com.hubspot.singularity.SingularityTaskId;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.RequestManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.helpers.RFC5545Schedule;
import com.hubspot.singularity.sentry.SingularityExceptionNotifier;
import com.hubspot.singularity.smtp.SingularityMailer;

Expand Down Expand Up @@ -110,23 +113,29 @@ private Optional<Long> getExpectedRuntime(SingularityRequestWithState request, S
return deployStatistics.get().getAverageRuntimeMillis();
}

final CronExpression cronExpression;
String scheduleExpression = request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545 ? request.getRequest().getSchedule().get() : request.getRequest().getQuartzScheduleSafe();
Date nextRunAtDate;

try {
cronExpression = new CronExpression(request.getRequest().getQuartzScheduleSafe());
} catch (ParseException e) {
LOG.warn("Unable to parse cron for {} ({})", taskId, request.getRequest().getQuartzScheduleSafe(), e);
exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString()));
return Optional.absent();
}

final Date startDate = new Date(taskId.getStartedAt());
final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate);

if (nextRunAtDate == null) {
String msg = String.format("No next run date found for %s (%s)", taskId, request.getRequest().getQuartzScheduleSafe());
LOG.warn(msg);
exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString()));
if (request.getRequest().getScheduleTypeSafe() == ScheduleType.RFC5545) {
final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(scheduleExpression);
nextRunAtDate = rfc5545Schedule.getNextValidTime();
} else {
final CronExpression cronExpression = new CronExpression(scheduleExpression);
final Date startDate = new Date(taskId.getStartedAt());
nextRunAtDate = cronExpression.getNextValidTimeAfter(startDate);
}

if (nextRunAtDate == null) {
String msg = String.format("No next run date found for %s (%s)", taskId, scheduleExpression);
LOG.warn(msg);
exceptionNotifier.notify(msg, ImmutableMap.of("taskId", taskId.toString()));
return Optional.absent();
}

} catch (ParseException|InvalidRecurrenceRuleException e) {
LOG.warn("Unable to parse schedule of type {} for expression {} (taskId: {}, err: {})", request.getRequest().getScheduleTypeSafe(), scheduleExpression, taskId, e);
exceptionNotifier.notify(e, ImmutableMap.of("taskId", taskId.toString(), "scheduleExpression", scheduleExpression, "scheduleType", request.getRequest().getScheduleTypeSafe().toString()));
return Optional.absent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import org.apache.mesos.Protos;
import org.apache.mesos.Protos.TaskStatus.Reason;
import org.dmfs.rfc5545.recur.InvalidRecurrenceRuleException;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,7 @@
import com.hubspot.singularity.MachineState;
import com.hubspot.singularity.RequestState;
import com.hubspot.singularity.RequestType;
import com.hubspot.singularity.ScheduleType;
import com.hubspot.singularity.SingularityCreateResult;
import com.hubspot.singularity.SingularityDeployMarker;
import com.hubspot.singularity.SingularityDeployStatistics;
Expand Down Expand Up @@ -65,6 +67,7 @@
import com.hubspot.singularity.data.SlaveManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.TaskRequestManager;
import com.hubspot.singularity.helpers.RFC5545Schedule;
import com.hubspot.singularity.smtp.SingularityMailer;

@Singleton
Expand Down Expand Up @@ -682,10 +685,18 @@ private Optional<Long> getNextRunAt(SingularityRequest request, RequestState sta
LOG.info("Scheduling requested immediate run of {}", request.getId());
} else {
try {
final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe());

final Date scheduleFrom = new Date(now);
final Date nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom);
Date nextRunAtDate = null;
Date scheduleFrom = null;

if (request.getScheduleTypeSafe() == ScheduleType.RFC5545) {
final RFC5545Schedule rfc5545Schedule = new RFC5545Schedule(request.getSchedule().get());
nextRunAtDate = rfc5545Schedule.getNextValidTime();
scheduleFrom = new Date(rfc5545Schedule.getStartDateTime().getMillis());
} else {
scheduleFrom = new Date(now);
final CronExpression cronExpression = new CronExpression(request.getQuartzScheduleSafe());
nextRunAtDate = cronExpression.getNextValidTimeAfter(scheduleFrom);
}

if (nextRunAtDate == null) {
return Optional.absent();
Expand All @@ -696,7 +707,7 @@ private Optional<Long> getNextRunAt(SingularityRequest request, RequestState sta
nextRunAt = Math.max(nextRunAtDate.getTime(), now); // don't create a schedule that is overdue as this is used to indicate that singularity is not fulfilling requests.

LOG.trace("Scheduling next run of {} (schedule: {}) at {} (from: {})", request.getId(), request.getSchedule(), nextRunAtDate, scheduleFrom);
} catch (ParseException pe) {
} catch (ParseException | InvalidRecurrenceRuleException pe) {
throw Throwables.propagate(pe);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.hubspot.singularity.helpers;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.dmfs.rfc5545.DateTime;
import org.junit.Assert;
import org.junit.Test;

public class RFC5545ScheduleTest {
@Test
public void testEveryWeekdayRFC5545Schdule() throws Exception {
String schedule = "FREQ=DAILY;BYDAY=MO,TU,WE,TH,FR";
Date nextValidTime = new RFC5545Schedule(schedule).getNextValidTime();
Assert.assertTrue(nextValidTime.after(new Date()));
Assert.assertTrue(nextValidTime.before(new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3))));
}

@Test
public void testMaxIterations() throws Exception {
String schedule = "DTSTART=19970902T090000\nRRULE:FREQ=HOURLY;COUNT=10001";
Assert.assertEquals(new RFC5545Schedule(schedule).getStartDateTime(), new org.joda.time.DateTime(1997, 9, 2, 9, 0, 0));
Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null);
}

@Test
public void testRecurInPastDoesNotGiveNext() throws Exception {
String schedule = "FREQ=YEARLY;INTERVAL=4;BYMONTH=11;BYDAY=TU;BYMONTHDAY=2,3,4,5,6,7,8;COUNT=1";
Assert.assertEquals(new RFC5545Schedule(schedule).getNextValidTime(), null);
}
}
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@
<artifactId>metrics-graphite</artifactId>
<version>${dep.metrics.version}</version><!-- TOOD: add this to HubSpot's basepom -->
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>lib-recur</artifactId>
<version>0.9.4</version>
</dependency>

<dependency>
<groupId>org.dmfs</groupId>
<artifactId>rfc5545-datetime</artifactId>
<version>0.2.2</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit 7af386d

Please sign in to comment.