Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DLQ-ing events that trigger an conditional evaluation error. #16423

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/static/dead-letter-queues.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ loss in this situation, you can <<configuring-dlq,configure Logstash>> to write
unsuccessful events to a dead letter queue instead of dropping them.

NOTE: The dead letter queue is currently supported only for the
<<plugins-outputs-elasticsearch,{es} output>>. The dead letter queue is used for
documents with response codes of 400 or 404, both of which indicate an event
<<plugins-outputs-elasticsearch,{es} output>> and <<conditionals, conditional statements evaluation>>.
The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event
that cannot be retried.
It's also used when a conditional evaluation encounter an error.

Each event written to the dead letter queue includes the original event,
metadata that describes the reason the event could not be processed, information
Expand Down Expand Up @@ -57,7 +58,12 @@ status code per entry to indicate why the action could not be performed.
If the DLQ is configured, individual indexing failures are routed there.

Even if you regularly process events, events remain in the dead letter queue.
The dead letter queue requires <<dlq-clear,manual intervention>> to clear it.
The dead letter queue requires <<dlq-clear,manual intervention>> to clear it.

[[conditionals-dlq]]
==== Conditional statements and the dead letter queue
When a conditional statement reaches an error in processing an event, such as comparing string and integer values,
the event, as it is at the time of evaluation, is inserted into the dead letter queue.

[[configuring-dlq]]
==== Configuring {ls} to use dead letter queues
Expand Down
50 changes: 44 additions & 6 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,8 @@ def flush(options)
LogStash::PLUGIN_REGISTRY.add(:output, "spec_sampler_output", PipelineHelpers::SpecSamplerOutput)
end

describe "given a pipeline executing an event that would trigger an evaluation error" do
context "given a pipeline executing an event that would trigger an evaluation error" do
let(:pipeline) do
settings.set_value("queue.drain", true)
LogStash::JavaPipeline.new(
org.logstash.config.ir.PipelineConfig.new(
LogStash::Config::Source::Local, :main,
Expand All @@ -470,11 +469,50 @@ def flush(options)
pipeline.close
end

subject {results.length > 1 ? results : results.first}
describe "when DLQ is disabled" do
let(:settings) do
s = super()
s.set_value("queue.drain", true)
s
end

it "should raise an error without killing the pipeline" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
subject {results.length > 1 ? results : results.first}

it "should raise an error without killing the pipeline" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
end
end

describe "when DLQ is enabled" do
let(:dlq_path) { Dir.mktmpdir }

let(:settings) do
s = super()
s.set_value("queue.drain", true)
s.set_value("pipeline.id", "test_dlq")
s.set_value("dead_letter_queue.enable", true)
s.set_value("path.dead_letter_queue", dlq_path)
s
end

after do
FileUtils.rm_rf(settings.get_value("path.dead_letter_queue"))
end

subject {results.length > 1 ? results : results.first}

it "should raise an error without killing the pipeline and insert the event into DLQ" do
expect(subject).to be nil
expect(pipeline.last_error_evaluation_received).to match(/no implicit conversion of nil into Integer/)
dlq_path = java.nio.file.Paths.get(settings.get_value("path.dead_letter_queue"), "test_dlq")
dlq_reader = org.logstash.common.io.DeadLetterQueueReader.new(dlq_path)
entry = dlq_reader.pollEntry(40)
expect(entry).to_not be_nil
expect(entry.reason).to match(/condition evaluation error.*no implicit conversion of nil into Integer/)
expect(entry.plugin_id).to eq("if-statement")
expect(entry.plugin_type).to eq("if-statement")
end
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
private @SuppressWarnings("rawtypes") RubyArray outputs;

private String lastErrorEvaluationReceived = "";
private DeadLetterQueueWriter javaDlqWriter;

public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -180,10 +181,32 @@ public final class LogErrorEvaluationListener implements ConditionalEvaluationLi
@Override
public void notify(ConditionalEvaluationError err) {
lastErrorEvaluationReceived = err.getCause().getMessage();
LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload.", lastErrorEvaluationReceived);
if (isDLQEnabled()) {
LOGGER.warn("{}. Failing event was sent to dead letter queue", lastErrorEvaluationReceived);
jsvd marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.warn("{}. Event was dropped, enable debug logging to see the event's payload.", lastErrorEvaluationReceived);
}
LOGGER.debug("Event generating the fault: {}", err.failedEvent().toMap().toString());

// logs the exception at debug level
if (LOGGER.isDebugEnabled()) {
debugLogStackTrace(err);
}

if (isDLQEnabled()) {
try {
javaDlqWriter.writeEntry(err.failedEvent(), "if-statement", "if-statement", "condition evaluation error, " + lastErrorEvaluationReceived);
andsel marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException ioex) {
LOGGER.error("Can't write in DLQ", ioex);
}
}
}

private boolean isDLQEnabled() {
return javaDlqWriter != null;
}

private void debugLogStackTrace(ConditionalEvaluationError err) {
try (StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) {
err.printStackTrace(pw);
LOGGER.debug("{}", sw);
Expand Down Expand Up @@ -372,7 +395,7 @@ public IRubyObject lirExecution(final ThreadContext context) {
public final IRubyObject dlqWriter(final ThreadContext context) {
if (dlqWriter == null) {
if (dlqEnabled(context).isTrue()) {
final DeadLetterQueueWriter javaDlqWriter = createDeadLetterQueueWriterFromSettings(context);
javaDlqWriter = createDeadLetterQueueWriterFromSettings(context);
dlqWriter = JavaUtil.convertJavaToUsableRubyObject(context.runtime, javaDlqWriter);
} else {
dlqWriter = RubyUtil.DUMMY_DLQ_WRITER_CLASS.callMethod(context, "new");
Expand Down
Loading