Skip to content

Commit

Permalink
Bugifx set timeout in mongo job repository
Browse files Browse the repository at this point in the history
  • Loading branch information
peterfouquet0001 committed Jan 10, 2019
1 parent 2e31075 commit 2d58dd7
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,39 @@ public JobStatus findStatus(final String jobId) {
return JobStatus.valueOf(collection()
.find(eq(ID, jobId))
.projection(new Document(JobStructure.STATUS.key(), true))
.maxTime(50, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.first().getString(JobStructure.STATUS.key()));
}

@Override
public void removeIfStopped(final String id) {
findOne(id).ifPresent(jobInfo -> {
if (jobInfo.isStopped()) {
collectionWithWriteTimeout(50, TimeUnit.MILLISECONDS).deleteOne(eq(ID, id));
collectionWithWriteTimeout(mongoProperties.getDefaultWriteTimeout(), TimeUnit.MILLISECONDS).deleteOne(eq(ID, id));
}
});
}

@Override
public void appendMessage(final String jobId, final JobMessage jobMessage) {
collectionWithWriteTimeout(250, TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), combine(push(JobStructure.MESSAGES.key(), encodeJobMessage(jobMessage)), set(JobStructure.LAST_UPDATED.key(), Date.from(jobMessage.getTimestamp().toInstant()))));
collectionWithWriteTimeout(mongoProperties.getDefaultWriteTimeout(), TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), combine(push(JobStructure.MESSAGES.key(), encodeJobMessage(jobMessage)), set(JobStructure.LAST_UPDATED.key(), Date.from(jobMessage.getTimestamp().toInstant()))));
}

@Override
public void setJobStatus(final String jobId, final JobStatus jobStatus) {
collectionWithWriteTimeout(250, TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), set(JobStructure.STATUS.key(), jobStatus.name()));
collectionWithWriteTimeout(mongoProperties.getDefaultWriteTimeout(), TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), set(JobStructure.STATUS.key(), jobStatus.name()));
}

@Override
public void setLastUpdate(final String jobId, final OffsetDateTime lastUpdate) {
collectionWithWriteTimeout(250, TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), set(JobStructure.LAST_UPDATED.key(), Date.from(lastUpdate.toInstant())));
collectionWithWriteTimeout(mongoProperties.getDefaultWriteTimeout(), TimeUnit.MILLISECONDS).updateOne(eq(ID, jobId), set(JobStructure.LAST_UPDATED.key(), Date.from(lastUpdate.toInstant())));
}

@Override
public List<JobInfo> findLatest(final int maxCount) {
return collection()
.find()
.maxTime(500, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.sort(orderByStarted(DESCENDING))
.limit(maxCount)
.map(this::decode)
Expand All @@ -100,7 +100,7 @@ public List<JobInfo> findLatestJobsDistinct() {
final List<String> allJobIds = findAllJobIdsDistinct();
return collection()
.find(in(ID, allJobIds))
.maxTime(500, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.map(this::decode)
.into(new ArrayList<>());
}
Expand All @@ -113,7 +113,7 @@ public List<String> findAllJobIdsDistinct() {
put("_id", "$type");
put("latestJobId", new Document("$first", "$_id"));
}})))
.maxTime(500, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.map(doc -> doc.getString("latestJobId"))
.into(new ArrayList<>()).stream()
.filter(Objects::nonNull)
Expand All @@ -124,7 +124,7 @@ public List<String> findAllJobIdsDistinct() {
public List<JobInfo> findLatestBy(final String type, final int maxCount) {
return collection()
.find(byType(type))
.maxTime(250, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.sort(orderByStarted(DESCENDING))
.limit(maxCount)
.map(this::decode)
Expand All @@ -135,7 +135,7 @@ public List<JobInfo> findLatestBy(final String type, final int maxCount) {
public List<JobInfo> findByType(final String type) {
return collection()
.find(byType(type))
.maxTime(250, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.sort(orderByStarted(DESCENDING))
.map(this::decode)
.into(new ArrayList<>());
Expand All @@ -147,7 +147,7 @@ public List<JobInfo> findRunningWithoutUpdateSince(final OffsetDateTime timeOffs
.find(new Document()
.append(JobStructure.STOPPED.key(), singletonMap("$exists", false))
.append(JobStructure.LAST_UPDATED.key(), singletonMap("$lt", from(timeOffset.toInstant()))))
.maxTime(500, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.map(this::decode)
.into(new ArrayList<>());
}
Expand Down Expand Up @@ -246,7 +246,7 @@ private Document orderByStarted(final int order) {
public List<JobInfo> findAllJobInfoWithoutMessages() {
return collection()
.find()
.maxTime(500, TimeUnit.MILLISECONDS)
.maxTime(mongoProperties.getDefaultReadTimeout(), TimeUnit.MILLISECONDS)
.projection(new Document(getJobInfoWithoutMessagesProjection()))
.map(this::decode)
.into(new ArrayList<>());
Expand Down

0 comments on commit 2d58dd7

Please sign in to comment.