Skip to content

Commit

Permalink
some cleanup and improvements of the zeek logstash pipeline idaholab#592
Browse files Browse the repository at this point in the history
  • Loading branch information
mmguero committed Oct 11, 2024
1 parent 47d06b2 commit 2e2fce1
Show file tree
Hide file tree
Showing 73 changed files with 6,675 additions and 6,429 deletions.
8 changes: 4 additions & 4 deletions docs/contributing-logstash.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ Finally, in the [`./config/logstash.env` file](malcolm-config.md#MalcolmConfigEn

The following modifications must be made in order for Malcolm to parse new Zeek log files:

1. Add a parsing section to [`logstash/pipelines/zeek/11_zeek_parse.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/11_zeek_parse.conf)
1. Add a parsing filter file named so that it sorts after [`logstash/pipelines/zeek/1001_zeek_parse.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/1001_zeek_parse.conf) but before [`logstash/pipelines/zeek/1199_zeek_unknown.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/1199_zeek_unknown.conf)
* Follow patterns for existing log files as an example
* For common Zeek fields such as the `id` four-tuple, timestamp, etc., use the same convention used by existing Zeek logs in that file (e.g., `ts`, `uid`, `orig_h`, `orig_p`, `resp_h`, `resp_p`)
* Take care, especially when copy-pasting filter code, the Zeek delimiter isn't modified from a tab character to a space character (see "*zeek's default delimiter is a literal tab, MAKE SURE YOUR EDITOR DOESN'T SCREW IT UP*" warnings in that file)
1. If necessary, perform log normalization in [`logstash/pipelines/zeek/13_zeek_normalize.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/13_zeek_normalize.conf) for values such as action (`event.action`), result (`event.result`), application protocol version (`network.protocol_version`), etc.
1. If necessary, define conversions for floating point or integer values in [`logstash/pipelines/zeek/14_zeek_convert.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/14_zeek_convert.conf)
* The [`logstash/scripts/logstash-start.sh`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/scripts/logstash-start.sh) Logstash container startup script should automatically fix any issues with parsing the Zeek tab delimiter (e.g., converting spaces in the `dissect` and `split` filters to tabs)
1. If necessary, perform log normalization in [`logstash/pipelines/zeek/1300_zeek_normalize.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/1300_zeek_normalize.conf) for values such as action (`event.action`), result (`event.result`), application protocol version (`network.protocol_version`), etc.
1. If necessary, define conversions for floating point or integer values in [`logstash/pipelines/zeek/1400_zeek_convert.conf`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/logstash/pipelines/zeek/1400_zeek_convert.conf)
1. Identify the new fields and add them as described in [Adding new log fields](contributing-new-log-fields.md#NewFields)

The script [`scripts/zeek_script_to_malcolm_boilerplate.py`]({{ site.github.repository_url }}/blob/{{ site.github.build_revision }}/scripts/zeek_script_to_malcolm_boilerplate.py) may help by autogenerating these filters.
Expand Down
2 changes: 1 addition & 1 deletion logstash/maps/malcolm_severity.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# keys should match the event.severity_tags values set in 19_severity.conf
# keys should match the event.severity_tags values set in 1900_severity.conf
"Cross-segment traffic": 20
"External traffic": 20
"Inbound traffic": 50
Expand Down
File renamed without changes.
File renamed without changes.
82 changes: 82 additions & 0 deletions logstash/pipelines/zeek/1001_zeek_parse.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
########################
# zeek -> arkime session creation and enrichment
#
# see https://docs.zeek.org/en/stable/script-reference/log-files.html for Zeek logfile documentation
#
# see source.zeeklogs.js for the Arkime code that turns these into UI fields
#
# to profile, debug:
# - get filters sorted by execution time (where in > 0)
# $ docker compose exec logstash curl -XGET http://localhost:9600/_node/stats/pipelines | jq -r '.. | .filters? // empty | .[] | objects | select (.events.in > 0) | [.id, .events.in, .events.out, .events.duration_in_millis] | join (";")' | sort -n -t ';' -k4
# - get filters where in != out
# $ docker compose exec logstash curl -XGET http://localhost:9600/_node/stats/pipelines | jq -r '.. | .filters? // empty | .[] | objects | select (.events.in != .events.out) | [.id, .events.in, .events.out, .events.duration_in_millis] | join (";")'
#
# Copyright (c) 2024 Battelle Energy Alliance, LLC. All rights reserved.
#######################

filter {

# handle JSON-formatted Zeek logs right out of the gate, we'll do the field renaming below
if ([message] =~ /^{.*}$/) { json {
id => "json_zeek_message_parse"
source => "[message]"
target => "[zeek_cols]"
add_tag => [ "_jsonparsesuccess" ]
} }


# in JSON, do some global renaming of common fields to make them match the names we'd
# be assigning to them if we were reading TSV
if ("_jsonparsesuccess" in [tags]) {

# some of the ICSNPP parsers do an interesting thing to handle source and destination fields
# (see https://github.com/cisagov/icsnpp-bacnet/?tab=readme-ov-file#source-and-destination-fields)
# so check for and handle those first
ruby {
id => "ruby_zeek_json_determine_source_destination_fields"
code => "
if ![event.get('[zeek_cols][source_h]').to_s,
event.get('[zeek_cols][source_p]').to_s,
event.get('[zeek_cols][destination_h]').to_s,
event.get('[zeek_cols][destination_p]').to_s].reject{ |e| e.nil? || e.empty? || (e == '0') }.empty? then
event.set('[@metadata][icsnpp_source_dest_fields]', 'true')
end
"
}
if ([@metadata][icsnpp_source_dest_fields]) {
mutate {
id => "mutate_rename_zeek_json_common_reversed_direction_fields"
rename => { "[zeek_cols][id.orig_h]" => "[zeek_cols][drop_orig_h]" }
rename => { "[zeek_cols][id.orig_p]" => "[zeek_cols][drop_orig_p]" }
rename => { "[zeek_cols][id.resp_h]" => "[zeek_cols][drop_resp_h]" }
rename => { "[zeek_cols][id.resp_p]" => "[zeek_cols][drop_resp_p]" }
rename => { "[zeek_cols][source_h]" => "[zeek_cols][orig_h]" }
rename => { "[zeek_cols][source_p]" => "[zeek_cols][orig_p]" }
rename => { "[zeek_cols][destination_h]" => "[zeek_cols][resp_h]" }
rename => { "[zeek_cols][destination_p]" => "[zeek_cols][resp_p]" }
}

} else {
mutate {
id => "mutate_rename_zeek_json_common_fields"
rename => { "[zeek_cols][id.orig_h]" => "[zeek_cols][orig_h]" }
rename => { "[zeek_cols][id.orig_p]" => "[zeek_cols][orig_p]" }
rename => { "[zeek_cols][id.resp_h]" => "[zeek_cols][resp_h]" }
rename => { "[zeek_cols][id.resp_p]" => "[zeek_cols][resp_p]" }
}
} # icsnpp_source_dest_fields or not
} # _jsonparsesuccess in tags

# for non-JSON:
# The Dissect is WAY faster than CSV, and quite a bit faster than mutate.split. However, it
# is not as flexible when it comes to missing or extra columns
# (See https://github.com/logstash-plugins/logstash-filter-dissect/issues/62)
#
# So, if the dissect filter fails, we're going to fall back to split-then-zip solution.
# This should be a good tradeoff between performance (in the case where the Zeek logs
# match what we think they should look like) and flexibility (when they don't).
#
# The one drawback is that if you make a change to the fields in dissect, make sure
# you make the corresponding change in the ruby init code.

} # end Filter
146 changes: 146 additions & 0 deletions logstash/pipelines/zeek/1011_zeek_bacnet.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
########################
# Copyright (c) 2024 Battelle Energy Alliance, LLC. All rights reserved.
#######################

filter {


if ([log_source] == "bacnet") {
#############################################################################################################################
# bacnet.log
# https://github.com/cisagov/ICSNPP

if ("_jsonparsesuccess" not in [tags]) {
dissect {
id => "dissect_zeek_bacnet"
mapping => {
"[message]" => "%{[zeek_cols][ts]} %{[zeek_cols][uid]} %{[zeek_cols][drop_orig_h]} %{[zeek_cols][drop_orig_p]} %{[zeek_cols][drop_resp_h]} %{[zeek_cols][drop_resp_p]} %{[zeek_cols][is_orig]} %{[zeek_cols][orig_h]} %{[zeek_cols][orig_p]} %{[zeek_cols][resp_h]} %{[zeek_cols][resp_p]} %{[zeek_cols][bvlc_function]} %{[zeek_cols][pdu_type]} %{[zeek_cols][pdu_service]} %{[zeek_cols][invoke_id]} %{[zeek_cols][result_code]}"
}
}
if ("_dissectfailure" in [tags]) {
mutate {
id => "mutate_split_zeek_bacnet"
split => { "[message]" => " " }
}
ruby {
id => "ruby_zip_zeek_bacnet"
init => "@zeek_bacnet_field_names = [ 'ts', 'uid', 'drop_orig_h', 'drop_orig_p', 'drop_resp_h', 'drop_resp_p', 'is_orig', 'orig_h', 'orig_p', 'resp_h', 'resp_p', 'bvlc_function', 'pdu_type', 'pdu_service', 'invoke_id', 'result_code' ]"
code => "event.set('[zeek_cols]', @zeek_bacnet_field_names.zip(event.get('[message]')).to_h)"
}
}
}

mutate {
id => "mutate_add_fields_zeek_bacnet"
add_field => {
"[zeek_cols][proto]" => "udp"
"[zeek_cols][service]" => "bacnet"
}
add_tag => [ "ics" ]
}

} else if ([log_source] == "bacnet_device_control") {
#############################################################################################################################
# bacnet_device_control.log
# https://github.com/cisagov/ICSNPP

if ("_jsonparsesuccess" not in [tags]) {
dissect {
id => "dissect_zeek_bacnet_device_control"
mapping => {
"[message]" => "%{[zeek_cols][ts]} %{[zeek_cols][uid]} %{[zeek_cols][drop_orig_h]} %{[zeek_cols][drop_orig_p]} %{[zeek_cols][drop_resp_h]} %{[zeek_cols][drop_resp_p]} %{[zeek_cols][is_orig]} %{[zeek_cols][orig_h]} %{[zeek_cols][orig_p]} %{[zeek_cols][resp_h]} %{[zeek_cols][resp_p]} %{[zeek_cols][invoke_id]} %{[zeek_cols][pdu_service]} %{[zeek_cols][time_duration]} %{[zeek_cols][device_state]} %{[zeek_cols][password]} %{[zeek_cols][result]} %{[zeek_cols][result_code]}"
}
}
if ("_dissectfailure" in [tags]) {
mutate {
id => "mutate_split_zeek_bacnet_device_control"
split => { "[message]" => " " }
}
ruby {
id => "ruby_zip_zeek_bacnet_device_control"
init => "@zeek_bacnet_device_control_field_names = [ 'ts', 'uid', 'drop_orig_h', 'drop_orig_p', 'drop_resp_h', 'drop_resp_p', 'is_orig', 'orig_h', 'orig_p', 'resp_h', 'resp_p', 'invoke_id', 'pdu_service', 'time_duration', 'device_state', 'password', 'result', 'result_code' ]"
code => "event.set('[zeek_cols]', @zeek_bacnet_device_control_field_names.zip(event.get('[message]')).to_h)"
}
}
}

mutate {
id => "mutate_add_fields_zeek_bacnet_device_control"
add_field => {
"[zeek_cols][proto]" => "udp"
"[zeek_cols][service]" => "bacnet"
}
add_tag => [ "ics" ]
}

} else if ([log_source] == "bacnet_discovery") {
#############################################################################################################################
# bacnet_discovery.log
# https://github.com/cisagov/ICSNPP

if ("_jsonparsesuccess" not in [tags]) {
dissect {
id => "dissect_zeek_bacnet_discovery"
mapping => {
"[message]" => "%{[zeek_cols][ts]} %{[zeek_cols][uid]} %{[zeek_cols][drop_orig_h]} %{[zeek_cols][drop_orig_p]} %{[zeek_cols][drop_resp_h]} %{[zeek_cols][drop_resp_p]} %{[zeek_cols][is_orig]} %{[zeek_cols][orig_h]} %{[zeek_cols][orig_p]} %{[zeek_cols][resp_h]} %{[zeek_cols][resp_p]} %{[zeek_cols][pdu_service]} %{[zeek_cols][device_id_type]} %{[zeek_cols][device_id_number]} %{[zeek_cols][object_type]} %{[zeek_cols][instance_number]} %{[zeek_cols][vendor]} %{[zeek_cols][range]} %{[zeek_cols][object_name]}"
}
}
if ("_dissectfailure" in [tags]) {
mutate {
id => "mutate_split_zeek_bacnet_discovery"
split => { "[message]" => " " }
}
ruby {
id => "ruby_zip_zeek_bacnet_discovery"
init => "@zeek_bacnet_discovery_field_names = [ 'ts', 'uid', 'drop_orig_h', 'drop_orig_p', 'drop_resp_h', 'drop_resp_p', 'is_orig', 'orig_h', 'orig_p', 'resp_h', 'resp_p', 'pdu_service', 'device_id_type', 'device_id_number', 'object_type', 'instance_number', 'vendor', 'range', 'object_name' ]"
code => "event.set('[zeek_cols]', @zeek_bacnet_discovery_field_names.zip(event.get('[message]')).to_h)"
}
}
}

mutate {
id => "mutate_add_fields_zeek_bacnet_discovery"
add_field => {
"[zeek_cols][proto]" => "udp"
"[zeek_cols][service]" => "bacnet"
}
add_tag => [ "ics" ]
}

} else if ([log_source] == "bacnet_property") {
#############################################################################################################################
# bacnet_property.log
# https://github.com/cisagov/ICSNPP

if ("_jsonparsesuccess" not in [tags]) {
dissect {
id => "dissect_zeek_bacnet_property"
mapping => {
"[message]" => "%{[zeek_cols][ts]} %{[zeek_cols][uid]} %{[zeek_cols][drop_orig_h]} %{[zeek_cols][drop_orig_p]} %{[zeek_cols][drop_resp_h]} %{[zeek_cols][drop_resp_p]} %{[zeek_cols][is_orig]} %{[zeek_cols][orig_h]} %{[zeek_cols][orig_p]} %{[zeek_cols][resp_h]} %{[zeek_cols][resp_p]} %{[zeek_cols][invoke_id]} %{[zeek_cols][pdu_service]} %{[zeek_cols][object_type]} %{[zeek_cols][instance_number]} %{[zeek_cols][property]} %{[zeek_cols][array_index]} %{[zeek_cols][value]}"
}
}
if ("_dissectfailure" in [tags]) {
mutate {
id => "mutate_split_zeek_bacnet_property"
split => { "[message]" => " " }
}
ruby {
id => "ruby_zip_zeek_bacnet_property"
init => "@zeek_bacnet_property_field_names = [ 'ts', 'uid', 'drop_orig_h', 'drop_orig_p', 'drop_resp_h', 'drop_resp_p', 'is_orig', 'orig_h', 'orig_p', 'resp_h', 'resp_p', 'invoke_id', 'pdu_service', 'object_type', 'instance_number', 'property', 'array_index', 'value' ]"
code => "event.set('[zeek_cols]', @zeek_bacnet_property_field_names.zip(event.get('[message]')).to_h)"
}
}
}

mutate {
id => "mutate_add_fields_zeek_bacnet_property"
add_field => {
"[zeek_cols][proto]" => "udp"
"[zeek_cols][service]" => "bacnet"
}
add_tag => [ "ics" ]
}

}

} # end Filter
37 changes: 37 additions & 0 deletions logstash/pipelines/zeek/1012_zeek_bestguess.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
########################
# Copyright (c) 2024 Battelle Energy Alliance, LLC. All rights reserved.
#######################

filter {


if ([log_source] == "bestguess") {
#############################################################################################################################
# bestguess.log

if ("_jsonparsesuccess" not in [tags]) {
dissect {
id => "dissect_zeek_bestguess"
mapping => {
"[message]" => "%{[zeek_cols][ts]} %{[zeek_cols][uid]} %{[zeek_cols][orig_h]} %{[zeek_cols][orig_p]} %{[zeek_cols][resp_h]} %{[zeek_cols][resp_p]} %{[zeek_cols][proto]} %{[zeek_cols][name]} %{[zeek_cols][category]}"
}
}
if ("_dissectfailure" in [tags]) {
mutate {
id => "mutate_split_zeek_bestguess"
split => { "[message]" => " " }
}
ruby {
id => "ruby_zip_zeek_bestguess"
init => "@zeek_bestguess_field_names = [ 'ts', 'uid', 'orig_h', 'orig_p', 'resp_h', 'resp_p', 'proto', 'name', 'category' ]"
code => "event.set('[zeek_cols]', @zeek_bestguess_field_names.zip(event.get('[message]')).to_h)"
}
}
}

mutate { id => "mutate_add_tag_ics_best_guess_log"
add_tag => [ "ics_best_guess" ] }

}

} # end Filter
Loading

0 comments on commit 2e2fce1

Please sign in to comment.