Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Nov 8, 2024
1 parent ae8279f commit 0f2d297
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
if (isPeerForwardingDisabled()) {
System.out.println("=====PROCESSING LOCALLY====");
recordsToProcessLocally.add(record);
} else {
recordsToProcess.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,11 @@ void PeerForwardingProcessingDecorator_should_have_interaction_with_getIdentific

@Test
void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdentificationKeys() {
List<Processor> processorList = new ArrayList<>();
processorList.add((Processor) requiresPeerForwarding);
processorList.add((Processor) requiresPeerForwardingCopy);
List<Processor> objectsUnderTest = new ArrayList<>();
Processor innerProcessor1 = (Processor)requiresPeerForwarding;
Processor innerProcessor2 = (Processor)requiresPeerForwardingCopy;
objectsUnderTest.add(innerProcessor1);
objectsUnderTest.add(innerProcessor2);

LocalPeerForwarder localPeerForwarder = mock(LocalPeerForwarder.class);
when(peerForwarderProvider.register(pipelineName, (Processor) requiresPeerForwarding, pluginId, identificationKeys, PIPELINE_WORKER_THREADS)).thenReturn(localPeerForwarder);
Expand All @@ -185,16 +187,14 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnlyWithExcludeIdenti
when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(true);

Processor processor1 = (Processor)requiresPeerForwarding;
Processor processor2 = (Processor)requiresPeerForwardingCopy;
when(processor1.execute(testData)).thenReturn(testData);
when(processor2.execute(testData)).thenReturn(testData);
when(innerProcessor1.execute(testData)).thenReturn(testData);
when(innerProcessor2.execute(testData)).thenReturn(testData);

when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
when(requiresPeerForwardingCopy.getIdentificationKeys()).thenReturn(identificationKeys);

final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(processorList, Set.of(identificationKeys));
final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(objectsUnderTest, Set.of(identificationKeys));
assertThat(processors.size(), equalTo(2));
for (final Processor processor: processors) {
assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
Expand Down

0 comments on commit 0f2d297

Please sign in to comment.