Skip to content

Commit

Permalink
Merge pull request #2 from opsdis/batch_update
Browse files Browse the repository at this point in the history
Refactor to use nodegraph-provider's graphs endpoint
  • Loading branch information
thenodon authored Apr 17, 2022
2 parents d99327b + 14d2bc5 commit 812008f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
2 changes: 1 addition & 1 deletion tempo_trace_aggregation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def argument_parser() -> Dict[str, Any]:
nodeprovider = NodeGraphAPI(graph=conf['graph']['name'], connection=nodegraph_provider_con)

if nodes and edges:
nodeprovider.update_nodes(nodes=nodes, edges=edges)
nodeprovider.batch_update_nodes(nodes=nodes, edges=edges)
else:
nodeprovider.delete_graph()

Expand Down
38 changes: 36 additions & 2 deletions tempo_trace_aggregation/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ def execute(self,

def _api_call(self, url_path: str) -> Dict[str, Any]:
try:
r = requests.get(url=f"{self._connection.url}{url_path}", headers=self._connection.headers, timeout=self._connection.timeout)
r = requests.get(url=f"{self._connection.url}{url_path}", headers=self._connection.headers,
timeout=self._connection.timeout)
if r.status_code == 200:
response = r.json()
if response:
Expand All @@ -264,14 +265,22 @@ def __init__(self, graph: str, connection: RestConnection):

def delete_graph(self):
try:
requests.post(f"{self._connection.url}/api/controller/{self.graph}/delete-all",
requests.delete(f"{self._connection.url}/api/graphs/{self.graph}",
headers=self._connection.headers, timeout=self._connection.timeout)
#requests.post(f"{self._connection.url}/api/controller/{self.graph}/delete-all",
#headers=self._connection.headers, timeout=self._connection.timeout)
except Exception as err:
log.error_fmt(
{'graph': self.graph, 'operation': 'delete-all', 'error': err.__str__()},
"Connection to nodegraph_provider failed")

def update_nodes(self, nodes: List[Node], edges: List[Edge]):
"""
This method is deprecated
:param nodes:
:param edges:
:return:
"""
self.delete_graph()

start = time.time()
Expand Down Expand Up @@ -318,3 +327,28 @@ def update_nodes(self, nodes: List[Node], edges: List[Edge]):
log.info_fmt(
{'graph': self.graph, 'nodes': len(nodes), 'edges': len(edges), 'time': time.time() - start},
"Update nodegraph_provider")

def batch_update_nodes(self, nodes: List[Node], edges: List[Edge]):

start = time.time()

batch = {'nodes': [], 'edges': []}

for node in nodes:
batch['nodes'].append(node.to_params_id())
for edge in edges:
batch['edges'].append(edge.to_params())

try:
r = requests.post(f"{self._connection.url}/api/graphs/{self.graph}", headers=self._connection.headers,
timeout=self._connection.timeout,
data=json.dumps(batch))
if r.status_code != 201:
log.warn_fmt({'graph': self.graph, 'object': 'graph', 'operation': 'create',
'status_code': r.status_code}, "Failed to create graph")
except Exception as err:
log.error_fmt({'graph': self.graph, 'object': 'graph', 'operation': 'create', 'error': err.__str__()},
"Connection to nodegraph_provider failed")
log.info_fmt(
{'graph': self.graph, 'nodes': len(nodes), 'edges': len(edges), 'time': time.time() - start},
"Update nodegraph_provider")

0 comments on commit 812008f

Please sign in to comment.