-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for lambda sink #4292
Conversation
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); | ||
private String stsRoleArn; | ||
|
||
final String LAMBDA_SINK_CONFIG_YAML = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this runtime input instead of hardcoded value?
import static org.mockito.Mockito.verify; | ||
|
||
@ExtendWith(MockitoExtension.class) | ||
class LambdaSinkServiceIT { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to run this test should be included in README but can come in a separate PR.
private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
||
@JsonProperty("sync") | ||
private Boolean sync = DEFAULT_INVOCATION; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUES: is there reason we want async invocation as default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async provides a better way to communicate to external services
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* A buffer can hold in memory data and flushing it to S3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush to Lambda?
* Upload accumulated data to s3 bucket. | ||
*/ | ||
@Override | ||
public void flushToLambda() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e2e acknowledgment seems missing in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e2e ack is handled. refer releaseEventHandles in the pr.
} | ||
|
||
/** | ||
* Upload accumulated data to s3 bucket. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s3 -> lambda
//reset buffer after flush | ||
currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); | ||
} catch (final IOException e) { | ||
LOG.error("Exception while completing codec", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should do releaseEventHandles(false)
in case of exceptionl
private int maxRetries = 0; | ||
private final Counter numberOfRecordsSuccessCounter; | ||
private final Counter numberOfRecordsFailedCounter; | ||
private final String SYNC_INVOCATION_TYPE = "RequestResponse"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we discussed that sync doesn't make sense in case of lambda as sink. So, invocation type is not needed.
dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); | ||
} | ||
//release even if failed | ||
releaseEventHandles(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be released with false
if failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is considered successful if it is uploaded to dlq?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what if DLQ is not configured?
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should move this to common directory so that other sinks can use this future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can refactor be part of another PR?
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
public class JsonCodec implements OutputCodec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, let's rename it to be more specific to lambda
c63dd00
to
dddb205
Compare
aws: | ||
region: "us-east-1" | ||
sts_role_arn: "<arn>" | ||
function_name: "uploadToS3Lambda" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we not discuss about passing arguments to lambda function? Do you not see any need for that option?
reentrantLock.lock(); | ||
try { | ||
for (Record<Event> record : records) { | ||
final Event event = record.getData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to support sending event metadata as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we could, we can add this incrementally?
dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); | ||
} | ||
//release even if failed | ||
releaseEventHandles(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what if DLQ is not configured?
* A buffer can hold in memory data and flushing it. | ||
*/ | ||
public class InMemoryBuffer implements Buffer { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not possible to use existing BufferAccumulator ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interface definitions are different. Will not be able to use the existing one
import java.util.Objects; | ||
|
||
public class LambdaJsonCodec implements OutputCodec { | ||
private final ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from Json Output Codec we already have? Can we re-use it somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tried to use existing one but it is not possible without making a whole bunch of changes specific to lambda, so i chose to have a new one instead.
Signed-off-by: srigovs <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
4e4aeb8
to
48a17fb
Compare
Hi @srikanthjg, I know that the chance of catching you on this merged PR are slim, but I wanted to ask if you had a pointer to any documentation of the lambda event type and return type that's used for a lambda sink. https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aws-lambda/#aws-lambda-processor-configuration doesn't really convey the shape of the lambda event or response unless I'm missing something. If there's something that can be pointed at, I'd be happy to ask for it to be included in https://github.com/DefinitelyTyped/DefinitelyTyped/tree/master/types/aws-lambda/trigger to help those of us using TS for these. Thanks much for any help, Dave |
Description
Add support for lambda sink
Issues Resolved
Resolves #4170
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.