Skip to content

Commit

Permalink
filter code update
Browse files Browse the repository at this point in the history
  • Loading branch information
MVarshini committed Oct 7, 2024
1 parent 0eab681 commit fd6055e
Show file tree
Hide file tree
Showing 17 changed files with 291 additions and 441 deletions.
240 changes: 99 additions & 141 deletions backend/app/api/v1/commons/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,57 @@
from app.services.search import ElasticService
import json

def buildFilterQuery(filter: dict, query: dict):
minimum_match = 0
filter_dict = json.loads(filter)
print("filter")
if bool(filter_dict):
for key,val in filter_dict.items():
if key == "workerNodesCount":
query["query"]["bool"]["filter"].append({"terms":{"workerNodesCount":val}})
elif key == "build":
for item in filter_dict["build"]:
buildObj = getMatchPhrase("ocpVersion", item)
query["query"]["bool"]["should"].append(buildObj)
minimum_match+=1
elif key == "jobType":
for item in filter_dict["jobType"]:
obj = getMatchPhrase("upstreamJob", item)
query["query"]["bool"]["should"].append(obj)
minimum_match+=1
elif key == "isRehearse":
rehearseObj = {"match_phrase": {"upstreamJob":"rehearse"}}
if True in filter_dict["isRehearse"]:
query["query"]["bool"]["should"].append(rehearseObj)
minimum_match+=1
if False in filter_dict["isRehearse"]:
query["query"]["bool"]["must_not"].append(rehearseObj)
else:

for item in filter_dict[key]:
queryObj = getMatchPhrase(key, item)
query["query"]["bool"]["should"].append(queryObj)
minimum_match+=1
print(len(query["query"]["bool"]["should"]))
print("my match")
if len(query["query"]["bool"]["should"]) >= 1:
query["query"]["bool"].update({"minimum_should_match": minimum_match})
print(query)
return query

def buildAggregateQuery():
keysDict = {"ciSystem":"ciSystem.keyword","platform":"platform.keyword","benchmark":"benchmark.keyword",
"releaseStream":"releaseStream.keyword","networkType":"networkType.keyword", "workerNodesCount":"workerNodesCount","jobStatus":"jobStatus.keyword",
"controlPlaneArch":"controlPlaneArch.keyword","publish":"publish.keyword","fips":"fips.keyword","encrypted":"encrypted.keyword",
"ipsec":"ipsec.keyword", "ocpVersion":"ocpVersion.keyword", "build":"ocpVersion.keyword",
"upstream":"upstreamJob.keyword"
}
aggregate = {}
for x,y in keysDict.items():
obj = {x:{"terms":{"field":y,"size":10}}}
aggregate.update(obj)
return aggregate

async def getData(start_datetime: date, end_datetime: date, size:int, offset:int, sort:str, filter:str, configpath: str):
try:
query = {
Expand All @@ -23,42 +74,12 @@ async def getData(start_datetime: date, end_datetime: date, size:int, offset:int
}
}
es = ElasticService(configpath=configpath)
minimum_match = 0
filter_dict={}

if sort:
query.update({"sort": json.loads(sort)})
if filter:
filter_dict = json.loads(filter)
print("filter")
if bool(filter_dict):
for key,val in filter_dict.items():
if key == "workerNodesCount":
query["query"]["bool"]["filter"].append({"terms":{"workerNodesCount":val}})
elif key == "build":
buildObj = getMatchPhrase("ocpVersion", filter_dict["build"])
query["query"]["bool"]["should"].append(buildObj)
minimum_match+=1
elif key == "jobType":
obj = getMatchPhrase("upstreamJob", filter_dict["jobType"])
query["query"]["bool"]["should"].append(obj)
minimum_match+=1
elif key == "isRehearse":
rehearseObj = {"match_phrase": {"upstreamJob":"rehearse"}}
if True in filter_dict["isRehearse"]:
query["query"]["bool"]["should"].append(rehearseObj)
minimum_match+=1
if False in filter_dict["isRehearse"]:
query["query"]["bool"]["must_not"].append(rehearseObj)
else:
queryObj = getMatchPhrase(key, filter_dict[key])
query["query"]["bool"]["should"].append(queryObj)
minimum_match+=1
print(len(query["query"]["bool"]["should"]))
print("my match")
if len(query["query"]["bool"]["should"]) >= 1:
query["query"]["bool"].update({"minimum_should_match": minimum_match})
print(query)

query=buildFilterQuery(filter, query)

response = await es.post(query=query, size=size, offset=offset, start_date=start_datetime, end_date=end_datetime, timestamp_field='timestamp')
await es.close()
tasks = [item['_source'] for item in response["data"]]
Expand Down Expand Up @@ -90,147 +111,84 @@ async def getData(start_datetime: date, end_datetime: date, size:int, offset:int
except Exception as err:
print(f"{type(err).__name__} was raised14: {err}")

def getMatchPhrase(key, items):
for item in items:
buildObj = {"match_phrase": {key: item}}
def getMatchPhrase(key, item):
buildObj = {"match_phrase": {key: item}}
return buildObj


def getSummary(jobStatus, isFilterReset):
if isFilterReset:
new_dict = {item['key']:0 for item in jobStatus}
else:
new_dict = {item['key']:item['doc_count'] for item in jobStatus}
return new_dict

async def getFilterData(start_datetime: date, end_datetime: date, size:int, offset:int, sort:str, filter:str, configpath: str):
try:
query = {
"aggs":{
"min_timestamp":{
"min":{
"field": start_datetime.strftime('%Y-%m-%d') if start_datetime else (datetime.utcnow().date() - timedelta(days=5).strftime('%Y-%m-%d'))
}
},
"max_timestamp": {
"max": {
"field": end_datetime.strftime('%Y-%m-%d') if end_datetime else datetime.utcnow().strftime('%Y-%m-%d')
}
}
},
"query":{
"bool": {
"filter":[{
"range": {
"timestamp": {
"format": "yyyy-MM-dd"
}
}

}],
"should":[],
"must_not":[]
}
}
}
keysDict = {"ciSystem":"ciSystem.keyword","platform":"platform.keyword","benchmark":"benchmark.keyword",
"releaseStream":"releaseStream.keyword","networkType":"networkType.keyword", "workerNodesCount":"workerNodesCount","jobStatus":"jobStatus.keyword",
"controlPlaneArch":"controlPlaneArch.keyword","publish":"publish.keyword","fips":"fips.keyword","encrypted":"encrypted.keyword",
"ipsec":"ipsec.keyword", "ocpVersion":"ocpVersion.keyword", "build":"ocpVersion.keyword",
"upstream":"upstreamJob.keyword"
}

aggregate = {}
start_date = start_datetime.strftime('%Y-%m-%d') if start_datetime else (datetime.utcnow().date() - timedelta(days=5).strftime('%Y-%m-%d'))
end_date = end_datetime.strftime('%Y-%m-%d') if end_datetime else datetime.utcnow().strftime('%Y-%m-%d')
query = {"aggs":{"min_timestamp":{"min":{"field":start_date}},"max_timestamp":{"max":{"field":end_date}}},"query":{"bool":{"filter":[{"range":{"timestamp":{"format":"yyyy-MM-dd","lte":end_date,"gte":start_date}}}],"should":[],"must_not":[]}}}

es = ElasticService(configpath=configpath)
if sort:
if bool(sort):
query.update({"sort": json.loads(sort)})

if filter:
filter_dict = json.loads(filter)
minimum_match = 0
if bool(filter_dict):
for key,val in filter_dict.items():
if key == "workerNodesCount":
query["query"]["bool"]["filter"].append({"terms":{"workerNodesCount":val}})
elif key == "build":
buildObj = getMatchPhrase("ocpVersion", filter_dict["build"])
query["query"]["bool"]["should"].append(buildObj)
minimum_match+=1
elif key == "jobType":
obj = getMatchPhrase("upstreamJob", filter_dict["jobType"])
query["query"]["bool"]["should"].append(obj)
minimum_match+=1
elif key == "isRehearse":
rehearseObj = {"match_phrase": {"upstreamJob":"rehearse"}}
if True in filter_dict["isRehearse"]:
query["query"]["bool"]["should"].append(rehearseObj)
minimum_match+=1
if False in filter_dict["isRehearse"]:
query["query"]["bool"]["must_not"].append(rehearseObj)
else:
queryObj = getMatchPhrase(key, filter_dict[key])
query["query"]["bool"]["should"].append(queryObj)
minimum_match+=1
if len(query["query"]["bool"]["should"]) >= 1:
query["query"]["bool"].update({"minimum_should_match": minimum_match})

for x,y in keysDict.items():
obj = {x:{"terms":{"field":y,"size":10}}}
aggregate.update(obj)
if bool(filter):
query = buildFilterQuery(filter, query)

aggregate = buildAggregateQuery()
query["aggs"].update(aggregate)

isFilterReset = False
response = await es.filterPost(query=query)
metrics = {'total': response["total"]}

if(response["total"] == 0):
query.pop('query', None)
response = await es.filterPost(query=query)
isFilterReset = True
await es.close()

filterK=[]
metrics = [{"key":"total", "count":0 if isFilterReset else response["total"]}]


await es.close()
filter_=[]

if bool(response) and bool(response["filter_"]):
for k,v in response["filter_"].items():
if k!= "max_timestamp" and k!= "min_timestamp":
obj={"key":k}
for x in v:
values=[]
buildVal=[]
if x == "buckets":
buck = v[x]
if k == "jobStatus":
print(buck)
metrics.update(getSummary(buck, isFilterReset))
for m in buck:
if k == "ocpVersion":
values.append(m["key"][0:4])
elif k == "build":
values.append("-".join(m["key"].split("-")[-4:]))
elif k == "jobStatus":
print("filter reset")
print(isFilterReset)
if isFilterReset:
metricsObj = {"key": str(m["key"]).lower(), "count":0}
else:
metricsObj = {"key": str(m["key"]).lower(), "count":m["doc_count"]}
metrics.append(metricsObj)
print("mterics")
print(metrics)
values.append(str(m["key"]).lower())
values.append("-".join(m["key"].split("-")[-4:]))
else:
values.append(str(m["key"]).lower())
obj.update({"value": values})
filterK.append(obj)
filter_.append(obj)

upstreamData = response["filter_"]["upstream"] and response["filter_"]["upstream"]["buckets"]
jobType = []
isRehearse = []
if len(upstreamData) > 0:
for i in upstreamData:
if i["key"].find("periodic"):
jobType.append("periodic")
else:
jobType.append("pull request")
if i["key"].find("rehearse"):
isRehearse.append("true")
else:
isRehearse.append("false")
filterK.append({"key":"jobType", "value":list(set(jobType))})
filterK.append({"key":"isRehearse","value": list(set(isRehearse))})
if response["filter_"]["upstream"] and response["filter_"]["upstream"]["buckets"]:
upstreamData = response["filter_"]["upstream"]["buckets"]
jobType = []
isRehearse = []
if len(upstreamData) > 0:
for i in upstreamData:
if i["key"].find("periodic"):
jobType.append("periodic")
else:
jobType.append("pull request")
if i["key"].find("rehearse"):
isRehearse.append("true")
else:
isRehearse.append("false")
filter_.append({"key":"jobType", "value":list(set(jobType))})
filter_.append({"key":"isRehearse","value": list(set(isRehearse))})

filterResponse = [d for d in filterK if d['key']!="upstream"]
filterResponse = [d for d in filter_ if d['key']!="upstream"]

return ({"filterData":filterResponse, "summary":metrics})
return({"filterData":[], "summary":[]})
Expand Down
7 changes: 5 additions & 2 deletions backend/app/api/v1/endpoints/ocm/ocmJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ async def jobs(start_date: date = Query(None, description="Start date for search

if start_date > end_date:
return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422)

results = await getData(start_date, end_date, 'ocm.elasticsearch')
sort = {}
filter = {}
offset = 0
size = 75
results = await getData(start_date, end_date, sort, filter, offset, 'ocm.elasticsearch')

if len(results) >= 1:
response = {
Expand Down
8 changes: 4 additions & 4 deletions backend/app/api/v1/endpoints/ocp/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def jobSummary(uuids: list):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def processBurner(data: dict) :
Expand Down Expand Up @@ -346,7 +346,7 @@ async def getBurnerResults(uuid: str, uuids: list, index: str ):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def getResults(uuid: str, uuids: list, index: str ):
Expand All @@ -366,7 +366,7 @@ async def getResults(uuid: str, uuids: list, index: str ):
es = ElasticService(configpath="ocp.elasticsearch",index=index)
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
return runs

async def getMatchRuns(meta: dict, workerCount: False):
Expand Down Expand Up @@ -416,7 +416,7 @@ async def getMatchRuns(meta: dict, workerCount: False):
es = ElasticService(configpath="ocp.elasticsearch")
response = await es.post(query=query)
await es.close()
runs = [item['_source'] for item in response]
runs = [item['_source'] for item in response["data"]]
uuids = []
for run in runs :
uuids.append(run["uuid"])
Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/v1/endpoints/ocp/ocpJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def jobs(start_date: date = Query(None, description="Start date for search

if start_date > end_date:
return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422)

results = await getData(start_date, end_date, size, offset, sort, filter, 'ocp.elasticsearch')


Expand Down
2 changes: 1 addition & 1 deletion backend/app/api/v1/endpoints/ocp/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ async def results_for_job(
es = ElasticService(configpath="ocp.elasticsearch")
response = await es.post(query=query)
await es.close()
tasks = [item['_source'] for item in response]
tasks = [item['_source'] for item in response['data']]
return tasks
Loading

0 comments on commit fd6055e

Please sign in to comment.