Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Sink draft code for issue #1744. #3103

Merged

Conversation

mallikagogoi7
Copy link
Contributor

Description

Prometheus Sink draft code for issue #1744.

Issues Resolved

#1744

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Member

@dinujoh dinujoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comments:

  • Use final on local and method variables.
  • Add metrics for success and failure scenarios.


- `url` The http/https endpoint url which can bee backed by prometheus.

- `encoding` Default is snappy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support other encoding and content-type ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. No, currently only snappy is required to support Prometheus.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't support any other configuration, this configuration parameter is not adding any value unless we have a followup plan to support another encoding. Can you followup and check what is the plan ?

@BeforeEach
void setUp() throws JsonProcessingException{
this.urlString = System.getProperty("tests.prometheus.sink.http.endpoint");
String[] values = { urlString,"unauthenticated","ap-south-1","arn:aws:iam::524239988944:role/app-test" };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the role and region needs to be configurable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. It should be configurable and should be read from system properties. Will update the code accordingly.

* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
*/
public final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class code is duplicated. We need to move this to common plugin module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved to aws-plugin-api

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. Will update the code accordingly.

* This class consist logic for downloading the SSL certificates from S3/ACM/Local file.
*
*/
public class CertificateProviderFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is duplicated as well. Need to move this to common module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to common certificate module. This class would need refactoring to remove direct dependency on PrometheusSinkConfiguration and set the required parameters in the constructor or create common object for them.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. Will update the code accordingly.

Comment on lines 135 to 155
if (event instanceof JacksonGauge) {
JacksonGauge jacksonGauge = (JacksonGauge) event;
message = buildRemoteWriteRequest(jacksonGauge.getTime(),
jacksonGauge.getStartTime(), jacksonGauge.getValue(), jacksonGauge.getAttributes(),jacksonGauge.getName());
} else if (event instanceof JacksonSum) {
JacksonSum jacksonSum = (JacksonSum) event;
message = buildRemoteWriteRequest(jacksonSum.getTime(),
jacksonSum.getStartTime(), jacksonSum.getValue(), jacksonSum.getAttributes(), jacksonSum.getName());
} else if (event instanceof JacksonSummary) {
JacksonSummary jacksonSummary = (JacksonSummary) event;
message = buildRemoteWriteRequest(jacksonSummary.getTime(),
jacksonSummary.getStartTime(), jacksonSummary.getSum(), jacksonSummary.getAttributes(), jacksonSummary.getName());
} else if (event instanceof JacksonHistogram) {
JacksonHistogram jacksonHistogram = (JacksonHistogram) event;
message = buildRemoteWriteRequest(jacksonHistogram.getTime(),
jacksonHistogram.getStartTime(), jacksonHistogram.getSum(), jacksonHistogram.getAttributes(), jacksonHistogram.getName());
} else if (event instanceof JacksonExponentialHistogram) {
JacksonExponentialHistogram jacksonExpHistogram = (JacksonExponentialHistogram) event;
message = buildRemoteWriteRequest(jacksonExpHistogram.getTime(),
jacksonExpHistogram.getStartTime(), jacksonExpHistogram.getSum(), jacksonExpHistogram.getAttributes(), jacksonExpHistogram.getName());
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see getSum method is not part of interface. Can we avoid this multiple if else statement and use reflection to call the getSum method ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. Will update the code accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Can we please elaborate on that?

try {
failedHttpEndPointResponses = pushToEndPoint(bytes);
} catch (IOException e) {
LOG.info("Error while pushing to the end point");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add metric when this failure occurs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. Will update the code accordingly.

if (failedHttpEndPointResponses != null) {
logFailedData(failedHttpEndPointResponses);
releaseEventHandles(Boolean.FALSE);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true when exception occurs at line 166.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkarmmore95 and @mallikagogoi7 Could you please have a look and clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh The code has been updated accordingly

final ClassicRequestBuilder classicHttpRequestBuilder =
httpAuthOptions.get(prometheusSinkConfiguration.getUrl()).getClassicHttpRequestBuilder();

final byte[] compressedBufferData = Snappy.compress(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the data be always Snappy compressed ? Should this be configurable ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. Yes, only snappy compression is required to send data to Prometheus.

classicHttpRequestBuilder.addHeader("X-Prometheus-Remote-Write-Version", prometheusSinkConfiguration.getRemoteWriteVersion());

try {
if(AuthTypeOptions.BEARER_TOKEN.equals(prometheusSinkConfiguration.getAuthType()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about other Auth support ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkarmmore95 and @mallikagogoi7 Could you please clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , we have unauthenticated, http_basic, and bearer_token auth just like Http Sink plugin


- `scope` (Optional) : This scope limit an application's access to a user's account.

- `aws_sigv4`: A boolean flag to sign the HTTP request with AWS credentials. Default to `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use aws: property for sigv4 and not use aws_.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for your review. We are following the other plugins (opensearch, httpsink etc.) key naming conventions to define the aws_sigv4 property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new convention is to move away from aws_ for the new sources and sinks.. We should only use aws: for new sources and sinks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved

@mallikagogoi7
Copy link
Contributor Author

General comments:

  • Use final on local and method variables.
  • Add metrics for success and failure scenarios.

@dinujoh Above comments has been addressed


- `scope` (Optional) : This scope limit an application's access to a user's account.

- `aws_sigv4`: A boolean flag to sign the HTTP request with AWS credentials. Default to `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new convention is to move away from aws_ for the new sources and sinks.. We should only use aws: for new sources and sinks.


- `url` The http/https endpoint url which can bee backed by prometheus.

- `encoding` Default is snappy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't support any other configuration, this configuration parameter is not adding any value unless we have a followup plan to support another encoding. Can you followup and check what is the plan ?

Comment on lines 49 to 52
" url: {0}\n" +
" http_method: POST\n" +
" auth_type: {1}\n" +
" ssl: false\n";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about other configurations for Integration test ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dino, Due to less time for this plugin, integration test was not in scope

//TODO: implementation
public void process(final HttpResponse response, final EntityDetails entity, final HttpContext context) throws IOException {
if (response.getCode() != STATUS_CODE_200) {
throw new IOException(String.format("url: %s , status code: %s", url,response.getCode()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this IOException ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dino, Thanks for review,For failed response we are throwing IOException, and on IOException, we are sending Failed data to DLQ

}catch (Exception e) {
LOG.info("Exception : "+ e.getMessage() );
}
return bearerTokenOptions.getAccessToken();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the exception is thrown in line 37, what would be this return value ? Will this cause NPE ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , Thanks for the review, Resolved

Comment on lines 63 to 64
Certificate trustedCa;
trustedCa = factory.generateCertificate(certificate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be single line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , Thanks for the review, Resolved

StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
dlqFileWriter.write(objectWriter.writeValueAsString(failedData)+"\n");
} catch (IOException e) {
LOG.error("Exception while writing failed data to DLQ file Exception: ",e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should emit metrics on this exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , Thanks for the review, Resolved


dlqWriter.write(Arrays.asList(dlqObject), pluginSetting.getPipelineName(), pluginId);
} catch (final IOException e) {
LOG.error("Exception while writing failed data to DLQ, Exception : ", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Emit metric on this exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , Thanks for the review, Resolved

Comment on lines 65 to 70
if(dlqFile != null) {
this.dlqFile = dlqFile;
this.objectWriter = new ObjectMapper().writer();
}else{
this.dlqProvider = getDlqProvider(pluginFactory,bucket,stsRoleArn,awsRegion,dlqPathPrefix);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be injected instead of being constructed here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh , Thanks for the review, Resolved

} else {
LOG.error("No valid Event type found");
}
bytes = message.toByteArray();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will cause NPE if line 171 is executed. No valid event type found.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dinujoh Thanks for the review, Resolved

@@ -12,7 +14,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 1.0
minimum = 0.1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project has 100% coverage and can retain that. Dropping this to 10% will prevent us from ensuring that code coverage exists within the project. We should test the AwsRequestSigningApacheInterceptor. I believe that a different PR may be doing something similar here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Resolved

@JsonProperty("encoding")
private String encoding = DEFAULT_ENCODING;

@JsonProperty("content-type")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This project uses underscores for YAML configuration property names. Please change to content_type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Resolved

@JsonProperty("content-type")
private String contentType = DEFAULT_CONTENT_TYPE;

@JsonProperty("remote-write-version")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to remote_write_version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Resolved

@@ -8,10 +8,10 @@

public class AuthenticationOptions {

@JsonProperty("http_basic")
@JsonProperty("http-basic")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this back to http_basic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Resolved

private BasicAuthCredentials httpBasic;

@JsonProperty("bearer_token")
@JsonProperty("bearer-token")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this back to bearer_token.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable Resolved

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is incomplete, but having the code in the repo would be easier than keeping it in this PR. Somebody could pick up from the merged code.

@dlvenable dlvenable merged commit 073419f into opensearch-project:main Dec 11, 2023
22 of 24 checks passed
dlvenable added a commit to dlvenable/data-prepper that referenced this pull request Dec 11, 2023
dlvenable added a commit to dlvenable/data-prepper that referenced this pull request Dec 11, 2023
dlvenable added a commit that referenced this pull request Dec 12, 2023
Cleaning up some unnecessary code and dependencies from the recent merge of PR #3103.
Adds missing certificate and key files to fix failures from recent merge of PR #3103.

Signed-off-by: David Venable <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants