From 6f114976311e6372d165bd6b92218bb1d2b05ddc Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 25 Jun 2024 10:07:40 -0500 Subject: [PATCH] Fixes the loading of peer-forwarders when using multiple workers. This fixes a bug where the service_map processor would not load in a pipeline with multiple workers. Resolves #4660. (#4661) Signed-off-by: David Venable Signed-off-by: Krishna Kondaka --- .../PeerForwardingProcessorDecorator.java | 9 +-- ...PeerForwardingProcessingDecoratorTest.java | 66 ++++++++++--------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java index 097b2b6552..038bdb28c5 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessorDecorator.java @@ -67,13 +67,10 @@ public static List decorateProcessors( "Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId); } + final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads); - return processors.stream() - .map(processor -> { - PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads); - return new PeerForwardingProcessorDecorator(peerForwarder, processor); - }) - .collect(Collectors.toList()); + return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor)) + .collect(Collectors.toList()); } private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java index ceb424b0cb..7a85033842 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -39,6 +39,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -51,18 +52,6 @@ class PeerForwardingProcessingDecoratorTest { @Mock private Processor processor; - @Mock - private Processor processor1; - - @Mock - private Processor processor2; - - @Mock(extraInterfaces = Processor.class) - private RequiresPeerForwarding requiresPeerForwarding1; - - @Mock(extraInterfaces = Processor.class) - private RequiresPeerForwarding requiresPeerForwarding2; - @Mock(extraInterfaces = Processor.class) private RequiresPeerForwarding requiresPeerForwarding; @@ -82,13 +71,13 @@ record = mock(Record.class); pluginId = UUID.randomUUID().toString(); } - private List createObjectUnderTesDecoratedProcessors(final List processors) { + private List createObjectUnderTestDecoratedProcessors(final List processors) { return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, PIPELINE_WORKER_THREADS); } @Test void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_not_an_instance_of_RequiresPeerForwarding() { - assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor))); + assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor))); verifyNoInteractions(peerForwarderProvider); } @@ -97,7 +86,7 @@ void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_n void PeerForwardingProcessingDecorator_execute_with_empty_identification_keys_should_throw() { when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Collections.emptySet()); - assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding))); + assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding))); } @Test @@ -109,12 +98,12 @@ void decorateProcessors_with_different_identification_key_should_throw() { when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString())); when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString())); - assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); + assertThrows(RuntimeException.class, () -> createObjectUnderTestDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); } @Test void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList()); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.emptyList()); assertThat(processors.size(), equalTo(0)); } @@ -136,9 +125,22 @@ void setUp() { @Test void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys() { - createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); + verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); + verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); + } + + @Test + void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys_when_list_of_processors() { + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); + when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys); + + createObjectUnderTestDecoratedProcessors(List.of((Processor) requiresPeerForwarding, (Processor) requiresPeerForwardingCopy)); + verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); } @Test @@ -148,16 +150,15 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() { processorList.add((Processor) requiresPeerForwardingCopy); LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class); - lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); - lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwardingCopy, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); + when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder); Event event = mock(Event.class); when(record.getData()).thenReturn(event); List> testData = Collections.singletonList(record); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false); when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(false); - processor1 = (Processor)requiresPeerForwarding; - processor2 = (Processor)requiresPeerForwardingCopy; + Processor processor1 = (Processor)requiresPeerForwarding; + Processor processor2 = (Processor)requiresPeerForwardingCopy; when(processor1.execute(testData)).thenReturn(testData); when(processor2.execute(testData)).thenReturn(testData); @@ -167,9 +168,10 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() { when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); when(requiresPeerForwardingCopy.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(processorList); + final List processors = createObjectUnderTestDecoratedProcessors(processorList); assertThat(processors.size(), equalTo(2)); verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); + verifyNoMoreInteractions(peerForwarderProvider); Collection> result = processors.get(0).execute(testData); assertThat(result.size(), equalTo(testData.size())); assertThat(result, equalTo(testData)); @@ -189,7 +191,7 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc when(processor.execute(testData)).thenReturn(testData); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); final Collection> records = processors.get(0).execute(testData); @@ -215,7 +217,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { when(((Processor) requiresPeerForwarding).execute(anyCollection())).thenReturn(expectedRecordsToProcessLocally); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)); assertThat(processors.size(), equalTo(1)); final Collection> records = processors.get(0).execute(forwardTestData); @@ -232,7 +234,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut Event event = mock(Event.class); when(record.getData()).thenReturn(event); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); assertThat(processors.size(), equalTo(1)); @@ -248,7 +250,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); Collection> testData = Collections.singletonList(record); assertThat(processors.size(), equalTo(1)); @@ -272,7 +274,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(false); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); when(record1.getData()).thenReturn(event1); when(record2.getData()).thenReturn(event2); when(record3.getData()).thenReturn(event3); @@ -303,7 +305,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false); when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true); when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(false); - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); when(record1.getData()).thenReturn(event1); when(record2.getData()).thenReturn(event2); when(record3.getData()).thenReturn(event3); @@ -322,7 +324,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_ @Test void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).prepareForShutdown(); @@ -331,7 +333,7 @@ void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_proces @Test void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_processors_isReadyForShutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).isReadyForShutdown(); @@ -340,7 +342,7 @@ void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_proces @Test void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutdown() { - final List processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)); + final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)); assertThat(processors.size(), equalTo(1)); processors.get(0).shutdown();