-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcheck_flow_logs.py
51 lines (42 loc) · 1.36 KB
/
check_flow_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import boto3
import time
import click
@click.command()
@click.argument('vpc-id', type=str)
@click.argument('log-group-name', type=str)
@click.option('--region', type=str, default='eu-west-2')
@click.option('--max-query-time', type=int, default=60)
def main(vpc_id, log_group_name, region, max_query_time):
# set start_time to now (seconds since epoch)
start_time = int(time.time())
end_time = start_time + max_query_time
query_string = 'QUERY_STRING'
# Initialize the CloudWatch Logs client
client = boto3.client('logs')
# Start the query
response = client.start_query(
logGroupName=log_group_name,
startTime=start_time,
endTime=end_time,
queryString=query_string
)
query_id = response['queryId']
# Wait for the query to complete
while True:
query_status = client.get_query_results(
queryId=query_id
)['status']
if query_status == 'Complete':
break
elif query_status == 'Failed' or query_status == 'Cancelled':
print(f"Query failed or was cancelled. Status: {query_status}")
exit(1)
time.sleep(1)
# Retrieve and print the query results
results = client.get_query_results(
queryId=query_id
)['results']
for result in results:
print('\t'.join(result))
if __name__ == '__main__':
main()