diff --git a/endpointmanager/pkg/endpointmanager/postgresql/historypruningstore.go b/endpointmanager/pkg/endpointmanager/postgresql/historypruningstore.go index 67fd8ef47..0525379ad 100644 --- a/endpointmanager/pkg/endpointmanager/postgresql/historypruningstore.go +++ b/endpointmanager/pkg/endpointmanager/postgresql/historypruningstore.go @@ -10,20 +10,116 @@ import ( var pruningStatementQueryInterval *sql.Stmt var pruningStatementNoQueryInterval *sql.Stmt +var pruningStatementCustomQueryInterval *sql.Stmt var pruningDeleteStatement *sql.Stmt var pruningDeleteValStatement *sql.Stmt var pruningDeleteValResStatement *sql.Stmt +var distinctURLStatementQueryInterval *sql.Stmt +var distinctURLStatementNoQueryInterval *sql.Stmt +var distinctURLStatementCustomQueryInterval *sql.Stmt +var addPruningMetadataStatementQueryInterval *sql.Stmt +var addPruningMetadataStatementNoQueryInterval *sql.Stmt +var addPruningMetadataStatementCustomQueryInterval *sql.Stmt +var updatePruningMetadataStatement *sql.Stmt +var pruningMetadataCountStatement *sql.Stmt +var lastPruneStatement *sql.Stmt + +// GetDistinctURLsFromHistory gets a list of ordered distinct URLs from the history table +func (s *Store) GetDistinctURLs(ctx context.Context, queryInterval bool, lastPruneSuccessful bool, lastPruneQueryIntStartDate string, lastPruneQueryIntEndDate string) (*sql.Rows, error) { + + var err error + var rows *sql.Rows + + if queryInterval { + if lastPruneQueryIntStartDate != "" && lastPruneQueryIntEndDate != "" { + if lastPruneSuccessful { + rows, err = distinctURLStatementCustomQueryInterval.QueryContext(ctx, lastPruneQueryIntEndDate) + } else { + rows, err = distinctURLStatementCustomQueryInterval.QueryContext(ctx, lastPruneQueryIntStartDate) + } + } else { + rows, err = distinctURLStatementQueryInterval.QueryContext(ctx) + } + } else { + rows, err = distinctURLStatementNoQueryInterval.QueryContext(ctx) + } + + return rows, err +} + +func (s *Store) GetPruningMetadataCount(ctx context.Context) (*sql.Rows, error) { + + var err error + var rows *sql.Rows + + rows, err = pruningMetadataCountStatement.QueryContext(ctx) + + return rows, err +} + +func (s *Store) GetLastPruneEntryDate(ctx context.Context) (*sql.Rows, error) { + + var err error + var rows *sql.Rows + + rows, err = lastPruneStatement.QueryContext(ctx) + + return rows, err +} + +func (s *Store) AddPruningMetadata(ctx context.Context, queryInterval bool, lastPruneSuccessful bool, lastPruneQueryIntStartDate string, lastPruneQueryIntEndDate string) (int, error) { + + var err error + var row *sql.Row + var id int + + if queryInterval { + if lastPruneQueryIntStartDate != "" && lastPruneQueryIntEndDate != "" { + if lastPruneSuccessful { + row = addPruningMetadataStatementCustomQueryInterval.QueryRowContext(ctx, lastPruneQueryIntEndDate) + } else { + row = addPruningMetadataStatementCustomQueryInterval.QueryRowContext(ctx, lastPruneQueryIntStartDate) + } + } else { + row = addPruningMetadataStatementQueryInterval.QueryRowContext(ctx) + } + } else { + row = addPruningMetadataStatementNoQueryInterval.QueryRowContext(ctx) + } + + err = row.Scan(&id) + + return id, err +} + +// GetInfoHistoryCountBeforeThreshold gets the count of rows in the info history table during the pruning query interval and threshold time frame +func (s *Store) UpdatePruningMetadata(ctx context.Context, pruningMetadataId int, successful bool, numRowsProcessed int, numRowsPruned int) error { + + var err error + + _, err = updatePruningMetadataStatement.ExecContext(ctx, pruningMetadataId, successful, numRowsProcessed, numRowsPruned) + + return err +} // PruningGetInfoHistory gets info history entries for pruning -func (s *Store) PruningGetInfoHistory(ctx context.Context, queryInterval bool) (*sql.Rows, error) { +func (s *Store) PruningGetInfoHistory(ctx context.Context, queryInterval bool, url string, lastPruneSuccessful bool, lastPruneQueryIntStartDate string, lastPruneQueryIntEndDate string) (*sql.Rows, error) { var rows *sql.Rows var err error if queryInterval { - rows, err = pruningStatementQueryInterval.QueryContext(ctx) + if lastPruneQueryIntStartDate != "" && lastPruneQueryIntEndDate != "" { + if lastPruneSuccessful { + rows, err = pruningStatementCustomQueryInterval.QueryContext(ctx, url, lastPruneQueryIntEndDate) + } else { + rows, err = pruningStatementCustomQueryInterval.QueryContext(ctx, url, lastPruneQueryIntStartDate) + } + } else { + rows, err = pruningStatementQueryInterval.QueryContext(ctx, url) + } } else { - rows, err = pruningStatementNoQueryInterval.QueryContext(ctx) + rows, err = pruningStatementNoQueryInterval.QueryContext(ctx, url) } return rows, err @@ -71,20 +167,116 @@ func prepareHistoryPruningStatements(s *Store) error { thresholdString := strconv.Itoa(pruningThreshold) queryIntString := strconv.Itoa(pruningThreshold * 2) + distinctURLStatementQueryInterval, err = s.DB.Prepare(` + select DISTINCT(url) FROM fhir_endpoints_info_history + WHERE (operation='U' OR operation='I') + AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + AND (date_trunc('minute', entered_at) >= date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute')) + ORDER BY url;`) + if err != nil { + return err + } + distinctURLStatementCustomQueryInterval, err = s.DB.Prepare(` + select DISTINCT(url) FROM fhir_endpoints_info_history + WHERE (operation='U' OR operation='I') + AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + AND (date_trunc('minute', entered_at) >= LEAST(date_trunc('minute', date($1)), + date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute'))) + ORDER BY url;`) + if err != nil { + return err + } + distinctURLStatementNoQueryInterval, err = s.DB.Prepare(` + select DISTINCT(url) FROM fhir_endpoints_info_history + WHERE (operation='U' OR operation='I') + AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + ORDER BY url;`) + if err != nil { + return err + } + pruningMetadataCountStatement, err = s.DB.Prepare(` + SELECT COUNT(*) FROM info_history_pruning_metadata;`) + if err != nil { + return err + } + lastPruneStatement, err = s.DB.Prepare(` + SELECT successful, query_int_start_date, query_int_end_date FROM info_history_pruning_metadata + ORDER BY started_on DESC LIMIT 1;`) + if err != nil { + return err + } + addPruningMetadataStatementQueryInterval, err = s.DB.Prepare(` + INSERT INTO info_history_pruning_metadata (query_int_start_date, query_int_end_date) + VALUES (date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute'), + date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + RETURNING id`) + if err != nil { + return err + } + addPruningMetadataStatementCustomQueryInterval, err = s.DB.Prepare(` + INSERT INTO info_history_pruning_metadata (query_int_start_date, query_int_end_date) + VALUES (LEAST(date_trunc('minute', date($1)), + date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute')), + date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + RETURNING id`) + if err != nil { + return err + } + addPruningMetadataStatementNoQueryInterval, err = s.DB.Prepare(` + INSERT INTO info_history_pruning_metadata (query_int_start_date, query_int_end_date) + SELECT date_trunc('minute', entered_at), + date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute') + FROM fhir_endpoints_info_history + ORDER BY entered_at ASC + LIMIT 1 + RETURNING id`) + if err != nil { + return err + } + updatePruningMetadataStatement, err = s.DB.Prepare(` + UPDATE info_history_pruning_metadata + SET successful = $2, + num_rows_processed = $3, + num_rows_pruned = $4, + ended_on = now() + WHERE id = $1`) + if err != nil { + return err + } pruningStatementQueryInterval, err = s.DB.Prepare(` SELECT operation, url, capability_statement, entered_at, tls_version, mime_types, smart_response, validation_result_id, requested_fhir_version FROM fhir_endpoints_info_history - WHERE (operation='U' OR operation='I') + WHERE (operation='U' OR operation='I') + AND url = $1 AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) AND (date_trunc('minute', entered_at) >= date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute')) - ORDER BY url, entered_at ASC;`) + ORDER BY entered_at ASC;`) + if err != nil { + return err + } + pruningStatementCustomQueryInterval, err = s.DB.Prepare(` + SELECT operation, url, capability_statement, entered_at, tls_version, mime_types, smart_response, validation_result_id, requested_fhir_version FROM fhir_endpoints_info_history + WHERE (operation='U' OR operation='I') + AND url = $1 + AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + AND (date_trunc('minute', entered_at) >= COALESCE((SELECT date_trunc('minute', entered_at) + FROM fhir_endpoints_info_history + WHERE (operation='U' OR operation='I') + AND (date_trunc('minute', entered_at) < LEAST(date_trunc('minute', date($2)), + date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute'))) + AND url = $1 + ORDER BY entered_at DESC LIMIT 1), + LEAST(date_trunc('minute', date($2)), + date_trunc('minute', current_date - INTERVAL '` + queryIntString + ` minute')))) + ORDER BY entered_at ASC;`) if err != nil { return err } pruningStatementNoQueryInterval, err = s.DB.Prepare(` SELECT operation, url, capability_statement, entered_at, tls_version, mime_types, smart_response, validation_result_id, requested_fhir_version FROM fhir_endpoints_info_history WHERE (operation='U' OR operation='I') - AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) - ORDER BY url, entered_at ASC;`) + AND (date_trunc('minute', entered_at) <= date_trunc('minute', current_date - INTERVAL '` + thresholdString + ` minute')) + AND url = $1 + ORDER BY entered_at ASC;`) if err != nil { return err } diff --git a/endpointmanager/pkg/historypruning/historypruning.go b/endpointmanager/pkg/historypruning/historypruning.go index a698eb46a..cb3ed4a13 100644 --- a/endpointmanager/pkg/historypruning/historypruning.go +++ b/endpointmanager/pkg/historypruning/historypruning.go @@ -10,77 +10,189 @@ import ( "github.com/onc-healthit/lantern-back-end/endpointmanager/pkg/endpointmanager/postgresql" "github.com/onc-healthit/lantern-back-end/endpointmanager/pkg/helpers" "github.com/onc-healthit/lantern-back-end/endpointmanager/pkg/smartparser" + log "github.com/sirupsen/logrus" ) // PruneInfoHistory checks info table and prunes any repetitive entries func PruneInfoHistory(ctx context.Context, store *postgresql.Store, queryInterval bool) { + + var distinctURLrows *sql.Rows + var pruningMetadataCountRows *sql.Rows + var lastPruneRows *sql.Rows + var pruningMetadataId int var rows *sql.Rows var err error - rows, err = store.PruningGetInfoHistory(ctx, queryInterval) + var lastPruneSuccessful bool + var lastPruneQueryIntStartDate string + var lastPruneQueryIntEndDate string + + // LANTERN-724: Check whether data is present in the info history pruning metadata table + pruningMetadataCountRows, err = store.GetPruningMetadataCount(ctx) helpers.FailOnError("", err) - if !rows.Next() { - return + if !pruningMetadataCountRows.Next() { + log.Fatal("Error fetching the count of info history pruning metadata table") } - _, fhirURL1, _, capStat1, tlsVersion1, mimeTypes1, smartResponse1, _, requestedFhirVersion1 := getRowInfo(rows) + pruningMetadataCount := getPruningMetadataCountRowInfo(pruningMetadataCountRows) + + if pruningMetadataCount > 0 { + // LANTERN-724: Fetch the last pruned row's entered_at date from the latest entry in the info history pruning metadata table + lastPruneRows, err = store.GetLastPruneEntryDate(ctx) + helpers.FailOnError("", err) + + if !lastPruneRows.Next() { + log.Fatal("Error fetching latest entry from the pruning metadata table") + } + + lastPruneSuccessful, lastPruneQueryIntStartDate, lastPruneQueryIntEndDate = getLastPruneRowInfo(lastPruneRows) + } - for rows.Next() { + // LANTERN-724: Insert an entry in the info history pruning metadata table + pruningMetadataId, err = store.AddPruningMetadata(ctx, queryInterval, lastPruneSuccessful, lastPruneQueryIntStartDate, lastPruneQueryIntEndDate) + helpers.FailOnError("", err) - operation2, fhirURL2, entryDate2, capStat2, tlsVersion2, mimeTypes2, smartResponse2, valResID2, requestedFhirVersion2 := getRowInfo(rows) + // LANTERN-724: Initialize counters to determine the number of rows processed and number of rows pruned during history pruning + numRowsProcessed := 0 + numRowsPruned := 0 - equalFhirEntries := fhirURL1 == fhirURL2 + // LANTERN-724: Get distinct URLs from the info history table + distinctURLrows, err = store.GetDistinctURLs(ctx, queryInterval, lastPruneSuccessful, lastPruneQueryIntStartDate, lastPruneQueryIntEndDate) - if equalFhirEntries { - equalFhirEntries = (requestedFhirVersion1 == requestedFhirVersion2) + // LANTERN-724: Update the pruning metadata entry anytime an error is thrown + if err != nil { + Update(ctx, store, queryInterval, pruningMetadataId, false, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) + } + + for distinctURLrows.Next() { + + url := getDistinctRowInfo(distinctURLrows) + + rows, err = store.PruningGetInfoHistory(ctx, queryInterval, url, lastPruneSuccessful, lastPruneQueryIntStartDate, lastPruneQueryIntEndDate) + + if err != nil { + Update(ctx, store, queryInterval, pruningMetadataId, false, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) + } + + if !rows.Next() { + return + } + + _, fhirURL1, _, capStat1, tlsVersion1, mimeTypes1, smartResponse1, _, requestedFhirVersion1 := getRowInfo(rows) + numRowsProcessed++ + + for rows.Next() { + + operation2, fhirURL2, entryDate2, capStat2, tlsVersion2, mimeTypes2, smartResponse2, valResID2, requestedFhirVersion2 := getRowInfo(rows) + + equalFhirEntries := fhirURL1 == fhirURL2 if equalFhirEntries { - equalFhirEntries = (tlsVersion1 == tlsVersion2) + equalFhirEntries = (requestedFhirVersion1 == requestedFhirVersion2) if equalFhirEntries { - equalFhirEntries = helpers.StringArraysEqual(mimeTypes1, mimeTypes2) + equalFhirEntries = (tlsVersion1 == tlsVersion2) if equalFhirEntries { - // If capstat is not null check if current entry that was passed in has capstat equal to capstat of old entry being checked from history table, otherwise check they are both null - if capStat1 != nil { - equalFhirEntries = capStat1.EqualIgnore(capStat2) - } else { - equalFhirEntries = (capStat2 == nil) - } + equalFhirEntries = helpers.StringArraysEqual(mimeTypes1, mimeTypes2) if equalFhirEntries { - // If smartresponse is not null check if current entry that was passed in has smartresponse equal to smartresponse of old entry being checked from history table, otherwise check they are both null - if smartResponse1 != nil { - ignoredFields := []string{} - equalFhirEntries = smartResponse1.EqualIgnore(smartResponse2, ignoredFields) + // If capstat is not null check if current entry that was passed in has capstat equal to capstat of old entry being checked from history table, otherwise check they are both null + if capStat1 != nil { + equalFhirEntries = capStat1.EqualIgnore(capStat2) } else { - equalFhirEntries = (smartResponse2 == nil) + equalFhirEntries = (capStat2 == nil) + } + + if equalFhirEntries { + // If smartresponse is not null check if current entry that was passed in has smartresponse equal to smartresponse of old entry being checked from history table, otherwise check they are both null + if smartResponse1 != nil { + ignoredFields := []string{} + equalFhirEntries = smartResponse1.EqualIgnore(smartResponse2, ignoredFields) + } else { + equalFhirEntries = (smartResponse2 == nil) + } } } } } } - } - if equalFhirEntries && operation2 == "U" { - err := store.PruningDeleteInfoHistory(ctx, fhirURL1, entryDate2, requestedFhirVersion1) - helpers.FailOnError("", err) - // Delete the validation table entries for the history table row - err = store.PruningDeleteValidationTable(ctx, valResID2) - helpers.FailOnError("", err) - err = store.PruningDeleteValidationResultEntry(ctx, valResID2) - helpers.FailOnError("", err) - } else { - fhirURL1 = fhirURL2 - capStat1 = capStat2 - tlsVersion1 = tlsVersion2 - mimeTypes1 = mimeTypes2 - smartResponse1 = smartResponse2 - requestedFhirVersion1 = requestedFhirVersion2 - continue + if equalFhirEntries && operation2 == "U" { + err := store.PruningDeleteInfoHistory(ctx, fhirURL1, entryDate2, requestedFhirVersion1) + + if err != nil { + Update(ctx, store, queryInterval, pruningMetadataId, false, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) + } + + // Delete the validation table entries for the history table row + err = store.PruningDeleteValidationTable(ctx, valResID2) + + if err != nil { + Update(ctx, store, queryInterval, pruningMetadataId, false, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) + } + + err = store.PruningDeleteValidationResultEntry(ctx, valResID2) + + if err != nil { + Update(ctx, store, queryInterval, pruningMetadataId, false, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) + } + + numRowsPruned++ + } else { + fhirURL1 = fhirURL2 + capStat1 = capStat2 + tlsVersion1 = tlsVersion2 + mimeTypes1 = mimeTypes2 + smartResponse1 = smartResponse2 + requestedFhirVersion1 = requestedFhirVersion2 + continue + } + numRowsProcessed++ } } + + Update(ctx, store, queryInterval, pruningMetadataId, true, numRowsProcessed, numRowsPruned) +} + +func Update(ctx context.Context, store *postgresql.Store, queryInterval bool, pruningMetadataId int, successful bool, numRowsProcessed int, numRowsPruned int) { + err := store.UpdatePruningMetadata(ctx, pruningMetadataId, successful, numRowsProcessed, numRowsPruned) + helpers.FailOnError("", err) +} + +func getDistinctRowInfo(rows *sql.Rows) string { + var url string + + err := rows.Scan(&url) + helpers.FailOnError("", err) + + return url +} + +func getPruningMetadataCountRowInfo(rows *sql.Rows) int { + var count int + + err := rows.Scan(&count) + helpers.FailOnError("", err) + + return count +} + +func getLastPruneRowInfo(rows *sql.Rows) (bool, string, string) { + var successful bool + var lastPruneEntryDate string + var queryIntEndDate string + + err := rows.Scan(&successful, &lastPruneEntryDate, &queryIntEndDate) + helpers.FailOnError("", err) + + return successful, lastPruneEntryDate, queryIntEndDate } func getRowInfo(rows *sql.Rows) (string, string, string, capabilityparser.CapabilityStatement, string, []string, smartparser.SMARTResponse, int, string) {