diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index 6d80818d5..cbe6e3821 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -82,15 +82,19 @@ public PostgresIndexDAO( @Override public void indexWorkflow(WorkflowSummary workflow) { String INSERT_WORKFLOW_INDEX_SQL = - "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)" - + "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + "INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)" + + "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n" + "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, " - + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data"; + + "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data " + + "WHERE EXCLUDED.update_time > workflow_index.update_time"; if (onlyIndexOnStatusChange) { - INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status"; + INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status"; } + TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime()); + Timestamp updateTime = Timestamp.from(Instant.from(updateTa)); + TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime()); Timestamp startTime = Timestamp.from(Instant.from(ta)); @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) { .addParameter(workflow.getCorrelationId()) .addParameter(workflow.getWorkflowType()) .addParameter(startTime) + .addParameter(updateTime) .addParameter(workflow.getStatus().toString()) .addJsonParameter(workflow) .executeUpdate()); @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) { "INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)" + "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) " + "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, " - + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data"; + + "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data " + + "WHERE EXCLUDED.update_time > task_index.update_time"; if (onlyIndexOnStatusChange) { - INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status"; + INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status"; } TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime()); diff --git a/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql b/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql new file mode 100644 index 000000000..c57c03f4e --- /dev/null +++ b/postgres-persistence/src/main/resources/db/migration_postgres/V13__workflow_index_columns.sql @@ -0,0 +1,8 @@ +ALTER TABLE workflow_index +ADD update_time TIMESTAMP WITH TIME ZONE NULL; + +UPDATE workflow_index +SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone; + +ALTER TABLE workflow_index +ALTER COLUMN update_time SET NOT NULL; \ No newline at end of file diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java index 80811e80e..e3c819362 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOStatusChangeOnlyTest.java @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) { wfs.setCorrelationId("correlation-id"); wfs.setWorkflowType("workflow-type"); wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setUpdateTime("2023-02-07T08:43:45Z"); wfs.setStatus(Workflow.WorkflowStatus.RUNNING); return wfs; } @@ -142,6 +143,7 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException { // Change the record, but not the status, and re-index wfs.setCorrelationId("new-correlation-id"); + wfs.setUpdateTime("2023-02-07T08:44:45Z"); indexDAO.indexWorkflow(wfs); // retrieve the record, make sure it hasn't changed @@ -149,6 +151,7 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException { // Change the status and re-index wfs.setStatus(Workflow.WorkflowStatus.FAILED); + wfs.setUpdateTime("2023-02-07T08:45:45Z"); indexDAO.indexWorkflow(wfs); // retrieve the record, make sure it has changed @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException { // Change the status and re-index ts.setStatus(Task.Status.FAILED); + ts.setUpdateTime("2023-02-07T10:43:45Z"); indexDAO.indexTask(ts); // retrieve the record, make sure it has changed - checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0"); + checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0"); } } diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java index 3d7c80d99..5db5ba046 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/postgres/dao/PostgresIndexDAOTest.java @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) { wfs.setCorrelationId("correlation-id"); wfs.setWorkflowType("workflow-type"); wfs.setStartTime("2023-02-07T08:42:45Z"); + wfs.setUpdateTime("2023-02-07T08:43:45Z"); wfs.setStatus(Workflow.WorkflowStatus.COMPLETED); return wfs; } @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException { @Test public void testIndexNewWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new"); indexDAO.indexWorkflow(wfs); @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException { @Test public void testIndexExistingWorkflow() throws SQLException { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + + wfs.setStatus(Workflow.WorkflowStatus.FAILED); + wfs.setUpdateTime("2023-02-07T08:44:45Z"); + + indexDAO.indexWorkflow(wfs); + + compareWorkflowSummary(wfs); + } + + @Test + public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed() + throws SQLException { + + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index"); indexDAO.indexWorkflow(wfs); compareWorkflowSummary(wfs); + // Set the update time to the past + wfs.setUpdateTime("2023-02-07T08:42:45Z"); wfs.setStatus(Workflow.WorkflowStatus.FAILED); indexDAO.indexWorkflow(wfs); + // Reset the workflow to check it's not been updated + wfs = getMockWorkflowSummary("workflow-id-existing-no-index"); compareWorkflowSummary(wfs); } @Test public void testIndexNewTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); + TaskSummary ts = getMockTaskSummary("task-id-new"); indexDAO.indexTask(ts); @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException { @Test public void testIndexExistingTask() throws SQLException { - TaskSummary ts = getMockTaskSummary("task-id"); + TaskSummary ts = getMockTaskSummary("task-id-existing"); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + + ts.setUpdateTime("2023-02-07T09:43:45Z"); + ts.setStatus(Task.Status.FAILED); + + indexDAO.indexTask(ts); + + compareTaskSummary(ts); + } + + @Test + public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException { + TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update"); indexDAO.indexTask(ts); compareTaskSummary(ts); + // Set the update time to the past + ts.setUpdateTime("2023-02-07T09:41:45Z"); ts.setStatus(Task.Status.FAILED); indexDAO.indexTask(ts); + // Reset the task to check it's not been updated + ts = getMockTaskSummary("task-id-exiting-no-update"); compareTaskSummary(ts); } @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() { @Test public void testJsonSearchWorkflowSummary() { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id"); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary"); wfs.setVersion(3); indexDAO.indexWorkflow(wfs); @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() { @Test public void testSearchWorkflowSummaryPagination() { for (int i = 0; i < 5; i++) { - WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i); + WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i); indexDAO.indexWorkflow(wfs); } List orderBy = Arrays.asList(new String[] {"workflowId:DESC"}); SearchResult results = - indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy); + indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy); assertEquals("Wrong totalHits returned", 3, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-4", + "workflow-id-pagination-4", results.getResults().get(0).getWorkflowId()); assertEquals( "Results returned in wrong order", - "workflow-id-3", + "workflow-id-pagination-3", results.getResults().get(1).getWorkflowId()); results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy); assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-2", + "workflow-id-pagination-2", results.getResults().get(0).getWorkflowId()); assertEquals( "Results returned in wrong order", - "workflow-id-1", + "workflow-id-pagination-1", results.getResults().get(1).getWorkflowId()); results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy); assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "workflow-id-0", + "workflow-id-pagination-0", results.getResults().get(0).getWorkflowId()); } @@ -351,7 +394,7 @@ public void testSearchTaskSummary() { @Test public void testSearchTaskSummaryPagination() { for (int i = 0; i < 5; i++) { - TaskSummary ts = getMockTaskSummary("task-id-" + i); + TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i); indexDAO.indexTask(ts); } @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() { assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-4", + "task-id-pagination-4", results.getResults().get(0).getTaskId()); assertEquals( "Results returned in wrong order", - "task-id-3", + "task-id-pagination-3", results.getResults().get(1).getTaskId()); results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy); assertEquals("Wrong totalHits returned", 5, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-2", + "task-id-pagination-2", results.getResults().get(0).getTaskId()); assertEquals( "Results returned in wrong order", - "task-id-1", + "task-id-pagination-1", results.getResults().get(1).getTaskId()); results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy); assertEquals("Wrong totalHits returned", 7, results.getTotalHits()); assertEquals("Wrong number of results returned", 2, results.getResults().size()); assertEquals( "Results returned in wrong order", - "task-id-0", + "task-id-pagination-0", results.getResults().get(0).getTaskId()); }