Skip to content

Commit

Permalink
Changes to validator
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielzolty committed Oct 25, 2023
1 parent 965ec7b commit 3ba9457
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 95 deletions.
13 changes: 13 additions & 0 deletions load-generator/bin/main/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
24 changes: 18 additions & 6 deletions terraform/templates/defaults/ecs_taskdef.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,28 @@
"value": "${sample_app_listen_address}"
},
{
"name": "JAEGER_RECEIVER_ENDPOINT",
"value": "127.0.0.1:${http_port}"
"name": "JAEGER_RECEIVER_ENDPOINT",
"value": "127.0.0.1:${http_port}"
},
{
"name": "ZIPKIN_RECEIVER_ENDPOINT",
"value": "127.0.0.1:${http_port}"
"name": "ZIPKIN_RECEIVER_ENDPOINT",
"value": "127.0.0.1:${http_port}"
},
{
"name": "OTEL_METRICS_EXPORTER",
"value": "otlp"
"name": "OTEL_METRICS_EXPORTER",
"value": "none"
},
{
"name": "OTEL_TRACES_EXPORTER",
"value": "otlp"
},
{
"name": "OTEL_LOGS_EXPORTER",
"value": "otlp"
},
{
"name": "SAMPLE_APP_LOG_LEVEL",
"value": "INFO"
}
],
"dependsOn": [
Expand Down
2 changes: 1 addition & 1 deletion terraform/testcases/otlp_logs/otconfig.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exporters:

service:
pipelines:
metrics:
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging, awscloudwatchlogs]
Expand Down
2 changes: 1 addition & 1 deletion terraform/validation/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ resource "null_resource" "validator" {
provisioner "local-exec" {
command = <<-EOT
docker-compose -f ${local.docker_compose_path} down
docker-compose -f ${local.docker_compose_path} build
docker-compose -f ${local.docker_compose_path} build --no-cache
docker-compose -f ${local.docker_compose_path} up --abort-on-container-exit
EOT
}
Expand Down
5 changes: 3 additions & 2 deletions validator/src/main/java/com/amazon/aoc/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,15 @@ public Integer call() throws Exception {
context.setLanguage(language);
context.setKubernetesContext(this.buildKubernetesContext());
log.info(context);
log.info("URL is: " + configPath);

// load config
List<ValidationConfig> validationConfigList =
new ConfigLoadHelper().loadConfigFromFile(configPath);

log.info("App - config was loaded");
// run validation
validate(context, validationConfigList);

log.info("App - validation completed");
Instant endTime = Instant.now();
Duration duration = Duration.between(startTime, endTime);
log.info("Validation has completed in {} minutes.", duration.toMinutes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public HttpCaller(String endpoint, String path) {

@Override
public SampleAppResponse callSampleApp() throws Exception {
log.info("Sample app was called");
OkHttpClient client = new OkHttpClient();
log.info("Sample app - OkHttpClient");
Request request = new Request.Builder().url(url).build();

log.info("Sample app - request builder");
AtomicReference<SampleAppResponse> sampleAppResponseAtomicReference = new AtomicReference<>();
log.info("Sample app - Atomic reference");
RetryHelper.retry(
40,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ public String render(FileConfig fileConfig, Object dataToInject) throws IOExcept
private String render(URL path, Object dataToInject) throws IOException {
log.info("fetch config: {}", path);
String templateContent = IOUtils.toString(path);
log.info("Got template Content");
Mustache mustache = mustacheFactory.compile(new StringReader(templateContent), path.getPath());
log.info("Got mustache");
StringWriter stringWriter = new StringWriter();
log.info("fGot stringWriter");
mustache.execute(stringWriter, dataToInject).flush();
log.info("Mustache executed");
return stringWriter.getBuffer().toString();
}
}
208 changes: 128 additions & 80 deletions validator/src/main/java/com/amazon/aoc/validators/CWLogValidator.java
Original file line number Diff line number Diff line change
@@ -1,98 +1,146 @@
package com.amazon.aoc.validators;

import com.amazon.aoc.callers.ICaller;
import com.amazon.aoc.exception.BaseException;
import com.amazon.aoc.exception.ExceptionCode;
import com.amazon.aoc.fileconfigs.FileConfig;
import com.amazon.aoc.helpers.MustacheHelper;
import com.amazon.aoc.helpers.RetryHelper;
import com.amazon.aoc.models.Context;
import com.amazon.aoc.models.ValidationConfig;
import com.amazonaws.services.logs.CloudWatchLogsClient;
import com.amazonaws.services.logs.model.GetLogEventsRequest;
import com.amazon.aoc.services.CloudWatchService;
import com.amazonaws.services.logs.model.OutputLogEvent;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.awaitility.core.RetryerBuilder;
import org.awaitility.core.StopStrategies;
import org.awaitility.core.WaitStrategies;
import org.opentest4j.AssertionFailedError;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import com.github.fge.jsonschema.main.JsonSchema;
import com.github.fge.jsonschema.main.JsonSchemaFactory;
import com.github.fge.jsonschema.report.ListReportProvider;
import com.github.fge.jsonschema.report.LogLevel;
import com.github.fge.jsonschema.report.ProcessingReport;
import com.github.fge.jsonschema.util.JsonLoader;
import java.time.Instant;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class CWLogValidator implements IValidator {


import java.time.temporal.ChronoUnit;
import java.util.*;
import lombok.extern.log4j.Log4j2;

// String getJsonSchemaMappingKey(JsonNode jsonNode) {
// // Your implementation for getting the JSON schema mapping key
// return null;
// }
@Log4j2
public class CWLogValidator extends AbstractStructuredLogValidator {
protected Map<String, JsonSchema> schemasToValidate = new HashMap<>();
protected Set<String> validatedSchema = new HashSet<>();
protected Set<String> logStreamNames = new HashSet<>();
protected String logGroupName;

@Override
public void init(Context context, ValidationConfig validationConfig, ICaller caller, FileConfig expectedDataTemplate) throws Exception {
protected String logStreamName = "otlp-logs";

}
protected CloudWatchService cloudWatchService;
private static final int CHECK_INTERVAL_IN_MILLI = 30 * 1000;
private static final int CHECK_DURATION_IN_SECONDS = 2 * 60;
private static final int MAX_RETRY_COUNT = 12;
private static final int QUERY_LIMIT = 100;
private JsonSchema schema;

@Override
public void validate() throws Exception {
var lines = new HashSet<String>();
InputStream inputStream = getClass().getResourceAsStream("/logs/testingJSON.log");
private Context context;

try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
lines.add(line);
}
} catch (IOException e) {
throw new RuntimeException("Error reading from the file: " + inputStream, e);
}
@Override
public void init(Context context, FileConfig expectedDataTemplate) throws Exception {
log.info("CWLog init starting");
this.context = context;
logGroupName = String.format("otlp-receiver", context.getCloudWatchContext().getClusterName());
// cloudWatchService = new CloudWatchService(context.getRegion());
MustacheHelper mustacheHelper = new MustacheHelper();
String templateInput = mustacheHelper.render(expectedDataTemplate, context);
JsonNode jsonNode = JsonLoader.fromString(templateInput);
JsonSchemaFactory jsonSchemaFactory =
JsonSchemaFactory.newBuilder()
.setReportProvider(new ListReportProvider(LogLevel.INFO, LogLevel.FATAL))
.freeze();
JsonSchema schema = jsonSchemaFactory.getJsonSchema(jsonNode);
this.schema = schema;
log.info(("CWLog init ending"));
}

var cwClient = CloudWatchLogsClient.builder().build();
var objectMapper = new ObjectMapper();
// @Override
// public void init(
// Context context,
// ValidationConfig validationConfig,
// ICaller caller,
// FileConfig expectedDataTemplate)
// throws Exception {
// log.info("CWLog init starting");
// this.context = context;
// logGroupName = String.format("otlp-receiver",
// context.getCloudWatchContext().getClusterName());
// cloudWatchService = new CloudWatchService(context.getRegion());
// MustacheHelper mustacheHelper = new MustacheHelper();
// String templateInput = mustacheHelper.render(expectedDataTemplate, context);
// JsonNode jsonNode = JsonLoader.fromString(templateInput);
// JsonSchemaFactory jsonSchemaFactory =
// JsonSchemaFactory.newBuilder()
// .setReportProvider(new ListReportProvider(LogLevel.INFO, LogLevel.FATAL))
// .freeze();
// JsonSchema schema = jsonSchemaFactory.getJsonSchema(jsonNode);
// this.schema = schema;
// log.info(("CWLog init ending"));
// }

RetryerBuilder.<Void>newBuilder()
.retryIfException()
.retryIfRuntimeException()
.retryIfExceptionOfType(AssertionFailedError.class)
.withWaitStrategy(WaitStrategies.fixedWait(10, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(5))
.build()
.call(() -> {
var now = Instant.now();
var start = now.minus(Duration.ofMinutes(2));
var end = now.plus(Duration.ofMinutes(2));
var response = cwClient.getLogEvents(GetLogEventsRequest.builder()
.logGroupName("adot-testbed/logs-component-testing/logs")
.logStreamName(testLogStreamName)
.startTime(start.toEpochMilli())
.endTime(end.toEpochMilli())
.build());
@Override
String getJsonSchemaMappingKey(JsonNode jsonNode) {
return jsonNode.get("Type").asText();
}

var events = response.events();
var receivedMessages = events.stream().map(x -> x.message()).collect(Collectors.toSet());
@Override
public void validate() throws Exception {
log.info(("CWLog validate starting"));
RetryHelper.retry(
getMaxRetryCount(),
CHECK_INTERVAL_IN_MILLI,
true,
() -> {
Instant startTime =
Instant.now().minusSeconds(CHECK_DURATION_IN_SECONDS).truncatedTo(ChronoUnit.MINUTES);
log.info("Start time is: " + startTime.toEpochMilli());
fetchAndValidateLogs(startTime);
});
}

// Extract the "body" field from each received message that is received from CloudWatch in JSON Format
var messageToValidate = receivedMessages.stream()
.map(message -> {
try {
JsonNode jsonNode = objectMapper.readTree(message);
return jsonNode.get("body").asText();
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
@Override
protected void fetchAndValidateLogs(Instant startTime) throws Exception {
log.info(("CWLog fetch starting"));
List<OutputLogEvent> logEvents =
cloudWatchService.getLogs(
logGroupName, logStreamName, startTime.toEpochMilli(), QUERY_LIMIT);
if (logEvents.isEmpty()) {
throw new BaseException(
ExceptionCode.LOG_FORMAT_NOT_MATCHED,
String.format(
"[StructuredLogValidator] no logs found under log stream %s" + " in log group %s",
logStreamName, logGroupName));
}
for (OutputLogEvent logEvent : logEvents) {
log.info("Log message: " + logEvent.getMessage());
validateJsonSchema(logEvent.getMessage());
}
}

// Validate the body field in JSON-messageToValidate with actual log lines from the log file.
assertThat(messageToValidate.containsAll(lines)).isTrue();
assertThat(messageToValidate).containsExactlyInAnyOrderElementsOf(lines);
return null;
});
@Override
protected void validateJsonSchema(String logEventMsg) throws Exception {
log.info("In validateJsonSchema");
JsonNode logEventNode = mapper.readTree(logEventMsg);
log.info("In validateJsonSchema - post readTree");
if (schema != null) {
log.info("In validateJsonSchema - schema isn't null");
ProcessingReport report = schema.validate(JsonLoader.fromString(logEventNode.toString()));
if (report.isSuccess()) {
// validatedSchema.add(key);
log.info("Report was a success");
} else {
// This will probably generate a lot of extra logs
// may want to log this to a different level in the future.
log.info("[StructuredLogValidator] failed to validate schema \n");
log.info(report.toString() + "\n");
}
}
}

@Override
protected int getMaxRetryCount() {
return MAX_RETRY_COUNT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.amazon.aoc.models.Context;
import com.amazon.aoc.models.ValidationConfig;
import com.amazon.aoc.services.TaskService;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class ValidatorFactory {
private Context context;

Expand All @@ -40,6 +42,7 @@ public ValidatorFactory(Context context) {
*/
public IValidator launchValidator(ValidationConfig validationConfig) throws Exception {
// get validator
log.info("validator factory launch started");
IValidator validator;
FileConfig expectedData = null;
switch (validationConfig.getValidationType()) {
Expand All @@ -51,12 +54,14 @@ public IValidator launchValidator(ValidationConfig validationConfig) throws Exce
validator = new LoadBalancingValidator();
break;
case "cw-metric":
validator = new CWLogValidator();
log.info("cw-metrics got picked");
validator = new CWMetricValidator();
expectedData = validationConfig.getExpectedMetricTemplate();
break;
case "cw-logs":
validator = new CWMetricValidator();
expectedData = validationConfig.getExpectedMetricTemplate();
log.info("cw-logs got picked");
validator = new CWLogValidator();
expectedData = validationConfig.getExpectedLogStructureTemplate();
break;
case "ecs-describe-task":
validator = new ECSHealthCheckValidator(new TaskService(), 10);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"body": "{{body}}",
"severity_number": 9,
"severity_text": "INFO",
"flags": 1,
"trace_id": "{{trace_id}}",
"span_id": "{{span_id}}",
"resource": {
"service.name": "{{resource.service.name}}"
}

}
]
Loading

0 comments on commit 3ba9457

Please sign in to comment.