From de4c875f7761bc644176abfcbdc8de64e43a1b7f Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 9 Oct 2024 09:48:12 -0700 Subject: [PATCH] Feature: health report api (#16520) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [health] bootstrap HealthObserver from agent to API (#16141) * [health] bootstrap HealthObserver from agent to API * specs: mocked agent needs health observer * add license headers * Merge `main` into `feature/health-report-api` (#16397) * Add GH vault plugin bot to allowed list (#16301) * regenerate webserver test certificates (#16331) * correctly handle stack overflow errors during pipeline compilation (#16323) This commit improves error handling when pipelines that are too big hit the Xss limit and throw a StackOverflowError. Currently the exception is printed outside of the logger, and doesn’t even show if log.format is json, leaving the user to wonder what happened. A couple of thoughts on the way this is implemented: * There should be a first barrier to handle pipelines that are too large based on the PipelineIR compilation. The barrier would use the detection of Xss to determine how big a pipeline could be. This however doesn't reduce the need to still handle a StackOverflow if it happens. * The catching of StackOverflowError could also be done on the WorkerLoop. However I'd suggest that this is unrelated to the Worker initialization itself, it just so happens that compiledPipeline.buildExecution is computed inside the WorkerLoop class for performance reasons. So I'd prefer logging to not come from the existing catch, but from a dedicated catch clause. Solves #16320 * Doc: Reposition worker-utilization in doc (#16335) * settings: add support for observing settings after post-process hooks (#16339) Because logging configuration occurs after loading the `logstash.yml` settings, deprecation logs from `LogStash::Settings::DeprecatedAlias#set` are effectively emitted to a null logger and lost. By re-emitting after the post-process hooks, we can ensure that they make their way to the deprecation log. This change adds support for any setting that responds to `Object#observe_post_process` to receive it after all post-processing hooks have been executed. Resolves: elastic/logstash#16332 * fix line used to determine ES is up (#16349) * add retries to snyk buildkite job (#16343) * Fix 8.13.1 release notes (#16363) make a note of the fix that went to 8.13.1: #16026 Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> * Update logstash_releases.json (#16347) * [Bugfix] Resolve the array and char (single | double quote) escaped values of ${ENV} (#16365) * Properly resolve the values from ENV vars if literal array string provided with ENV var. * Docker acceptance test for persisting keys and use actual values in docker container. * Review suggestion. Simplify the code by stripping whitespace before `gsub`, no need to check comma and split. Co-authored-by: João Duarte --------- Co-authored-by: João Duarte * Doc: Add SNMP integration to breaking changes (#16374) * deprecate java less-than 17 (#16370) * Exclude substitution refinement on pipelines.yml (#16375) * Exclude substitution refinement on pipelines.yml (applies on ENV vars and logstash.yml where env2yaml saves vars) * Safety integration test for pipeline config.string contains ENV . * Doc: Forwardport 8.15.0 release notes to main (#16388) * Removing 8.14 from ci/branches.json as we have 8.15. (#16390) --------- Co-authored-by: ev1yehor <146825775+ev1yehor@users.noreply.github.com> Co-authored-by: João Duarte Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> Co-authored-by: Andrea Selva Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * Squashed merge from 8.x * Failure injector plugin implementation. (#16466) * Test purpose only failure injector integration (filter and output) plugins implementation. Add unit tests and include license notes. * Fix the degrate method name typo. Co-authored-by: Andrea Selva * Add explanation to the config params and rebuild plugin gem. --------- Co-authored-by: Andrea Selva * Health report integration tests bootstrapper and initial tests implementation (#16467) * Health Report integration tests bootstrapper and initial slow start scenario implementation. * Apply suggestions from code review Renaming expectation check method name. Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> * Changed to branch concept, YAML structure simplified as changed to Dict. * Apply suggestions from code review Reflect `help_url` to the integration test. --------- Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> * health api: expose `GET /_health_report` with pipelines/*/status probe (#16398) Adds a `GET /_health_report` endpoint with per-pipeline status probes, and wires the resulting report status into the other API responses, replacing their hard-coded `green` with a meaningful status indication. --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * docs: health report API, and diagnosis links (feature-targeted) (#16518) * docs: health report API, and diagnosis links * Remove plus-for-passthrough markers Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> --------- Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * merge 8.x into feature branch... (#16519) * Add GH vault plugin bot to allowed list (#16301) * regenerate webserver test certificates (#16331) * correctly handle stack overflow errors during pipeline compilation (#16323) This commit improves error handling when pipelines that are too big hit the Xss limit and throw a StackOverflowError. Currently the exception is printed outside of the logger, and doesn’t even show if log.format is json, leaving the user to wonder what happened. A couple of thoughts on the way this is implemented: * There should be a first barrier to handle pipelines that are too large based on the PipelineIR compilation. The barrier would use the detection of Xss to determine how big a pipeline could be. This however doesn't reduce the need to still handle a StackOverflow if it happens. * The catching of StackOverflowError could also be done on the WorkerLoop. However I'd suggest that this is unrelated to the Worker initialization itself, it just so happens that compiledPipeline.buildExecution is computed inside the WorkerLoop class for performance reasons. So I'd prefer logging to not come from the existing catch, but from a dedicated catch clause. Solves #16320 * Doc: Reposition worker-utilization in doc (#16335) * settings: add support for observing settings after post-process hooks (#16339) Because logging configuration occurs after loading the `logstash.yml` settings, deprecation logs from `LogStash::Settings::DeprecatedAlias#set` are effectively emitted to a null logger and lost. By re-emitting after the post-process hooks, we can ensure that they make their way to the deprecation log. This change adds support for any setting that responds to `Object#observe_post_process` to receive it after all post-processing hooks have been executed. Resolves: elastic/logstash#16332 * fix line used to determine ES is up (#16349) * add retries to snyk buildkite job (#16343) * Fix 8.13.1 release notes (#16363) make a note of the fix that went to 8.13.1: #16026 Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> * Update logstash_releases.json (#16347) * [Bugfix] Resolve the array and char (single | double quote) escaped values of ${ENV} (#16365) * Properly resolve the values from ENV vars if literal array string provided with ENV var. * Docker acceptance test for persisting keys and use actual values in docker container. * Review suggestion. Simplify the code by stripping whitespace before `gsub`, no need to check comma and split. Co-authored-by: João Duarte --------- Co-authored-by: João Duarte * Doc: Add SNMP integration to breaking changes (#16374) * deprecate java less-than 17 (#16370) * Exclude substitution refinement on pipelines.yml (#16375) * Exclude substitution refinement on pipelines.yml (applies on ENV vars and logstash.yml where env2yaml saves vars) * Safety integration test for pipeline config.string contains ENV . * Doc: Forwardport 8.15.0 release notes to main (#16388) * Removing 8.14 from ci/branches.json as we have 8.15. (#16390) * Increase Jruby -Xmx to avoid OOM during zip task in DRA (#16408) Fix: #16406 * Generate Dataset code with meaningful fields names (#16386) This PR is intended to help Logstash developers or users that want to better understand the code that's autogenerated to model a pipeline, assigning more meaningful names to the Datasets subclasses' fields. Updates `FieldDefinition` to receive the name of the field from construction methods, so that it can be used during the code generation phase, instead of the existing incremental `field%n`. Updates `ClassFields` to propagate the explicit field name down to the `FieldDefinitions`. Update the `DatasetCompiler` that add fields to `ClassFields` to assign a proper name to generated Dataset's fields. * Implements safe evaluation of conditional expressions, logging the error without killing the pipeline (#16322) This PR protects the if statements against expression evaluation errors, cancel the event under processing and log it. This avoids to crash the pipeline which encounter a runtime error during event condition evaluation, permitting to debug the root cause reporting the offending event and removing from the current processing batch. Translates the `org.jruby.exceptions.TypeError`, `IllegalArgumentException`, `org.jruby.exceptions.ArgumentError` that could happen during `EventCodition` evaluation into a custom `ConditionalEvaluationError` which bubbles up on AST tree nodes. It's catched in the `SplitDataset` node. Updates the generation of the `SplitDataset `so that the execution of `filterEvents` method inside the compute body is try-catch guarded and defer the execution to an instance of `AbstractPipelineExt.ConditionalEvaluationListener` to handle such error. In this particular case the error management consist in just logging the offending Event. --------- Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> * Update logstash_releases.json (#16426) * Release notes for 8.15.1 (#16405) (#16427) * Update release notes for 8.15.1 * update release note --------- Co-authored-by: logstashmachine <43502315+logstashmachine@users.noreply.github.com> Co-authored-by: Kaise Cheng (cherry picked from commit 2fca7e39e87c20fcfcd934e984720173ce3417e8) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * Fix ConditionalEvaluationError to do not include the event that errored in its serialiaxed form, because it's not expected that this class is ever serialized. (#16429) (#16430) Make inner field of ConditionalEvaluationError transient to be avoided during serialization. (cherry picked from commit bb7ecc203f698a56f341fa538bdc1cd4da15b28c) Co-authored-by: Andrea Selva * use gnu tar compatible minitar to generate tar artifact (#16432) (#16434) Using VERSION_QUALIFIER when building the tarball distribution will fail since Ruby's TarWriter implements the older POSIX88 version of tar and paths will be longer than 100 characters. For the long paths being used in Logstash's plugins, mainly due to nested folders from jar-dependencies, we need the tarball to follow either the 2001 ustar format or gnu tar, which is implemented by the minitar gem. (cherry picked from commit 69f0fa54ca07cb3f822846745fdbdd1504175cfb) Co-authored-by: João Duarte * account for the 8.x in DRA publishing task (#16436) (#16440) the current DRA publishing task computes the branch from the version contained in the version.yml This is done by taking the major.minor and confirming that a branch exists with that name. However this pattern won't be applicable for 8.x, as that branch currently points to 8.16.0 and there is no 8.16 branch. This commit falls back to reading the buildkite injected BUILDKITE_BRANCH variable. (cherry picked from commit 17dba9f829a2514aba295ed7a8fa21655b55c86b) Co-authored-by: João Duarte * Fixes the issue where LS wipes out all quotes from docker env variables. (#16456) (#16459) * Fixes the issue where LS wipes out all quotes from docker env variables. This is an issue when running LS on docker with CONFIG_STRING, needs to keep quotes with env variable. * Add a docker acceptance integration test. (cherry picked from commit 7c64c7394bf47e8b5316710876ed55350df46d61) Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * Known issue for 8.15.1 related to env vars references (#16455) (#16469) (cherry picked from commit b54caf3fd8e907c526ab2b8897ce4de4656c2fd5) Co-authored-by: Luca Belluccini * bump .ruby_version to jruby-9.4.8.0 (#16477) (#16480) (cherry picked from commit 51cca7320e5c54865ab3fe2d4101496bd69cacca) Co-authored-by: João Duarte * Release notes for 8.15.2 (#16471) (#16478) Co-authored-by: andsel Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> (cherry picked from commit 01dc76f3b55333f0c49d7190c0cd4ca14b74a7c0) * Change LogStash::Util::SubstitutionVariables#replace_placeholders refine argument to optional (#16485) (#16488) (cherry picked from commit 8368c00367cac0c5f5e0090c26be8795b2e8c7d2) Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com> * Use jruby-9.4.8.0 in exhaustive CIs. (#16489) (#16491) (cherry picked from commit fd1de39005cf4646d8faa3f89b1963c716ec6088) Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * Don't use an older JRuby with oraclelinux-7 (#16499) (#16501) A recent PR (elastic/ci-agent-images/pull/932) modernized the VM images and removed JRuby 9.4.5.0 and some older versions. This ended up breaking exhaustive test on Oracle Linux 7 that hard coded JRuby 9.4.5.0. PR https://github.com/elastic/logstash/pull/16489 worked around the problem by pinning to the new JRuby, but actually we don't need the conditional anymore since the original issue https://github.com/jruby/jruby/issues/7579#issuecomment-1425885324 has been resolved and none of our releasable branches (apart from 7.17 which uses `9.2.20.1`) specify `9.3.x.y` in `/.ruby-version`. Therefore, this commit removes conditional setting of JRuby for OracleLinux 7 agents in exhaustive tests (and relies on whatever `/.ruby-version` defines). (cherry picked from commit 07c01f8231daf14113b2ce57791712ec74365799) Co-authored-by: Dimitrios Liappis * Improve pipeline bootstrap error logs (#16495) (#16504) This PR adds the cause errors details on the pipeline converge state error logs (cherry picked from commit e84fb458ce2f092e065c63df649222f8cbda8c44) Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com> * Logstash Health Report Tests Buildkite pipeline setup. (#16416) (#16511) (cherry picked from commit 5195332bc6a758198cae70fea7d88dfddf0fa15a) Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * Make health report test runner script executable. (#16446) (#16512) (cherry picked from commit 2ebf2658ff86678125b04c8826958b468ee0da1f) Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> * Backport PR #16423 to 8.x: DLQ-ing events that trigger an conditional evaluation error. (#16493) * DLQ-ing events that trigger an conditional evaluation error. (#16423) When a conditional evaluation encounter an error in the expression the event that triggered the issue is sent to pipeline's DLQ, if enabled for the executing pipeline. This PR engage with the work done in #16322, the `ConditionalEvaluationListener` that is receives notifications about if-statements evaluation failure, is improved to also send the event to DLQ (if enabled in the pipeline) and not just logging it. (cherry picked from commit b69d993d718dfd639603cdb5d340947b09a6687a) * Fixed warning about non serializable field DeadLetterQueueWriter in serializable AbstractPipelineExt --------- Co-authored-by: Andrea Selva * add deprecation log for `--event_api.tags.illegal` (#16507) (#16515) - move `--event_api.tags.illegal` from option to deprecated_option - add deprecation log when the flag is explicitly used relates: #16356 Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> (cherry picked from commit a4eddb8a2a79c7e1eb7696140795580427792cb1) Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> --------- Co-authored-by: ev1yehor <146825775+ev1yehor@users.noreply.github.com> Co-authored-by: João Duarte Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> Co-authored-by: Andrea Selva Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Luca Belluccini Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com> Co-authored-by: Dimitrios Liappis --------- Co-authored-by: ev1yehor <146825775+ev1yehor@users.noreply.github.com> Co-authored-by: João Duarte Co-authored-by: Karen Metts <35154725+karenzone@users.noreply.github.com> Co-authored-by: Andrea Selva Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Luca Belluccini Co-authored-by: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com> Co-authored-by: Dimitrios Liappis (cherry picked from commit 7eb5185b4e75061cfa0a091a8bafb622eab5a2f2) --- .../scripts/health-report-tests/README.md | 18 ++ .../scripts/health-report-tests/__init__.py | 0 .../scripts/health-report-tests/bootstrap.py | 101 +++++++++ .../health-report-tests/config/pipelines.yml | 1 + .../health-report-tests/config_validator.py | 69 ++++++ .../logstash_health_report.py | 16 ++ .../scripts/health-report-tests/main.py | 87 ++++++++ .../scripts/health-report-tests/main.sh | 8 +- .../health-report-tests/requirements.txt | 2 + .../health-report-tests/scenario_executor.py | 65 ++++++ .../tests/abnormal-termination.yaml | 31 +++ .../tests/normal-termination.yaml | 29 +++ .../health-report-tests/tests/slow-start.yaml | 30 +++ .../scripts/health-report-tests/util.py | 35 +++ .../monitoring/monitoring-apis.asciidoc | 156 ++++++++++++- docs/static/releasenotes.asciidoc | 2 +- .../health-pipeline-status.asciidoc | 37 ++++ .../troubleshoot/troubleshooting.asciidoc | 1 + logstash-core/build.gradle | 18 ++ logstash-core/lib/logstash/agent.rb | 30 +++ .../lib/logstash/api/command_factory.rb | 2 + .../logstash/api/commands/default_metadata.rb | 24 +- .../logstash/api/commands/health_report.rb | 31 +++ .../lib/logstash/api/modules/health_report.rb | 49 +++++ logstash-core/lib/logstash/api/rack_app.rb | 2 + logstash-core/lib/logstash/java_pipeline.rb | 11 + .../lib/logstash/pipeline_action/create.rb | 8 + .../lib/logstash/pipeline_action/delete.rb | 5 + .../pipeline_action/stop_and_delete.rb | 5 + .../lib/logstash/pipelines_registry.rb | 16 ++ .../spec/logstash/java_pipeline_spec.rb | 3 + .../logstash/pipeline_action/create_spec.rb | 6 + .../logstash/pipeline_action/delete_spec.rb | 78 +++++++ .../pipeline_action/stop_and_delete_spec.rb | 79 +++++++ logstash-core/spec/logstash/webserver_spec.rb | 8 +- .../src/main/java/org/logstash/Logstash.java | 20 ++ .../ext/JrubyWrappedSynchronousQueueExt.java | 1 + .../org/logstash/health/ApiHealthReport.java | 60 +++++ .../java/org/logstash/health/Diagnosis.java | 113 ++++++++++ .../org/logstash/health/HealthObserver.java | 64 ++++++ .../java/org/logstash/health/HelpUrl.java | 68 ++++++ .../main/java/org/logstash/health/Impact.java | 111 ++++++++++ .../java/org/logstash/health/ImpactArea.java | 43 ++++ .../java/org/logstash/health/Indicator.java | 31 +++ .../org/logstash/health/MultiIndicator.java | 164 ++++++++++++++ .../logstash/health/PipelineIndicator.java | 195 +++++++++++++++++ .../main/java/org/logstash/health/Probe.java | 93 ++++++++ .../org/logstash/health/ProbeIndicator.java | 205 ++++++++++++++++++ .../org/logstash/health/ReportContext.java | 58 +++++ .../main/java/org/logstash/health/Status.java | 67 ++++++ .../logstash/health/ProbeIndicatorTest.java | 128 +++++++++++ .../java/org/logstash/health/StatusTest.java | 106 +++++++++ .../README.md | 10 + .../lib/logstash/filters/failure_injector.rb | 91 ++++++++ .../lib/logstash/outputs/failure_injector.rb | 91 ++++++++ .../logstash/utils/failure_injector_util.rb | 37 ++++ ...ash-integration-failure_injector-0.0.1.gem | Bin 0 -> 7168 bytes ...stash-integration-failure_injector.gemspec | 26 +++ .../spec/unit/failure_injector_spec.rb | 129 +++++++++++ 59 files changed, 2955 insertions(+), 19 deletions(-) create mode 100644 .buildkite/scripts/health-report-tests/README.md create mode 100644 .buildkite/scripts/health-report-tests/__init__.py create mode 100644 .buildkite/scripts/health-report-tests/bootstrap.py create mode 100644 .buildkite/scripts/health-report-tests/config/pipelines.yml create mode 100644 .buildkite/scripts/health-report-tests/config_validator.py create mode 100644 .buildkite/scripts/health-report-tests/logstash_health_report.py create mode 100644 .buildkite/scripts/health-report-tests/main.py create mode 100644 .buildkite/scripts/health-report-tests/requirements.txt create mode 100644 .buildkite/scripts/health-report-tests/scenario_executor.py create mode 100644 .buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml create mode 100644 .buildkite/scripts/health-report-tests/tests/normal-termination.yaml create mode 100644 .buildkite/scripts/health-report-tests/tests/slow-start.yaml create mode 100644 .buildkite/scripts/health-report-tests/util.py create mode 100644 docs/static/troubleshoot/health-pipeline-status.asciidoc create mode 100644 logstash-core/lib/logstash/api/commands/health_report.rb create mode 100644 logstash-core/lib/logstash/api/modules/health_report.rb create mode 100644 logstash-core/spec/logstash/pipeline_action/delete_spec.rb create mode 100644 logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb create mode 100644 logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java create mode 100644 logstash-core/src/main/java/org/logstash/health/Diagnosis.java create mode 100644 logstash-core/src/main/java/org/logstash/health/HealthObserver.java create mode 100644 logstash-core/src/main/java/org/logstash/health/HelpUrl.java create mode 100644 logstash-core/src/main/java/org/logstash/health/Impact.java create mode 100644 logstash-core/src/main/java/org/logstash/health/ImpactArea.java create mode 100644 logstash-core/src/main/java/org/logstash/health/Indicator.java create mode 100644 logstash-core/src/main/java/org/logstash/health/MultiIndicator.java create mode 100644 logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java create mode 100644 logstash-core/src/main/java/org/logstash/health/Probe.java create mode 100644 logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java create mode 100644 logstash-core/src/main/java/org/logstash/health/ReportContext.java create mode 100644 logstash-core/src/main/java/org/logstash/health/Status.java create mode 100644 logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java create mode 100644 logstash-core/src/test/java/org/logstash/health/StatusTest.java create mode 100644 qa/support/logstash-integration-failure_injector/README.md create mode 100644 qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb create mode 100644 qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb create mode 100644 qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb create mode 100644 qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem create mode 100644 qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec create mode 100644 qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb diff --git a/.buildkite/scripts/health-report-tests/README.md b/.buildkite/scripts/health-report-tests/README.md new file mode 100644 index 00000000000..99bd198a823 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/README.md @@ -0,0 +1,18 @@ +## Description +This package for integration tests of the Health Report API. +Export `LS_BRANCH` to run on a specific branch. By default, it uses the main branch. + +## How to run the Health Report Integration test? +### Prerequisites +Make sure you have python installed. Install the integration test dependencies with the following command: +```shell +python3 -mpip install -r .buildkite/scripts/health-report-tests/requirements.txt +``` + +### Run the integration tests +```shell +python3 .buildkite/scripts/health-report-tests/main.py +``` + +### Troubleshooting +- If you get `WARNING: pip is configured with locations that require TLS/SSL,...` warning message, make sure you have python >=3.12.4 installed. \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/__init__.py b/.buildkite/scripts/health-report-tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/.buildkite/scripts/health-report-tests/bootstrap.py b/.buildkite/scripts/health-report-tests/bootstrap.py new file mode 100644 index 00000000000..180592b4e53 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/bootstrap.py @@ -0,0 +1,101 @@ +""" +Health Report Integration test bootstrapper with Python script + - A script to resolve Logstash version if not provided + - Download LS docker image and spin up + - When tests finished, teardown the Logstash +""" +import os +import subprocess +import util +import yaml + + +class Bootstrap: + ELASTIC_STACK_VERSIONS_URL = "https://artifacts-api.elastic.co/v1/versions" + + def __init__(self) -> None: + f""" + A constructor of the {Bootstrap}. + Returns: + Resolves Logstash branch considering provided LS_BRANCH + Checks out git branch + """ + logstash_branch = os.environ.get("LS_BRANCH") + if logstash_branch is None: + # version is not specified, use the main branch, no need to git checkout + print(f"LS_BRANCH is not specified, using main branch.") + else: + # LS_BRANCH accepts major latest as a major.x or specific branch as X.Y + if logstash_branch.find(".x") == -1: + print(f"Using specified branch: {logstash_branch}") + util.git_check_out_branch(logstash_branch) + else: + major_version = logstash_branch.split(".")[0] + if major_version and major_version.isnumeric(): + resolved_version = self.__resolve_latest_stack_version_for(major_version) + minor_version = resolved_version.split(".")[1] + branch = major_version + "." + minor_version + print(f"Using resolved branch: {branch}") + util.git_check_out_branch(branch) + else: + raise ValueError(f"Invalid value set to LS_BRANCH. Please set it properly (ex: 8.x or 9.0) and " + f"rerun again") + + def __resolve_latest_stack_version_for(self, major_version: str) -> str: + resolved_version = "" + response = util.call_url_with_retry(self.ELASTIC_STACK_VERSIONS_URL) + release_versions = response.json()["versions"] + for release_version in reversed(release_versions): + if release_version.find("SNAPSHOT") > 0: + continue + if release_version.split(".")[0] == major_version: + print(f"Resolved latest version for {major_version} is {release_version}.") + resolved_version = release_version + break + + if resolved_version == "": + raise ValueError(f"Cannot resolve latest version for {major_version} major") + return resolved_version + + def install_plugin(self, plugin_path: str) -> None: + util.run_or_raise_error( + ["bin/logstash-plugin", "install", plugin_path], + f"Failed to install {plugin_path}") + + def build_logstash(self): + print(f"Building Logstash.") + util.run_or_raise_error( + ["./gradlew", "clean", "bootstrap", "assemble", "installDefaultGems"], + "Failed to build Logstash") + print(f"Logstash has successfully built.") + + def apply_config(self, config: dict) -> None: + with open(os.getcwd() + "/.buildkite/scripts/health-report-tests/config/pipelines.yml", 'w') as pipelines_file: + yaml.dump(config, pipelines_file) + + def run_logstash(self, full_start_required: bool) -> subprocess.Popen: + # --config.reload.automatic is to make instance active + # it is helpful when testing crash pipeline cases + config_path = os.getcwd() + "/.buildkite/scripts/health-report-tests/config" + process = subprocess.Popen(["bin/logstash", "--config.reload.automatic", "--path.settings", config_path, + "-w 1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=False) + if process.poll() is not None: + print(f"Logstash failed to run, check the the config and logs, then rerun.") + return None + + # Read stdout and stderr in real-time + logs = [] + for stdout_line in iter(process.stdout.readline, ""): + logs.append(stdout_line.strip()) + # we don't wait for Logstash fully start as we also test slow pipeline start scenarios + if full_start_required is False and "Starting pipeline" in stdout_line: + break + if full_start_required is True and "Pipeline started" in stdout_line: + break + if "Logstash shut down" in stdout_line or "Logstash stopped" in stdout_line: + print(f"Logstash couldn't spin up.") + print(logs) + return None + + print(f"Logstash is running with PID: {process.pid}.") + return process diff --git a/.buildkite/scripts/health-report-tests/config/pipelines.yml b/.buildkite/scripts/health-report-tests/config/pipelines.yml new file mode 100644 index 00000000000..cfa2d9632f9 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/config/pipelines.yml @@ -0,0 +1 @@ +# Intentionally left blank \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/config_validator.py b/.buildkite/scripts/health-report-tests/config_validator.py new file mode 100644 index 00000000000..a0b6df9b72d --- /dev/null +++ b/.buildkite/scripts/health-report-tests/config_validator.py @@ -0,0 +1,69 @@ +import yaml +from typing import Any, List, Dict + + +class ConfigValidator: + REQUIRED_KEYS = { + "root": ["name", "config", "conditions", "expectation"], + "config": ["pipeline.id", "config.string"], + "conditions": ["full_start_required"], + "expectation": ["status", "symptom", "indicators"], + "indicators": ["pipelines"], + "pipelines": ["status", "symptom", "indicators"], + "DYNAMIC": ["status", "symptom", "diagnosis", "impacts", "details"], + "details": ["status"], + "status": ["state"] + } + + def __init__(self): + self.yaml_content = None + + def __has_valid_keys(self, data: any, key_path: str, repeated: bool) -> bool: + if isinstance(data, str) or isinstance(data, bool): # we reached values + return True + + # we have two indicators section and for the next repeated ones, we go deeper + first_key = next(iter(data)) + data = data[first_key] if repeated and key_path == "indicators" else data + + if isinstance(data, dict): + # pipeline-id is a DYNAMIC + required = self.REQUIRED_KEYS.get("DYNAMIC" if repeated and key_path == "indicators" else key_path, []) + repeated = not repeated if key_path == "indicators" else repeated + for key in required: + if key not in data: + print(f"Missing key '{key}' in '{key_path}'") + return False + else: + dic_keys_result = self.__has_valid_keys(data[key], key, repeated) + if dic_keys_result is False: + return False + elif isinstance(data, list): + for item in data: + list_keys_result = self.__has_valid_keys(item, key_path, repeated) + if list_keys_result is False: + return False + return True + + def load(self, file_path: str) -> None: + """Load the YAML file content into self.yaml_content.""" + self.yaml_content: [Dict[str, Any]] = None + try: + with open(file_path, 'r') as file: + self.yaml_content = yaml.safe_load(file) + except yaml.YAMLError as exc: + print(f"Error in YAML file: {exc}") + self.yaml_content = None + + def is_valid(self) -> bool: + """Validate the entire YAML structure.""" + if self.yaml_content is None: + print(f"YAML content is empty.") + return False + + if not isinstance(self.yaml_content, dict): + print(f"YAML structure is not as expected, it should start with a Dict.") + return False + + result = self.__has_valid_keys(self.yaml_content, "root", False) + return True if result is True else False diff --git a/.buildkite/scripts/health-report-tests/logstash_health_report.py b/.buildkite/scripts/health-report-tests/logstash_health_report.py new file mode 100644 index 00000000000..6b00cff07bb --- /dev/null +++ b/.buildkite/scripts/health-report-tests/logstash_health_report.py @@ -0,0 +1,16 @@ +""" +A class to provide information about Logstash node stats. +""" + +import util + + +class LogstashHealthReport: + LOGSTASH_HEALTH_REPORT_URL = "http://localhost:9600/_health_report" + + def __init__(self): + pass + + def get(self): + response = util.call_url_with_retry(self.LOGSTASH_HEALTH_REPORT_URL) + return response.json() diff --git a/.buildkite/scripts/health-report-tests/main.py b/.buildkite/scripts/health-report-tests/main.py new file mode 100644 index 00000000000..bccfe7fe0c5 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/main.py @@ -0,0 +1,87 @@ +""" + Main entry point of the LS health report API integration test suites +""" +import glob +import os +import time +import traceback +import yaml +from bootstrap import Bootstrap +from scenario_executor import ScenarioExecutor +from config_validator import ConfigValidator + + +class BootstrapContextManager: + + def __init__(self): + pass + + def __enter__(self): + print(f"Starting Logstash Health Report Integration test.") + self.bootstrap = Bootstrap() + self.bootstrap.build_logstash() + + plugin_path = os.getcwd() + "/qa/support/logstash-integration-failure_injector/logstash-integration" \ + "-failure_injector-*.gem" + matching_files = glob.glob(plugin_path) + if len(matching_files) == 0: + raise ValueError(f"Could not find logstash-integration-failure_injector plugin.") + + self.bootstrap.install_plugin(matching_files[0]) + print(f"logstash-integration-failure_injector successfully installed.") + return self.bootstrap + + def __exit__(self, exc_type, exc_value, exc_traceback): + if exc_type is not None: + print(traceback.format_exception(exc_type, exc_value, exc_traceback)) + + +def main(): + with BootstrapContextManager() as bootstrap: + scenario_executor = ScenarioExecutor() + config_validator = ConfigValidator() + + working_dir = os.getcwd() + scenario_files_path = working_dir + "/.buildkite/scripts/health-report-tests/tests/*.yaml" + scenario_files = glob.glob(scenario_files_path) + + for scenario_file in scenario_files: + print(f"Validating {scenario_file} scenario file.") + config_validator.load(scenario_file) + if config_validator.is_valid() is False: + print(f"{scenario_file} scenario file is not valid.") + return + else: + print(f"Validation succeeded.") + + has_failed_scenario = False + for scenario_file in scenario_files: + with open(scenario_file, 'r') as file: + # scenario_content: Dict[str, Any] = None + scenario_content = yaml.safe_load(file) + print(f"Testing `{scenario_content.get('name')}` scenario.") + scenario_name = scenario_content['name'] + + is_full_start_required = next(sub.get('full_start_required') for sub in + scenario_content.get('conditions') if 'full_start_required' in sub) + config = scenario_content['config'] + if config is not None: + bootstrap.apply_config(config) + expectations = scenario_content.get("expectation") + process = bootstrap.run_logstash(is_full_start_required) + if process is not None: + try: + scenario_executor.on(scenario_name, expectations) + except Exception as e: + print(e) + has_failed_scenario = True + process.terminate() + time.sleep(5) # leave some window to terminate the process + + if has_failed_scenario: + # intentionally fail due to visibility + raise Exception("Some of scenarios failed, check the log for details.") + + +if __name__ == "__main__": + main() diff --git a/.buildkite/scripts/health-report-tests/main.sh b/.buildkite/scripts/health-report-tests/main.sh index c31a4b120e3..8b0dd00cd5f 100755 --- a/.buildkite/scripts/health-report-tests/main.sh +++ b/.buildkite/scripts/health-report-tests/main.sh @@ -1,9 +1,5 @@ #!/usr/bin/env bash set -eo pipefail -# TODO: -# if branch is specified with X.Y, pull branches from ACTIVE_BRANCHES_URL="https://raw.githubusercontent.com/elastic/logstash/main/ci/branches.json", parse and use -# build Logstash from specificed (ex: 8.x -> translates to 8.latest, 8.16) branch, defaults to main -# install requirements of the python package and run main.py - - +python3 -mpip install -r .buildkite/scripts/health-report-tests/requirements.txt +python3 .buildkite/scripts/health-report-tests/main.py diff --git a/.buildkite/scripts/health-report-tests/requirements.txt b/.buildkite/scripts/health-report-tests/requirements.txt new file mode 100644 index 00000000000..c48e502117b --- /dev/null +++ b/.buildkite/scripts/health-report-tests/requirements.txt @@ -0,0 +1,2 @@ +requests==2.32.3 +pyyaml==6.0.2 \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/scenario_executor.py b/.buildkite/scripts/health-report-tests/scenario_executor.py new file mode 100644 index 00000000000..2db8a31d850 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/scenario_executor.py @@ -0,0 +1,65 @@ +""" +A class to execute the given scenario for Logstash Health Report integration test +""" +import time +from logstash_health_report import LogstashHealthReport + + +class ScenarioExecutor: + logstash_health_report_api = LogstashHealthReport() + + def __init__(self): + pass + + def __has_intersection(self, expects, results): + # we expect expects to be existing in results + for expect in expects: + for result in results: + if result.get('help_url') and "health-report-pipeline-status.html#" not in result.get('help_url'): + return False + if not all(key in result and result[key] == value for key, value in expect.items()): + return False + return True + + def __get_difference(self, differences: list, expectations: dict, reports: dict) -> dict: + for key in expectations.keys(): + + if type(expectations.get(key)) != type(reports.get(key)): + differences.append(f"Scenario expectation and Health API report structure differs for {key}.") + return differences + + if isinstance(expectations.get(key), str): + if expectations.get(key) != reports.get(key): + differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}}) + continue + elif isinstance(expectations.get(key), dict): + self.__get_difference(differences, expectations.get(key), reports.get(key)) + elif isinstance(expectations.get(key), list): + if not self.__has_intersection(expectations.get(key), reports.get(key)): + differences.append({key: {"expected": expectations.get(key), "got": reports.get(key)}}) + return differences + + def __is_expected(self, expectations: dict) -> None: + reports = self.logstash_health_report_api.get() + differences = self.__get_difference([], expectations, reports) + if differences: + print("Differences found in 'expectation' section between YAML content and stats:") + for diff in differences: + print(f"Difference: {diff}") + return False + else: + return True + + def on(self, scenario_name: str, expectations: dict) -> None: + # retriable check the expectations + attempts = 5 + while self.__is_expected(expectations) is False: + attempts = attempts - 1 + if attempts == 0: + break + time.sleep(1) + + if attempts == 0: + raise Exception(f"{scenario_name} failed.") + else: + print(f"Scenario `{scenario_name}` expectaion meets the health report stats.") diff --git a/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml b/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml new file mode 100644 index 00000000000..219d8e93b7b --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/abnormal-termination.yaml @@ -0,0 +1,31 @@ +name: "Abnormally terminated pipeline" +config: + - pipeline.id: abnormally-terminated-pp + config.string: | + input { heartbeat { interval => 1 } } + filter { failure_injector { crash_at => filter } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: true +expectation: + status: "red" + symptom: "1 indicator is unhealthy (`pipelines`)" + indicators: + pipelines: + status: "red" + symptom: "1 indicator is unhealthy (`abnormally-terminated-pp`)" + indicators: + abnormally-terminated-pp: + status: "red" + symptom: "The pipeline is unhealthy; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline is not running, likely because it has encountered an error" + - action: "view logs to determine the cause of abnormal pipeline shutdown" + impacts: + - description: "the pipeline is not currently processing" + - impact_areas: ["pipeline_execution"] + details: + status: + state: "TERMINATED" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml b/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml new file mode 100644 index 00000000000..86a05deb5fa --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/normal-termination.yaml @@ -0,0 +1,29 @@ +name: "Successfully terminated pipeline" +config: + - pipeline.id: normally-terminated-pp + config.string: | + input { generator { count => 1 } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: true +expectation: + status: "yellow" + symptom: "1 indicator is concerning (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`normally-terminated-pp`)" + indicators: + normally-terminated-pp: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline has finished running because its inputs have been closed and events have been processed" + - action: "if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events" + impacts: + - impact_areas: ["pipeline_execution"] + details: + status: + state: "FINISHED" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/tests/slow-start.yaml b/.buildkite/scripts/health-report-tests/tests/slow-start.yaml new file mode 100644 index 00000000000..d036391a9c3 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/tests/slow-start.yaml @@ -0,0 +1,30 @@ +name: "Slow start pipeline" +config: + - pipeline.id: slow-start-pp + config.string: | + input { heartbeat {} } + filter { failure_injector { degrade_at => [register] } } + output { stdout {} } + pipeline.workers: 1 + pipeline.batch.size: 1 +conditions: + - full_start_required: false +expectation: + status: "yellow" + symptom: "1 indicator is concerning (`pipelines`)" + indicators: + pipelines: + status: "yellow" + symptom: "1 indicator is concerning (`slow-start-pp`)" + indicators: + slow-start-pp: + status: "yellow" + symptom: "The pipeline is concerning; 1 area is impacted and 1 diagnosis is available" + diagnosis: + - cause: "pipeline is loading" + - action: "if pipeline does not come up quickly, you may need to check the logs to see if it is stalled" + impacts: + - impact_areas: ["pipeline_execution"] + details: + status: + state: "LOADING" \ No newline at end of file diff --git a/.buildkite/scripts/health-report-tests/util.py b/.buildkite/scripts/health-report-tests/util.py new file mode 100644 index 00000000000..c7e840f06f4 --- /dev/null +++ b/.buildkite/scripts/health-report-tests/util.py @@ -0,0 +1,35 @@ +import requests +import subprocess +from requests.adapters import HTTPAdapter, Retry + + +def call_url_with_retry(url: str, max_retries: int = 5, delay: int = 1) -> requests.Response: + f""" + Calls the given {url} with maximum of {max_retries} retries with {delay} delay. + """ + schema = "https://" if "https://" in url else "http://" + session = requests.Session() + # retry on most common failures such as connection timeout(408), etc... + retries = Retry(total=max_retries, backoff_factor=delay, status_forcelist=[408, 502, 503, 504]) + session.mount(schema, HTTPAdapter(max_retries=retries)) + return session.get(url) + + +def git_check_out_branch(branch_name: str) -> None: + f""" + Checks out specified branch or fails with error if checkout operation fails. + """ + run_or_raise_error(["git", "checkout", branch_name], + "Error occurred while checking out the " + branch_name + " branch") + + +def run_or_raise_error(commands: list, error_message): + f""" + Executes the {list} commands and raises an {Exception} if opration fails. + """ + result = subprocess.run(commands, universal_newlines=True, stdout=subprocess.PIPE) + if result.returncode != 0: + full_error_message = (error_message + ", output: " + result.stdout.decode('utf-8')) \ + if result.stdout else error_message + raise Exception(f"{full_error_message}") + diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index 897507d1e22..68b4a0b8378 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -2,13 +2,13 @@ [[monitoring]] == APIs for monitoring {ls} -{ls} provides monitoring APIs for retrieving runtime metrics -about {ls}: +{ls} provides monitoring APIs for retrieving runtime information about {ls}: * <> * <> * <> * <> +* <> You can use the root resource to retrieve general information about the Logstash instance, including @@ -1184,3 +1184,155 @@ Example of a human-readable response: org.jruby.internal.runtime.NativeThread.join(NativeThread.java:75) -------------------------------------------------- + + +[[logstash-health-report-api]] +=== Health report API + +An API that reports the health status of Logstash. + +[source,js] +-------------------------------------------------- +curl -XGET 'localhost:9600/_health_report?pretty' +-------------------------------------------------- + +==== Description + +The health API returns a report with the health status of Logstash and the pipelines that are running inside of it. +The report contains a list of indicators that compose Logstash functionality. + +Each indicator has a health status of: `green`, `unknown`, `yellow`, or `red`. +The indicator will provide an explanation and metadata describing the reason for its current health status. + +The top-level status is controlled by the worst indicator status. + +In the event that an indicator's status is non-green, a list of impacts may be present in the indicator result which detail the functionalities that are negatively affected by the health issue. +Each impact carries with it a severity level, an area of the system that is affected, and a simple description of the impact on the system. + +Some health indicators can determine the root cause of a health problem and prescribe a set of steps that can be performed in order to improve the health of the system. +The root cause and remediation steps are encapsulated in a `diagnosis`. +A diagnosis contains a cause detailing a root cause analysis, an action containing a brief description of the steps to take to fix the problem, and the URL for detailed troubleshooting help. + +NOTE: The health indicators perform root cause analysis of non-green health statuses. + This can be computationally expensive when called frequently. + +==== Response body + +`status`:: +(Optional, string) Health status of {ls}, based on the aggregated status of all indicators. Statuses are: + +`green`::: +{ls} is healthy. + +`unknown`::: +The health of {ls} could not be determined. + +`yellow`::: +The functionality of {ls} is in a degraded state and may need remediation to avoid the health becoming `red`. + +`red`::: +{ls} is experiencing an outage or certain features are unavailable for use. + +`indicators`:: +(object) Information about the health of the {ls} indicators. + ++ +.Properties of `indicators` +[%collapsible%open] +==== +``:: +(object) Contains health results for an indicator. ++ +.Properties of `` +[%collapsible%open] +======= +`status`:: +(string) Health status of the indicator. Statuses are: + +`green`::: +The indicator is healthy. + +`unknown`::: +The health of the indicator could not be determined. + +`yellow`::: +The functionality of an indicator is in a degraded state and may need remediation to avoid the health becoming `red`. + +`red`::: +The indicator is experiencing an outage or certain features are unavailable for use. + +`symptom`:: +(string) A message providing information about the current health status. + +`details`:: +(Optional, object) An object that contains additional information about the indicator that has lead to the current health status result. +Each indicator has <>. + +`impacts`:: +(Optional, array) If a non-healthy status is returned, indicators may include a list of impacts that this health status will have on {ls}. ++ +.Properties of `impacts` +[%collapsible%open] +======== +`severity`:: +(integer) How important this impact is to the functionality of {ls}. +A value of 1 is the highest severity, with larger values indicating lower severity. + +`description`:: +(string) A description of the impact on {ls}. + +`impact_areas`:: +(array of strings) The areas {ls} functionality that this impact affects. +Possible values are: ++ +-- +* `pipeline_execution` +-- + +======== + +`diagnosis`:: +(Optional, array) If a non-healthy status is returned, indicators may include a list of diagnosis that encapsulate the cause of the health issue and an action to take in order to remediate the problem. ++ +.Properties of `diagnosis` +[%collapsible%open] +======== +`cause`:: +(string) A description of a root cause of this health problem. + +`action`:: +(string) A brief description the steps that should be taken to remediate the problem. +A more detailed step-by-step guide to remediate the problem is provided by the `help_url` field. + +`help_url`:: +(string) A link to the troubleshooting guide that'll fix the health problem. +======== +======= +==== + +[role="child_attributes"] +[[logstash-health-api-response-details]] +==== Indicator Details + +Each health indicator in the health API returns a set of details that further explains the state of the system. +The details have contents and a structure that is unique to each indicator. + +[[logstash-health-api-response-details-pipeline]] +===== Pipeline Indicator Details + +`pipelines/indicators//details`:: +(object) Information about the specified pipeline. ++ +.Properties of `pipelines/indicators//details` +[%collapsible%open] +==== +`status`:: +(object) Details related to the pipeline's current status and run-state. ++ +.Properties of `status` +[%collapsible%open] +======== +`state`:: +(string) The current state of the pipeline, including whether it is `loading`, `running`, `finished`, or `terminated`. +======== +==== diff --git a/docs/static/releasenotes.asciidoc b/docs/static/releasenotes.asciidoc index 72d3dcbab72..0151b278af7 100644 --- a/docs/static/releasenotes.asciidoc +++ b/docs/static/releasenotes.asciidoc @@ -2517,4 +2517,4 @@ We have added another flag to the Benchmark CLI to allow passing a data file wit This feature allows users to run the Benchmark CLI in a custom test case with a custom config and a custom dataset. https://github.com/elastic/logstash/pull/12437[#12437] ==== Plugin releases -Plugins align with release 7.14.0 \ No newline at end of file +Plugins align with release 7.14.0 diff --git a/docs/static/troubleshoot/health-pipeline-status.asciidoc b/docs/static/troubleshoot/health-pipeline-status.asciidoc new file mode 100644 index 00000000000..095ef85f950 --- /dev/null +++ b/docs/static/troubleshoot/health-pipeline-status.asciidoc @@ -0,0 +1,37 @@ +[[health-report-pipeline-status]] +=== Health Report Pipeline Status + +The Pipeline indicator has a `status` probe that is capable of producing one of several diagnoses about the pipeline's lifecycle, indicating whether the pipeline is currently running. + +[[health-report-pipeline-status-diagnosis-loading]] +==== [[loading]]Loading Pipeline + +A pipeline that is loading is not yet processing data, and is considered a temporarily-degraded pipeline state. +Some plugins perform actions or pre-validation that can delay the starting of the pipeline, such as when a plugin pre-establishes a connection to an external service before allowing the pipeline to start. +When these plugins take significant time to start up, the whole pipeline can remain in a loading state for an extended time. + +If your pipeline does not come up in a reasonable amount of time, consider checking the Logstash logs to see if the plugin shows evidence of being caught in a retry loop. + +[[health-report-pipeline-status-diagnosis-finished]] +==== [[finished]]Finished Pipeline + +A logstash pipeline whose input plugins have all completed will be shut down once events have finished processing. + +Many plugins can be configured to run indefinitely, either by listening for new inbound events or by polling for events on a schedule. +A finished pipeline will not produce or process any more events until it is restarted, which will occur if the pipeline's definition is changed and pipeline reloads are enabled. +If you wish to keep your pipeline runing, consider configuring its input to run on a schedule or otherwise listen for new events. + +[[health-report-pipeline-status-diagnosis-terminated]] +==== [[terminated]]Terminated Pipeline + +When a Logstash pipeline's filter or output plugins crash, the entire pipeline is terminated and intervention is required. + +A terminated pipeline will not produce or process any more events until it is restarted, which will occur if the pipeline's definition is changed and pipeline reloads are enabled. +Check the logs to determine the cause of the crash, and report the issue to the plugin maintainers. + +[[health-report-pipeline-status-diagnosis-unknown]] +==== [[unknown]]Unknown Pipeline + +When a Logstash pipeline either cannot be created or has recently been deleted the health report doesn't know enough to produce a meaningful status. + +Check the logs to determine if the pipeline crashed during creation, and report the issue to the plugin maintainers. diff --git a/docs/static/troubleshoot/troubleshooting.asciidoc b/docs/static/troubleshoot/troubleshooting.asciidoc index b4c8ee7a0d7..66bb60f45e5 100644 --- a/docs/static/troubleshoot/troubleshooting.asciidoc +++ b/docs/static/troubleshoot/troubleshooting.asciidoc @@ -28,3 +28,4 @@ include::ts-logstash.asciidoc[] include::ts-plugins-general.asciidoc[] include::ts-plugins.asciidoc[] include::ts-other-issues.asciidoc[] +include::health-pipeline-status.asciidoc[] diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 8cb89141809..831f0fefde5 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -57,6 +57,7 @@ def versionMap = (Map) (new Yaml()).load(new File("$projectDir/../versions.yml") description = """Logstash Core Java""" +String logstashCoreVersion = versionMap['logstash-core'] String jacksonVersion = versionMap['jackson'] String jacksonDatabindVersion = versionMap['jackson-databind'] String jrubyVersion = versionMap['jruby']['version'] @@ -183,6 +184,23 @@ artifacts { } } +task generateVersionInfoResources(type: DefaultTask) { + ext.outDir = layout.buildDirectory.dir("generated-resources/version-info").get() + + inputs.property("version-info:logstash-core", logstashCoreVersion) + outputs.dir(ext.outDir) + + doLast { + mkdir outDir; + def resourceFile = outDir.file('version-info.properties').asFile + resourceFile.text = "logstash-core: ${logstashCoreVersion}" + } +} +sourceSets { + main { output.dir(generateVersionInfoResources.outputs.files) } +} +processResources.dependsOn generateVersionInfoResources + configurations { provided } diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 7f99abe224e..d1f8e006995 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -40,6 +40,8 @@ class LogStash::Agent attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus attr_accessor :logger + attr_reader :health_observer + # initialize method for LogStash::Agent # @param params [Hash] potential parameters are: # :name [String] - identifier for the agent @@ -51,6 +53,9 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil) @auto_reload = setting("config.reload.automatic") @ephemeral_id = SecureRandom.uuid + java_import("org.logstash.health.HealthObserver") + @health_observer ||= HealthObserver.new + # Mutex to synchronize in the exclusive method # Initial usage for the Ruby pipeline initialization which is not thread safe @webserver_control_lock = Mutex.new @@ -151,6 +156,31 @@ def execute transition_to_stopped end + include org.logstash.health.PipelineIndicator::PipelineDetailsProvider + def pipeline_details(pipeline_id) + logger.trace("fetching pipeline details for `#{pipeline_id}`") + pipeline_id = pipeline_id.to_sym + + java_import org.logstash.health.PipelineIndicator + + pipeline_state = @pipelines_registry.states.get(pipeline_id) + if pipeline_state.nil? + return PipelineIndicator::Details.new(PipelineIndicator::Status::UNKNOWN) + end + + status = pipeline_state.synchronize do |sync_state| + case + when sync_state.loading? then PipelineIndicator::Status::LOADING + when sync_state.crashed? then PipelineIndicator::Status::TERMINATED + when sync_state.running? then PipelineIndicator::Status::RUNNING + when sync_state.finished? then PipelineIndicator::Status::FINISHED + else PipelineIndicator::Status::UNKNOWN + end + end + + return PipelineIndicator::Details.new(status) + end + def auto_reload? @auto_reload end diff --git a/logstash-core/lib/logstash/api/command_factory.rb b/logstash-core/lib/logstash/api/command_factory.rb index 6ed9e570142..0b246f14034 100644 --- a/logstash-core/lib/logstash/api/command_factory.rb +++ b/logstash-core/lib/logstash/api/command_factory.rb @@ -18,6 +18,7 @@ require "logstash/api/service" require "logstash/api/commands/system/basicinfo_command" require "logstash/api/commands/system/plugins_command" +require "logstash/api/commands/health_report" require "logstash/api/commands/stats" require "logstash/api/commands/node" require "logstash/api/commands/default_metadata" @@ -34,6 +35,7 @@ def initialize(service) :plugins_command => ::LogStash::Api::Commands::System::Plugins, :stats => ::LogStash::Api::Commands::Stats, :node => ::LogStash::Api::Commands::Node, + :health_report => ::LogStash::Api::Commands::HealthReport, :default_metadata => ::LogStash::Api::Commands::DefaultMetadata } end diff --git a/logstash-core/lib/logstash/api/commands/default_metadata.rb b/logstash-core/lib/logstash/api/commands/default_metadata.rb index 26f4af31337..635e3e5f43a 100644 --- a/logstash-core/lib/logstash/api/commands/default_metadata.rb +++ b/logstash-core/lib/logstash/api/commands/default_metadata.rb @@ -22,20 +22,14 @@ module Api module Commands class DefaultMetadata < Commands::Base def all - res = {:host => host, - :version => version, - :http_address => http_address, - :id => service.agent.id, - :name => service.agent.name, - :ephemeral_id => service.agent.ephemeral_id, - :status => "green", # This is hard-coded to mirror x-pack behavior - :snapshot => ::BUILD_INFO["build_snapshot"], + res = base_info.merge({ + :status => service.agent.health_observer.status, :pipeline => { :workers => LogStash::SETTINGS.get("pipeline.workers"), :batch_size => LogStash::SETTINGS.get("pipeline.batch.size"), :batch_delay => LogStash::SETTINGS.get("pipeline.batch.delay"), }, - } + }) monitoring = {} if enabled_xpack_monitoring? monitoring = monitoring.merge({ @@ -49,6 +43,18 @@ def all res.merge(monitoring.empty? ? {} : {:monitoring => monitoring}) end + def base_info + { + :host => host, + :version => version, + :http_address => http_address, + :id => service.agent.id, + :name => service.agent.name, + :ephemeral_id => service.agent.ephemeral_id, + :snapshot => ::BUILD_INFO["build_snapshot"], + } + end + def host @@host ||= Socket.gethostname end diff --git a/logstash-core/lib/logstash/api/commands/health_report.rb b/logstash-core/lib/logstash/api/commands/health_report.rb new file mode 100644 index 00000000000..d53a313b499 --- /dev/null +++ b/logstash-core/lib/logstash/api/commands/health_report.rb @@ -0,0 +1,31 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/api/commands/base" + +module LogStash + module Api + module Commands + class HealthReport < Commands::Base + + def all(selected_fields = []) + service.agent.health_observer.report + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/modules/health_report.rb b/logstash-core/lib/logstash/api/modules/health_report.rb new file mode 100644 index 00000000000..ff5728d94e7 --- /dev/null +++ b/logstash-core/lib/logstash/api/modules/health_report.rb @@ -0,0 +1,49 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module LogStash + module Api + module Modules + class HealthReport < ::LogStash::Api::Modules::Base + + get "/" do + payload = health_report.all.then do |health_report_pojo| + # The app_helper needs a ruby-hash. + # Manually creating a map of properties works around the issue. + base_metadata.merge({ + status: health_report_pojo.status, + symptom: health_report_pojo.symptom, + indicators: health_report_pojo.indicators, + }) + end + + respond_with(payload, {exclude_default_metadata: true}) + end + + private + + def health_report + @health_report ||= factory.build(:health_report) + end + + def base_metadata + @factory.build(:default_metadata).base_info + end + end + end + end +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/api/rack_app.rb b/logstash-core/lib/logstash/api/rack_app.rb index c14bdf26a23..ee3e409e95e 100644 --- a/logstash-core/lib/logstash/api/rack_app.rb +++ b/logstash-core/lib/logstash/api/rack_app.rb @@ -18,6 +18,7 @@ require "rack" require "sinatra/base" require "logstash/api/modules/base" +require "logstash/api/modules/health_report" require "logstash/api/modules/node" require "logstash/api/modules/node_stats" require "logstash/api/modules/plugins" @@ -123,6 +124,7 @@ def self.app(logger, agent, environment) def self.rack_namespaces(agent) { + "/_health_report" => LogStash::Api::Modules::HealthReport, "/_node" => LogStash::Api::Modules::Node, "/_stats" => LogStash::Api::Modules::Stats, "/_node/stats" => LogStash::Api::Modules::NodeStats, diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 9cec566ccf0..b30d11e2be7 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -65,6 +65,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @flushing = java.util.concurrent.atomic.AtomicBoolean.new(false) @flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) @shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false) + @crash_detected = Concurrent::AtomicBoolean.new(false) @outputs_registered = Concurrent::AtomicBoolean.new(false) # @finished_execution signals that the pipeline thread has finished its execution @@ -87,6 +88,10 @@ def finished_execution? @finished_execution.true? end + def finished_run? + @finished_run.true? + end + def ready? @ready.value end @@ -229,6 +234,10 @@ def stopped? @running.false? end + def crashed? + @crash_detected.true? + end + # register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins # @param plugins [Array[Plugin]] the list of plugins to register def register_plugins(plugins) @@ -305,6 +314,7 @@ def start_workers rescue => e # WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the # original exception as the cause + @crash_detected.make_true @logger.error( "Pipeline worker error, the pipeline will be stopped", default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace) @@ -319,6 +329,7 @@ def start_workers begin start_inputs rescue => e + @crash_detected.make_true # if there is any exception in starting inputs, make sure we shutdown workers. # exception will already by logged in start_inputs shutdown_workers diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index ffd09777733..6f0fff00119 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -46,13 +46,21 @@ def execution_priority # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines_registry) + attach_health_indicator(agent) new_pipeline = LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do new_pipeline.start # block until the pipeline is correctly started or crashed end + LogStash::ConvergeResult::ActionResult.create(self, success) end + def attach_health_indicator(agent) + health_observer = agent.health_observer + health_observer.detach_pipeline_indicator(pipeline_id) # just in case ... + health_observer.attach_pipeline_indicator(pipeline_id, agent) + end + def to_s "PipelineAction::Create<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/delete.rb b/logstash-core/lib/logstash/pipeline_action/delete.rb index 1a19509ba2f..c072e70bf38 100644 --- a/logstash-core/lib/logstash/pipeline_action/delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/delete.rb @@ -27,10 +27,15 @@ def initialize(pipeline_id) def execute(agent, pipelines_registry) success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::Delete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb index c627087ed42..4c8e6ded037 100644 --- a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb +++ b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb @@ -31,10 +31,15 @@ def execute(agent, pipelines_registry) end success = pipelines_registry.delete_pipeline(@pipeline_id) + detach_health_indicator(agent) if success LogStash::ConvergeResult::ActionResult.create(self, success) end + def detach_health_indicator(agent) + agent.health_observer.detach_pipeline_indicator(pipeline_id) + end + def to_s "PipelineAction::StopAndDelete<#{pipeline_id}>" end diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 3810201e8bb..3752003477c 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -28,6 +28,7 @@ def initialize(pipeline_id, pipeline) @lock = Monitor.new end + # a terminated pipeline has either crashed OR finished normally def terminated? @lock.synchronize do # a loading pipeline is never considered terminated @@ -35,6 +36,20 @@ def terminated? end end + # a finished pipeline finished _normally_ without exception + def finished? + @lock.synchronize do + # a loading pipeline is never considered terminated + @loading.false? && @pipeline.finished_run? + end + end + + def crashed? + @lock.synchronize do + @pipeline&.crashed? + end + end + def running? @lock.synchronize do # not terminated and not loading @@ -104,6 +119,7 @@ def size end end + def empty? @lock.synchronize do @states.empty? diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 24bcb3adc0c..c64d9275470 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -587,6 +587,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -614,6 +615,7 @@ def flush(options) # wait until there is no more worker thread since we have a single worker that should have died wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey + expect(subject.crashed?).to be true # at this point the input plugin should have been asked to stop wait(5).for {dummyinput.stop?}.to be_truthy @@ -779,6 +781,7 @@ def flush(options) expect(input).to receive(:do_close).once pipeline.start pipeline.shutdown + expect(pipeline.crashed?).to be false end end end diff --git a/logstash-core/spec/logstash/pipeline_action/create_spec.rb b/logstash-core/spec/logstash/pipeline_action/create_spec.rb index 16cabacb086..553576b543a 100644 --- a/logstash-core/spec/logstash/pipeline_action/create_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/create_spec.rb @@ -30,6 +30,7 @@ before do clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) end subject { described_class.new(pipeline_config, metric) } @@ -66,6 +67,11 @@ it "returns a successful execution status" do expect(subject.execute(agent, pipelines)).to be_truthy end + + it "attached an indicator to the agent's health observer" do + expect(agent.health_observer).to receive(:attach_pipeline_indicator).with(:main, agent) + subject.execute(agent, pipelines) + end end context "when the pipeline doesn't start" do diff --git a/logstash-core/spec/logstash/pipeline_action/delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb new file mode 100644 index 00000000000..73193389ae0 --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/delete_spec.rb @@ -0,0 +1,78 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::Delete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + + it 'fails to delete the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to_not be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to_not be_nil + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb new file mode 100644 index 00000000000..a32ed5eb0fa --- /dev/null +++ b/logstash-core/spec/logstash/pipeline_action/stop_and_delete_spec.rb @@ -0,0 +1,79 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "spec_helper" +require_relative "../../support/helpers" +require_relative "../../support/matchers" +require "logstash/pipelines_registry" +require "logstash/pipeline_action/delete" +require "logstash/inputs/generator" + + +describe LogStash::PipelineAction::StopAndDelete do + let(:pipeline_config) { "input { dummyblockinginput {} } output { null {} }" } + let(:pipeline_id) { :main } + let(:pipeline) { mock_java_pipeline_from_string(pipeline_config) } + let(:pipelines) do + LogStash::PipelinesRegistry.new.tap do |chm| + chm.create_pipeline(pipeline_id, pipeline) { true } + end + end + let(:agent) { double("agent") } + + subject { described_class.new(pipeline_id) } + + before do + clear_data_dir + allow(agent).to receive(:health_observer).and_return(double("HealthObserver").as_null_object) + pipeline.start + end + + after do + pipeline.shutdown + end + + it "returns the pipeline_id" do + expect(subject.pipeline_id).to eq(:main) + end + + context "when the pipeline is still running" do + it 'stops and deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end + + context "when the pipeline has completed" do + let(:pipeline_config) { "input { generator { count => 1 } } output { null {} }"} + + before(:each) do + sleep(0.1) until pipelines.non_running_pipelines.keys.include?(pipeline_id) + end + + it 'deletes the pipeline' do + action_result = subject.execute(agent, pipelines) + expect(action_result).to be_successful + + expect(pipelines.get_pipeline(pipeline_id)).to be_nil + + expect(agent.health_observer).to have_received(:detach_pipeline_indicator).with(pipeline_id) + end + end +end \ No newline at end of file diff --git a/logstash-core/spec/logstash/webserver_spec.rb b/logstash-core/spec/logstash/webserver_spec.rb index 50c90911eeb..e766be329be 100644 --- a/logstash-core/spec/logstash/webserver_spec.rb +++ b/logstash-core/spec/logstash/webserver_spec.rb @@ -55,7 +55,13 @@ def free_ports(servers) end let(:logger) { LogStash::Logging::Logger.new("testing") } - let(:agent) { OpenStruct.new({:webserver => webserver_block, :http_address => "127.0.0.1", :id => "myid", :name => "myname"}) } + let(:agent) { OpenStruct.new({ + webserver: webserver_block, + http_address: "127.0.0.1", + id: "myid", + name: "myname", + health_observer: org.logstash.health.HealthObserver.new, + }) } let(:webserver_block) { OpenStruct.new({}) } subject(:webserver) { LogStash::WebServer.new(logger, agent, webserver_options) } diff --git a/logstash-core/src/main/java/org/logstash/Logstash.java b/logstash-core/src/main/java/org/logstash/Logstash.java index eb9c823f397..04ce1b1820f 100644 --- a/logstash-core/src/main/java/org/logstash/Logstash.java +++ b/logstash-core/src/main/java/org/logstash/Logstash.java @@ -26,6 +26,7 @@ import java.io.PrintStream; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Properties; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,25 @@ */ public final class Logstash implements Runnable, AutoCloseable { + public static final String VERSION_FULL; + public static final String VERSION_MAJOR; + public static final String VERSION_MINOR; + public static final String VERSION_PATCH; + + static { + final Properties properties = new Properties(); + try { + properties.load(Logstash.class.getResourceAsStream("/version-info.properties")); + VERSION_FULL = properties.getProperty("logstash-core"); + final String[] versions = VERSION_FULL.split("\\."); + VERSION_MAJOR = versions[0]; + VERSION_MINOR = versions[1]; + VERSION_PATCH = versions[2]; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static final Logger LOGGER = LogManager.getLogger(Logstash.class); /** diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java index e905d922c5b..dbbfb97de5b 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java @@ -22,6 +22,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyNumeric; diff --git a/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java new file mode 100644 index 00000000000..d2d1cbf6889 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ApiHealthReport.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Map; + +@JsonSerialize(using = ApiHealthReport.JsonSerializer.class) +public class ApiHealthReport { + private final MultiIndicator.Report delegate; + + public ApiHealthReport(final MultiIndicator.Report delegate) { + this.delegate = delegate; + } + + public Status getStatus() { + return delegate.status(); + } + + public String getSymptom() { + return delegate.symptom(); + } + + public Map getIndicators() { + return delegate.indicators(); + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final ApiHealthReport apiHealthReport, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", apiHealthReport.getStatus()); + jsonGenerator.writeObjectField("symptom", apiHealthReport.getSymptom()); + jsonGenerator.writeObjectField("indicators", apiHealthReport.getIndicators()); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Diagnosis.java b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java new file mode 100644 index 00000000000..7e64f555638 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Diagnosis.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.UnaryOperator; + +@JsonSerialize(using = Diagnosis.JsonSerializer.class) +public final class Diagnosis { + public final String id; + public final String cause; + public final String action; + public final String helpUrl; + + private Diagnosis(final Builder builder) { + this.id = builder.id; + this.cause = builder.cause; + this.action = builder.action; + this.helpUrl = builder.helpUrl; + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final String id; + private final String cause; + private final String action; + private final String helpUrl; + + public Builder() { + this(null, null, null, null); + } + + Builder(final String id, + final String cause, + final String action, + final String helpUrl) { + this.id = id; + this.cause = cause; + this.action = action; + this.helpUrl = helpUrl; + } + + public Builder withId(final String id) { + if (Objects.equals(id, this.id)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + + public Builder withCause(final String cause) { + if (Objects.equals(cause, this.cause)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withAction(final String action) { + if (Objects.equals(action, this.action)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder withHelpUrl(final String helpUrl) { + if (Objects.equals(helpUrl, this.helpUrl)) { + return this; + } + return new Builder(id, cause, action, helpUrl); + } + public Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + public synchronized Diagnosis build() { + return new Diagnosis(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Diagnosis diagnosis, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (diagnosis.id != null) { + jsonGenerator.writeStringField("id", diagnosis.id); + } + jsonGenerator.writeStringField("cause", diagnosis.cause); + jsonGenerator.writeStringField("action", diagnosis.action); + jsonGenerator.writeStringField("help_url", diagnosis.helpUrl); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/HealthObserver.java b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java new file mode 100644 index 00000000000..63a461a74e6 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/HealthObserver.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HealthObserver { + + private static final Logger LOGGER = LogManager.getLogger(); + + private final MultiIndicator rootIndicator = new MultiIndicator(); + private final MultiIndicator pipelinesIndicator = new MultiIndicator(); + + public HealthObserver() { + this.rootIndicator.attachIndicator("pipelines", this.pipelinesIndicator); + } + + public final Status getStatus() { + return getReport().getStatus(); + } + + public MultiIndicator getIndicator() { + return this.rootIndicator; + } + + public ApiHealthReport getReport() { + return new ApiHealthReport(this.rootIndicator.report()); + } + + public void attachPipelineIndicator(final String pipelineId, final PipelineIndicator.PipelineDetailsProvider detailsProvider) { + try { + this.pipelinesIndicator.attachIndicator(pipelineId, PipelineIndicator.forPipeline(pipelineId, detailsProvider)); + LOGGER.debug(String.format("attached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to attach pipeline indicator [%s]", pipelineId), e); + } + } + + public void detachPipelineIndicator(final String pipelineId) { + try { + this.pipelinesIndicator.detachIndicator(pipelineId, null); + LOGGER.debug(String.format("detached pipeline indicator [%s]", pipelineId)); + } catch (final Exception e) { + LOGGER.warn(String.format("failed to detach pipeline indicator [%s]", pipelineId), e); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/HelpUrl.java b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java new file mode 100644 index 00000000000..db2586765d1 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/HelpUrl.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import org.logstash.Logstash; + +import java.util.Objects; + +public class HelpUrl { + static final String BASE_URL; + static { + final String versionAnchor; + if (Integer.parseInt(Logstash.VERSION_MAJOR) >= 9) { + versionAnchor = "master"; + } else { + versionAnchor = String.format("%s.%s", Logstash.VERSION_MAJOR, Logstash.VERSION_MINOR); + } + BASE_URL = String.format("https://www.elastic.co/guide/en/logstash/%s/", versionAnchor); + } + + public HelpUrl(final String page) { + this(page, null); + } + + public HelpUrl withAnchor(final String anchor) { + return new HelpUrl(this.page, anchor); + } + + private HelpUrl(final String page, final String anchor) { + Objects.requireNonNull(page, "page cannot be null"); + this.page = page; + this.anchor = anchor; + } + + private final String page; + private final String anchor; + + private transient String resolved; + + @Override + public String toString() { + if (resolved == null) { + final StringBuilder sb = new StringBuilder(BASE_URL); + sb.append(page).append(".html"); + if (anchor != null) { + sb.append("#").append(anchor); + } + resolved = sb.toString(); + } + return resolved; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Impact.java b/logstash-core/src/main/java/org/logstash/health/Impact.java new file mode 100644 index 00000000000..3ccda5de2af --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Impact.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.*; +import java.util.function.UnaryOperator; + +@JsonSerialize(using=Impact.JsonSerializer.class) +public final class Impact { + public final String id; + public final int severity; + public final String description; + public final Set impactAreas; + + public Impact(final Builder builder) { + this.id = builder.id; + this.severity = Objects.requireNonNullElse(builder.severity, 0); + this.description = builder.description; + this.impactAreas = Set.copyOf(builder.impactAreas); + } + + static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String id; + private Integer severity; + private String description; + private Set impactAreas; + + public Builder() { + this.impactAreas = Set.of(); + } + + private Builder(String id, Integer severity, String description, Set impactAreas) { + this.id = id; + this.severity = severity; + this.description = description; + this.impactAreas = Set.copyOf(impactAreas); + } + + public synchronized Builder withId(final String id) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withSeverity(int severity) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withDescription(String description) { + return new Builder(id, severity, description, impactAreas); + } + + public synchronized Builder withAdditionalImpactArea(ImpactArea impactArea) { + final Set mergedImpactAreas = new HashSet<>(impactAreas); + if (!mergedImpactAreas.add(impactArea)) { + return this; + } else { + return this.withImpactAreas(mergedImpactAreas); + } + } + + public synchronized Builder withImpactAreas(Collection impactAreas) { + return new Builder(id, severity, description, Set.copyOf(impactAreas)); + } + + public synchronized Builder transform(final UnaryOperator configurator) { + return configurator.apply(this); + } + + public synchronized Impact build() { + return new Impact(this); + } + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Impact impact, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + if (impact.id != null) { + jsonGenerator.writeStringField("id", impact.id); + } + jsonGenerator.writeNumberField("severity", impact.severity); + jsonGenerator.writeStringField("description", impact.description); + jsonGenerator.writeObjectField("impact_areas", impact.impactAreas); + jsonGenerator.writeEndObject(); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ImpactArea.java b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java new file mode 100644 index 00000000000..94679c79ad9 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ImpactArea.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.annotation.JsonValue; + +import java.util.Objects; + +public enum ImpactArea { + PIPELINE_EXECUTION, + ; + + private final String externalValue; + + ImpactArea(final String externalValue) { + this.externalValue = Objects.requireNonNullElseGet(externalValue, () -> name().toLowerCase()); + } + + ImpactArea() { + this(null); + } + + @JsonValue + public String externalValue() { + return this.externalValue; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Indicator.java b/logstash-core/src/main/java/org/logstash/health/Indicator.java new file mode 100644 index 00000000000..df646e13584 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Indicator.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +public interface Indicator { + REPORT report(ReportContext reportContext); + + default REPORT report() { + return report(ReportContext.EMPTY); + } + + interface Report { + Status status(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java new file mode 100644 index 00000000000..d0699f7687a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/MultiIndicator.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A {@code MultiIndicator} is an {@link Indicator} that combines multiple sub-{@link Indicator}s and produces a + * summarized {@link Report}. + */ +public class MultiIndicator implements Indicator { + private static final Logger LOGGER = LogManager.getLogger(); + + private final Map> indicators = new ConcurrentHashMap<>(); + + public void attachIndicator(final String name, + final Indicator indicatorToAttach) { + final Indicator existing = indicators.putIfAbsent(name, indicatorToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, indicatorToAttach)) { + throw new IllegalArgumentException(String.format("Cannot attach indicator %s (%s) because a different one of the same name is already attached (%s).", name, indicatorToAttach, existing)); + } + LOGGER.debug("attached indicator {}=>{} (res:{})", name, indicatorToAttach, this); + } + + public void detachIndicator(final String name, + final Indicator indicatorToDetach) { + final Indicator remaining = indicators.computeIfPresent(name, (key, existing) -> Objects.isNull(indicatorToDetach) || Objects.equals(indicatorToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach indicator " + name + " because a different one of the same name is attached."); + } + LOGGER.debug("detached indicator {}<={} (res:{})", name, indicatorToDetach, this); + } + + public > Optional getIndicator(final String name, + final Class indicatorClass) { + return getIndicator(name).map(indicatorClass::cast); + } + + public Optional> getIndicator(final String name) { + return Optional.ofNullable(indicators.get(name)); + } + + @Override + public Report report(final ReportContext reportContext) { + LOGGER.debug("report starting with indicators {} for {}", this.indicators, reportContext); + final Status.Holder combinedStatus = new Status.Holder(); + + final Map reports = new HashMap<>(); + final Map> indicatorNamesByStatus = new HashMap<>(); + + this.indicators.forEach((indicatorName, indicator) -> { + if (reportContext.isMuted(indicatorName)) { + LOGGER.trace("sub-indicator {} is muted for {}", indicatorName, reportContext); + } else { + reportContext.descend(indicatorName, (scopedContext) -> { + final Indicator.Report report = indicator.report(scopedContext); + + combinedStatus.reduce(report.status()); + reports.put(indicatorName, report); + indicatorNamesByStatus.computeIfAbsent(report.status(), k -> new HashSet<>()).add(indicatorName); + }); + } + }); + + final StringBuilder symptom = new StringBuilder(); + // to highlight indicators by most-degraded status, we summarize in reverse-order + final List summaryByStatus = new ArrayList<>(indicatorNamesByStatus.size()); + for (int i = Status.values().length - 1; i >= 0; i--) { + final Status summarizingStatus = Status.values()[i]; + if (indicatorNamesByStatus.containsKey(summarizingStatus)) { + final Set indicatorNames = indicatorNamesByStatus.get(summarizingStatus); + summaryByStatus.add(String.format("%s "+(indicatorNames.size()==1 ? "indicator is" : "indicators are")+" %s (`%s`)", + indicatorNames.size(), + summarizingStatus.descriptiveValue(), + String.join("`, `", indicatorNames))); + } + } + if (summaryByStatus.isEmpty()) { + symptom.append("no indicators"); + } else if (summaryByStatus.size() == 1) { + symptom.append(summaryByStatus.get(0)); + } else if (summaryByStatus.size() == 2) { + symptom.append(summaryByStatus.get(0)).append(" and ").append(summaryByStatus.get(1)); + } else { + final int lastIndex = summaryByStatus.size() - 1; + symptom.append(String.join(", ", summaryByStatus.subList(0, lastIndex))) + .append(", and ").append(summaryByStatus.get(lastIndex)); + } + + return new Report(combinedStatus.value(), symptom.toString(), reports); + } + + @Override + public String toString() { + return "MultiIndicator{" + + "indicators=" + indicators + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report implements Indicator.Report { + private final Status status; + private final String symptom; + private final Map indicators; + + Report(final Status status, + final String symptom, + final Map indicators) { + this.status = status; + this.symptom = symptom; + this.indicators = Map.copyOf(indicators); + } + + @Override + public Status status() { + return this.status; + } + + public String symptom() { + return this.symptom; + } + + public Map indicators() { + return this.indicators; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", report.status()); + jsonGenerator.writeStringField("symptom", report.symptom); + jsonGenerator.writeObjectField("indicators", report.indicators()); + jsonGenerator.writeEndObject(); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java new file mode 100644 index 00000000000..3f6821fec4f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/PipelineIndicator.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.Objects; + +import static org.logstash.health.Status.*; + +/** + * A {@code PipelineIndicator} is a specialized {@link ProbeIndicator} that is meant for assessing the health of + * an individual pipeline. + */ +public class PipelineIndicator extends ProbeIndicator { + + public static PipelineIndicator forPipeline(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + PipelineIndicator pipelineIndicator = new PipelineIndicator(new DetailsSupplier(pipelineId, pipelineDetailsProvider)); + pipelineIndicator.attachProbe("status", new StatusProbe()); + return pipelineIndicator; + } + + private PipelineIndicator(final DetailsSupplier detailsSupplier) { + super("pipeline", detailsSupplier::get); + } + + @JsonSerialize(using = Status.JsonSerializer.class) + public static class Status { + public enum State { + UNKNOWN, + LOADING, + RUNNING, + FINISHED, + TERMINATED, + } + + public static final Status UNKNOWN = new Status(State.UNKNOWN); + public static final Status LOADING = new Status(State.LOADING); + public static final Status RUNNING = new Status(State.RUNNING); + public static final Status FINISHED = new Status(State.FINISHED); + public static final Status TERMINATED = new Status(State.TERMINATED); + + private final State state; + public Status(final State state) { + this.state = state; + } + public State getState() { + return state; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + @Override + public void serialize(Status value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + gen.writeStringField("state", value.getState().toString()); + gen.writeEndObject(); + } + } + } + + @JsonSerialize(using = Details.JsonSerializer.class) + public static class Details implements Observation { + private final Status status; + + public Details(final Status status) { + this.status = Objects.requireNonNull(status, "status cannot be null"); + } + public Status getStatus() { + return this.status; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer
{ + @Override + public void serialize(final Details details, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("status", details.getStatus()); + jsonGenerator.writeEndObject(); + } + } + } + + /** + * This interface is implemented by the ruby-Agent + */ + @FunctionalInterface + public interface PipelineDetailsProvider { + Details pipelineDetails(final String pipelineId); + } + + public static class DetailsSupplier { + private final String pipelineId; + private final PipelineDetailsProvider pipelineDetailsProvider; + DetailsSupplier(final String pipelineId, + final PipelineDetailsProvider pipelineDetailsProvider) { + this.pipelineId = pipelineId; + this.pipelineDetailsProvider = pipelineDetailsProvider; + } + + public Details get() { + return this.pipelineDetailsProvider.pipelineDetails(pipelineId); + } + } + + static class StatusProbe implements Probe
{ + static final Impact.Builder NOT_PROCESSING = Impact.builder() + .withId(impactId("not_processing")) + .withDescription("the pipeline is not currently processing") + .withAdditionalImpactArea(ImpactArea.PIPELINE_EXECUTION); + + static final HelpUrl HELP_URL = new HelpUrl("health-report-pipeline-status"); + + @Override + public Analysis analyze(final Details details) { + switch (details.getStatus().getState()) { + case LOADING: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("loading")) + .withCause("pipeline is loading") + .withAction("if pipeline does not come up quickly, you may need to check the logs to see if it is stalled") + .withHelpUrl(HELP_URL.withAnchor("loading").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).withDescription("pipeline is loading").build()) + .build(); + case RUNNING: + return Analysis.builder() + .withStatus(GREEN) + .build(); + case FINISHED: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("finished")) + .withCause("pipeline has finished running because its inputs have been closed and events have been processed") + .withAction("if you expect this pipeline to run indefinitely, you will need to configure its inputs to continue receiving or fetching events") + .withHelpUrl(HELP_URL.withAnchor("finished").toString())) + .withImpact(NOT_PROCESSING.withSeverity(10).withDescription("pipeline has finished running").build()) + .build(); + case TERMINATED: + return Analysis.builder() + .withStatus(RED) + .withDiagnosis(db -> db + .withId(diagnosisId("terminated")) + .withCause("pipeline is not running, likely because it has encountered an error") + .withAction("view logs to determine the cause of abnormal pipeline shutdown") + .withHelpUrl(HELP_URL.withAnchor("terminated").toString())) + .withImpact(NOT_PROCESSING.withSeverity(1).build()) + .build(); + case UNKNOWN: + default: + return Analysis.builder() + .withStatus(YELLOW) + .withDiagnosis(db -> db + .withId(diagnosisId("unknown")) + .withCause("pipeline is not known; it may have been recently deleted or failed to start") + .withAction("view logs to determine if the pipeline failed to start") + .withHelpUrl(HELP_URL.withAnchor("unknown").toString())) + .withImpact(NOT_PROCESSING.withSeverity(2).build()) + .build(); + } + } + + static String diagnosisId(final String state) { + return String.format("logstash:health:pipeline:status:diagnosis:%s", state); + } + + static String impactId(final String state) { + return String.format("logstash:health:pipeline:status:impact:%s", state); + } + + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Probe.java b/logstash-core/src/main/java/org/logstash/health/Probe.java new file mode 100644 index 00000000000..ec9cf71d2b4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Probe.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.Objects; +import java.util.function.UnaryOperator; + +public interface Probe { + Analysis analyze(OBSERVATION observation); + + final class Analysis { + public final Status status; + public final Diagnosis diagnosis; + public final Impact impact; + + Analysis(final Builder builder) { + this.status = builder.status; + this.diagnosis = builder.diagnosis; + this.impact = builder.impact; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private final Status status; + private final Diagnosis diagnosis; + private final Impact impact; + + public Builder() { + this(Status.UNKNOWN, null, null); + } + + public Builder(final Status status, + final Diagnosis diagnosis, + final Impact impact) { + this.status = status; + this.diagnosis = diagnosis; + this.impact = impact; + } + + public Builder withStatus(final Status status) { + if (Objects.equals(this.status, status)) { + return this; + } + return new Builder(status, this.diagnosis, this.impact); + } + + public Builder withDiagnosis(final Diagnosis diagnosis) { + if (Objects.equals(this.diagnosis, diagnosis)) { + return this; + } + return new Builder(status, diagnosis, impact); + } + + public Builder withDiagnosis(final UnaryOperator diagnosisConfigurator) { + return this.withDiagnosis(Diagnosis.builder().transform(diagnosisConfigurator).build()); + } + + public synchronized Builder withImpact(final Impact impact) { + if (Objects.equals(this.impact, impact)) { + return this; + } + return new Builder(status, this.diagnosis, impact); + } + + public Builder withImpact(final UnaryOperator impactConfigurator) { + return this.withImpact(Impact.builder().transform(impactConfigurator).build()); + } + + public synchronized Analysis build() { + return new Analysis(this); + } + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java new file mode 100644 index 00000000000..ddb4fce6c4b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ProbeIndicator.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * A {@code ProbeIndicator} is an {@link Indicator} that has one or more {@link Probe}s attached, and can be used + * to produce a {@link Report}. + */ +public class ProbeIndicator implements Indicator> { + private static final Logger LOGGER = LogManager.getLogger(); + + // Marker Interface + public interface Observation {} + + @FunctionalInterface + public interface Observer extends Supplier {} + + private final String subject; + private final Observer observer; + + private final Map> probes = new ConcurrentHashMap<>(); + + public ProbeIndicator(final String subject, + final Observer observer, + final Map> probes) { + this(subject, observer); + probes.forEach(this::attachProbe); + } + + public ProbeIndicator(final String subject, + final Observer observer) { + this.subject = subject; + this.observer = observer; + } + + public final void attachProbe(final String name, + final Probe probeToAttach) { + final Probe existing = probes.putIfAbsent(name, probeToAttach); + if (Objects.nonNull(existing) && !Objects.equals(existing, probeToAttach)) { + throw new IllegalArgumentException("Cannot attach probe " + name + " because a different one of the same name is already attached."); + } + } + + public final void detachProbe(final String name, + final Probe probeToDetach) { + Probe remaining = probes.computeIfPresent(name, (key, existing) -> Objects.equals(probeToDetach, existing) ? null : existing); + if (Objects.nonNull(remaining)) { + throw new IllegalArgumentException("Cannot detach probe " + name + " because a different one of the same name is attached."); + } + } + + public final void detachProbe(final String name) { + probes.remove(name); + } + + Probe getProbe(final String name) { + return probes.get(name); + } + + @Override + public Report report(final ReportContext reportContext) { + + LOGGER.debug("report starting with {} probes {}", this.probes.keySet(), reportContext); + final OBSERVATION observation = observer.get(); + + final Status.Holder combinedStatus = new Status.Holder(); + final List diagnoses = new ArrayList<>(); + final List impacts = new ArrayList<>(); + final Set distinctImpactAreas = new HashSet<>(); + + for (Map.Entry> probeEntry : this.probes.entrySet()) { + final String probeName = probeEntry.getKey(); + final Probe.Analysis probeAnalysis = probeEntry.getValue().analyze(observation); + LOGGER.trace("probe {}: {}", probeName, probeAnalysis); + + if (reportContext.isMuted(probeName)) { + LOGGER.trace("probe {} is muted", probeName); + } else { + combinedStatus.reduce(probeAnalysis.status); + Optional.ofNullable(probeAnalysis.diagnosis) + .ifPresent(diagnoses::add); + Optional.ofNullable(probeAnalysis.impact) + .filter(impacts::add) + .map(impact -> impact.impactAreas) + .ifPresent(distinctImpactAreas::addAll); + } + } + + final Status status = combinedStatus.value(); + final StringBuilder symptomBuilder = new StringBuilder(); + symptomBuilder.append(String.format("The %s is %s", this.subject, status.descriptiveValue())); + if (distinctImpactAreas.size() + diagnoses.size() > 0) { + symptomBuilder.append("; ") + .append(String.format(distinctImpactAreas.size() == 1 ? "%s area is impacted" : "%s areas are impacted", distinctImpactAreas.size())) + .append(" and ") + .append(String.format(diagnoses.size() == 1 ? "%s diagnosis is available" : "%s diagnoses are available", diagnoses.size())); + } + final String symptom = symptomBuilder.toString(); + + return new Report<>(status, observation, symptom, diagnoses, impacts); + } + + @Override + public String toString() { + return "ProbeIndicator{" + + "observer=" + observer + + ", probes=" + probes + + '}'; + } + + @JsonSerialize(using=Report.JsonSerializer.class) + public static class Report
implements Indicator.Report { + private final Status status; + private final DETAILS details; + private final String symptom; + private final List diagnosis; + + private final List impacts; + + public Report(final Status status, + final DETAILS details, + final String symptom, + final List diagnosis, + final List impacts) { + this.status = status; + this.details = details; + this.symptom = symptom; + this.diagnosis = List.copyOf(diagnosis); + this.impacts = List.copyOf(impacts); + } + public Status status() { + return status; + } + public DETAILS details() { + return details; + } + public String symptom() { + return symptom; + } + public List diagnosis() { + return diagnosis; + } + public List impacts() { + return impacts; + } + + public static class JsonSerializer extends com.fasterxml.jackson.databind.JsonSerializer> { + @Override + public void serialize(final Report report, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException { + jsonGenerator.writeStartObject(); + + jsonGenerator.writeObjectField("status", report.status); + jsonGenerator.writeStringField("symptom", report.symptom); + + if (Objects.nonNull(report.diagnosis) && !report.diagnosis.isEmpty()) { + jsonGenerator.writeObjectField("diagnosis", report.diagnosis); + } + + if (Objects.nonNull(report.impacts) && !report.impacts.isEmpty()) { + jsonGenerator.writeObjectField("impacts", report.impacts); + } + + jsonGenerator.writeObjectField("details", report.details); + + jsonGenerator.writeEndObject(); + } + } + + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/ReportContext.java b/logstash-core/src/main/java/org/logstash/health/ReportContext.java new file mode 100644 index 00000000000..9dc2970fcd4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/ReportContext.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +/** + * A {@code ReportContext} is used when building an {@link Indicator.Report} to provide contextual configuration + * for a specific {@link Indicator} that is being reported on. + */ +public class ReportContext { + private final List path; + + public static final ReportContext EMPTY = new ReportContext(List.of()); + + ReportContext(final List path) { + this.path = List.copyOf(path); + } + + public ReportContext descend(final String node) { + final ArrayList newPath = new ArrayList<>(path); + newPath.add(node); + return new ReportContext(newPath); + } + + public void descend(final String node, final Consumer consumer) { + consumer.accept(this.descend(node)); + } + + public boolean isMuted(final String childNodeName) { + return false; + } + + @Override + public String toString() { + return "ReportContext{" + + "path=" + path + + '}'; + } +} diff --git a/logstash-core/src/main/java/org/logstash/health/Status.java b/logstash-core/src/main/java/org/logstash/health/Status.java new file mode 100644 index 00000000000..ba0ce68a3fd --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/health/Status.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.health; + +import com.fasterxml.jackson.annotation.JsonValue; + +public enum Status { + GREEN("healthy"), + UNKNOWN("unknown"), + YELLOW("concerning"), + RED("unhealthy"), + ; + + private final String externalValue = name().toLowerCase(); + private final String descriptiveValue; + + Status(String descriptiveValue) { + this.descriptiveValue = descriptiveValue; + } + + @JsonValue + public String externalValue() { + return externalValue; + } + + public String descriptiveValue() { return descriptiveValue; } + + /** + * Combine this status with another status. + * This method is commutative. + * @param status the other status + * @return the more-degraded of the two statuses. + */ + public Status reduce(Status status) { + if (compareTo(status) >= 0) { + return this; + } else { + return status; + } + } + + public static class Holder { + private Status status = Status.GREEN; + public synchronized Status reduce(Status status) { + return this.status = this.status.reduce(status); + } + public synchronized Status value() { + return this.status; + } + } +} diff --git a/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java new file mode 100644 index 00000000000..0ec8601112a --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/ProbeIndicatorTest.java @@ -0,0 +1,128 @@ +package org.logstash.health; + +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class ProbeIndicatorTest { + + @Test + public void attachProbeWhenNotExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + probeIndicator.attachProbe("new", probeToAdd); + + assertThat(probeIndicator.getProbe("new"), is(probeToAdd)); + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenExists() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + final Probe probeToAdd = new ProbeImplementation(); + assertThrows(IllegalArgumentException.class, () -> probeIndicator.attachProbe("existing", probeToAdd)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + @Test + public void attachProbeWhenAttached() throws Exception { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + // attach it again + probeIndicator.attachProbe("existing", existingProbe); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + } + + + @Test + public void detachProbeByNameWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByNameWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + + probeIndicator.detachProbe("to_remove"); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenAttached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + probeIndicator.attachProbe("to_remove", existingProbeToRemove); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenDetached() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe existingProbeToRemove = new ProbeImplementation(); + + probeIndicator.detachProbe("to_remove", existingProbeToRemove); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void detachProbeByValueWhenConflict() { + final ProbeIndicator probeIndicator = new ProbeIndicator<>("subject", ProbeSubject::new); + final Probe existingProbe = new ProbeImplementation(); + probeIndicator.attachProbe("existing", existingProbe); + final Probe anotherProbeToRemove = new ProbeImplementation(); + + assertThrows(IllegalArgumentException.class, () -> probeIndicator.detachProbe("existing", anotherProbeToRemove)); + + assertThat(probeIndicator.getProbe("existing"), is(existingProbe)); + assertThat(probeIndicator.getProbe("to_remove"), is(nullValue())); + } + + @Test + public void report() { + } + + private static class ProbeSubject implements ProbeIndicator.Observation {} + + private static class ProbeImplementation implements Probe { + @Override + public Analysis analyze(ProbeSubject observation) { + return Analysis.builder().build(); + } + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/health/StatusTest.java b/logstash-core/src/test/java/org/logstash/health/StatusTest.java new file mode 100644 index 00000000000..32eca41d593 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/health/StatusTest.java @@ -0,0 +1,106 @@ +package org.logstash.health; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collectors; + +import static com.google.common.collect.Collections2.orderedPermutations; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.logstash.health.Status.*; + +@RunWith(Enclosed.class) +public class StatusTest { + + public static class Tests { + @Test + public void testReduceUnknown() { + assertThat(UNKNOWN.reduce(UNKNOWN), is(UNKNOWN)); + assertThat(UNKNOWN.reduce(GREEN), is(UNKNOWN)); + assertThat(UNKNOWN.reduce(YELLOW), is(YELLOW)); + assertThat(UNKNOWN.reduce(RED), is(RED)); + } + + @Test + public void testReduceGreen() { + assertThat(GREEN.reduce(UNKNOWN), is(UNKNOWN)); + assertThat(GREEN.reduce(GREEN), is(GREEN)); + assertThat(GREEN.reduce(YELLOW), is(YELLOW)); + assertThat(GREEN.reduce(RED), is(RED)); + } + + @Test + public void testReduceYellow() { + assertThat(YELLOW.reduce(UNKNOWN), is(YELLOW)); + assertThat(YELLOW.reduce(GREEN), is(YELLOW)); + assertThat(YELLOW.reduce(YELLOW), is(YELLOW)); + assertThat(YELLOW.reduce(RED), is(RED)); + } + + @Test + public void testReduceRed() { + assertThat(RED.reduce(UNKNOWN), is(RED)); + assertThat(RED.reduce(GREEN), is(RED)); + assertThat(RED.reduce(YELLOW), is(RED)); + assertThat(RED.reduce(RED), is(RED)); + } + } + + @RunWith(Parameterized.class) + public static class JacksonSerialization { + @Parameters(name = "{0}") + public static Iterable data() { + return EnumSet.allOf(Status.class); + } + + @Parameter + public Status status; + + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testSerialization() throws Exception { + assertThat(mapper.writeValueAsString(status), is(equalTo('"' + status.name().toLowerCase() + '"'))); + } + } + + @RunWith(Parameterized.class) + public static class ReduceCommutativeSpecification { + @Parameters(name = "{0}<=>{1}") + public static Collection data() { + return getCombinations(EnumSet.allOf(Status.class), 2); + } + + @Parameter(0) + public Status statusA; + @Parameter(1) + public Status statusB; + + @Test + public void testReduceCommutative() { + assertThat(statusA.reduce(statusB), is(statusB.reduce(statusA))); + } + + private static > List getCombinations(Collection source, int count) { + return orderedPermutations(source).stream() + .map((l) -> l.subList(0, count)) + .map(ArrayList::new).peek(Collections::sort) + .collect(Collectors.toSet()) + .stream() + .map(List::toArray) + .collect(Collectors.toList()); + } + } +} \ No newline at end of file diff --git a/qa/support/logstash-integration-failure_injector/README.md b/qa/support/logstash-integration-failure_injector/README.md new file mode 100644 index 00000000000..bfa4a01977b --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/README.md @@ -0,0 +1,10 @@ +# About the plugin + +This plugin is for Logstash pipelines test purpose _only_. + +# How to apply changes + +When you change the source code, make sure to build a gem file. +```shell +gem build logstash-integration-failure_injector.gemspec +``` diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb new file mode 100644 index 00000000000..919118a0f6b --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/filters/failure_injector.rb @@ -0,0 +1,91 @@ +# encoding: utf-8 + +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require 'logstash/inputs/base' +require 'logstash/namespace' + +require_relative '../utils/failure_injector_util' + +class LogStash::Filters::FailureInjector < LogStash::Filters::Base + + config_name "failure_injector" + + # Defines the phases where plugin needs to make a pipeline degraded state. + # Accepts one or any of [register, filter, close] + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # degrade_at => ['register', 'filter'] + # } + config :degrade_at, :validate => :array, :default => [] + + # Defines the phases where plugin needs to be crashed, causes pipeline terminated. + # Accepts one of [register, filter, close] + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # crash_at => 'register' + # } + # + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Plugin doesn't validate the order, if `crash_at` is combined with `degrade_at`, and plugin cannot simulate degraded state after crash phase + # Example, + # failure_injector { + # crash_at => 'register' + # degrade_at => ['filter'] + # } + config :crash_at, :validate => :string + + def initialize(params) + FailureInjectorUtils.validate_config('filter', params) + super + end + + def register + @logger.debug("Registering plugin") + degrade_or_crash_if_required('register') + end + + def filter(event) + @logger.trace("Received the event to filter: #{event}") + degrade_or_crash_if_required('filter') + end + + def close + @logger.debug("Plugin is closing") + degrade_or_crash_if_required('close') + end + + def degrade_or_crash_if_required(phase) + degrade(phase) if @degrade_at.include?(phase) + crash(phase) if @crash_at && @crash_at == phase + end + + def degrade(phase) + @logger.debug("Degraded at #{phase} phase") + (1..100).each { |i| + sleep(i * 0.005) + } + end + + def crash(phase) + @logger.debug("Crashing at #{phase} phase") + raise "`logstash-filter-failure_injector` is crashing at #{phase} phase" + end + +end diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb new file mode 100644 index 00000000000..5e836a93f2e --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/outputs/failure_injector.rb @@ -0,0 +1,91 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +require 'logstash/inputs/base' +require 'logstash/namespace' + +require_relative '../utils/failure_injector_util' + +class LogStash::Outputs::FailureInjector < LogStash::Outputs::Base + + config_name "failure_injector" + + # Defines the phases where plugin needs to make a pipeline degraded state. + # Accepts one or any of [register, receive, close] + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # degrade_at => ['register', 'receive'] + # } + config :degrade_at, :validate => :array, :default => [] + + # Defines the phases where plugin needs to be crashed, causes pipeline terminated. + # Accepts one of [register, receive, close] + # Example config to degrade the pipeline status at register and receive phase: + # failure_injector { + # crash_at => 'register' + # } + # + # Note that, order of the phases doesn't matter as it obeys on plugin phase + # Plugin doesn't validate the order, if `crash_at` is combined with `degrade_at`, and plugin cannot simulate degraded state after crash phase + # Example, + # failure_injector { + # crash_at => 'register' + # degrade_at => ['receive'] + # } + config :crash_at, :validate => :string + + def initialize(params) + FailureInjectorUtils.validate_config('output', params) + super + end + + def register + @logger.debug("Registering plugin") + degrade_or_crash_if_required('register') + end + + def multi_receive(events) + @logger.trace("Received #{events.size} size of events") + degrade_or_crash_if_required('receive') + end + + def close + @logger.debug("Plugin is closing") + degrade_or_crash_if_required('close') + end + + def degrade_or_crash_if_required(phase) + degrade(phase) if @degrade_at.include?(phase) + crash(phase) if @crash_at && @crash_at == phase + end + + def degrade(phase) + @logger.debug("Degraded at #{phase} phase") + (1..100).each { |i| + sleep(i * 0.01) + } + end + + def crash(phase) + @logger.debug("Crashing at #{phase} phase") + raise "`logstash-output-failure_injector` is crashing at #{phase} phase" + end + +end diff --git a/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb b/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb new file mode 100644 index 00000000000..7a2f0af9c13 --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/lib/logstash/utils/failure_injector_util.rb @@ -0,0 +1,37 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +class FailureInjectorUtils + + def self.validate_config(type, params) + type_error_message = "`logstash-integration-failure_injector` accepts 'filter' or 'output' type." + raise type_error_message unless type + raise type_error_message unless %w(filter output).include?(type) + + plugin_phase = type == 'output' ? 'receive' : 'filter' + accepted_configs = ['register', "#{plugin_phase}", 'close'] + config_error_message = "failure_injector #{type} plugin accepts #{accepted_configs} configs but received" + params['degrade_at']&.each do | degrade_phase | + raise "#{config_error_message} #{degrade_phase}" unless accepted_configs.include?(degrade_phase) + end + + crash_at = params['crash_at'] + raise "#{config_error_message} #{crash_at}" if crash_at && !accepted_configs.include?(crash_at) + end +end \ No newline at end of file diff --git a/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector-0.0.1.gem new file mode 100644 index 0000000000000000000000000000000000000000..e241aaf94208b7e964965854f865c25539304c5e GIT binary patch literal 7168 zcmeHLS5y;Nw+;vr0Yd@-l_p3KDMEnIMJdun0-*#!TIe9X2q;B5QX~SQ4OK$Npmda` zG^I*#s1ChLCvt^3VO}!Tqw&S^J#5_W8~^-&)^(WaDLNZRuqxZ08U7 zyA1xr5C{a|cmKow>_x>TqyVA_Nhv9DxVWgqk9DHr2nk657yR!k=wI`CdwN-V{3yxS z-p0oHUju&{|6lR{IoltH`_uM+>h$V!Aiz1|)RvMwY^dc*goB%ojZ$eHMCK|T!ihj| za?unr>@I``9K&uWDJosFmH+)LphH9QxJ0v^dgNjEYFhuSU!VpSwi|tMPj+a}cdF?L z-BV*Twka!K=^Z7(JTwDua)WnPy)fjHvJ;KL8!M&rbjipqLe1RyGG!INgu(v&7^H0I zHbMvYQGLN^jKmvf2c&Bn`kD;O#bISc9zz}I?t~JM*atDX|(xP`~?JQkIf-f zP3QSLX{{7L>?K71=V{zR2E(ZmD?Eo4Y1-oUxw6z%GD^Z?Ez99R4Xbt zgl^~2Krhp=z~GTH%r8MT)MX^A(|*q#*ZtOO&4vCKC#XbTji+4X<*H80Jn+@J^ueLU zT`=}4RkrN3J)L}e_+Tl&l5EYw!ghqewo{H$uA%0HQ}{LIy_h3mRjI47aoTZku5e1$ zOytd{{@xGYA8($CDKB&_!0sE}lgwuj{mdW2SzmtlP_sEAF;cJM!UP3F)-Yo|j~-~k z4=GyY=VWZd)?1o9V$l&aVyQ>mX}_W=zrZ6mH$eOviR5^9;Y@cRmpeY#0IIXpx%oK8 z!R-L7@Wk{@PS_nuR)azN!T@+#yYT8esfwOA$4)^^x z>FYDvJH{+ZE8!!8-_E1g93Pa1Mi-==Q~2KJBD>lRnsIg!0sv zm%cO$Qx4f1Xv9Ppg39T4ChY#M(f5mE8=cOsT_n>emD5s}d(KQd z-gA8p{^>0J(M;uSmgRvh4KSB|d22d(9^qa2x2ssQsaHHqo4_c9TMIkLFT`(5)S-y! zX^T``7gd?zK~pPizAG+XvNaF!#-4^4x+Bc%UcAuy^dtC$lK&0Qy5@y{>L>gU*7e`q z|APVlgZwWch4|n64;TN*|NkG);t&2`V{`Hx@6gRY6s$mEu$-(gUK5v*8#T^?arg>i z<(#z`EAwqiwQVb~f*Z|XNe}tkWI!+gsOvS{<)isnD6yr<*VWaRf%cHV)`8(JPW4i| zl_bPG37NM{5lc{O7BlZC&H&Y}yX_sl|MqRR(?tHMLC~Gi@zK$hJKqA1ylQL5gDBnP zPJiJ$z4>LlAyV+%FbZrVX(?n%;XA%l;$hxht&N`Rq)6yEhwUEBZ$_|=&q?eX!_W%W z_dv=5uzl+M;yc_T6WOL%6$&j4c2UMcYu3Wp19uqeb`O^!K%Tf17LV${3t`)M3u7TN zmU9PlYfJOoA?g^|loE!b@CuGeA(=_IOX&V@FChbX#TMQK9#%%q);RulSC|bZJ+G@p zs$TQ(@ihvGEERAgQ&yd~7+YG{Le5C2hmkwJTDRwGO67E%xl zIezh~h$5tHl$W6jp!|9!R5v(Q|R) zO-tMugX>@6#_cb^O|;G8^h>hV^NQ3b*!Ipw-A@D5wkRJHwTyxHgeiKw8`I{dJv{a{ zC36nIRnOkVcNVHYf*&aIVGeR2@$|P2D?m%sz+ro^X*ADm5|U*`-&h7%sclvXNg(Q3 zz|xSW%r7PtaIl^ZFg1iKtR*V-%RcxCZ2AHnMJSbX<`=*OJ*^f2!YH>X`XyYO2B9|? z0CIx1JuBeIlS&3{eLxfF!adowcDa)|6m;vY^`Y0J(xTECN3>dC`e0$p5Q5;GRk%JO zW^9^L`)5&(r8dD`m0rc^qPn;xfY)4MKa_fQDNYK_}7sTxWL>4AtRgK?APiuTU-s zXwe@5>!>A~8hSPM?F|cpMOWJJ_1v{VkvZIR*+(hupE*Ji%%PxZp6$L*BoK-Nw!?j+ zyjF1AWqQE(hH zQwdekS~U#4W|ZCtk40#3C)u_<4DTf{b`hqpq3bF-Sf3JGXj~;k2Jz>`(CdjmvC_3j zL)#E{7GIcm3VENC?CH3<80UxgXn0eTzl^rw&UG|6=z1$>?qr$mla|a<+%BA}>3}=v zhKk8-qukkx2uO=MM#NaKvw5k>PPH|B#R!HJthkS2%D(IkpggK*znfUPjgHQ{Mz}jR z(Y6`X6vd5G3rwY6GlnQ93brlk9!@Dve?AN3T>5OqylmjS`AL{Pr}Am})IvH|RJC-Y zVc=oa!}Igt`s-H@J0n1W5V4dhX+L#;;oZxuPs}QxDjvy_M7Cs9$F$c_pK>YPzm2j^ zC6c!CSK5ZVXKft~g-+a?_kz%CB( z43_7hC+X5_bm}N$;d}>7%DTUqqFBC1YKuQ6_gav%{1rJ?%pZ%OSICCY8@>({uFGVf zW4Rv%?0ao=HQGdZAj3K=R??+Z4#gj;V zugUr1n{>ohzPidqth~vtvh@qP*T`yBAvgKc@Nwy2RLO z`k8*CAyJz4_=~;)wewQzL{nh8k>lZzr`oSJ=n?5lwCdJF{MxT$U?46`BsW3v3sT{| zBxRbZ2r((@o_*>FB-XJe0`fJPdu#9A4HOSm85N8PS?8EWdHp1b?~CTewLZevP?cb2 zaJiTX(bWGk%^ENDw0;O|7ACSOl%8PVbzA;3;TOK3z^5xe*f!zA`U^EbWl|R$4%Maf zib#BE>%my~4Eo3}#u6hfNsUXAk1W{p#OIpc`I>ZnD~Y%6x$qR>2#P;34PL}LyyJff zb-dcp(rDPI&3QHJO8*qJJJF#d^FFmjW`}WJ_9d(CXUr~F^pska;;bac#_yYzT1X-0 zY0JK!vF;Sjd+Dp)alOG&*>;JSt!ZYtKJ@9%hseoDKy2DzV4yv!U<=TZY{XUR%HM#D z<|}vovKmwkUf`K|!Y(SHC@{F*H=ZtsP%%qa!Rn-4B2Y=9i%_p=>A;xoux_4ox1!|C zjs4v4<;GKvlWmT`Lz0P|qOb){D3-~2ctwLUn3YO8ds^Dxb8@HPnbh+-Q_kb5e7UTX z*Kf)D0fapx64@-sSDZZB6C}Bl-Fve5Hk-W2hWXVcc2eI)`(#=h4V=hiUCKC1#e+!r zORp`)%{V)SY#D6t+Tt=L~h z*r6@Q^{!BZtPDcJ@70{Q^--;uHg{jkW9W3hY;DK(nC}^_Zw1Jcz9A6DU{;!*`ln^Pbv8}brd)PVtfftjLpNC$2f@o{6qR9QH&qQyKz zupI$tqJ3Ia+;w%Hbmdg2d$2jWR8*YDVV}^X8R@9~wx{F$Fzw)UtH(~XzYKjmqObY}u`A6TDDpsvJ-@gBVV2pmi zpJL`BB^naO0x0Ys}JuA2`%&XY!@HZ=eg;^THCNUWbyzE!p{?29-d{&HLmcVD)kgfmn7+X z~9ar`au|e=mR-*q+ zMUNMsH8Q*EFGt9-cnCYvqiE}8d2SvSI>20nWdwrsVyg;}<#Is+H}pN}tG9Q% zSqB_~tP~dc9OzzpE?>OjQrt#&7*Nw^IKtnIG_+_}lR7Nct&!}q;y}~l$f_w~k-U3O z)?ze?pT*`ovA!%NSR*4Dm2C5G!#wGq;_lIFMqhL&@y!l%sf_N|8}?V~Z<|%9&U#^{ zRfE>Gx}k?0)fBNF@Txs8N?d<_fVeQ()NODQT4?eev--NDSA%z>D7nDV+H8%#Kx%Nk zsu&vH#~{O&QM|M;W}30O+I+_IyL2Fv&jF(HG2H;wzYr1_dmD_Cr}raIVL!`9&VP>z z{Jk~y7yUmtLPG3M{Xc}HsMJsW_ZQdpZwvjQ|7>1}8dAZ5caP=UeGFH5z+5AvqXCpa z3!B;TPeU2i#K41{$noz!en9~$DY(yjN5;N7&Br^P`=6O7uPc>og0{oogKak2cyE?D zeRRI&i%wdWq<<9^(FQk4fAYAwtTE_xI}4DgQz8Vh0@pZuU)pMQ#3CD+^xO13cNss- zu;|h;(RUR_@-v*+J-DCQ0ZhiMM%hqTOE$LnaM&kT^~ZAsPcBKwDj_3t#X(N(lewkU z4^$M>@mcxPFCX~FP@m`A=5!2R6r`~jHD*@K3&MeD)Dy%p9gbD8?-{9MdE$)_vk%_; z?FrnfM~UMNZp~fqk1qV^hP>M-$vBd7%RED!v}9UmTVYj_R-O@Gt@g+|_;$pTLyA|X ZC7NF;{%wH&O!O0hp9uU!;2(m(e*r}?7_k5V literal 0 HcmV?d00001 diff --git a/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec new file mode 100644 index 00000000000..5525dcf4f51 --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/logstash-integration-failure_injector.gemspec @@ -0,0 +1,26 @@ + +Gem::Specification.new do |s| + s.name = "logstash-integration-failure_injector" + s.version = "0.0.1" + s.licenses = ["Apache-2.0"] + s.summary = "A collection of Logstash plugins that halp simulating abnormal cases during the tests." + s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname." + s.authors = ["Elastic"] + s.email = "info@elastic.co" + s.homepage = "https://www.elastic.co/logstash" + s.metadata = { + "logstash_plugin" => "true", + "logstash_group" => "integration", + "integration_plugins" => %w( + logstash-filter-failure_injector + logstash-output-failure_injector + ).join(",") + } + + s.files = Dir["lib/**/*","spec/**/*","*.gemspec"] + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.1.12", "<= 2.99" + + s.add_development_dependency "logstash-devutils" +end diff --git a/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb b/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb new file mode 100644 index 00000000000..d455d8b66cc --- /dev/null +++ b/qa/support/logstash-integration-failure_injector/spec/unit/failure_injector_spec.rb @@ -0,0 +1,129 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# encoding: utf-8 + +require "logstash/devutils/rspec/spec_helper" +require "logstash/filters/failure_injector" +require "logstash/outputs/failure_injector" + +%w(filter output).each do | plugin_type | + instance = plugin_type == 'filter' ? LogStash::Filters::FailureInjector : LogStash::Outputs::FailureInjector + phase = plugin_type == 'filter' ? 'filter' : 'receive' + describe instance do + let(:params) { { 'degrade_at' => [], 'crash_at' => nil } } + let(:event) { LogStash::Event.new } + let(:plugin) { described_class.new(params) } + + before do + allow(plugin).to receive(:@logger).and_return(double('logger', :debug => nil, :trace => nil)) + end + + describe 'plugin base' do + subject { described_class } + it { is_expected.to be_a_kind_of Class } + it { is_expected.to be <= (plugin_type == 'filter' ? LogStash::Filters::Base : LogStash::Outputs::Base) } + it { is_expected.to have_attributes(:config_name => "failure_injector") } + end + + shared_examples 'a phase that can degrade or crash' do |phase| + context "when degrades at #{phase}" do + let(:params) { { 'degrade_at' => [phase] } } + + it 'calls the degrade method' do + expect(plugin).to receive(:degrade).with(phase) + case phase + when 'filter' + plugin.filter(event) + when 'receive' + plugin.multi_receive([event]) + else + plugin.send(phase) + end + end + end + + context "when crashes at #{phase}" do + let(:params) { { 'crash_at' => phase } } + + it 'raises a crash error' do + case phase + when 'filter' + expect { plugin.filter(event) }.to raise_error(RuntimeError, /crashing at #{phase}/) + when 'receive' + expect { plugin.multi_receive([event]) }.to raise_error(RuntimeError, /crashing at #{phase}/) + else + expect { plugin.send(phase) }.to raise_error(RuntimeError, /crashing at #{phase}/) + end + end + end + end + + describe '#initialize' do + context 'when valid params are passed' do + let(:params) { { 'degrade_at' => [], 'crash_at' => nil } } + + it 'does not raise any error' do + expect { described_class.new(params) }.not_to raise_error + end + end + + context 'when invalid params are passed' do + it 'raises an error on invalid config' do + configs = ["register", plugin_type == 'filter' ? "filter" : "receive", "close"] + expect { + described_class.new('degrade_at' => ['invalid'], 'crash_at' => 'invalid') + }.to raise_error("failure_injector #{plugin_type} plugin accepts #{configs} configs but received invalid") + end + end + end + + describe '#register' do + it_behaves_like 'a phase that can degrade or crash', 'register' + end + + if plugin_type == 'filter' + describe '#filter' do + it_behaves_like 'a phase that can degrade or crash', 'filter' + end + end + + if plugin_type == 'output' + describe '#receive' do + it_behaves_like 'a phase that can degrade or crash', 'receive' + end + end + + describe '#close' do + it_behaves_like 'a phase that can degrade or crash', 'close' + end + + describe '#degrade' do + it 'sleeps for a certain period of time' do + expect(plugin).to receive(:sleep).at_least(:once) + plugin.degrade('filter') + end + end + + describe '#crash' do + it 'raises an error with the phase' do + expect { plugin.crash(phase) }.to raise_error(RuntimeError, /crashing at #{phase}/) + end + end + end +end +