-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfunc.py
98 lines (84 loc) · 3.44 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import io
import oci
import json
import logging
from datetime import datetime
from fdk import response
from onug_decorator import onug
from write_to_oci_log import write_onug_to_log
## to be moved
LOGGER = logging.getLogger()
LOGGER.setLevel(level=logging.WARNING)
LOGGER.info("Inside Event Logging Function")
def get_signer():
signer = None
try:
signer = oci.auth.signers.get_resource_principals_signer()
except Exception as ex:
LOGGER.error('ERROR: Could not get signer from resource principal', ex, flush=True)
raise
return signer
def get_function_config(ctx):
config = dict(ctx.Config())
# Getting LOG_LEVEL from function config
try:
log_level = getattr(logging,config["LOG_LEVEL"].upper(),None)
if isinstance(log_level, int):
LOGGER.setLevel(level=log_level)
else:
LOGGER.warning("Invalid LOG_LEVEL in function configuration.")
except KeyError:
LOGGER.warning("LOG_LEVEL not defined in function configuration.")
# Getting LOG_LEVEL from function config
global provider_json
try:
provider_json = config["PROVIDER_JSON_URL"]
LOGGER.info("get_function_config: Provider JSON URL is set to: " + config["PROVIDER_JSON_URL"])
except KeyError:
LOGGER.warning("Provider JSON URL is not defined in function configuration.")
provider_json = 'https://objectstorage.us-ashburn-1.oraclecloud.com/n/orasenatdpltsecitom01/b/HammerPublic/o/oci_output.json'
# Getting LOGGING_OCID from function config
global LOGGING_OCID
try:
LOGGER.info("get_function_config: Logging OCID set to: " + config["LOGGING_OCID"])
LOGGING_OCID = config["LOGGING_OCID"]
except:
LOGGER.info("get_function_config: No LOGGING OCID Provided")
LOGGING_OCID = None
def handler(ctx, data: io.BytesIO = None):
global LOGGER
LOGGER = logging.getLogger()
#LOGGER.setLevel(level=logging.DEBUG)
LOGGER.info("Inside Event Logging Function")
# Getting function configuration
get_function_config(ctx)
# Getting Event Message
try:
message = json.loads(data.getvalue())
LOGGER.debug("Handler: message is: " + str(message))
except (Exception) as ex:
raise Exception("Event type not properly formatted." + str(ex))
# Getting Resource principal
signer = get_signer()
try:
my_onug = onug(provider_json, message)
payload = my_onug.get_finding()
del my_onug
logging.debug("Handler: My onug object is: " + str(payload))
if LOGGING_OCID:
try:
LOGGER.info("Handler: writing ONUG finding to log OCID: " + str(LOGGING_OCID))
LOGGER.info("Handler: writing ONUG finding to log")
write_onug_to_log(signer, {}, payload, LOGGING_OCID)
except (Exception) as ex:
raise Exception("Handler: Failed to write event to log: " + str(ex))
return response.Response(
ctx, response_data=json.dumps(payload),
headers={"Content-Type": "application/json"})
else:
LOGGER.info("Handler: returning mapped ONUG finding")
return response.Response(
ctx, response_data=json.dumps(payload),
headers={"Content-Type": "application/json"})
except (Exception) as ex:
raise Exception("Handler: Event type not properly formatted." + str(ex))