From 0f2d297a139b80eef8b83e5cdce55fa60663314a Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 8 Nov 2024 16:45:36 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../PeerForwardingProcessorDecorator.java | 1 - .../PeerForwardingProcessingDecoratorTest.java | 16 ++++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java index 2ee6035d9b..3f80813cf0 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java @@ -102,7 +102,6 @@ public Collection> execute(final Collection> records for (Record record: records) { if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) { if (isPeerForwardingDisabled()) { - System.out.println("=====PROCESSING LOCALLY===="); recordsToProcessLocally.add(record); } else { recordsToProcess.add(record); diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java index 01bd2fade5..8d9ac072a2 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -173,9 +173,11 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific @Test void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdentificationKeys() { - List processorList = new ArrayList<>(); - processorList.add((Processor) requiresPeerForwarding); - processorList.add((Processor) requiresPeerForwardingCopy); + List objectsUnderTest = new ArrayList<>(); + Processor innerProcessor1 = (Processor)requiresPeerForwarding; + Processor innerProcessor2 = (Processor)requiresPeerForwardingCopy; + objectsUnderTest.add(innerProcessor1); + objectsUnderTest.add(innerProcessor2); LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class); when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); @@ -185,16 +187,14 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdenti when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(true); - Processor processor1 = (Processor)requiresPeerForwarding; - Processor processor2 = (Processor)requiresPeerForwardingCopy; - when(processor1.execute(testData)).thenReturn(testData); - when(processor2.execute(testData)).thenReturn(testData); + when(innerProcessor1.execute(testData)).thenReturn(testData); + when(innerProcessor2.execute(testData)).thenReturn(testData); when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys); - final List processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(processorList, Set.of(identificationKeys)); + final List processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(objectsUnderTest, Set.of(identificationKeys)); assertThat(processors.size(), equalTo(2)); for (final Processor processor: processors) { assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());