From 3ba94579abdbb64c8ec6f3032d5ea3852f590cd5 Mon Sep 17 00:00:00 2001 From: Daniel Zolty Date: Wed, 25 Oct 2023 16:46:30 -0700 Subject: [PATCH] Changes to validator --- load-generator/bin/main/log4j2.xml | 13 ++ terraform/templates/defaults/ecs_taskdef.tpl | 24 +- terraform/testcases/otlp_logs/otconfig.tpl | 2 +- terraform/validation/main.tf | 2 +- .../src/main/java/com/amazon/aoc/App.java | 5 +- .../com/amazon/aoc/callers/HttpCaller.java | 5 +- .../amazon/aoc/helpers/MustacheHelper.java | 4 + .../amazon/aoc/validators/CWLogValidator.java | 208 +++++++++++------- .../aoc/validators/ValidatorFactory.java | 11 +- .../otlpExpectedLog.mustache | 14 ++ .../validations/spark-otel-log-validation.yml | 2 +- 11 files changed, 195 insertions(+), 95 deletions(-) create mode 100644 load-generator/bin/main/log4j2.xml create mode 100644 validator/src/main/resources/expected-data-template/otlpExpectedLog.mustache diff --git a/load-generator/bin/main/log4j2.xml b/load-generator/bin/main/log4j2.xml new file mode 100644 index 000000000..c52aaeaf2 --- /dev/null +++ b/load-generator/bin/main/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/terraform/templates/defaults/ecs_taskdef.tpl b/terraform/templates/defaults/ecs_taskdef.tpl index 72321e290..0c14d9f77 100644 --- a/terraform/templates/defaults/ecs_taskdef.tpl +++ b/terraform/templates/defaults/ecs_taskdef.tpl @@ -42,16 +42,28 @@ "value": "${sample_app_listen_address}" }, { - "name": "JAEGER_RECEIVER_ENDPOINT", - "value": "127.0.0.1:${http_port}" + "name": "JAEGER_RECEIVER_ENDPOINT", + "value": "127.0.0.1:${http_port}" }, { - "name": "ZIPKIN_RECEIVER_ENDPOINT", - "value": "127.0.0.1:${http_port}" + "name": "ZIPKIN_RECEIVER_ENDPOINT", + "value": "127.0.0.1:${http_port}" }, { - "name": "OTEL_METRICS_EXPORTER", - "value": "otlp" + "name": "OTEL_METRICS_EXPORTER", + "value": "none" + }, + { + "name": "OTEL_TRACES_EXPORTER", + "value": "otlp" + }, + { + "name": "OTEL_LOGS_EXPORTER", + "value": "otlp" + }, + { + "name": "SAMPLE_APP_LOG_LEVEL", + "value": "INFO" } ], "dependsOn": [ diff --git a/terraform/testcases/otlp_logs/otconfig.tpl b/terraform/testcases/otlp_logs/otconfig.tpl index 729e86f52..647f59101 100644 --- a/terraform/testcases/otlp_logs/otconfig.tpl +++ b/terraform/testcases/otlp_logs/otconfig.tpl @@ -20,7 +20,7 @@ exporters: service: pipelines: - metrics: + logs: receivers: [otlp] processors: [batch] exporters: [logging, awscloudwatchlogs] diff --git a/terraform/validation/main.tf b/terraform/validation/main.tf index c8e4d4ad1..fbee1a396 100644 --- a/terraform/validation/main.tf +++ b/terraform/validation/main.tf @@ -73,7 +73,7 @@ resource "null_resource" "validator" { provisioner "local-exec" { command = <<-EOT docker-compose -f ${local.docker_compose_path} down - docker-compose -f ${local.docker_compose_path} build + docker-compose -f ${local.docker_compose_path} build --no-cache docker-compose -f ${local.docker_compose_path} up --abort-on-container-exit EOT } diff --git a/validator/src/main/java/com/amazon/aoc/App.java b/validator/src/main/java/com/amazon/aoc/App.java index 36c0b550e..72a2bcdd9 100644 --- a/validator/src/main/java/com/amazon/aoc/App.java +++ b/validator/src/main/java/com/amazon/aoc/App.java @@ -149,14 +149,15 @@ public Integer call() throws Exception { context.setLanguage(language); context.setKubernetesContext(this.buildKubernetesContext()); log.info(context); + log.info("URL is: " + configPath); // load config List validationConfigList = new ConfigLoadHelper().loadConfigFromFile(configPath); - + log.info("App - config was loaded"); // run validation validate(context, validationConfigList); - + log.info("App - validation completed"); Instant endTime = Instant.now(); Duration duration = Duration.between(startTime, endTime); log.info("Validation has completed in {} minutes.", duration.toMinutes()); diff --git a/validator/src/main/java/com/amazon/aoc/callers/HttpCaller.java b/validator/src/main/java/com/amazon/aoc/callers/HttpCaller.java index 1640fe5fb..1a70ebec2 100644 --- a/validator/src/main/java/com/amazon/aoc/callers/HttpCaller.java +++ b/validator/src/main/java/com/amazon/aoc/callers/HttpCaller.java @@ -47,10 +47,13 @@ public HttpCaller(String endpoint, String path) { @Override public SampleAppResponse callSampleApp() throws Exception { + log.info("Sample app was called"); OkHttpClient client = new OkHttpClient(); + log.info("Sample app - OkHttpClient"); Request request = new Request.Builder().url(url).build(); - + log.info("Sample app - request builder"); AtomicReference sampleAppResponseAtomicReference = new AtomicReference<>(); + log.info("Sample app - Atomic reference"); RetryHelper.retry( 40, () -> { diff --git a/validator/src/main/java/com/amazon/aoc/helpers/MustacheHelper.java b/validator/src/main/java/com/amazon/aoc/helpers/MustacheHelper.java index ca9a8953c..83115430f 100644 --- a/validator/src/main/java/com/amazon/aoc/helpers/MustacheHelper.java +++ b/validator/src/main/java/com/amazon/aoc/helpers/MustacheHelper.java @@ -45,9 +45,13 @@ public String render(FileConfig fileConfig, Object dataToInject) throws IOExcept private String render(URL path, Object dataToInject) throws IOException { log.info("fetch config: {}", path); String templateContent = IOUtils.toString(path); + log.info("Got template Content"); Mustache mustache = mustacheFactory.compile(new StringReader(templateContent), path.getPath()); + log.info("Got mustache"); StringWriter stringWriter = new StringWriter(); + log.info("fGot stringWriter"); mustache.execute(stringWriter, dataToInject).flush(); + log.info("Mustache executed"); return stringWriter.getBuffer().toString(); } } diff --git a/validator/src/main/java/com/amazon/aoc/validators/CWLogValidator.java b/validator/src/main/java/com/amazon/aoc/validators/CWLogValidator.java index a51271d3f..51d74006a 100644 --- a/validator/src/main/java/com/amazon/aoc/validators/CWLogValidator.java +++ b/validator/src/main/java/com/amazon/aoc/validators/CWLogValidator.java @@ -1,98 +1,146 @@ package com.amazon.aoc.validators; -import com.amazon.aoc.callers.ICaller; +import com.amazon.aoc.exception.BaseException; +import com.amazon.aoc.exception.ExceptionCode; import com.amazon.aoc.fileconfigs.FileConfig; +import com.amazon.aoc.helpers.MustacheHelper; +import com.amazon.aoc.helpers.RetryHelper; import com.amazon.aoc.models.Context; -import com.amazon.aoc.models.ValidationConfig; -import com.amazonaws.services.logs.CloudWatchLogsClient; -import com.amazonaws.services.logs.model.GetLogEventsRequest; +import com.amazon.aoc.services.CloudWatchService; +import com.amazonaws.services.logs.model.OutputLogEvent; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.awaitility.core.RetryerBuilder; -import org.awaitility.core.StopStrategies; -import org.awaitility.core.WaitStrategies; -import org.opentest4j.AssertionFailedError; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.time.Duration; +import com.github.fge.jsonschema.main.JsonSchema; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import com.github.fge.jsonschema.report.ListReportProvider; +import com.github.fge.jsonschema.report.LogLevel; +import com.github.fge.jsonschema.report.ProcessingReport; +import com.github.fge.jsonschema.util.JsonLoader; import java.time.Instant; -import java.util.HashSet; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -public class CWLogValidator implements IValidator { - - +import java.time.temporal.ChronoUnit; +import java.util.*; +import lombok.extern.log4j.Log4j2; -// String getJsonSchemaMappingKey(JsonNode jsonNode) { -// // Your implementation for getting the JSON schema mapping key -// return null; -// } +@Log4j2 +public class CWLogValidator extends AbstractStructuredLogValidator { + protected Map schemasToValidate = new HashMap<>(); + protected Set validatedSchema = new HashSet<>(); + protected Set logStreamNames = new HashSet<>(); + protected String logGroupName; - @Override - public void init(Context context, ValidationConfig validationConfig, ICaller caller, FileConfig expectedDataTemplate) throws Exception { + protected String logStreamName = "otlp-logs"; - } + protected CloudWatchService cloudWatchService; + private static final int CHECK_INTERVAL_IN_MILLI = 30 * 1000; + private static final int CHECK_DURATION_IN_SECONDS = 2 * 60; + private static final int MAX_RETRY_COUNT = 12; + private static final int QUERY_LIMIT = 100; + private JsonSchema schema; - @Override - public void validate() throws Exception { - var lines = new HashSet(); - InputStream inputStream = getClass().getResourceAsStream("/logs/testingJSON.log"); + private Context context; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - String line; - while ((line = reader.readLine()) != null) { - lines.add(line); - } - } catch (IOException e) { - throw new RuntimeException("Error reading from the file: " + inputStream, e); - } + @Override + public void init(Context context, FileConfig expectedDataTemplate) throws Exception { + log.info("CWLog init starting"); + this.context = context; + logGroupName = String.format("otlp-receiver", context.getCloudWatchContext().getClusterName()); + // cloudWatchService = new CloudWatchService(context.getRegion()); + MustacheHelper mustacheHelper = new MustacheHelper(); + String templateInput = mustacheHelper.render(expectedDataTemplate, context); + JsonNode jsonNode = JsonLoader.fromString(templateInput); + JsonSchemaFactory jsonSchemaFactory = + JsonSchemaFactory.newBuilder() + .setReportProvider(new ListReportProvider(LogLevel.INFO, LogLevel.FATAL)) + .freeze(); + JsonSchema schema = jsonSchemaFactory.getJsonSchema(jsonNode); + this.schema = schema; + log.info(("CWLog init ending")); + } - var cwClient = CloudWatchLogsClient.builder().build(); - var objectMapper = new ObjectMapper(); + // @Override + // public void init( + // Context context, + // ValidationConfig validationConfig, + // ICaller caller, + // FileConfig expectedDataTemplate) + // throws Exception { + // log.info("CWLog init starting"); + // this.context = context; + // logGroupName = String.format("otlp-receiver", + // context.getCloudWatchContext().getClusterName()); + // cloudWatchService = new CloudWatchService(context.getRegion()); + // MustacheHelper mustacheHelper = new MustacheHelper(); + // String templateInput = mustacheHelper.render(expectedDataTemplate, context); + // JsonNode jsonNode = JsonLoader.fromString(templateInput); + // JsonSchemaFactory jsonSchemaFactory = + // JsonSchemaFactory.newBuilder() + // .setReportProvider(new ListReportProvider(LogLevel.INFO, LogLevel.FATAL)) + // .freeze(); + // JsonSchema schema = jsonSchemaFactory.getJsonSchema(jsonNode); + // this.schema = schema; + // log.info(("CWLog init ending")); + // } - RetryerBuilder.newBuilder() - .retryIfException() - .retryIfRuntimeException() - .retryIfExceptionOfType(AssertionFailedError.class) - .withWaitStrategy(WaitStrategies.fixedWait(10, TimeUnit.SECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(5)) - .build() - .call(() -> { - var now = Instant.now(); - var start = now.minus(Duration.ofMinutes(2)); - var end = now.plus(Duration.ofMinutes(2)); - var response = cwClient.getLogEvents(GetLogEventsRequest.builder() - .logGroupName("adot-testbed/logs-component-testing/logs") - .logStreamName(testLogStreamName) - .startTime(start.toEpochMilli()) - .endTime(end.toEpochMilli()) - .build()); + @Override + String getJsonSchemaMappingKey(JsonNode jsonNode) { + return jsonNode.get("Type").asText(); + } - var events = response.events(); - var receivedMessages = events.stream().map(x -> x.message()).collect(Collectors.toSet()); + @Override + public void validate() throws Exception { + log.info(("CWLog validate starting")); + RetryHelper.retry( + getMaxRetryCount(), + CHECK_INTERVAL_IN_MILLI, + true, + () -> { + Instant startTime = + Instant.now().minusSeconds(CHECK_DURATION_IN_SECONDS).truncatedTo(ChronoUnit.MINUTES); + log.info("Start time is: " + startTime.toEpochMilli()); + fetchAndValidateLogs(startTime); + }); + } - // Extract the "body" field from each received message that is received from CloudWatch in JSON Format - var messageToValidate = receivedMessages.stream() - .map(message -> { - try { - JsonNode jsonNode = objectMapper.readTree(message); - return jsonNode.get("body").asText(); - } catch (Exception e) { - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); + @Override + protected void fetchAndValidateLogs(Instant startTime) throws Exception { + log.info(("CWLog fetch starting")); + List logEvents = + cloudWatchService.getLogs( + logGroupName, logStreamName, startTime.toEpochMilli(), QUERY_LIMIT); + if (logEvents.isEmpty()) { + throw new BaseException( + ExceptionCode.LOG_FORMAT_NOT_MATCHED, + String.format( + "[StructuredLogValidator] no logs found under log stream %s" + " in log group %s", + logStreamName, logGroupName)); + } + for (OutputLogEvent logEvent : logEvents) { + log.info("Log message: " + logEvent.getMessage()); + validateJsonSchema(logEvent.getMessage()); + } + } - // Validate the body field in JSON-messageToValidate with actual log lines from the log file. - assertThat(messageToValidate.containsAll(lines)).isTrue(); - assertThat(messageToValidate).containsExactlyInAnyOrderElementsOf(lines); - return null; - }); + @Override + protected void validateJsonSchema(String logEventMsg) throws Exception { + log.info("In validateJsonSchema"); + JsonNode logEventNode = mapper.readTree(logEventMsg); + log.info("In validateJsonSchema - post readTree"); + if (schema != null) { + log.info("In validateJsonSchema - schema isn't null"); + ProcessingReport report = schema.validate(JsonLoader.fromString(logEventNode.toString())); + if (report.isSuccess()) { + // validatedSchema.add(key); + log.info("Report was a success"); + } else { + // This will probably generate a lot of extra logs + // may want to log this to a different level in the future. + log.info("[StructuredLogValidator] failed to validate schema \n"); + log.info(report.toString() + "\n"); + } } + } + + @Override + protected int getMaxRetryCount() { + return MAX_RETRY_COUNT; + } } diff --git a/validator/src/main/java/com/amazon/aoc/validators/ValidatorFactory.java b/validator/src/main/java/com/amazon/aoc/validators/ValidatorFactory.java index 7b348625d..1f668fa7b 100644 --- a/validator/src/main/java/com/amazon/aoc/validators/ValidatorFactory.java +++ b/validator/src/main/java/com/amazon/aoc/validators/ValidatorFactory.java @@ -23,7 +23,9 @@ import com.amazon.aoc.models.Context; import com.amazon.aoc.models.ValidationConfig; import com.amazon.aoc.services.TaskService; +import lombok.extern.log4j.Log4j2; +@Log4j2 public class ValidatorFactory { private Context context; @@ -40,6 +42,7 @@ public ValidatorFactory(Context context) { */ public IValidator launchValidator(ValidationConfig validationConfig) throws Exception { // get validator + log.info("validator factory launch started"); IValidator validator; FileConfig expectedData = null; switch (validationConfig.getValidationType()) { @@ -51,12 +54,14 @@ public IValidator launchValidator(ValidationConfig validationConfig) throws Exce validator = new LoadBalancingValidator(); break; case "cw-metric": - validator = new CWLogValidator(); + log.info("cw-metrics got picked"); + validator = new CWMetricValidator(); expectedData = validationConfig.getExpectedMetricTemplate(); break; case "cw-logs": - validator = new CWMetricValidator(); - expectedData = validationConfig.getExpectedMetricTemplate(); + log.info("cw-logs got picked"); + validator = new CWLogValidator(); + expectedData = validationConfig.getExpectedLogStructureTemplate(); break; case "ecs-describe-task": validator = new ECSHealthCheckValidator(new TaskService(), 10); diff --git a/validator/src/main/resources/expected-data-template/otlpExpectedLog.mustache b/validator/src/main/resources/expected-data-template/otlpExpectedLog.mustache new file mode 100644 index 000000000..8be43fb9e --- /dev/null +++ b/validator/src/main/resources/expected-data-template/otlpExpectedLog.mustache @@ -0,0 +1,14 @@ +[ + { + "body": "{{body}}", + "severity_number": 9, + "severity_text": "INFO", + "flags": 1, + "trace_id": "{{trace_id}}", + "span_id": "{{span_id}}", + "resource": { + "service.name": "{{resource.service.name}}" + } + + } +] \ No newline at end of file diff --git a/validator/src/main/resources/validations/spark-otel-log-validation.yml b/validator/src/main/resources/validations/spark-otel-log-validation.yml index 8dd609033..f7f2b32e4 100644 --- a/validator/src/main/resources/validations/spark-otel-log-validation.yml +++ b/validator/src/main/resources/validations/spark-otel-log-validation.yml @@ -3,4 +3,4 @@ httpPath: "/outgoing-http-call" httpMethod: "get" callingType: "http" - expectedMetricTemplate: "DEFAULT_EXPECTED_LOG" \ No newline at end of file + expectedLogStructureTemplate: "DEFAULT_EXPECTED_LOG" \ No newline at end of file