Skip to content

Commit

Permalink
Merge pull request #889 from HubSpot/max-tasks-no-zk
Browse files Browse the repository at this point in the history
add support for max numbers of objects in ZK when no database is configured
  • Loading branch information
Tom Petr committed Feb 16, 2016
2 parents 4468c7d + 02ef24d commit 678e95e
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 34 deletions.
3 changes: 3 additions & 0 deletions Docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ These settings are less likely to be changed, but were included in the configura
| newTaskCheckerBaseDelaySeconds | 1 | Added to the the amount of deploy to wait before checking a new task | long |
| allowTestResourceCalls | false | If true, allows calls to be made to the test resource, which can test internal methods | boolean |
| deleteDeploysFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete deploys from zk when they are older than this if we are not using a database | long |
| maxStaleDeploysPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest deploys from zk when there are more than this number for a given request, if we're not already persisting them to a database | int |
| deleteStaleRequestsFromZkWhenNoDatabaseAfterHours | 336 (14 days) | Delete stale requests after this amount of time if we are not using a database | long |
| maxRequestsWithHistoryInZkWhenNoDatabase | infinite (disabled) | Delete history of oldest requests from zk when there are more than this number of requests, if we're not already persisting them to a database | int |
| deleteTasksFromZkWhenNoDatabaseAfterHours | 168 (7 days) | Delete old tasks from zk after this amount of time if we are not using a database | long |
| maxStaleTasksPerRequestInZkWhenNoDatabase | infinite (disabled) | Delete oldest tasks from zk when there are more than this number for a given request, if we're not already persisting them to a database | int |
| deleteDeadSlavesAfterHours | 168 (7 days) | Remove dead slaves from the list after this amount of time | long |
| deleteUndeliverableWebhooksAfterHours | 168 (7 days) | Delete (and stop retrying) failed webhooks after this amount of time | long |
| waitForListeners | true | If true, the event system waits for all listeners having processed an event. | boolean |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.google.common.base.Optional;
import com.google.common.collect.ComparisonChain;

public class SingularityRequestHistory implements Comparable<SingularityRequestHistory> {
public class SingularityRequestHistory implements Comparable<SingularityRequestHistory>, SingularityHistoryItem {

private final long createdAt;
private final Optional<String> user;
Expand Down Expand Up @@ -86,6 +86,12 @@ public Optional<String> getMessage() {
return message;
}

@Override
@JsonIgnore
public long getCreateTimestampForCalculatingHistoryAge() {
return createdAt;
}

@Override
public String toString() {
return "SingularityRequestHistory [createdAt=" + createdAt + ", user=" + user + ", eventType=" + eventType + ", request=" + request + ", message=" + message + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ public int compare(SingularityTaskId o1, SingularityTaskId o2) {

};

public static Comparator<SingularityTaskId> STARTED_AT_COMPARATOR_DESC = new Comparator<SingularityTaskId>() {

@Override
public int compare(SingularityTaskId o1, SingularityTaskId o2) {
return Long.compare(o2.startedAt, o1.startedAt);
}

};

public static Predicate<SingularityTaskId> notIn(Collection<SingularityTaskId> exclude) {
return Predicates.not(Predicates.in(exclude));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,18 @@ public class SingularityConfiguration extends Configuration {

private long deleteDeploysFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14);

private Optional<Integer> maxStaleDeploysPerRequestInZkWhenNoDatabase = Optional.absent();

private long deleteDeadSlavesAfterHours = TimeUnit.DAYS.toHours(7);

private long deleteStaleRequestsFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(14);

private Optional<Integer> maxRequestsWithHistoryInZkWhenNoDatabase = Optional.absent();

private long deleteTasksFromZkWhenNoDatabaseAfterHours = TimeUnit.DAYS.toHours(7);

private Optional<Integer> maxStaleTasksPerRequestInZkWhenNoDatabase = Optional.absent();

private long deleteUndeliverableWebhooksAfterHours = TimeUnit.DAYS.toHours(7);

private long deltaAfterWhichTasksAreLateMillis = TimeUnit.SECONDS.toMillis(30);
Expand Down Expand Up @@ -925,4 +931,28 @@ public boolean isDeleteRemovedRequestsFromLoadBalancer() {
public void setDeleteRemovedRequestsFromLoadBalancer(boolean deleteRemovedRequestsFromLoadBalancer) {
this.deleteRemovedRequestsFromLoadBalancer = deleteRemovedRequestsFromLoadBalancer;
}

public Optional<Integer> getMaxStaleDeploysPerRequestInZkWhenNoDatabase() {
return maxStaleDeploysPerRequestInZkWhenNoDatabase;
}

public void setMaxStaleDeploysPerRequestInZkWhenNoDatabase(Optional<Integer> maxStaleDeploysPerRequestInZkWhenNoDatabase) {
this.maxStaleDeploysPerRequestInZkWhenNoDatabase = maxStaleDeploysPerRequestInZkWhenNoDatabase;
}

public Optional<Integer> getMaxRequestsWithHistoryInZkWhenNoDatabase() {
return maxRequestsWithHistoryInZkWhenNoDatabase;
}

public void setMaxRequestsWithHistoryInZkWhenNoDatabase(Optional<Integer> maxRequestsWithHistoryInZkWhenNoDatabase) {
this.maxRequestsWithHistoryInZkWhenNoDatabase = maxRequestsWithHistoryInZkWhenNoDatabase;
}

public Optional<Integer> getMaxStaleTasksPerRequestInZkWhenNoDatabase() {
return maxStaleTasksPerRequestInZkWhenNoDatabase;
}

public void setMaxStaleTasksPerRequestInZkWhenNoDatabase(Optional<Integer> maxStaleTasksPerRequestInZkWhenNoDatabase) {
this.maxStaleTasksPerRequestInZkWhenNoDatabase = maxStaleTasksPerRequestInZkWhenNoDatabase;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.hubspot.singularity.data.history;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -10,6 +11,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.collect.TreeMultimap;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
Expand Down Expand Up @@ -43,6 +45,7 @@ public void runActionOnPoll() {

final List<SingularityDeployKey> allDeployIds = deployManager.getAllDeployIds();
final Map<String, SingularityRequestDeployState> byRequestId = deployManager.getAllRequestDeployStatesByRequestId();
final TreeMultimap<String, SingularityDeployHistory> deployHistoryByRequestId = TreeMultimap.create();

int numTotal = 0;
int numTransferred = 0;
Expand All @@ -56,16 +59,22 @@ public void runActionOnPoll() {

Optional<SingularityDeployHistory> deployHistory = deployManager.getDeployHistory(deployKey.getRequestId(), deployKey.getDeployId(), true);

if (!deployHistory.isPresent()) {
if (deployHistory.isPresent()) {
deployHistoryByRequestId.put(deployKey.getRequestId(), deployHistory.get());
} else {
LOG.info("Deploy history for key {} not found", deployKey);
continue;
}
}

if (moveToHistoryOrCheckForPurge(deployHistory.get())) {
numTransferred++;
}
for (Collection<SingularityDeployHistory> deployHistoryForRequest : deployHistoryByRequestId.asMap().values()) {
int i=0;
for (SingularityDeployHistory deployHistory : deployHistoryForRequest) {
if (moveToHistoryOrCheckForPurge(deployHistory, i++)) {
numTransferred++;
}

numTotal++;
numTotal++;
}
}

LOG.info("Transferred {} out of {} deploys in {}", numTransferred, numTotal, JavaUtils.duration(start));
Expand All @@ -76,6 +85,11 @@ protected long getMaxAgeInMillisOfItem() {
return TimeUnit.HOURS.toMillis(configuration.getDeleteDeploysFromZkWhenNoDatabaseAfterHours());
}

@Override
protected Optional<Integer> getMaxNumberOfItems() {
return configuration.getMaxStaleDeploysPerRequestInZkWhenNoDatabase();
}

private boolean shouldTransferDeploy(SingularityRequestDeployState deployState, SingularityDeployKey deployKey) {
if (deployState == null) {
LOG.warn("Missing request deploy state for deployKey {}", deployKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
import com.hubspot.singularity.SingularityHistoryItem;
Expand Down Expand Up @@ -34,19 +35,21 @@ protected boolean persistsHistoryInsteadOfPurging() {

@Override
protected boolean isEnabled() {
return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0;
return persistsHistoryInsteadOfPurging() || getMaxAgeInMillisOfItem() > 0 || getMaxNumberOfItems().isPresent();
}

protected abstract long getMaxAgeInMillisOfItem();

protected abstract Optional<Integer> getMaxNumberOfItems();

protected abstract boolean moveToHistory(T object);

protected abstract SingularityDeleteResult purgeFromZk(T object);

protected boolean moveToHistoryOrCheckForPurge(T object) {
protected boolean moveToHistoryOrCheckForPurge(T object, int index) {
final long start = System.currentTimeMillis();

if (moveToHistoryOrCheckForPurgeAndShouldDelete(object)) {
if (moveToHistoryOrCheckForPurgeAndShouldDelete(object, index)) {
SingularityDeleteResult deleteResult = purgeFromZk(object);
LOG.debug("{} {} (deleted: {}) in {}", persistsHistoryInsteadOfPurging() ? "Persisted" : "Purged", object, deleteResult, JavaUtils.duration(start));
return true;
Expand All @@ -55,7 +58,7 @@ protected boolean moveToHistoryOrCheckForPurge(T object) {
return false;
}

private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) {
private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object, int index) {
if (persistsHistoryInsteadOfPurging()) {
return moveToHistory(object);
}
Expand All @@ -67,6 +70,11 @@ private boolean moveToHistoryOrCheckForPurgeAndShouldDelete(T object) {
return true;
}

if (getMaxNumberOfItems().isPresent() && index >= getMaxNumberOfItems().get()) {
LOG.trace("Deleting {} because it is item number {} (max: {})", object, index, getMaxNumberOfItems().get());
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.hubspot.singularity.data.history;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.SingularityDeleteResult;
Expand All @@ -33,7 +38,7 @@ public SingularityRequestHistoryPersister(SingularityConfiguration configuration
this.historyManager = historyManager;
}

public static class SingularityRequestHistoryParent implements SingularityHistoryItem {
public static class SingularityRequestHistoryParent implements SingularityHistoryItem, Comparable<SingularityRequestHistoryParent> {

private final List<SingularityRequestHistory> history;
private final String requestId;
Expand All @@ -59,6 +64,38 @@ public long getCreateTimestampForCalculatingHistoryAge() {
return createTime;
}

@Override
public int compareTo(SingularityRequestHistoryParent o) {
return Longs.compare(this.getCreateTimestampForCalculatingHistoryAge(), o.getCreateTimestampForCalculatingHistoryAge());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SingularityRequestHistoryParent that = (SingularityRequestHistoryParent) o;
return createTime == that.createTime &&
Objects.equals(history, that.history) &&
Objects.equals(requestId, that.requestId);
}

@Override
public int hashCode() {
return Objects.hash(history, requestId, createTime);
}

@Override
public String toString() {
return "SingularityRequestHistoryParent[" +
"history=" + history +
", requestId='" + requestId + '\'' +
", createTime=" + createTime +
']';
}
}

@Override
Expand All @@ -67,30 +104,36 @@ public void runActionOnPoll() {

final long start = System.currentTimeMillis();

final List<String> requestIdsWithHistory = requestManager.getRequestIdsWithHistory();
final List<SingularityRequestHistoryParent> requestHistoryParents = new ArrayList();

int numHistoryTransferred = 0;
int numRequests = 0;

for (String requestId : requestIdsWithHistory) {
numRequests++;
for (String requestId : requestManager.getRequestIdsWithHistory()) {
requestHistoryParents.add(new SingularityRequestHistoryParent(requestManager.getRequestHistory(requestId), requestId));
}

List<SingularityRequestHistory> historyForRequestId = requestManager.getRequestHistory(requestId);
SingularityRequestHistoryParent requestHistoryParent = new SingularityRequestHistoryParent(historyForRequestId, requestId);
Collections.sort(requestHistoryParents, Collections.<SingularityRequestHistoryParent>reverseOrder()); // createdAt descending

if (moveToHistoryOrCheckForPurge(requestHistoryParent)) {
int i=0;
for (SingularityRequestHistoryParent requestHistoryParent : requestHistoryParents) {
if (moveToHistoryOrCheckForPurge(requestHistoryParent, i++)) {
numHistoryTransferred += requestHistoryParent.history.size();
}
}

LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, numRequests, JavaUtils.duration(start));
LOG.info("Transferred {} history updates for {} requests in {}", numHistoryTransferred, requestHistoryParents.size(), JavaUtils.duration(start));
}

@Override
protected long getMaxAgeInMillisOfItem() {
return TimeUnit.HOURS.toMillis(configuration.getDeleteStaleRequestsFromZkWhenNoDatabaseAfterHours());
}

@Override
protected Optional<Integer> getMaxNumberOfItems() {
return configuration.getMaxRequestsWithHistoryInZkWhenNoDatabase();
}

@Override
protected boolean moveToHistory(SingularityRequestHistoryParent object) {
for (SingularityRequestHistory requestHistory : object.history) {
Expand Down
Loading

0 comments on commit 678e95e

Please sign in to comment.