Skip to content

Commit

Permalink
Moved tracer to statestore
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Hellander committed Dec 5, 2023
1 parent a5ba3d0 commit 19eb1fc
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 225 deletions.
32 changes: 0 additions & 32 deletions .devcontainer/devcontainer.json.tpl

This file was deleted.

4 changes: 3 additions & 1 deletion fedn/cli/run_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,13 @@ def dashboard_cmd(ctx, host, port, secret_key, local_package, name, init):
statestore_config = fedn_config['statestore']
if statestore_config['type'] == 'MongoDB':
statestore = MongoStateStore(
network_id, statestore_config['mongo_config'], fedn_config['storage'])
network_id, statestore_config['mongo_config'])
else:
print("Unsupported statestore type, exiting. ", flush=True)
exit(-1)

statestore.set_storage_backend(fedn_config['storage'])

# Enable JWT token authentication.
if config['secret_key']:
# If we already have a valid token in statestore config, use that one.
Expand Down
3 changes: 0 additions & 3 deletions fedn/fedn/common/tracer/__init__.py

This file was deleted.

125 changes: 0 additions & 125 deletions fedn/fedn/common/tracer/mongotracer.py

This file was deleted.

11 changes: 0 additions & 11 deletions fedn/fedn/common/tracer/tracer.py

This file was deleted.

3 changes: 2 additions & 1 deletion fedn/fedn/network/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
network_id = get_network_config()
modelstorage_config = get_modelstorage_config()
statestore = MongoStateStore(
network_id, statestore_config["mongo_config"], modelstorage_config
network_id, statestore_config["mongo_config"]
)
statestore.set_storage_backend(modelstorage_config)
control = Control(statestore=statestore)
api = API(statestore, control)
app = Flask(__name__)
Expand Down
2 changes: 1 addition & 1 deletion fedn/fedn/network/combiner/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def run(self, polling_interval=1.0):
tic
round_meta['status'] = "Success"
round_meta['name'] = self.server.id
self.server.tracer.set_round_combiner_data(round_meta)
self.server.statestore.set_round_combiner_data(round_meta)
elif round_config['task'] == 'validation' or round_config['task'] == 'inference':
self.execute_validation_round(round_config)
else:
Expand Down
24 changes: 14 additions & 10 deletions fedn/fedn/network/combiner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import fedn.common.net.grpc.fedn_pb2 as fedn
import fedn.common.net.grpc.fedn_pb2_grpc as rpc
from fedn.common.config import (get_controller_config, get_modelstorage_config,
get_network_config, get_statestore_config)
from fedn.common.net.grpc.server import Server
from fedn.common.storage.s3.s3repo import S3ModelRepository
from fedn.common.tracer.mongotracer import MongoTracer
from fedn.network.combiner.connect import ConnectorCombiner, Status
from fedn.network.combiner.modelservice import ModelService
from fedn.network.combiner.round import RoundController
from fedn.network.statestore.mongostatestore import MongoStateStore

VALID_NAME_REGEX = '^[a-zA-Z0-9_-]*$'

Expand Down Expand Up @@ -118,17 +120,19 @@ def __init__(self, config):
'certificate': cert,
'key': key}

print(announce_config, flush=True)

# Set up model repository
self.repository = S3ModelRepository(
announce_config['storage']['storage_config'])

self.statestore = MongoStateStore(
announce_config['statestore']['network_id'],
announce_config['statestore']['mongo_config']
)
# Create gRPC server
self.server = Server(self, self.modelservice, grpc_config)

# Set up tracer for statestore
self.tracer = MongoTracer(
announce_config['statestore']['mongo_config'], announce_config['statestore']['network_id'])

# Set up round controller
self.control = RoundController(config['aggregator'], self.repository, self, self.modelservice)

Expand Down Expand Up @@ -359,13 +363,13 @@ def __route_request_to_client(self, request, client, queue_name):
raise

def _send_status(self, status):
""" Report a status to tracer.
""" Report a status to backend db.
:param status: the status to report
:type status: :class:`fedn.common.net.grpc.fedn_pb2.Status`
"""

self.tracer.report_status(status)
self.statestore.report_status(status)

def __register_heartbeat(self, client):
""" Register a client if first time connecting. Update heartbeat timestamp.
Expand Down Expand Up @@ -714,15 +718,15 @@ def ModelUpdateRequestStream(self, response, context):

self._send_status(status)

self.tracer.update_client_status(client.name, "online")
self.statestore.update_client_status(client.name, "online")

while context.is_active():
try:
yield q.get(timeout=1.0)
except queue.Empty:
pass

self.tracer.update_client_status(client.name, "offline")
self.statestore.update_client_status(client.name, "offline")

def ModelValidationStream(self, update, context):
""" Model validation stream RPC endpoint. Update status for client is connecting to stream.
Expand Down Expand Up @@ -838,7 +842,7 @@ def register_model_validation(self, validation):
:type validation: :class:`fedn.common.net.grpc.fedn_pb2.ModelValidation`
"""

self.tracer.report_validation(validation)
self.statestore.report_validation(validation)

def SendModelValidation(self, request, context):
""" Send a model validation response.
Expand Down
19 changes: 6 additions & 13 deletions fedn/fedn/network/controller/controlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import fedn.utils.helpers
from fedn.common.storage.s3.s3repo import S3ModelRepository
from fedn.common.tracer.mongotracer import MongoTracer
from fedn.network.api.network import Network
from fedn.network.combiner.interfaces import CombinerUnavailableError
from fedn.network.state import ReducerState
Expand Down Expand Up @@ -77,12 +76,6 @@ def __init__(self, statestore):
)
raise UnsupportedStorageBackend()

# The tracer is a helper that manages state in the database backend
statestore_config = statestore.get_config()
self.tracer = MongoTracer(
statestore_config["mongo_config"], statestore_config["network_id"]
)

if self.statestore.is_inited():
self._state = ReducerState.idle

Expand Down Expand Up @@ -205,13 +198,13 @@ def create_session(self, config):
else:
session_id = config["session_id"]

self.tracer.create_session(id=session_id)
self.tracer.set_session_config(session_id, config)
self.statestore.create_session(id=session_id)
self.statestore.set_session_config(session_id, config)

def create_round(self, round_data):
"""Initialize a new round in backend db. """

self.tracer.create_round(round_data)
self.statestore.create_round(round_data)

def set_round_data(self, round_id, round_data):
""" Set round data.
Expand All @@ -221,7 +214,7 @@ def set_round_data(self, round_id, round_data):
:param round_data: The status
:type status: dict
"""
self.tracer.set_round_data(round_id, round_data)
self.statestore.set_round_data(round_id, round_data)

def set_round_status(self, round_id, status):
""" Set the round round stats.
Expand All @@ -231,7 +224,7 @@ def set_round_status(self, round_id, status):
:param status: The status
:type status: str
"""
self.tracer.set_round_status(round_id, status)
self.statestore.set_round_status(round_id, status)

def set_round_config(self, round_id, round_config):
""" Upate round in backend db.
Expand All @@ -241,7 +234,7 @@ def set_round_config(self, round_id, round_config):
:param round_config: The round configuration
:type round_config: dict
"""
self.tracer.set_round_config(round_id, round_config)
self.statestore.set_round_config(round_id, round_config)

def request_model_updates(self, combiners):
"""Ask Combiner server to produce a model update.
Expand Down
6 changes: 0 additions & 6 deletions fedn/fedn/network/dashboard/restservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
url_for)
from werkzeug.utils import secure_filename

from fedn.common.tracer.mongotracer import MongoTracer
from fedn.network.combiner.interfaces import CombinerInterface
from fedn.network.dashboard.plots import Plot
from fedn.network.state import ReducerState, ReducerStateToString
Expand Down Expand Up @@ -584,11 +583,6 @@ def delete_model_trail():
:return:
"""
if request.method == "POST":
statestore_config = self.control.statestore.get_config()
self.tracer = MongoTracer(
statestore_config["mongo_config"],
statestore_config["network_id"],
)
try:
self.control.drop_models()
except Exception:
Expand Down
4 changes: 1 addition & 3 deletions fedn/fedn/network/statestore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
#
# Scaleout Systems AB
# __author__ = 'Morgan Ekmefjord [email protected]'
"""Module handling storing state in the backend database (currently MongoDB)."""
Loading

0 comments on commit 19eb1fc

Please sign in to comment.