Skip to content

Commit

Permalink
Fix PeerForwardingProcessorDecorator to process records locally when …
Browse files Browse the repository at this point in the history
…exclude identification keys is set

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 8, 2024
1 parent 60990cd commit ae8279f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
final Collection<Record<Event>> recordsSkipped = new ArrayList<>();
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
recordsToProcess.add(record);
if (isPeerForwardingDisabled()) {
System.out.println("=====PROCESSING LOCALLY====");
recordsToProcessLocally.add(record);
} else {
recordsToProcess.add(record);
}
} else if (((RequiresPeerForwarding)innerProcessor).isForLocalProcessingOnly(record.getData())){
recordsToProcessLocally.add(record);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,45 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific
verifyNoMoreInteractions(peerForwarderProvider);
}

@Test
void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdentificationKeys() {
List<Processor> processorList = new ArrayList<>();
processorList.add((Processor) requiresPeerForwarding);
processorList.add((Processor) requiresPeerForwardingCopy);

LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class);
when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
Event event = mock(Event.class);
when(record.getData()).thenReturn(event);
List<Record<Event>> testData = Collections.singletonList(record);
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(true);

Processor processor1 = (Processor)requiresPeerForwarding;
Processor processor2 = (Processor)requiresPeerForwardingCopy;
when(processor1.execute(testData)).thenReturn(testData);
when(processor2.execute(testData)).thenReturn(testData);

when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys);

final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(processorList, Set.of(identificationKeys));
assertThat(processors.size(), equalTo(2));
for (final Processor processor: processors) {
assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
result = processors.get(1).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
}


@Test
void PeerForwardingProcessingDecorator_with_localProcessingOnly() {
List<Processor> processorList = new ArrayList<>();
Expand Down

0 comments on commit ae8279f

Please sign in to comment.