Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PeerForwardingProcessorDecorator to process records locally when exclude identification keys is set #5178

Merged
merged 2 commits into from
Nov 8, 2024

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Nov 8, 2024

Description

Fix PeerForwardingProcessorDecorator to process records locally when exclude identification keys is set.
The changes in PR 5127 missed a step in execute() which is supposed to process records locally if peer forwarding is disabled for a processor

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…exclude identification keys is set

Signed-off-by: Krishna Kondaka <[email protected]>
@@ -101,7 +101,12 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
final Collection<Record<Event>> recordsSkipped = new ArrayList<>();
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
recordsToProcess.add(record);
if (isPeerForwardingDisabled()) {
System.out.println("=====PROCESSING LOCALLY====");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a println here. I expect the build to fail as a result.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. sorry

}
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following this test. It seems that you mock this above on line 190 and then assert it here.

I'm not 100% confident that this test is covering the situation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I think I see now. The processors are the decorated processors under test.

Maybe rename to objectsUnderTest.

}
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I think I see now. The processors are the decorated processors under test.

Maybe rename to objectsUnderTest.

result = processors.get(1).execute(testData);
assertThat(result.size(), equalTo(testData.size()));
assertThat(result, equalTo(testData));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also have some form of verify(processor1).execute? and verify(processor2).execute?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do verify when I am doing processors.get(1).execute(testData);?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that processor1 != processors.get(0). The value processor1 is the inner processor. And processors.get(0) is the actual processor. So you can't say for sure in the current tests that the data reached processor1.

when(requiresPeerForwarding.isApplicableEventForPeerForwarding(event)).thenReturn(true);
when(requiresPeerForwardingCopy.isApplicableEventForPeerForwarding(event)).thenReturn(true);

Processor processor1 = (Processor)requiresPeerForwarding;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename these to innerProcessor1 and innerProcessor2 to provide clarity.

Signed-off-by: Krishna Kondaka <[email protected]>
@@ -101,7 +101,11 @@ public Collection<Record<Event>> execute(final Collection<Record<Event>> records
final Collection<Record<Event>> recordsSkipped = new ArrayList<>();
for (Record<Event> record: records) {
if (((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData())) {
recordsToProcess.add(record);
if (isPeerForwardingDisabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This condition could just be a part of the above if statement

if (!isPeerForwardingDisabled && ((RequiresPeerForwarding)innerProcessor).isApplicableEventForPeerForwarding(record.getData()))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it but the next condition has isForLocalProcessingOnly(record.getData())){ which will fail. This is more clean

@sb2k16 sb2k16 dismissed dlvenable’s stale review November 8, 2024 20:44

To expedite testing.

@sb2k16 sb2k16 merged commit 347a803 into opensearch-project:main Nov 8, 2024
47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants