Skip to content

Commit

Permalink
pagination params
Browse files Browse the repository at this point in the history
  • Loading branch information
MVarshini committed Nov 9, 2024
1 parent 4baf573 commit 9385645
Show file tree
Hide file tree
Showing 24 changed files with 318 additions and 210 deletions.
11 changes: 6 additions & 5 deletions backend/app/api/v1/commons/hce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from app.services.search import ElasticService


async def getData(start_datetime: date, end_datetime: date, configpath: str):
async def getData(start_datetime: date, end_datetime: date, size:int, offset:int, configpath: str):
query = {
"size": size,
"from": offset,
"query": {
"bool": {
"filter": {
Expand All @@ -17,14 +19,13 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str):
}
}
}

es = ElasticService(configpath=configpath)
response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='date')
response = await es.post(query=query, size=size, start_date=start_datetime, end_date=end_datetime, timestamp_field='date')
await es.close()
tasks = [item['_source'] for item in response]
tasks = [item['_source'] for item in response['data']]
jobs = pd.json_normalize(tasks)
jobs[['group']] = jobs[['group']].fillna(0)
jobs.fillna('', inplace=True)
if len(jobs) == 0:
return jobs
return jobs
return ({'data':jobs, 'total': response['total']})
10 changes: 6 additions & 4 deletions backend/app/api/v1/commons/ocm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from app.services.search import ElasticService


async def getData(start_datetime: date, end_datetime: date, configpath: str):
async def getData(start_datetime: date, end_datetime: date, size:int, offset:int, configpath: str):
query = {
"size": size,
"from": offset,
"query": {
"bool": {
"filter": {
Expand All @@ -19,9 +21,9 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str):
}

es = ElasticService(configpath=configpath)
response = await es.post(query=query, start_date=start_datetime, end_date=end_datetime, timestamp_field='metrics.earliest')
response = await es.post(query=query, size=size, start_date=start_datetime, end_date=end_datetime, timestamp_field='metrics.earliest')
await es.close()
tasks = [item['_source'] for item in response]
tasks = [item['_source'] for item in response['data']]
jobs = pd.json_normalize(tasks)
if len(jobs) == 0:
return jobs
Expand All @@ -32,7 +34,7 @@ async def getData(start_datetime: date, end_datetime: date, configpath: str):
jobs.insert(len(jobs.columns), "ciSystem", "")
jobs.fillna('', inplace=True)
jobs['jobStatus'] = jobs.apply(convertJobStatus, axis=1)
return jobs
return {"data":jobs,"total": response["total"]}


def fillCiSystem(row):
Expand Down
1 change: 0 additions & 1 deletion backend/app/api/v1/commons/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ async def getData(start_datetime: date, end_datetime: date, size:int, offset:int
jobs = pd.json_normalize(tasks)
if len(jobs) == 0:
return jobs

jobs[['masterNodesCount', 'workerNodesCount',
'infraNodesCount', 'totalNodesCount']] = jobs[['masterNodesCount', 'workerNodesCount', 'infraNodesCount', 'totalNodesCount']].fillna(0)
jobs.fillna('', inplace=True)
Expand Down
1 change: 0 additions & 1 deletion backend/app/api/v1/commons/quay.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@ async def getData(start_datetime: date, end_datetime: date, size, offset, config
cleanJobs = jobs[jobs['platform'] != ""]

jbs = cleanJobs

return ({'data':jbs, 'total': response['total']})
4 changes: 2 additions & 2 deletions backend/app/api/v1/commons/telco.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def getData(start_datetime: date, end_datetime: date, size: int, offset: i
response = await splunk.query(query=query, size=size, offset=offset, searchList=searchList)
mapped_list = []

for each_response in response:
for each_response in response['data']:
end_timestamp = int(each_response['timestamp'])
test_data = each_response['data']
threshold = await telcoGraphs.process_json(test_data, True)
Expand Down Expand Up @@ -68,4 +68,4 @@ async def getData(start_datetime: date, end_datetime: date, size: int, offset: i
if len(jobs) == 0:
return jobs

return jobs
return {'data':jobs, 'total':response['total']}
2 changes: 1 addition & 1 deletion backend/app/api/v1/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def getMetadata(uuid: str, configpath: str) :
es = ElasticService(configpath=configpath)
response = await es.post(query=query)
await es.close()
meta = [item['_source'] for item in response]
meta = [item['_source'] for item in response['data']]
return meta[0]

def updateStatus(job):
Expand Down
63 changes: 53 additions & 10 deletions backend/app/api/v1/endpoints/cpt/cptJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ async def jobs(start_date: date = Query(None, description="Start date for search
end_date: date = Query(None, description="End date for searching jobs, format: 'YYYY-MM-DD'", examples=["2020-11-15"]),
pretty: bool = Query(False, description="Output contet in pretty format."),
size: int = Query(None, description="Number of jobs to fetch"),
offset: int = Query(None, description="Offset Number to fetch jobs from"),):
offset: int = Query(None, description="Offset Number to fetch jobs from"),
totalJobs: int = Query(None, description="Offset Number to fetch jobs from")
):
if start_date is None:
start_date = datetime.utcnow().date()
start_date = start_date - timedelta(days=5)
Expand All @@ -51,20 +53,31 @@ async def jobs(start_date: date = Query(None, description="Start date for search
return Response(content=json.dumps({'error': "invalid date format, start_date must be less than end_date"}), status_code=422)

results_df = pd.DataFrame()
total_dict = {}
total = 0
with ProcessPoolExecutor(max_workers=cpu_count()) as executor:
futures = {executor.submit(fetch_product, product, start_date, end_date): product for product in products}
futures = {executor.submit(fetch_product, product, start_date, end_date, size, offset): product for product in products}
for future in as_completed(futures):
product = futures[future]
try:
result = future.result()
results_df = pd.concat([results_df, result])
total_dict[product] = result['total']
results_df = pd.concat([results_df, result['data']])
except Exception as e:
print(f"Error fetching data for product {product}: {e}")


num = 0 if totalJobs is None else int(totalJobs)
if totalJobs == 0:
for product in total_dict:
total += int(total_dict[product])

totalJobs =max(total,num)
response = {
'startDate': start_date.__str__(),
'endDate': end_date.__str__(),
'results': results_df.to_dict('records')
'results': results_df.to_dict('records'),
'total': totalJobs,
'offset': offset + size
}

if pretty:
Expand All @@ -75,16 +88,46 @@ async def jobs(start_date: date = Query(None, description="Start date for search
return jsonstring


async def fetch_product_async(product, start_date, end_date):
async def fetch_product_async(product, start_date, end_date, size, offset):
try:
df = await products[product](start_date, end_date)
return df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]] if len(df) != 0 else df
response = await products[product](start_date, end_date, size, offset)
if response:
df = response["data"]
return {"data":df.loc[:, ["ciSystem", "uuid", "releaseStream", "jobStatus", "buildUrl", "startDate", "endDate", "product", "version", "testName"]] if len(df) != 0 else df, "total":response["total"]}
except ConnectionError:
print("Connection Error in mapper for product " + product)
except Exception as e:
print(f"Error in mapper for product {product}: {e}")
return pd.DataFrame()


def fetch_product(product, start_date, end_date):
return asyncio.run(fetch_product_async(product, start_date, end_date))
def fetch_product(product, start_date, end_date, size, offset):
return asyncio.run(fetch_product_async(product, start_date, end_date, size, offset))

def is_requested_size_available(total_count, offset, requested_size):
"""
Check if the requested size of data is available starting from a given offset.
Args:
total_count (int): Total number of available records.
offset (int): The starting position in the dataset.
requested_size (int): The number of records requested.
Returns:
bool: True if the requested size is available, False otherwise.
"""
return (offset + requested_size) <= total_count
def calculate_remaining_data(total_count, offset, requested_size):
"""
Calculate the remaining number of data items that can be fetched based on the requested size.
Args:
total_count (int): Total number of available records.
offset (int): The starting position in the dataset.
requested_size (int): The number of records requested.
Returns:
int: The number of records that can be fetched, which may be less than or equal to requested_size.
"""
available_data = total_count - offset # Data available from the offset
return min(available_data, requested_size)
41 changes: 22 additions & 19 deletions backend/app/api/v1/endpoints/cpt/maps/hce.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from datetime import date

################################################################
# This will return a DataFrame from HCE required by the CPT
# endpoint, it contians the following columns:
# This will return a Dictionary with from HCE required by the CPT
# endpoint, it contians totalJobs and a Dataframe with the following columns:
# "ciSystem"
# "uuid"
# "releaseStream"
Expand All @@ -15,23 +15,26 @@
# "version"
# "testName"
################################################################
async def hceMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'hce.elasticsearch')
if len(df) == 0:
return df
df["releaseStream"] = "Nightly"
df["ciSystem"] = "Jenkins"
df["testName"] = df["product"] + ":" + df["test"]
df["product"] = df["group"]
df["jobStatus"] = df['result'].apply(lambda x: "SUCCESS" if x == 'PASS' else "FAILURE")
df["version"] = df['version'].apply(lambda x: x if len(x.split(":")) == 1 else x.split(":")[1][:7])
df["uuid"] = df["result_id"]
df["buildUrl"] = df["link"]
df["startDate"] = df["date"]
df["endDate"] = df["date"]
df = dropColumns(df)
return df

import pandas as pd
async def hceMapper(start_datetime: date, end_datetime: date, size: int, offset: int):
response = await getData(start_datetime, end_datetime, size, offset, f'hce.elasticsearch')
if response:
df = response["data"]
if len(df) == 0:
return df
df["releaseStream"] = "Nightly"
df["ciSystem"] = "Jenkins"
df["testName"] = df["product"] + ":" + df["test"]
df["product"] = df["group"]
df["jobStatus"] = df['result'].apply(lambda x: "SUCCESS" if x == 'PASS' else "FAILURE")
df["version"] = df['version'].apply(lambda x: x if len(x.split(":")) == 1 else x.split(":")[1][:7])
df["uuid"] = df["result_id"]
df["buildUrl"] = df["link"]
df["startDate"] = df["date"]
df["endDate"] = df["date"]
df = dropColumns(df)
return {"data":df, "total": response["total"]}
return {"data":pd.DataFrame(), "total":0}

def dropColumns(df):
df = df.drop(columns=["group","test","result","result_id","link","date","release"])
Expand Down
25 changes: 14 additions & 11 deletions backend/app/api/v1/endpoints/cpt/maps/ocm.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from ....commons.ocm import getData
from datetime import date

import pandas as pd

################################################################
# This will return a DataFrame from OCM required by the CPT endpoint
################################################################
async def ocmMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'ocm.elasticsearch')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "ocm")
df.insert(len(df.columns), "releaseStream", "Nightly")
df["testName"] = df["attack"]
df["startDate"] = df["metrics.earliest"]
df["endDate"] = df["metrics.end"]
async def ocmMapper(start_datetime: date, end_datetime: date, size:int, offset:int):
response = await getData(start_datetime, end_datetime, size, offset, f'ocm.elasticsearch')
if not isinstance(response, pd.DataFrame) and response:
df = response["data"]
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "ocm")
df.insert(len(df.columns), "releaseStream", "Nightly")
df["testName"] = df["attack"]
df["startDate"] = df["metrics.earliest"]
df["endDate"] = df["metrics.end"]

return df
return {"data":df, "total": response["total"]}
return {"data":pd.DataFrame(), "total":0}
24 changes: 14 additions & 10 deletions backend/app/api/v1/endpoints/cpt/maps/ocp.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from ....commons.ocp import getData
from ....commons.utils import getReleaseStream
from datetime import date

import pandas as pd

################################################################
# This will return a DataFrame from OCP required by the CPT endpoint
################################################################
async def ocpMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'ocp.elasticsearch')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "ocp")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
df["testName"] = df["benchmark"]
return df
async def ocpMapper(start_datetime: date, end_datetime: date, size:int, offset:int):
response = await getData(start_datetime, end_datetime, size, offset, f'ocp.elasticsearch')
if not isinstance(response, pd.DataFrame) and response:
df = response["data"]
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "ocp")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
df["testName"] = df["benchmark"]
return {"data":df, "total": response["total"]}
return {"data":pd.DataFrame(), "total": response["total"]}

27 changes: 15 additions & 12 deletions backend/app/api/v1/endpoints/cpt/maps/quay.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
from ....commons.quay import getData
from datetime import date
import pandas as pd


#####################################################################
# This will return a DataFrame from Quay required by the CPT endpoint
#####################################################################
async def quayMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'quay.elasticsearch')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "quay")
df["version"] = df["releaseStream"]
df["testName"] = df["benchmark"]
return df
#####################################################################################
# This will return a DataFrame from Quay required by the CPT endpoint with Total jobs
#####################################################################################
async def quayMapper(start_datetime: date, end_datetime: date, size:int, offset: int):
response = await getData(start_datetime, end_datetime, size, offset, f'quay.elasticsearch')
if not isinstance(response, pd.DataFrame) and response:
df = response["data"]
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "quay")
df["version"] = df["releaseStream"]
df["testName"] = df["benchmark"]
return {"data":df, "total": response["total"]}
return {"data":pd.DataFrame(), "total":0}
23 changes: 13 additions & 10 deletions backend/app/api/v1/endpoints/cpt/maps/telco.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from ....commons.telco import getData
from ....commons.utils import getReleaseStream
from datetime import date

import pandas as pd

#####################################################################
# This will return a DataFrame from Telco required by the CPT endpoint
#####################################################################
async def telcoMapper(start_datetime: date, end_datetime: date):
df = await getData(start_datetime, end_datetime, f'telco.splunk')
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "telco")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
df["testName"] = df["benchmark"]
return df
async def telcoMapper(start_datetime: date, end_datetime: date, size:int, offset: int):
response = await getData(start_datetime, end_datetime, size, offset, f'telco.splunk')
if not isinstance(response, pd.DataFrame) and response:
df = response["data"]
if len(df) == 0:
return df
df.insert(len(df.columns), "product", "telco")
df["releaseStream"] = df.apply(getReleaseStream, axis=1)
df["version"] = df["shortVersion"]
df["testName"] = df["benchmark"]
return {"data":df, "total": response["total"]}
return {"data":pd.DataFrame(), "total":0}
Loading

0 comments on commit 9385645

Please sign in to comment.