From 319d4bb03f3d69821e138a455f03d0d3c4956701 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Fri, 21 Jul 2023 11:54:15 -0700 Subject: [PATCH] Add workflow, refactor techniques, added s/n methods Signed-off-by: Martin Gaievski --- .../processor/NormalizationProcessor.java | 64 ++----- .../NormalizationProcessorWorkflow.java | 107 ++++++++++++ .../processor/ScoreCombinationTechnique.java | 43 ----- .../neuralsearch/processor/ScoreCombiner.java | 95 ----------- .../processor/ScoreNormalizationRequest.java | 20 --- .../ScoreNormalizationTechnique.java | 43 ----- .../processor/ScoreNormalizer.java | 77 --------- .../ArithmeticMeanScoreCombinationMethod.java | 46 +++++ .../combination/ScoreCombinationMethod.java | 15 ++ .../ScoreCombinationTechnique.java | 27 +++ .../processor/combination/ScoreCombiner.java | 161 ++++++++++++++++++ .../NormalizationProcessorFactory.java | 8 +- .../MinMaxScoreNormalizationMethod.java | 123 +++++++++++++ .../ScoreNormalizationMethod.java | 22 +++ .../ScoreNormalizationTechnique.java | 32 ++++ .../normalization/ScoreNormalizer.java | 29 ++++ .../common/BaseNeuralSearchIT.java | 4 +- .../NormalizationProcessorTests.java | 53 ++---- .../NormalizationProcessorWorkflowTests.java | 63 +++++++ ....java => ScoreCombinationMethodTests.java} | 8 +- ...ava => ScoreNormalizationMethodTests.java} | 24 +-- .../NormalizationProcessorFactoryTests.java | 4 +- 22 files changed, 675 insertions(+), 393 deletions(-) create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java delete mode 100644 src/main/java/org/opensearch/neuralsearch/processor/ScoreCombinationTechnique.java delete mode 100644 src/main/java/org/opensearch/neuralsearch/processor/ScoreCombiner.java delete mode 100644 src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationRequest.java delete mode 100644 src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationTechnique.java delete mode 100644 src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizer.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/combination/ArithmeticMeanScoreCombinationMethod.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationMethod.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationTechnique.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombiner.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/normalization/MinMaxScoreNormalizationMethod.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationMethod.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationTechnique.java create mode 100644 src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizer.java create mode 100644 src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflowTests.java rename src/test/java/org/opensearch/neuralsearch/processor/{ScoreCombinerTests.java => ScoreCombinationMethodTests.java} (92%) rename src/test/java/org/opensearch/neuralsearch/processor/{ScoreNormalizerTests.java => ScoreNormalizationMethodTests.java} (91%) diff --git a/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java index 7315d1a0b..16c2d9fab 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java @@ -5,11 +5,10 @@ package org.opensearch.neuralsearch.processor; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.AllArgsConstructor; @@ -20,7 +19,8 @@ import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchPhaseResults; -import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; import org.opensearch.neuralsearch.search.CompoundTopDocs; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.internal.SearchContext; @@ -48,6 +48,7 @@ public class NormalizationProcessor implements SearchPhaseResultsProcessor { final ScoreNormalizationTechnique normalizationTechnique; @Getter(AccessLevel.PACKAGE) final ScoreCombinationTechnique combinationTechnique; + final NormalizationProcessorWorkflow normalizationWorkflow; /** * Method abstracts functional aspect of score normalization and score combination. Exact methods for each processing stage @@ -64,17 +65,8 @@ public void process( if (shouldSearchResultsBeIgnored(searchPhaseResult)) { return; } - - TopDocsAndMaxScore[] topDocsAndMaxScores = getCompoundQueryTopDocsFromSearchPhaseResult(searchPhaseResult); - List queryTopDocs = Arrays.stream(topDocsAndMaxScores) - .map(td -> td != null ? (CompoundTopDocs) td.topDocs : null) - .collect(Collectors.toList()); - - ScoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique); - - List combinedMaxScores = ScoreCombiner.combineScores(queryTopDocs, combinationTechnique); - - updateOriginalQueryResults(searchPhaseResult, queryTopDocs, topDocsAndMaxScores, combinedMaxScores); + List querySearchResults = getQuerySearchResults(searchPhaseResult); + normalizationWorkflow.execute(querySearchResults, normalizationTechnique, combinationTechnique); } @Override @@ -128,46 +120,16 @@ private boolean isNotHybridQuery(final Optional maybeResult) || !(maybeResult.get().queryResult().topDocs().topDocs instanceof CompoundTopDocs); } - private TopDocsAndMaxScore[] getCompoundQueryTopDocsFromSearchPhaseResult( - final SearchPhaseResults results - ) { - List preShardResultList = results.getAtomicArray().asList(); - TopDocsAndMaxScore[] result = new TopDocsAndMaxScore[preShardResultList.size()]; - for (int idx = 0; idx < preShardResultList.size(); idx++) { - Result shardResult = preShardResultList.get(idx); + private List getQuerySearchResults(final SearchPhaseResults results) { + List resultsPerShard = results.getAtomicArray().asList(); + List querySearchResults = new ArrayList<>(); + for (Result shardResult : resultsPerShard) { if (shardResult == null) { + querySearchResults.add(null); continue; } - QuerySearchResult querySearchResult = shardResult.queryResult(); - TopDocsAndMaxScore topDocsAndMaxScore = querySearchResult.topDocs(); - if (!(topDocsAndMaxScore.topDocs instanceof CompoundTopDocs)) { - continue; - } - result[idx] = topDocsAndMaxScore; - } - return result; - } - - @VisibleForTesting - protected void updateOriginalQueryResults( - final SearchPhaseResults results, - final List queryTopDocs, - TopDocsAndMaxScore[] topDocsAndMaxScores, - List combinedMaxScores - ) { - List preShardResultList = results.getAtomicArray().asList(); - for (int i = 0; i < preShardResultList.size(); i++) { - CompoundTopDocs updatedTopDocs = queryTopDocs.get(i); - if (Objects.isNull(updatedTopDocs)) { - continue; - } - float maxScore = updatedTopDocs.totalHits.value > 0 ? updatedTopDocs.scoreDocs[0].score : 0.0f; - TopDocsAndMaxScore topDocsAndMaxScore = new TopDocsAndMaxScore(updatedTopDocs, maxScore); - QuerySearchResult querySearchResult = preShardResultList.get(i).queryResult(); - querySearchResult.topDocs(topDocsAndMaxScore, null); - if (topDocsAndMaxScores[i] != null) { - topDocsAndMaxScores[i].maxScore = combinedMaxScores.get(i); - } + querySearchResults.add(shardResult.queryResult()); } + return querySearchResults; } } diff --git a/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java b/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java new file mode 100644 index 000000000..13838d30b --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.combination.ScoreCombiner; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer; +import org.opensearch.neuralsearch.search.CompoundTopDocs; +import org.opensearch.search.query.QuerySearchResult; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Class abstracts steps required for score normalization and combination, this includes pre-processing of income data + * and post-processing for final results + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class NormalizationProcessorWorkflow { + + /** + * Return instance of workflow class. Making default constructor private for now + * as we may use singleton pattern here and share one instance among processors + * @return instance of NormalizationProcessorWorkflow + */ + public static NormalizationProcessorWorkflow create() { + return new NormalizationProcessorWorkflow(); + } + + /** + * Start execution of this workflow + * @param querySearchResults input data with QuerySearchResult from multiple shards + * @param normalizationTechnique technique for score normalization + * @param combinationTechnique technique for score combination + */ + public void execute( + final List querySearchResults, + final ScoreNormalizationTechnique normalizationTechnique, + final ScoreCombinationTechnique combinationTechnique + ) { + // pre-process data + List queryTopDocs = getQueryTopDocs(querySearchResults); + + // normalize + ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); + scoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique); + + // combine + ScoreCombiner scoreCombiner = new ScoreCombiner(); + List combinedMaxScores = scoreCombiner.combineScores(queryTopDocs, combinationTechnique); + + // post-process data + updateOriginalQueryResults(querySearchResults, queryTopDocs, combinedMaxScores); + } + + private List getQueryTopDocs(List querySearchResults) { + List queryTopDocs = querySearchResults.stream() + .filter(searchResult -> searchResult.topDocs().topDocs instanceof CompoundTopDocs) + .map(searchResult -> (CompoundTopDocs) searchResult.topDocs().topDocs) + .collect(Collectors.toList()); + return queryTopDocs; + } + + @VisibleForTesting + protected void updateOriginalQueryResults( + List querySearchResults, + final List queryTopDocs, + List combinedMaxScores + ) { + TopDocsAndMaxScore[] topDocsAndMaxScores = new TopDocsAndMaxScore[querySearchResults.size()]; + for (int idx = 0; idx < querySearchResults.size(); idx++) { + QuerySearchResult querySearchResult = querySearchResults.get(idx); + TopDocsAndMaxScore topDocsAndMaxScore = querySearchResult.topDocs(); + if (!(topDocsAndMaxScore.topDocs instanceof CompoundTopDocs)) { + continue; + } + topDocsAndMaxScores[idx] = topDocsAndMaxScore; + } + for (int i = 0; i < querySearchResults.size(); i++) { + QuerySearchResult querySearchResult = querySearchResults.get(i); + CompoundTopDocs updatedTopDocs = queryTopDocs.get(i); + if (Objects.isNull(updatedTopDocs)) { + continue; + } + float maxScore = updatedTopDocs.totalHits.value > 0 ? updatedTopDocs.scoreDocs[0].score : 0.0f; + TopDocsAndMaxScore topDocsAndMaxScore = new TopDocsAndMaxScore(updatedTopDocs, maxScore); + if (querySearchResult == null) { + continue; + } + querySearchResult.topDocs(topDocsAndMaxScore, null); + if (topDocsAndMaxScores[i] != null) { + topDocsAndMaxScores[i].maxScore = combinedMaxScores.get(i); + } + } + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombinationTechnique.java b/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombinationTechnique.java deleted file mode 100644 index a96b472c3..000000000 --- a/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombinationTechnique.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.neuralsearch.processor; - -/** - * Collection of techniques for score combination - */ -public enum ScoreCombinationTechnique { - - /** - * Arithmetic mean method for combining scores. - * cscore = (score1 + score2 +...+ scoreN)/N - * - * Zero (0.0) scores are excluded from number of scores N - */ - ARITHMETIC_MEAN { - - @Override - public float combine(final float[] scores) { - float combinedScore = 0.0f; - int count = 0; - for (float score : scores) { - if (score >= 0.0) { - combinedScore += score; - count++; - } - } - return combinedScore / count; - } - }; - - public static final ScoreCombinationTechnique DEFAULT = ARITHMETIC_MEAN; - - /** - * Defines combination function specific to this technique - * @param scores array of collected original scores - * @return combined score - */ - abstract float combine(final float[] scores); -} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombiner.java b/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombiner.java deleted file mode 100644 index 151a3e0a1..000000000 --- a/src/main/java/org/opensearch/neuralsearch/processor/ScoreCombiner.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.neuralsearch.processor; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.stream.Collectors; - -import lombok.extern.log4j.Log4j2; - -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.apache.lucene.search.TotalHits; -import org.opensearch.neuralsearch.search.CompoundTopDocs; - -/** - * Abstracts combination of scores in query search results. - */ -@Log4j2 -public class ScoreCombiner { - - private static final Float ZERO_SCORE = 0.0f; - - /** - * Performs score combination based on input combination technique. Mutates input object by updating combined scores - * @param queryTopDocs query results that need to be normalized, mutated by method execution - * @param combinationTechnique exact combination method that should be applied - * @return list of max combined scores for each shard - */ - public static List combineScores( - final List queryTopDocs, - final ScoreCombinationTechnique combinationTechnique - ) { - List maxScores = new ArrayList<>(); - for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { - if (Objects.isNull(compoundQueryTopDocs) || compoundQueryTopDocs.totalHits.value == 0) { - maxScores.add(ZERO_SCORE); - continue; - } - List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); - int shardId = compoundQueryTopDocs.scoreDocs[0].shardIndex; - Map normalizedScoresPerDoc = new HashMap<>(); - int maxHits = 0; - TotalHits.Relation totalHits = TotalHits.Relation.EQUAL_TO; - for (int j = 0; j < topDocsPerSubQuery.size(); j++) { - TopDocs topDocs = topDocsPerSubQuery.get(j); - int hits = 0; - for (ScoreDoc scoreDoc : topDocs.scoreDocs) { - normalizedScoresPerDoc.putIfAbsent(scoreDoc.doc, normalizedScoresPerDoc.computeIfAbsent(scoreDoc.doc, key -> { - float[] scores = new float[topDocsPerSubQuery.size()]; - // we initialize with -1.0, as after normalization it's possible that score is 0.0 - Arrays.fill(scores, -1.0f); - return scores; - })); - normalizedScoresPerDoc.get(scoreDoc.doc)[j] = scoreDoc.score; - hits++; - } - maxHits = Math.max(maxHits, hits); - } - if (topDocsPerSubQuery.stream() - .anyMatch(topDocs -> topDocs.totalHits.relation == TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)) { - totalHits = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; - } - Map combinedNormalizedScoresByDocId = normalizedScoresPerDoc.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> combinationTechnique.combine(entry.getValue()))); - // create priority queue, make it max heap by the score - PriorityQueue pq = new PriorityQueue<>( - (a, b) -> Float.compare(combinedNormalizedScoresByDocId.get(b), combinedNormalizedScoresByDocId.get(a)) - ); - // we're merging docs with normalized and combined scores. we need to have only maxHits results - pq.addAll(normalizedScoresPerDoc.keySet()); - - ScoreDoc[] finalScoreDocs = new ScoreDoc[maxHits]; - float maxScore = combinedNormalizedScoresByDocId.get(pq.peek()); - - for (int j = 0; j < maxHits; j++) { - int docId = pq.poll(); - finalScoreDocs[j] = new ScoreDoc(docId, combinedNormalizedScoresByDocId.get(docId), shardId); - } - compoundQueryTopDocs.scoreDocs = finalScoreDocs; - compoundQueryTopDocs.totalHits = new TotalHits(maxHits, totalHits); - maxScores.add(maxScore); - } - return maxScores; - } -} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationRequest.java b/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationRequest.java deleted file mode 100644 index b530e5dac..000000000 --- a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationRequest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.neuralsearch.processor; - -import lombok.Builder; -import lombok.Data; - -/** - * DTO for normalize method request - */ -@Data -@Builder -public class ScoreNormalizationRequest { - private final float score; - private final float minScore; - private final float maxScore; -} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationTechnique.java b/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationTechnique.java deleted file mode 100644 index 1f047c10c..000000000 --- a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizationTechnique.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.neuralsearch.processor; - -import com.google.common.primitives.Floats; - -/** - * Collection of techniques for score normalization - */ -public enum ScoreNormalizationTechnique { - - /** - * Min-max normalization method. - * nscore = (score - min_score)/(max_score - min_score) - */ - MIN_MAX { - @Override - float normalize(final ScoreNormalizationRequest request) { - // edge case when there is only one score and min and max scores are same - if (Floats.compare(request.getMaxScore(), request.getMinScore()) == 0 - && Floats.compare(request.getMaxScore(), request.getScore()) == 0) { - return SINGLE_RESULT_SCORE; - } - float normalizedScore = (request.getScore() - request.getMinScore()) / (request.getMaxScore() - request.getMinScore()); - return normalizedScore == 0.0f ? MIN_SCORE : normalizedScore; - } - }; - - public static final ScoreNormalizationTechnique DEFAULT = MIN_MAX; - - static final float MIN_SCORE = 0.001f; - static final float SINGLE_RESULT_SCORE = 1.0f; - - /** - * Defines normalization function specific to this technique - * @param request complex request DTO that defines parameters like min/max scores etc. - * @return normalized score - */ - abstract float normalize(final ScoreNormalizationRequest request); -} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizer.java b/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizer.java deleted file mode 100644 index 845ffff5c..000000000 --- a/src/main/java/org/opensearch/neuralsearch/processor/ScoreNormalizer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.neuralsearch.processor; - -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import lombok.extern.log4j.Log4j2; - -import org.apache.lucene.search.ScoreDoc; -import org.apache.lucene.search.TopDocs; -import org.opensearch.neuralsearch.search.CompoundTopDocs; - -/** - * Abstracts normalization of scores in query search results. - */ -@Log4j2 -public class ScoreNormalizer { - - /** - * Performs score normalization based on input normalization technique. Mutates input object by updating normalized scores. - * @param queryTopDocs original query results from multiple shards and multiple sub-queries - * @param normalizationTechnique exact normalization method that should be applied - */ - public static void normalizeScores(final List queryTopDocs, final ScoreNormalizationTechnique normalizationTechnique) { - Optional maybeCompoundQuery = queryTopDocs.stream() - .filter(Objects::nonNull) - .filter(topDocs -> topDocs.getCompoundTopDocs().size() > 0) - .findAny(); - if (maybeCompoundQuery.isEmpty()) { - return; - } - - // init scores per sub-query - float[][] minMaxScores = new float[maybeCompoundQuery.get().getCompoundTopDocs().size()][]; - for (int i = 0; i < minMaxScores.length; i++) { - minMaxScores[i] = new float[] { Float.MAX_VALUE, Float.MIN_VALUE }; - } - - for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { - if (compoundQueryTopDocs == null) { - continue; - } - List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); - for (int j = 0; j < topDocsPerSubQuery.size(); j++) { - TopDocs subQueryTopDoc = topDocsPerSubQuery.get(j); - // get min and max scores - for (ScoreDoc scoreDoc : subQueryTopDoc.scoreDocs) { - minMaxScores[j][0] = Math.min(minMaxScores[j][0], scoreDoc.score); - minMaxScores[j][1] = Math.max(minMaxScores[j][1], scoreDoc.score); - } - } - } - // do the normalization - for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { - if (compoundQueryTopDocs == null) { - continue; - } - List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); - for (int j = 0; j < topDocsPerSubQuery.size(); j++) { - TopDocs subQueryTopDoc = topDocsPerSubQuery.get(j); - for (ScoreDoc scoreDoc : subQueryTopDoc.scoreDocs) { - ScoreNormalizationRequest normalizationRequest = ScoreNormalizationRequest.builder() - .score(scoreDoc.score) - .minScore(minMaxScores[j][0]) - .maxScore(minMaxScores[j][1]) - .build(); - scoreDoc.score = normalizationTechnique.normalize(normalizationRequest); - } - } - } - } -} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/combination/ArithmeticMeanScoreCombinationMethod.java b/src/main/java/org/opensearch/neuralsearch/processor/combination/ArithmeticMeanScoreCombinationMethod.java new file mode 100644 index 000000000..74417e5a2 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/combination/ArithmeticMeanScoreCombinationMethod.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.combination; + +import java.util.Objects; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +/** + * Abstracts combination of scores based on arithmetic mean method + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ArithmeticMeanScoreCombinationMethod implements ScoreCombinationMethod { + + private static ArithmeticMeanScoreCombinationMethod INSTANCE = new ArithmeticMeanScoreCombinationMethod(); + + public static ArithmeticMeanScoreCombinationMethod getInstance() { + if (Objects.isNull(INSTANCE)) { + INSTANCE = new ArithmeticMeanScoreCombinationMethod(); + } + return INSTANCE; + } + + /** + * Arithmetic mean method for combining scores. + * cscore = (score1 + score2 +...+ scoreN)/N + * + * Zero (0.0) scores are excluded from number of scores N + */ + @Override + public float combine(final float[] scores) { + float combinedScore = 0.0f; + int count = 0; + for (float score : scores) { + if (score >= 0.0) { + combinedScore += score; + count++; + } + } + return combinedScore / count; + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationMethod.java b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationMethod.java new file mode 100644 index 000000000..39911cd51 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationMethod.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.combination; + +public interface ScoreCombinationMethod { + /** + * Defines combination function specific to this technique + * @param scores array of collected original scores + * @return combined score + */ + float combine(final float[] scores); +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationTechnique.java b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationTechnique.java new file mode 100644 index 000000000..b564d4c2a --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombinationTechnique.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.combination; + +import lombok.AllArgsConstructor; + +/** + * Collection of techniques for score combination + */ +@AllArgsConstructor +public enum ScoreCombinationTechnique { + + /** + * Arithmetic mean method for combining scores. + */ + ARITHMETIC_MEAN(ArithmeticMeanScoreCombinationMethod.getInstance()); + + public static final ScoreCombinationTechnique DEFAULT = ARITHMETIC_MEAN; + private final ScoreCombinationMethod method; + + public float combine(final float[] scores) { + return method.combine(scores); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombiner.java b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombiner.java new file mode 100644 index 000000000..d999f2535 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/combination/ScoreCombiner.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.combination; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +import lombok.extern.log4j.Log4j2; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.neuralsearch.search.CompoundTopDocs; + +/** + * Abstracts combination of scores in query search results. + */ +@Log4j2 +public class ScoreCombiner { + + private static final Float ZERO_SCORE = 0.0f; + + /** + * Performs score combination based on input combination technique. Mutates input object by updating combined scores + * Main steps we're doing for combination: + * - create map of normalized scores per doc id + * - using normalized scores create another map of combined scores per doc id + * - count max number of hits among sub-queries + * - sort documents by scores and take first "max number" of docs + * - update query search results with normalized scores + * Different score combination techniques are different in step 2, where we create map of "doc id" - "combined score", + * other steps are same for all techniques. + * @param queryTopDocs query results that need to be normalized, mutated by method execution + * @param scoreCombinationTechnique exact combination technique that should be applied + * @return list of max combined scores for each shard + */ + public List combineScores(final List queryTopDocs, final ScoreCombinationTechnique scoreCombinationTechnique) { + // iterate over results from each shard. Every CompoundTopDocs object has results from + // multiple sub queries, doc ids may repeat for each sub query results + return queryTopDocs.stream() + .map(compoundQueryTopDocs -> combineShardScores(scoreCombinationTechnique, compoundQueryTopDocs)) + .collect(Collectors.toList()); + } + + private float combineShardScores(ScoreCombinationTechnique scoreCombinationTechnique, CompoundTopDocs compoundQueryTopDocs) { + if (Objects.isNull(compoundQueryTopDocs) || compoundQueryTopDocs.totalHits.value == 0) { + return ZERO_SCORE; + } + List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); + // - create map of normalized scores results returned from the single shard + Map normalizedScoresPerDoc = getNormalizedScoresPerDocument(topDocsPerSubQuery); + + // - create map of combined scores per doc id + Map combinedNormalizedScoresByDocId = combineScoresAndGetCombinedNormilizedScoresPerDocument( + normalizedScoresPerDoc, + scoreCombinationTechnique + ); + + // - sort documents by scores and take first "max number" of docs + // create a priority queue of doc ids that are sorted by their combined scores + PriorityQueue scoreQueue = getPriorityQueueOfDocIds(normalizedScoresPerDoc, combinedNormalizedScoresByDocId); + // store max score to resulting list, call it now as priority queue will change after combining scores + float maxScore = combinedNormalizedScoresByDocId.get(scoreQueue.peek()); + + // - update query search results with normalized scores + updateQueryTopDocsWithCombinedScores(compoundQueryTopDocs, topDocsPerSubQuery, combinedNormalizedScoresByDocId, scoreQueue); + return maxScore; + } + + private PriorityQueue getPriorityQueueOfDocIds( + Map normalizedScoresPerDoc, + Map combinedNormalizedScoresByDocId + ) { + PriorityQueue pq = new PriorityQueue<>( + (a, b) -> Float.compare(combinedNormalizedScoresByDocId.get(b), combinedNormalizedScoresByDocId.get(a)) + ); + // we're merging docs with normalized and combined scores. we need to have only maxHits results + pq.addAll(normalizedScoresPerDoc.keySet()); + return pq; + } + + private ScoreDoc[] getCombinedScoreDocs( + final CompoundTopDocs compoundQueryTopDocs, + final Map combinedNormalizedScoresByDocId, + final PriorityQueue scoreQueue, + final int maxHits + ) { + ScoreDoc[] finalScoreDocs = new ScoreDoc[maxHits]; + + int shardId = compoundQueryTopDocs.scoreDocs[0].shardIndex; + for (int j = 0; j < maxHits; j++) { + int docId = scoreQueue.poll(); + finalScoreDocs[j] = new ScoreDoc(docId, combinedNormalizedScoresByDocId.get(docId), shardId); + } + return finalScoreDocs; + } + + private Map getNormalizedScoresPerDocument(List topDocsPerSubQuery) { + Map normalizedScoresPerDoc = new HashMap<>(); + for (int j = 0; j < topDocsPerSubQuery.size(); j++) { + TopDocs topDocs = topDocsPerSubQuery.get(j); + for (ScoreDoc scoreDoc : topDocs.scoreDocs) { + normalizedScoresPerDoc.putIfAbsent(scoreDoc.doc, normalizedScoresPerDoc.computeIfAbsent(scoreDoc.doc, key -> { + float[] scores = new float[topDocsPerSubQuery.size()]; + // we initialize with -1.0, as after normalization it's possible that score is 0.0 + Arrays.fill(scores, -1.0f); + return scores; + })); + normalizedScoresPerDoc.get(scoreDoc.doc)[j] = scoreDoc.score; + } + } + return normalizedScoresPerDoc; + } + + private Map combineScoresAndGetCombinedNormilizedScoresPerDocument( + Map normalizedScoresPerDocument, + final ScoreCombinationTechnique scoreCombinationTechnique + ) { + return normalizedScoresPerDocument.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> scoreCombinationTechnique.combine(entry.getValue()))); + } + + private void updateQueryTopDocsWithCombinedScores( + CompoundTopDocs compoundQueryTopDocs, + List topDocsPerSubQuery, + Map combinedNormalizedScoresByDocId, + PriorityQueue scoreQueue + ) { + // - count max number of hits among sub-queries + int maxHits = getMaxHits(topDocsPerSubQuery); + // - update query search results with normalized scores + compoundQueryTopDocs.scoreDocs = getCombinedScoreDocs(compoundQueryTopDocs, combinedNormalizedScoresByDocId, scoreQueue, maxHits); + compoundQueryTopDocs.totalHits = getTotalHits(topDocsPerSubQuery, maxHits); + } + + private int getMaxHits(List topDocsPerSubQuery) { + int maxHits = 0; + for (TopDocs topDocs : topDocsPerSubQuery) { + int hits = topDocs.scoreDocs.length; + maxHits = Math.max(maxHits, hits); + } + return maxHits; + } + + private TotalHits getTotalHits(List topDocsPerSubQuery, int maxHits) { + TotalHits.Relation totalHits = TotalHits.Relation.EQUAL_TO; + if (topDocsPerSubQuery.stream().anyMatch(topDocs -> topDocs.totalHits.relation == TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)) { + totalHits = TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO; + } + return new TotalHits(maxHits, totalHits); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactory.java b/src/main/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactory.java index 8f486f2b3..9fffb06f8 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactory.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactory.java @@ -13,8 +13,9 @@ import org.apache.commons.lang3.EnumUtils; import org.apache.commons.lang3.StringUtils; import org.opensearch.neuralsearch.processor.NormalizationProcessor; -import org.opensearch.neuralsearch.processor.ScoreCombinationTechnique; -import org.opensearch.neuralsearch.processor.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; @@ -58,7 +59,8 @@ public SearchPhaseResultsProcessor create( tag, description, ScoreNormalizationTechnique.valueOf(normalizationTechnique), - ScoreCombinationTechnique.valueOf(combinationTechnique) + ScoreCombinationTechnique.valueOf(combinationTechnique), + NormalizationProcessorWorkflow.create() ); } diff --git a/src/main/java/org/opensearch/neuralsearch/processor/normalization/MinMaxScoreNormalizationMethod.java b/src/main/java/org/opensearch/neuralsearch/processor/normalization/MinMaxScoreNormalizationMethod.java new file mode 100644 index 000000000..7006eb2fb --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/normalization/MinMaxScoreNormalizationMethod.java @@ -0,0 +1,123 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.normalization; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.opensearch.neuralsearch.search.CompoundTopDocs; + +import com.google.common.primitives.Floats; + +/** + * Abstracts normalization of scores based on min-max method + */ +public class MinMaxScoreNormalizationMethod implements ScoreNormalizationMethod { + static final float MIN_SCORE = 0.001f; + static final float SINGLE_RESULT_SCORE = 1.0f; + + private static MinMaxScoreNormalizationMethod INSTANCE = new MinMaxScoreNormalizationMethod(); + + public static MinMaxScoreNormalizationMethod getInstance() { + if (Objects.isNull(INSTANCE)) { + INSTANCE = new MinMaxScoreNormalizationMethod(); + } + return INSTANCE; + } + + /** + * Min-max normalization method. + * nscore = (score - min_score)/(max_score - min_score) + * Main algorithm steps: + * - calculate min and max scores for each sub query + * - iterate over each result and update score as per formula above where "score" is raw score returned by Hybrid query + */ + @Override + public void normalize(final List queryTopDocs) { + Optional optionalCompoundTopDocs = queryTopDocs.stream() + .filter(Objects::nonNull) + .filter(topDocs -> topDocs.getCompoundTopDocs().size() > 0) + .findAny(); + if (optionalCompoundTopDocs.isEmpty()) { + return; + } + int numOfSubqueries = optionalCompoundTopDocs.get().getCompoundTopDocs().size(); + // get min scores for each sub query + float[] minScoresPerSubquery = getMinScores(queryTopDocs, numOfSubqueries); + + // get max scores for each sub query + float[] maxScoresPerSubquery = getMaxScores(queryTopDocs, numOfSubqueries); + + // do normalization using actual score and min and max scores for corresponding sub query + for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { + if (Objects.isNull(compoundQueryTopDocs)) { + continue; + } + List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); + for (int j = 0; j < topDocsPerSubQuery.size(); j++) { + TopDocs subQueryTopDoc = topDocsPerSubQuery.get(j); + for (ScoreDoc scoreDoc : subQueryTopDoc.scoreDocs) { + scoreDoc.score = normalizeSingleScore(scoreDoc.score, minScoresPerSubquery[j], maxScoresPerSubquery[j]); + } + } + } + } + + private float[] getMaxScores(List queryTopDocs, int numOfSubqueries) { + float[] maxScores = new float[numOfSubqueries]; + Arrays.fill(maxScores, Float.MIN_VALUE); + for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { + if (Objects.isNull(compoundQueryTopDocs)) { + continue; + } + List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); + for (int j = 0; j < topDocsPerSubQuery.size(); j++) { + maxScores[j] = Math.max( + maxScores[j], + Arrays.stream(topDocsPerSubQuery.get(j).scoreDocs) + .map(scoreDoc -> scoreDoc.score) + .max(Float::compare) + .orElse(Float.MIN_VALUE) + ); + } + } + return maxScores; + } + + private float[] getMinScores(List queryTopDocs, int numOfScores) { + float[] minScores = new float[numOfScores]; + Arrays.fill(minScores, Float.MAX_VALUE); + for (CompoundTopDocs compoundQueryTopDocs : queryTopDocs) { + if (Objects.isNull(compoundQueryTopDocs)) { + continue; + } + List topDocsPerSubQuery = compoundQueryTopDocs.getCompoundTopDocs(); + for (int j = 0; j < topDocsPerSubQuery.size(); j++) { + minScores[j] = Math.min( + minScores[j], + Arrays.stream(topDocsPerSubQuery.get(j).scoreDocs) + .map(scoreDoc -> scoreDoc.score) + .min(Float::compare) + .orElse(Float.MAX_VALUE) + ); + } + } + return minScores; + } + + private float normalizeSingleScore(float score, float minScore, float maxScore) { + // edge case when there is only one score and min and max scores are same + if (Floats.compare(maxScore, minScore) == 0 && Floats.compare(maxScore, score) == 0) { + return SINGLE_RESULT_SCORE; + } + float normalizedScore = (score - minScore) / (maxScore - minScore); + return normalizedScore == 0.0f ? MIN_SCORE : normalizedScore; + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationMethod.java b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationMethod.java new file mode 100644 index 000000000..61165ed12 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationMethod.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.normalization; + +import java.util.List; + +import org.opensearch.neuralsearch.search.CompoundTopDocs; + +/** + * Abstracts normalization of scores in query search results. + */ +public interface ScoreNormalizationMethod { + + /** + * Performs score normalization based on input normalization technique. Mutates input object by updating normalized scores. + * @param queryTopDocs original query results from multiple shards and multiple sub-queries + */ + void normalize(final List queryTopDocs); +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationTechnique.java b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationTechnique.java new file mode 100644 index 000000000..317d830f8 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizationTechnique.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.normalization; + +import java.util.List; + +import lombok.AllArgsConstructor; + +import org.opensearch.neuralsearch.search.CompoundTopDocs; + +/** + * Collection of techniques for score normalization + */ +@AllArgsConstructor +public enum ScoreNormalizationTechnique { + + /** + * Min-max normalization method. + * nscore = (score - min_score)/(max_score - min_score) + */ + MIN_MAX(MinMaxScoreNormalizationMethod.getInstance()); + + public static final ScoreNormalizationTechnique DEFAULT = MIN_MAX; + private final ScoreNormalizationMethod method; + + public void normalize(final List queryTopDocs) { + method.normalize(queryTopDocs); + } +} diff --git a/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizer.java b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizer.java new file mode 100644 index 000000000..acdef8d46 --- /dev/null +++ b/src/main/java/org/opensearch/neuralsearch/processor/normalization/ScoreNormalizer.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor.normalization; + +import java.util.List; +import java.util.Objects; + +import org.opensearch.neuralsearch.search.CompoundTopDocs; + +public class ScoreNormalizer { + + /** + * Performs score normalization based on input normalization technique. Mutates input object by updating normalized scores. + * @param queryTopDocs original query results from multiple shards and multiple sub-queries + * @param scoreNormalizationTechnique exact normalization technique that should be applied + */ + public void normalizeScores(final List queryTopDocs, final ScoreNormalizationTechnique scoreNormalizationTechnique) { + if (canQueryResultsBeNormalized(queryTopDocs)) { + scoreNormalizationTechnique.normalize(queryTopDocs); + } + } + + private boolean canQueryResultsBeNormalized(List queryTopDocs) { + return queryTopDocs.stream().filter(Objects::nonNull).anyMatch(topDocs -> topDocs.getCompoundTopDocs().size() > 0); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/common/BaseNeuralSearchIT.java b/src/test/java/org/opensearch/neuralsearch/common/BaseNeuralSearchIT.java index c6a3d722d..4bdcc8313 100644 --- a/src/test/java/org/opensearch/neuralsearch/common/BaseNeuralSearchIT.java +++ b/src/test/java/org/opensearch/neuralsearch/common/BaseNeuralSearchIT.java @@ -46,8 +46,8 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.knn.index.SpaceType; import org.opensearch.neuralsearch.OpenSearchSecureRestTestCase; -import org.opensearch.neuralsearch.processor.ScoreCombinationTechnique; -import org.opensearch.neuralsearch.processor.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; import com.google.common.collect.ImmutableList; diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorTests.java index 509ebf592..340e2b970 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorTests.java @@ -7,11 +7,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -27,20 +25,19 @@ import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchPhaseName; -import org.opensearch.action.search.SearchPhaseResults; import org.opensearch.action.search.SearchProgressListener; import org.opensearch.action.search.SearchRequest; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.breaker.NoopCircuitBreaker; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.BigArrays; -import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; import org.opensearch.neuralsearch.search.CompoundTopDocs; import org.opensearch.search.DocValueFormat; -import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; @@ -94,44 +91,14 @@ public void cleanup() { terminate(threadPool); } - public void testSearchResultTypes_whenNotCompoundDocsOrEmptyResults_thenNoProcessing() { - NormalizationProcessor normalizationProcessor = spy( - new NormalizationProcessor( - PROCESSOR_TAG, - DESCRIPTION, - ScoreNormalizationTechnique.MIN_MAX, - ScoreCombinationTechnique.ARITHMETIC_MEAN - ) - ); - - assertEquals(SearchPhaseName.FETCH, normalizationProcessor.getAfterPhase()); - assertEquals(SearchPhaseName.QUERY, normalizationProcessor.getBeforePhase()); - assertEquals(DESCRIPTION, normalizationProcessor.getDescription()); - assertEquals(PROCESSOR_TAG, normalizationProcessor.getTag()); - assertEquals(true, normalizationProcessor.isIgnoreFailure()); - assertEquals("normalization-processor", normalizationProcessor.getType()); - - SearchPhaseResults searchPhaseResults = mock(SearchPhaseResults.class); - SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); - normalizationProcessor.process(searchPhaseResults, searchPhaseContext); - - verify(normalizationProcessor, never()).updateOriginalQueryResults(any(), any(), any(), any()); - - AtomicArray resultAtomicArray = new AtomicArray<>(1); - when(searchPhaseResults.getAtomicArray()).thenReturn(resultAtomicArray); - normalizationProcessor.process(searchPhaseResults, searchPhaseContext); - - verify(normalizationProcessor, never()).updateOriginalQueryResults(any(), any(), any(), any()); - } - public void testSearchResultTypes_whenCompoundDocs_thenDoNormalizationCombination() { - NormalizationProcessor normalizationProcessor = spy( - new NormalizationProcessor( - PROCESSOR_TAG, - DESCRIPTION, - ScoreNormalizationTechnique.MIN_MAX, - ScoreCombinationTechnique.ARITHMETIC_MEAN - ) + NormalizationProcessorWorkflow normalizationProcessorWorkflow = spy(NormalizationProcessorWorkflow.create()); + NormalizationProcessor normalizationProcessor = new NormalizationProcessor( + PROCESSOR_TAG, + DESCRIPTION, + ScoreNormalizationTechnique.MIN_MAX, + ScoreCombinationTechnique.ARITHMETIC_MEAN, + normalizationProcessorWorkflow ); assertEquals(SearchPhaseName.FETCH, normalizationProcessor.getAfterPhase()); @@ -181,6 +148,6 @@ public void testSearchResultTypes_whenCompoundDocs_thenDoNormalizationCombinatio SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); normalizationProcessor.process(queryPhaseResultConsumer, searchPhaseContext); - verify(normalizationProcessor, times(1)).updateOriginalQueryResults(any(), any(), any(), any()); + verify(normalizationProcessorWorkflow, times(1)).updateOriginalQueryResults(any(), any(), any()); } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflowTests.java b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflowTests.java new file mode 100644 index 000000000..0741b3a31 --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflowTests.java @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.neuralsearch.processor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.opensearch.action.OriginalIndices; +import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.search.CompoundTopDocs; +import org.opensearch.search.DocValueFormat; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.OpenSearchTestCase; + +public class NormalizationProcessorWorkflowTests extends OpenSearchTestCase { + + public void testSearchResultTypes_whenCompoundDocs_thenDoNormalizationCombination() { + NormalizationProcessorWorkflow normalizationProcessorWorkflow = spy(NormalizationProcessorWorkflow.create()); + + List querySearchResults = new ArrayList<>(); + for (int shardId = 0; shardId < 4; shardId++) { + SearchShardTarget searchShardTarget = new SearchShardTarget( + "node", + new ShardId("index", "uuid", shardId), + null, + OriginalIndices.NONE + ); + QuerySearchResult querySearchResult = new QuerySearchResult(); + CompoundTopDocs topDocs = new CompoundTopDocs( + new TotalHits(4, TotalHits.Relation.EQUAL_TO), + List.of( + new TopDocs( + new TotalHits(4, TotalHits.Relation.EQUAL_TO), + new ScoreDoc[] { new ScoreDoc(0, 0.5f), new ScoreDoc(2, 0.3f), new ScoreDoc(4, 0.25f), new ScoreDoc(10, 0.2f) } + ) + ) + ); + querySearchResult.topDocs(new TopDocsAndMaxScore(topDocs, 0.5f), new DocValueFormat[0]); + querySearchResult.setSearchShardTarget(searchShardTarget); + querySearchResult.setShardIndex(shardId); + querySearchResults.add(querySearchResult); + } + + normalizationProcessorWorkflow.execute(querySearchResults, ScoreNormalizationTechnique.DEFAULT, ScoreCombinationTechnique.DEFAULT); + + verify(normalizationProcessorWorkflow, times(1)).updateOriginalQueryResults(any(), any(), any()); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinerTests.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationMethodTests.java similarity index 92% rename from src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinerTests.java rename to src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationMethodTests.java index 513ed04b7..ee692454f 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinerTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationMethodTests.java @@ -10,14 +10,16 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.combination.ScoreCombiner; import org.opensearch.neuralsearch.search.CompoundTopDocs; import org.opensearch.test.OpenSearchTestCase; -public class ScoreCombinerTests extends OpenSearchTestCase { +public class ScoreCombinationMethodTests extends OpenSearchTestCase { public void testEmptyResults_whenEmptyResultsAndDefaultMethod_thenNoProcessing() { ScoreCombiner scoreCombiner = new ScoreCombiner(); - List maxScores = scoreCombiner.combineScores(List.of(), ScoreCombinationTechnique.DEFAULT); + List maxScores = scoreCombiner.combineScores(List.of(), ScoreCombinationTechnique.ARITHMETIC_MEAN); assertNotNull(maxScores); assertEquals(0, maxScores.size()); } @@ -58,7 +60,7 @@ public void testCombination_whenMultipleSubqueriesResultsAndDefaultMethod_thenSc ) ); - List combinedMaxScores = scoreCombiner.combineScores(queryTopDocs, ScoreCombinationTechnique.DEFAULT); + List combinedMaxScores = scoreCombiner.combineScores(queryTopDocs, ScoreCombinationTechnique.ARITHMETIC_MEAN); assertNotNull(queryTopDocs); assertEquals(3, queryTopDocs.size()); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizerTests.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationMethodTests.java similarity index 91% rename from src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizerTests.java rename to src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationMethodTests.java index 78674c3d2..61be2270e 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizerTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationMethodTests.java @@ -13,26 +13,28 @@ import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer; import org.opensearch.neuralsearch.search.CompoundTopDocs; import org.opensearch.test.OpenSearchTestCase; -public class ScoreNormalizerTests extends OpenSearchTestCase { +public class ScoreNormalizationMethodTests extends OpenSearchTestCase { public void testEmptyResults_whenEmptyResultsAndDefaultMethod_thenNoProcessing() { - ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); - scoreNormalizer.normalizeScores(List.of(), ScoreNormalizationTechnique.DEFAULT); + ScoreNormalizer scoreNormalizationMethod = new ScoreNormalizer(); + scoreNormalizationMethod.normalizeScores(List.of(), ScoreNormalizationTechnique.DEFAULT); } @SneakyThrows public void testNormalization_whenOneSubqueryAndOneShardAndDefaultMethod_thenScoreNormalized() { - ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); + ScoreNormalizer scoreNormalizationMethod = new ScoreNormalizer(); final List queryTopDocs = List.of( new CompoundTopDocs( new TotalHits(1, TotalHits.Relation.EQUAL_TO), List.of(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(1, 2.0f) })) ) ); - scoreNormalizer.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); + scoreNormalizationMethod.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); assertNotNull(queryTopDocs); assertEquals(1, queryTopDocs.size()); CompoundTopDocs resultDoc = queryTopDocs.get(0); @@ -50,7 +52,7 @@ public void testNormalization_whenOneSubqueryAndOneShardAndDefaultMethod_thenSco @SneakyThrows public void testNormalization_whenOneSubqueryMultipleHitsAndOneShardAndDefaultMethod_thenScoreNormalized() { - ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); + ScoreNormalizer scoreNormalizationMethod = new ScoreNormalizer(); final List queryTopDocs = List.of( new CompoundTopDocs( new TotalHits(3, TotalHits.Relation.EQUAL_TO), @@ -62,7 +64,7 @@ public void testNormalization_whenOneSubqueryMultipleHitsAndOneShardAndDefaultMe ) ) ); - scoreNormalizer.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); + scoreNormalizationMethod.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); assertNotNull(queryTopDocs); assertEquals(1, queryTopDocs.size()); CompoundTopDocs resultDoc = queryTopDocs.get(0); @@ -82,7 +84,7 @@ public void testNormalization_whenOneSubqueryMultipleHitsAndOneShardAndDefaultMe } public void testNormalization_whenMultipleSubqueriesMultipleHitsAndOneShardAndDefaultMethod_thenScoreNormalized() { - ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); + ScoreNormalizer scoreNormalizationMethod = new ScoreNormalizer(); final List queryTopDocs = List.of( new CompoundTopDocs( new TotalHits(3, TotalHits.Relation.EQUAL_TO), @@ -98,7 +100,7 @@ public void testNormalization_whenMultipleSubqueriesMultipleHitsAndOneShardAndDe ) ) ); - scoreNormalizer.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); + scoreNormalizationMethod.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); assertNotNull(queryTopDocs); assertEquals(1, queryTopDocs.size()); @@ -130,7 +132,7 @@ public void testNormalization_whenMultipleSubqueriesMultipleHitsAndOneShardAndDe } public void testNormalization_whenMultipleSubqueriesMultipleHitsMultipleShardsAndDefaultMethod_thenScoreNormalized() { - ScoreNormalizer scoreNormalizer = new ScoreNormalizer(); + ScoreNormalizer scoreNormalizationMethod = new ScoreNormalizer(); final List queryTopDocs = List.of( new CompoundTopDocs( new TotalHits(3, TotalHits.Relation.EQUAL_TO), @@ -163,7 +165,7 @@ public void testNormalization_whenMultipleSubqueriesMultipleHitsMultipleShardsAn ) ) ); - scoreNormalizer.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); + scoreNormalizationMethod.normalizeScores(queryTopDocs, ScoreNormalizationTechnique.DEFAULT); assertNotNull(queryTopDocs); assertEquals(3, queryTopDocs.size()); // shard one diff --git a/src/test/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactoryTests.java b/src/test/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactoryTests.java index 7304557f6..158bcd517 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactoryTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/factory/NormalizationProcessorFactoryTests.java @@ -13,8 +13,8 @@ import lombok.SneakyThrows; import org.opensearch.neuralsearch.processor.NormalizationProcessor; -import org.opensearch.neuralsearch.processor.ScoreCombinationTechnique; -import org.opensearch.neuralsearch.processor.ScoreNormalizationTechnique; +import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; +import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; import org.opensearch.test.OpenSearchTestCase;