diff --git a/services/harmony/app/backends/workflow-orchestration/work-item-updates.ts b/services/harmony/app/backends/workflow-orchestration/work-item-updates.ts index d3c2ed194..dec76922a 100644 --- a/services/harmony/app/backends/workflow-orchestration/work-item-updates.ts +++ b/services/harmony/app/backends/workflow-orchestration/work-item-updates.ts @@ -655,10 +655,7 @@ export async function processWorkItem( let jobSaveStartTime; let didCreateWorkItem = false; - // TODO HARMONY-1995 add status === WorkItemStatus.WARNING here - if (status === WorkItemStatus.SUCCESSFUL) { - logger.info(`Updating work item ${workItemID} to ${status}`); - } + logger.info(`Received update for work item ${workItemID} to ${status}`); try { // lock the work item so we can update it - need to do this after locking jobs table above @@ -911,18 +908,22 @@ export async function processWorkItems( 'HWIUWJI.Job.byJobID', logger))(tx, jobID, false, false, true); - const thisStep: WorkflowStep = await (await logAsyncExecutionTime( - getWorkflowStepByJobIdStepIndex, - 'HWIUWJI.getWorkflowStepByJobIdStepIndex', - logger))(tx, jobID, workflowStepIndex); - - const lastIndex = items.length - 1; - for (let index = 0; index < items.length; index++) { - const { preprocessResult, update } = items[index]; - if (index < lastIndex) { - await processWorkItem(tx, preprocessResult, job, update, logger, false, thisStep); - } else { - await processWorkItem(tx, preprocessResult, job, update, logger, true, thisStep); + if (job.hasTerminalStatus()) { + logger.warn(`Ignoring work item updates for job ${jobID} in terminal state ${job.status}.`); + } else { + const thisStep: WorkflowStep = await (await logAsyncExecutionTime( + getWorkflowStepByJobIdStepIndex, + 'HWIUWJI.getWorkflowStepByJobIdStepIndex', + logger))(tx, jobID, workflowStepIndex); + + const lastIndex = items.length - 1; + for (let index = 0; index < items.length; index++) { + const { preprocessResult, update } = items[index]; + if (index < lastIndex) { + await processWorkItem(tx, preprocessResult, job, update, logger, false, thisStep); + } else { + await processWorkItem(tx, preprocessResult, job, update, logger, true, thisStep); + } } } }); diff --git a/services/harmony/app/models/job.ts b/services/harmony/app/models/job.ts index e6c4f8e0c..0ad4f90d9 100644 --- a/services/harmony/app/models/job.ts +++ b/services/harmony/app/models/job.ts @@ -10,7 +10,7 @@ import { createPublicPermalink } from '../frontends/service-results'; import { CmrPermission, CmrPermissionsMap, CmrTagKeys, getCollectionsByIds, getPermissions, } from '../util/cmr'; -import { Transaction } from '../util/db'; +import db, { Transaction } from '../util/db'; import env from '../util/env'; import { ConflictError } from '../util/errors'; import { removeEmptyProperties } from '../util/object'; @@ -337,6 +337,21 @@ async function getUniqueProviderIds(tx: Transaction): Promise { return results.map((job) => job.provider_id); } +/** + * Get the job status for the given job ID + * + * @param jobID - the job ID + * @returns the job status for the given job ID + */ +export async function getJobStatusForJobID(jobID: string): Promise { + return ( + await db('jobs') + .select('status') + .where({ jobID }) + .first() + )?.status; +} + /** * Sets the fields on the where clauses (see JobQuery) to be prefixed with a table name to avoid * ambiguities when joining with other tables diff --git a/services/harmony/test/geojson-normalizer.ts b/services/harmony/test/geojson-normalizer.ts index 5dc938167..ae0b24b76 100644 --- a/services/harmony/test/geojson-normalizer.ts +++ b/services/harmony/test/geojson-normalizer.ts @@ -514,7 +514,6 @@ describe('convertPointsToPolygons', () => { const expectedOutputJson = fs.readFileSync(path.resolve(__dirname, `resources/${expectedOutputFile}.geojson`), 'utf8'); const normalizedGeoJson = convertPointsToPolygons(testGeoJson); - console.log(`${JSON.stringify(normalizedGeoJson, null, 2)}`); it('should convert Points to Polygons', function () { expect(normalizedGeoJson.features[0].geometry.type).to.equal('Polygon'); expect(JSON.stringify(normalizedGeoJson, null, 2)).to.eql(expectedOutputJson); @@ -530,7 +529,6 @@ describe('convertPointsToPolygons', () => { const expectedOutputJson = fs.readFileSync(path.resolve(__dirname, `resources/${expectedOutputFile}.geojson`), 'utf8'); const normalizedGeoJson = convertPointsToPolygons(testGeoJson); - console.log(`${JSON.stringify(normalizedGeoJson, null, 2)}`); it('should convert MultiPoints to MultiPolygons', function () { expect(normalizedGeoJson.features[0].geometry.type).to.equal('MultiPolygon'); expect(JSON.stringify(normalizedGeoJson, null, 2)).to.eql(expectedOutputJson); diff --git a/services/work-updater/app/workers/updater.ts b/services/work-updater/app/workers/updater.ts index fa44ec8b4..0dfd16d69 100644 --- a/services/work-updater/app/workers/updater.ts +++ b/services/work-updater/app/workers/updater.ts @@ -1,19 +1,19 @@ import { Logger } from 'winston'; + import { - WorkItemUpdateQueueItem, - handleWorkItemUpdate, - preprocessWorkItem, - processWorkItems } from '../../../harmony/app/backends/workflow-orchestration/work-item-updates'; + handleWorkItemUpdate, preprocessWorkItem, processWorkItems, WorkItemUpdateQueueItem, +} from '../../../harmony/app/backends/workflow-orchestration/work-item-updates'; +import { getJobStatusForJobID, terminalStates } from '../../../harmony/app/models/job'; import { getJobIdForWorkItem } from '../../../harmony/app/models/work-item'; +import { getWorkflowStepByJobIdStepIndex } from '../../../harmony/app/models/workflow-steps'; +import db from '../../../harmony/app/util/db'; import { default as defaultLogger } from '../../../harmony/app/util/log'; +import { logAsyncExecutionTime } from '../../../harmony/app/util/log-execution'; import { WorkItemQueueType } from '../../../harmony/app/util/queue/queue'; import { getQueueForType } from '../../../harmony/app/util/queue/queue-factory'; import sleep from '../../../harmony/app/util/sleep'; import { Worker } from '../../../harmony/app/workers/worker'; import env from '../util/env'; -import { logAsyncExecutionTime } from '../../../harmony/app/util/log-execution'; -import { getWorkflowStepByJobIdStepIndex } from '../../../harmony/app/models/workflow-steps'; -import db from '../../../harmony/app/util/db'; /** * Group work item updates by its workflow step and return the grouped work item updates @@ -105,11 +105,16 @@ export async function handleBatchWorkItemUpdates( }, {}); // process each job's updates for (const jobID in jobUpdates) { - const startTime = Date.now(); - logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID}`); - await handleBatchWorkItemUpdatesWithJobId(jobID, jobUpdates[jobID], logger); - const endTime = Date.now(); - logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID} took ${endTime - startTime} ms`); + const jobStatus = await getJobStatusForJobID(jobID); + if (terminalStates.includes(jobStatus)) { + logger.warn(`Ignoring work item updates for job ${jobID} in terminal state ${jobStatus}.`); + } else { + const startTime = Date.now(); + logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID}`); + await handleBatchWorkItemUpdatesWithJobId(jobID, jobUpdates[jobID], logger); + const endTime = Date.now(); + logger.debug(`Processing ${jobUpdates[jobID].length} work item updates for job ${jobID} took ${endTime - startTime} ms`); + } } }