Skip to content

Commit

Permalink
feat: Dynamo DB Over SQS
Browse files Browse the repository at this point in the history
Signed-off-by: Chirag Soni <[email protected]>
  • Loading branch information
chirag-ibm committed Feb 1, 2024
1 parent f34a803 commit b936e22
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@

input {
s3 {
#Mandatory arguments:
#Insert the access key and secret that has access to the Bucket
access_key_id => "<ACCESS-KEY>"
secret_access_key => "<SECRET-KEY>"
region => "<REGION>" #Region that has the DB, Default value: us-east-1
bucket => "<BUCKET_NAME>" #This is Bucket cnfigured while creating CloudTrail
access_key_id => "<Enter the access key id>"
secret_access_key => "<Enter the secret access key id>"
region => "<Enter AWS region value>" #Region that has the DB, Default value: us-east-1
bucket => "<Enter the S3 bucket name>" #This is Bucket cnfigured while creating CloudTrail
interval => 5
type => 'CloudTrail'
type => 'CloudTrail'
codec => cloudtrail {}
#Insert the account id of the AWS account where dynamodb is present.
add_field => {"account_id" => "<ACCOUNT_ID>"}
#Prefix is an optional parameter to speed up the data fetching from S3. If all the tables related to the cloudTrail are in the same Region, then add the value as prefix => "AWSLogs/399998888666/CloudTrail/ap-south-1/" The value can be confirmed on S3 buckets page in AWS.
prefix => "<PREFIX>" # for e.g., "AWSLogs/<Account_ID>/<Tables_Region>/"
add_field => {"account_id" => "<Enter the aws account id>"}
#Prefix is an optional parameter to speed up the data fetching from S3. If all the tables related to the cloudTrail are in the same Region, then add the value as prefix => "AWSLogs/399998888666/CloudTrail/ap-south-1/" The value can be confirmed on S3 buckets page in AWS.
prefix => "<Enter the prefix of filenames in the bucket>" # for e.g., "AWSLogs/<Account_ID>/<Tables_Region>/"
}
}

Expand Down Expand Up @@ -62,12 +59,12 @@ filter {
}

if [errorMessage] {
mutate {add_field => {"[message][errorMessage]" => "%{errorMessage}"}}
}
mutate {add_field => {"[message][errorMessage]" => "%{errorMessage}"}}
}

if [errorCode] {
mutate {add_field => {"[message][errorCode]" => "%{errorCode}"}}
}
if [errorCode] {
mutate {add_field => {"[message][errorCode]" => "%{errorCode}"}}
}

if [eventSource] {
if[eventSource] =~ "dynamodb.amazonaws.com" {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Copyright 2022-2023 IBM Inc. All rights reserved
# SPDX-License-Identifier: Apache2.0
#

input{
cloudwatch_logs {
#Mandatory arguments:
log_group => ["<Enter the aws log group>"] #e.g., ["aws-dynamodb-logs"]
access_key_id => "<Enter the access key id>"
secret_access_key => "<Enter the secret access key id>"
region => "<Enter aws region>" #Region that has the DB, Default value: us-east-1
start_position => "end"
interval => 5
event_filter => ""
type => "dynamodb"
#Insert the account id of the AWS account where dynamodb is present.
add_field => {"account_id" => "<Enter Aws account Id>"}
}
}

filter {
if [type] == 'dynamodb' {
json {
source => "message"
target => "parsed_json"
}

if [parsed_json][detail] {

mutate {
add_field => {
"new_event_source" => "%{[parsed_json][detail][eventSource]}"
}

replace => {
"message" => "%{[parsed_json][detail]}"
}
}

} else {

mutate {
add_field => {
"new_event_source" => "%{[parsed_json][eventSource]}"
}
}
}

ruby { code => 'event.set("message", event.get("message").to_s)' }

if [new_event_source] {
if[new_event_source] =~ "dynamodb.amazonaws.com" {
dynamodb_guardium_plugin_filter {}
}
else {
drop {}
}
}

prune { whitelist_names => [ "GuardRecord" ] }
}
}

This file was deleted.

Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright 2022-2023 IBM Inc. All rights reserved
# SPDX-License-Identifier: Apache2.0
#

input{
sqs {
access_key_id => "<Enter the access key id>"
secret_access_key => "<Enter the secret access key id>"
region => "<Enter aws region>" #Region that has the Queue, Default value: us-east-1
queue => "<Enter the sqs queue name>" #This parameter simply holds the Queue name and not the URL
codec => plain
type => "dynamodb"
#Insert the account id of the AWS account
add_field => {"account_id" => "<Enter aws account id>"}
}
}

filter {
if [type] == "Dynamodb" {
mutate { gsub => [ "message", "'", '"' ] }
mutate { gsub => [ "message", '\"', '"' ] }
mutate { gsub => [ "message", '"{', '{' ] }
mutate { gsub => [ "message", '}"', '}' ] }
json {
source => "message"
target => "parsed_json"
}
mutate {
add_field => {
"new_event_source" => "%{[parsed_json][message][eventSource]}"
}
replace => {
"message" => "%{[parsed_json][message]}"
}
}
if [new_event_source] {
if[new_event_source] =~ "dynamodb.amazonaws.com" {
dynamodb_guardium_plugin_filter {}
}
else {
drop {}
}
}

mutate { remove_field => [ "parsed_json", "new_event_source", "message", "cloudwatch_logs", "@timestamp", "@version", "type", "host", "sequence" ] }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright 2022-2023 IBM Inc. All rights reserved
# SPDX-License-Identifier: Apache2.0
#

import boto3
import os
from datetime import datetime, timedelta
GROUP_NAME = os.environ['GROUP_NAME']
QUEUE_NAME = os.environ['QUEUE_NAME']

def lambda_handler(event, context):

currentTime = datetime.now()
StartDate = currentTime - timedelta(minutes = 2)
EndDate = currentTime


fromDate = int(StartDate.timestamp() * 1000)
toDate = int(EndDate.timestamp() * 1000)


client = boto3.client('logs')
sqs = boto3.client('sqs')

logGroupDetails = client.describe_log_streams(
logGroupName = GROUP_NAME,
orderBy = 'LastEventTime',
descending = True
)

for logStream in logGroupDetails['logStreams'] :
logStreamNameObt = logStream['logStreamName']
response = client.get_log_events(
logGroupName=GROUP_NAME,
logStreamName=logStreamNameObt,
startTime=fromDate,
endTime=toDate,
startFromHead=True,
)

for event in response['events'] :
sqs.send_message(
QueueUrl = QUEUE_NAME,
MessageBody=str(event)
)

print("New response",response)
Loading

0 comments on commit b936e22

Please sign in to comment.