Skip to content

Commit

Permalink
Fixes the loading of peer-forwarders when using multiple workers. Thi…
Browse files Browse the repository at this point in the history
…s fixes a bug where the service_map processor would not load in a pipeline with multiple workers. Resolves opensearch-project#4660. (opensearch-project#4661)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
dlvenable authored and Krishna Kondaka committed Jul 23, 2024
1 parent 78fd03c commit 6f11497
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ public static List<Processor> decorateProcessors(
"Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream()
.map(processor -> {
PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, processor, pluginId, identificationKeys, pipelineWorkerThreads);
return new PeerForwardingProcessorDecorator(peerForwarder, processor);
})
.collect(Collectors.toList());
return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -51,18 +52,6 @@ class PeerForwardingProcessingDecoratorTest {
@Mock
private Processor processor;

@Mock
private Processor processor1;

@Mock
private Processor processor2;

@Mock(extraInterfaces = Processor.class)
private RequiresPeerForwarding requiresPeerForwarding1;

@Mock(extraInterfaces = Processor.class)
private RequiresPeerForwarding requiresPeerForwarding2;

@Mock(extraInterfaces = Processor.class)
private RequiresPeerForwarding requiresPeerForwarding;

Expand All @@ -82,13 +71,13 @@ record = mock(Record.class);
pluginId = UUID.randomUUID().toString();
}

private List<Processor> createObjectUnderTesDecoratedProcessors(final List<Processor> processors) {
private List<Processor> createObjectUnderTestDecoratedProcessors(final List<Processor> processors) {
return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, PIPELINE_WORKER_THREADS);
}

@Test
void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_not_an_instance_of_RequiresPeerForwarding() {
assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor)));
assertThrows(UnsupportedPeerForwarderPluginException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor)));

verifyNoInteractions(peerForwarderProvider);
}
Expand All @@ -97,7 +86,7 @@ void PeerForwardingProcessingDecorator_should_not_have_any_interactions_if_its_n
void PeerForwardingProcessingDecorator_execute_with_empty_identification_keys_should_throw() {
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Collections.emptySet());

assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)));
assertThrows(EmptyPeerForwarderPluginIdentificationKeysException.class, () -> createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)));
}

@Test
Expand All @@ -109,12 +98,12 @@ void decorateProcessors_with_different_identification_key_should_throw() {
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString()));
when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(Set.of(UUID.randomUUID().toString()));

assertThrows(RuntimeException.class, () -> createObjectUnderTesDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy)));
assertThrows(RuntimeException.class, () -> createObjectUnderTestDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy)));
}

@Test
void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.emptyList());
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.emptyList());
assertThat(processors.size(), equalTo(0));
}

Expand All @@ -136,9 +125,22 @@ void setUp() {

@Test
void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys() {
createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
verify(requiresPeerForwarding, times(2)).getIdentificationKeys();
verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
}

@Test
void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentificationKeys_when_list_of_processors() {
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys);

createObjectUnderTestDecoratedProcessors(List.of((Processor) requiresPeerForwarding, (Processor) requiresPeerForwardingCopy));

verify(requiresPeerForwarding, times(2)).getIdentificationKeys();
verify(peerForwarderProvider).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
}

@Test
Expand All @@ -148,16 +150,15 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() {
processorList.add((Processor) requiresPeerForwardingCopy);

LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class);
lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
lenient().when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwardingCopy, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
List<Record<Event>> testData = Collections.singletonList(record);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false);
when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(false);

processor1 = (Processor)requiresPeerForwarding;
processor2 = (Processor)requiresPeerForwardingCopy;
Processor processor1 = (Processor)requiresPeerForwarding;
Processor processor2 = (Processor)requiresPeerForwardingCopy;
when(processor1.execute(testData)).thenReturn(testData);
when(processor2.execute(testData)).thenReturn(testData);

Expand All @@ -167,9 +168,10 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() {
when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true);
when(requiresPeerForwardingCopy.isForLocalProcessingOnly(any())).thenReturn(true);

final List<Processor> processors = createObjectUnderTesDecoratedProcessors(processorList);
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(processorList);
assertThat(processors.size(), equalTo(2));
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
Expand All @@ -189,7 +191,7 @@ void PeerForwardingProcessingDecorator_execute_should_forwardRecords_with_correc

when(processor.execute(testData)).thenReturn(testData);

final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
assertThat(processors.size(), equalTo(1));
final Collection<Record<Event>> records = processors.get(0).execute(testData);

Expand All @@ -215,7 +217,7 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() {

when(((Processor) requiresPeerForwarding).execute(anyCollection())).thenReturn(expectedRecordsToProcessLocally);

final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding));
assertThat(processors.size(), equalTo(1));
final Collection<Record<Event>> records = processors.get(0).execute(forwardTestData);

Expand All @@ -232,7 +234,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
Collection<Record<Event>> testData = Collections.singletonList(record);

assertThat(processors.size(), equalTo(1));
Expand All @@ -248,7 +250,7 @@ void PeerForwardingProcessingDecorator_execute_will_call_inner_processors_execut
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(false);
when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true);

final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
Collection<Record<Event>> testData = Collections.singletonList(record);

assertThat(processors.size(), equalTo(1));
Expand All @@ -272,7 +274,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(false);
when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(true);
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
when(record1.getData()).thenReturn(event1);
when(record2.getData()).thenReturn(event2);
when(record3.getData()).thenReturn(event3);
Expand Down Expand Up @@ -303,7 +305,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event2)).thenReturn(false);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event3)).thenReturn(true);
when(requiresPeerForwarding.isForLocalProcessingOnly(any())).thenReturn(false);
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));
when(record1.getData()).thenReturn(event1);
when(record2.getData()).thenReturn(event2);
when(record3.getData()).thenReturn(event3);
Expand All @@ -322,7 +324,7 @@ void PeerForwardingProcessingDecorator_inner_processor_with_is_applicable_event_

@Test
void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_processors_prepareForShutdown() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));

assertThat(processors.size(), equalTo(1));
processors.get(0).prepareForShutdown();
Expand All @@ -331,7 +333,7 @@ void PeerForwardingProcessingDecorator_prepareForShutdown_will_call_inner_proces

@Test
void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_processors_isReadyForShutdown() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));

assertThat(processors.size(), equalTo(1));
processors.get(0).isReadyForShutdown();
Expand All @@ -340,7 +342,7 @@ void PeerForwardingProcessingDecorator_isReadyForShutdown_will_call_inner_proces

@Test
void PeerForwardingProcessingDecorator_shutdown_will_call_inner_processors_shutdown() {
final List<Processor> processors = createObjectUnderTesDecoratedProcessors(Collections.singletonList(processor));
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList(processor));

assertThat(processors.size(), equalTo(1));
processors.get(0).shutdown();
Expand Down

0 comments on commit 6f11497

Please sign in to comment.