Skip to content

Commit

Permalink
Merge pull request #900 from HubSpot/mesos-oom
Browse files Browse the repository at this point in the history
Store status update reason field in SingularityTaskHistoryUpdate
  • Loading branch information
Tom Petr committed Feb 22, 2016
2 parents 0eb076e + 2085e7b commit 3ed3eb1
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class SingularityTaskHistoryUpdate extends SingularityTaskIdHolder implem
private final long timestamp;
private final ExtendedTaskState taskState;
private final Optional<String> statusMessage;
private final Optional<String> statusReason;

public enum SimplifiedTaskState {
UNKNOWN, WAITING, RUNNING, DONE
Expand Down Expand Up @@ -50,12 +51,13 @@ public static SimplifiedTaskState getCurrentState(Iterable<SingularityTaskHistor
}

@JsonCreator
public SingularityTaskHistoryUpdate(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("timestamp") long timestamp, @JsonProperty("taskState") ExtendedTaskState taskState, @JsonProperty("statusMessage") Optional<String> statusMessage) {
public SingularityTaskHistoryUpdate(@JsonProperty("taskId") SingularityTaskId taskId, @JsonProperty("timestamp") long timestamp, @JsonProperty("taskState") ExtendedTaskState taskState, @JsonProperty("statusMessage") Optional<String> statusMessage, @JsonProperty("statusReason") Optional<String> statusReason) {
super(taskId);

this.timestamp = timestamp;
this.taskState = taskState;
this.statusMessage = statusMessage;
this.statusReason = statusReason;
}

@Override
Expand All @@ -69,7 +71,7 @@ public int compareTo(SingularityTaskHistoryUpdate o) {

@Override
public int hashCode() {
return Objects.hashCode(getTaskId(), timestamp, taskState, statusMessage);
return Objects.hashCode(getTaskId(), timestamp, taskState, statusMessage, statusReason);
}

@Override
Expand All @@ -86,7 +88,8 @@ public boolean equals(Object other) {
return Objects.equal(this.getTaskId(), that.getTaskId())
&& Objects.equal(this.timestamp, that.timestamp)
&& Objects.equal(this.taskState, that.taskState)
&& Objects.equal(statusMessage, statusMessage);
&& Objects.equal(this.statusMessage, that.statusMessage)
&& Objects.equal(this.statusReason, that.statusReason);
}

public long getTimestamp() {
Expand All @@ -101,9 +104,16 @@ public Optional<String> getStatusMessage() {
return statusMessage;
}

@Override
public String toString() {
return "SingularityTaskHistoryUpdate [taskId=" + getTaskId() + ", timestamp=" + timestamp + ", taskState=" + taskState + ", statusMessage=" + statusMessage + "]";
public Optional<String> getStatusReason() {
return statusReason;
}

@Override public String toString() {
return "SingularityTaskHistoryUpdate[" +
"timestamp=" + timestamp +
", taskState=" + taskState +
", statusMessage=" + statusMessage +
", statusReason=" + statusReason +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task) throws
msg = String.format("%s (%s)", msg, task.getTaskRequest().getPendingTask().getMessage().get());
}

saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(task.getTaskId(), now, ExtendedTaskState.TASK_LAUNCHED, Optional.of(msg)));
saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(task.getTaskId(), now, ExtendedTaskState.TASK_LAUNCHED, Optional.of(msg), Optional.<String>absent()));
saveLastActiveTaskStatus(new SingularityTaskStatusHolder(task.getTaskId(), Optional.<TaskStatus>absent(), now, serverId, Optional.of(task.getOffer().getSlaveId().getValue())));

try {
Expand Down Expand Up @@ -686,7 +686,7 @@ private void saveTaskHistoryUpdate(SingularityTaskCleanup cleanup) {
msg.append(cleanup.getMessage().get());
}

saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(cleanup.getTaskId(), cleanup.getTimestamp(), ExtendedTaskState.TASK_CLEANING, Optional.of(msg.toString())));
saveTaskHistoryUpdate(new SingularityTaskHistoryUpdate(cleanup.getTaskId(), cleanup.getTimestamp(), ExtendedTaskState.TASK_CLEANING, Optional.of(msg.toString()), Optional.<String>absent()));
}

public SingularityCreateResult createTaskCleanup(SingularityTaskCleanup cleanup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
Expand Down Expand Up @@ -298,6 +299,22 @@ private Optional<SingularityTaskId> getTaskId(String taskId) {
}
}

private Optional<String> getStatusMessage(Protos.TaskStatus status, Optional<SingularityTask> task) {
if (status.hasMessage() && !Strings.isNullOrEmpty(status.getMessage())) {
return Optional.of(status.getMessage());
} else if (status.hasReason() && status.getReason() == Protos.TaskStatus.Reason.REASON_MEMORY_LIMIT) {
if (task.isPresent()) {
final double memory = MesosUtils.getMemory(task.get().getMesosTask().getResourcesList());
if (memory > 0) {
return Optional.of(String.format("Task exceeded memory limit of %s MB", MesosUtils.getMemory(task.get().getMesosTask().getResourcesList())));
}
}
return Optional.of("Task exceeded memory limit");
}

return Optional.absent();
}

@Override
@Timed
public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
Expand Down Expand Up @@ -357,8 +374,10 @@ public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
}
}

final Optional<String> statusMessage = getStatusMessage(status, task);

final SingularityTaskHistoryUpdate taskUpdate =
new SingularityTaskHistoryUpdate(taskIdObj, timestamp, taskState, status.hasMessage() ? Optional.of(status.getMessage()) : Optional.<String>absent());
new SingularityTaskHistoryUpdate(taskIdObj, timestamp, taskState, statusMessage, status.hasReason() ? Optional.of(status.getReason().name()) : Optional.<String>absent());
final SingularityCreateResult taskHistoryUpdateCreateResult = taskManager.saveTaskHistoryUpdate(taskUpdate);

logSupport.checkDirectory(taskIdObj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public void testTaskOrdering() {
final SingularityTaskId taskId = new SingularityTaskId("r", "d", System.currentTimeMillis(), 1, "h", "r");
final Optional<String> msg = Optional.absent();

SingularityTaskHistoryUpdate update1 = new SingularityTaskHistoryUpdate(taskId, 1L, ExtendedTaskState.TASK_LAUNCHED, msg);
SingularityTaskHistoryUpdate update2 = new SingularityTaskHistoryUpdate(taskId, 2L, ExtendedTaskState.TASK_RUNNING, msg);
SingularityTaskHistoryUpdate update3 = new SingularityTaskHistoryUpdate(taskId, 2L, ExtendedTaskState.TASK_FAILED, msg);
SingularityTaskHistoryUpdate update1 = new SingularityTaskHistoryUpdate(taskId, 1L, ExtendedTaskState.TASK_LAUNCHED, msg, Optional.<String>absent());
SingularityTaskHistoryUpdate update2 = new SingularityTaskHistoryUpdate(taskId, 2L, ExtendedTaskState.TASK_RUNNING, msg, Optional.<String>absent());
SingularityTaskHistoryUpdate update3 = new SingularityTaskHistoryUpdate(taskId, 2L, ExtendedTaskState.TASK_FAILED, msg, Optional.<String>absent());

List<SingularityTaskHistoryUpdate> list = Arrays.asList(update2, update1, update3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ public void testSlavePlacementSeparate() {
Assert.assertTrue(taskManager.getPendingTaskIds().size() == 1);
Assert.assertTrue(taskManager.getActiveTaskIds().size() == 1);

eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent()));
eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent(), Optional.<String>absent()));

sms.resourceOffers(driver, Arrays.asList(createOffer(20, 20000, "slave1", "host1")));

Expand Down Expand Up @@ -829,7 +829,7 @@ public void testSlavePlacementOptimistic() {

sms.resourceOffers(driver, Arrays.asList(createOffer(20, 20000, "slave2", "host2")));

eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent()));
eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent(), Optional.<String>absent()));

Assert.assertTrue(taskManager.getPendingTaskIds().isEmpty());
Assert.assertTrue(taskManager.getActiveTaskIds().size() == 3);
Expand All @@ -844,7 +844,7 @@ public void testSlavePlacementOptimisticSingleOffer() {

sms.resourceOffers(driver, Arrays.asList(createOffer(20, 20000, "slave1", "host1"), createOffer(20, 20000, "slave2", "host2")));

eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent()));
eventListener.taskHistoryUpdateEvent(new SingularityTaskHistoryUpdate(taskManager.getActiveTaskIds().get(0), System.currentTimeMillis(), ExtendedTaskState.TASK_CLEANING, Optional.<String>absent(), Optional.<String>absent()));

Assert.assertTrue(taskManager.getPendingTaskIds().isEmpty());
Assert.assertTrue(taskManager.getActiveTaskIds().size() == 3);
Expand Down

0 comments on commit 3ed3eb1

Please sign in to comment.