diff --git a/examples/pytorch-keyworddetection-api/.gitignore b/examples/pytorch-keyworddetection-api/.gitignore new file mode 100644 index 000000000..580cf03a5 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/.gitignore @@ -0,0 +1,3 @@ +data +*.npz +*.yaml \ No newline at end of file diff --git a/examples/pytorch-keyworddetection-api/README.rst b/examples/pytorch-keyworddetection-api/README.rst new file mode 100644 index 000000000..a1e2626b4 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/README.rst @@ -0,0 +1,62 @@ +FEDn Project: Keyword Detection (PyTorch) +----------------------------- + +This is an example to showcase how to set up FEDnClient and use APIClient to setup and manage a training from python. +The machine learning project is based on the Speech Commands dataset from Google, https://huggingface.co/datasets/google/speech_commands. + +The example is intented as a minimalistic quickstart to learn how to use FEDn. + + + **Note: It is recommended to complete the example in https://docs.scaleoutsystems.com/en/stable/quickstart.html before starting this example ** + +Prerequisites +------------- + +- `Python >=3.9, <=3.12 `__ +- `A project in FEDn Studio `__ + +Installing pre requirements and creating seed model +------------------------------------------- + +There are two alternatives to install the required packages, either using conda or pip. + +.. code-block:: + + conda env create -n --file env.yaml + +Or if you rather use pip to install the packages: + +.. code-block:: + + pip install -r requirements.txt + +Note that you in the case of installing with pip need to install either sox (macos or linux) or soundfile (windows) depending on your platform as this is a requirement for the torchaudio package. + + +Clone this repository, then locate into this directory: + +.. code-block:: + + git clone https://github.com/scaleoutsystems/fedn.git + cd fedn/examples/pytorch-keyworddetection-api + +Next we need to setup the APIClient. This link https://docs.scaleoutsystems.com/en/stable/apiclient.html helps you to get the hostname and access token. Edit the file fedn_api.py and insert your HOST and TOKEN. + +Next, generate the seed model: + +.. code-block:: + + python fedn_api.py --init-seed + +This will create a model file 'seed.npz' in the root of the project and upload it to the server. + + +Now we need to start the clients, download at set of client configutations following the quickstart tutorial: https://fedn.readthedocs.io/en/stable/quickstart.html#start-clients. + +Start the clients with the following command: +.. code-block:: + + python client_sc.py --dataset-split-idx 0 --client-yaml client0.yaml + +where each client is started with a different dataset split index and client yaml file. + diff --git a/examples/pytorch-keyworddetection-api/client_sc.py b/examples/pytorch-keyworddetection-api/client_sc.py new file mode 100644 index 000000000..5a7c256c7 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/client_sc.py @@ -0,0 +1,215 @@ +"""Client SC Example for PyTorch Keyword Detection API. + +This module contains the implementation of the client for the federated learning +network using PyTorch for keyword detection. +""" + +import argparse +import io +from pathlib import Path + +from data import get_dataloaders +from model import compile_model, load_parameters, model_hyperparams, save_parameters +from settings import BATCHSIZE_VALID, DATASET_PATH, DATASET_TOTAL_SPLITS, KEYWORDS +from torch import nn +from torch.optim import Adam +from torcheval.metrics import MulticlassAccuracy +from util import construct_api_url, read_settings + +from fedn.network.clients.fedn_client import ConnectToApiResult, FednClient + + +def main() -> None: + """Parse arguments and start the client.""" + parser = argparse.ArgumentParser(description="Client SC Example") + parser.add_argument("--client-yaml", type=str, required=False, help="Settings specfic for the client (default: client.yaml)") + parser.add_argument("--dataset-split-idx", type=int, required=True, help="Setting for which dataset split this client uses") + + parser.set_defaults(client_yaml="client.yaml") + args = parser.parse_args() + + start_client(args.client_yaml, args.dataset_split_idx) + + +def start_client(client_yaml: str, dataset_split_idx: int) -> None: + """Start the client with the given configuration and dataset split index. + + Args: + client_yaml (str): Path to the client configuration YAML file. + dataset_split_idx (int): Index of the dataset split to use. + + """ + DATASET_SPLIT_IDX = dataset_split_idx + + cfg = load_client_config(client_yaml) + url = construct_api_url(cfg["api_url"], cfg.get("api_port", None)) + + fedn_client = FednClient( + train_callback=lambda params, settings: on_train(params, settings, DATASET_SPLIT_IDX), + validate_callback=lambda params: on_validate(params, DATASET_SPLIT_IDX), + predict_callback=lambda params: on_predict(params, DATASET_SPLIT_IDX), + ) + + configure_fedn_client(fedn_client, cfg) + + result, combiner_config = fedn_client.connect_to_api(url, cfg["token"], get_controller_config(fedn_client)) + + if result != ConnectToApiResult.Assigned: + print("Failed to connect to API, exiting.") + return + + if not fedn_client.init_grpchandler(config=combiner_config, client_name=fedn_client.client_id, token=cfg["token"]): + return + + fedn_client.run() + + +def load_client_config(client_yaml: str) -> dict: + """Load the client configuration from a YAML file. + + Args: + client_yaml (str): Path to the client configuration YAML file. + + Returns: + dict: The client configuration as a dictionary. + + """ + if Path(client_yaml).exists(): + cfg = read_settings(client_yaml) + else: + raise Exception(f"Client yaml file not found: {client_yaml}") + + if "discover_host" in cfg: + cfg["api_url"] = cfg["discover_host"] + + return cfg + + +def configure_fedn_client(fedn_client: FednClient, cfg: dict) -> None: + """Configure the FednClient with the given settings. + + Args: + fedn_client (FednClient): The FednClient instance to configure. + cfg (dict): The configuration dictionary containing client settings. + + """ + fedn_client.set_name(cfg["name"]) + fedn_client.set_client_id(cfg["client_id"]) + + +def get_controller_config(fedn_client: FednClient) -> dict: + """Get the controller configuration for the FednClient. + + Args: + fedn_client (FednClient): The FednClient instance. + + Returns: + dict: The controller configuration dictionary. + + """ + return { + "name": fedn_client.name, + "client_id": fedn_client.client_id, + "package": "local", + "preferred_combiner": "", + } + + +def on_train(model_params, settings, dataset_split_idx) -> tuple: + """Train the model with the given parameters and settings. + + Args: + model_params: The model parameters. + settings: The training settings. + dataset_split_idx: The index of the dataset split to use. + + Returns: + tuple: The trained model parameters and metadata. + + """ + training_metadata = {"batchsize_train": 64, "lr": 1e-3, "n_epochs": 1} + + dataloader_train, _, _ = get_dataloaders( + DATASET_PATH, KEYWORDS, dataset_split_idx, DATASET_TOTAL_SPLITS, training_metadata["batchsize_train"], BATCHSIZE_VALID + ) + + sc_model = compile_model(**model_hyperparams(dataloader_train.dataset)) + load_parameters(sc_model, model_params) + optimizer = Adam(sc_model.parameters(), lr=training_metadata["lr"]) + loss_fn = nn.CrossEntropyLoss() + n_epochs = training_metadata["n_epochs"] + + for epoch in range(n_epochs): + sc_model.train() + for idx, (y_labels, x_spectrograms) in enumerate(dataloader_train): + optimizer.zero_grad() + _, logits = sc_model(x_spectrograms) + + loss = loss_fn(logits, y_labels) + loss.backward() + + optimizer.step() + + if idx % 100 == 0: + print(f"Epoch {epoch + 1}/{n_epochs} | Batch: {idx + 1}/{len(dataloader_train)} | Loss: {loss.item()}") + + out_model = save_parameters(sc_model, io.BytesIO()) + + metadata = {"training_metadata": {"num_examples": len(dataloader_train.dataset)}} + + return out_model, metadata + + +def on_validate(model_params, dataset_split_idx) -> dict: + """Validate the model with the given parameters and dataset split index. + + Args: + model_params: The model parameters. + dataset_split_idx: The index of the dataset split to use. + + Returns: + dict: The validation report containing training and validation accuracy. + + """ + dataloader_train, dataloader_valid, dataloader_test = get_dataloaders( + DATASET_PATH, KEYWORDS, dataset_split_idx, DATASET_TOTAL_SPLITS, BATCHSIZE_VALID, BATCHSIZE_VALID + ) + + n_labels = dataloader_train.dataset.n_labels + + sc_model = compile_model(**model_hyperparams(dataloader_train.dataset)) + load_parameters(sc_model, model_params) + + def evaluate(dataloader) -> float: + sc_model.eval() + metric = MulticlassAccuracy(num_classes=n_labels) + for y_labels, x_spectrograms in dataloader: + probs, _ = sc_model(x_spectrograms) + + y_pred = probs.argmax(-1) + metric.update(y_pred, y_labels) + return metric.compute().item() + + return {"training_acc": evaluate(dataloader_train), "validation_acc": evaluate(dataloader_valid)} + + +def on_predict(model_params, dataset_split_idx) -> dict: + """Generate predictions using the model parameters and dataset split index. + + Args: + model_params: The model parameters. + dataset_split_idx: The index of the dataset split to use. + + Returns: + dict: The prediction results. + + """ + dataloader_train, _, _ = get_dataloaders(DATASET_PATH, KEYWORDS, dataset_split_idx, DATASET_TOTAL_SPLITS, BATCHSIZE_VALID, BATCHSIZE_VALID) + sc_model = compile_model(**model_hyperparams(dataloader_train.dataset)) + load_parameters(sc_model, model_params) + + return {} + + +if __name__ == "__main__": + main() diff --git a/examples/pytorch-keyworddetection-api/data.py b/examples/pytorch-keyworddetection-api/data.py new file mode 100644 index 000000000..efff78a95 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/data.py @@ -0,0 +1,275 @@ +import hashlib +import json +from pathlib import Path + +import numpy as np +import pyloudnorm as pyln +import torch +import torchaudio +from torch.utils.data import DataLoader, Dataset + +SAMPLERATE = 16000 + + +class BackgroundNoise(Dataset): + """Dataset for background noise samples using all *.wav in the given path.""" + + def __init__(self, path: str, dataset_split_idx: int, dataset_total_splits: int) -> "BackgroundNoise": + """Initialize the dataset.""" + super().__init__() + self._path = path + self._walker = sorted(str(p) for p in Path(self._path).glob("*.wav")) + + self._dataset_split_idx = dataset_split_idx + self._dataset_total_splits = dataset_total_splits + + self._start_idx = int(self._dataset_split_idx * len(self._walker) / self._dataset_total_splits) + self._end_idx = int((self._dataset_split_idx + 1) * len(self._walker) / self._dataset_total_splits) + + self._loudness_meter = pyln.Meter(SAMPLERATE) + + self._loudness = [self._loudness_meter.integrated_loudness(self._load_audio(file)[0].numpy()) for file in self._walker] + + def _load_audio(self, filename: str) -> tuple[torch.Tensor, int]: + data, sr = torchaudio.load(filename) + data.squeeze_() + return data, sr + + def __getitem__(self, index: int) -> torch.Tensor: + """Get the audio sample at the given index.""" + index = index + self._start_idx + filename = self._walker[index] + audio, sr = self._load_audio(filename) + loudness = self._loudness[index] + + audio_np = audio.numpy() + audio_np = pyln.normalize.loudness(audio_np, loudness, -27) + audio = torch.from_numpy(audio_np) + + if sr != SAMPLERATE: + raise ValueError(f"sample rate should be {SAMPLERATE}, but got {sr}") + return audio + + def __len__(self) -> int: + """Get the number of samples in the dataset.""" + return self._end_idx - self._start_idx + + +class FedSCDataset(Dataset): + """Dataset for the Federated Speech Commands dataset.""" + + NEGATIVE_KEYWORD = "" + SEED = 1 + + def __init__( # noqa: PLR0913 + self, path: str, keywords: list[str], subset: str, dataset_split_idx: int, dataset_total_splits: int, data_augmentation: bool = False + ) -> "FedSCDataset": + """Initialize the dataset. + + Args: + path (str): Path to the dataset. + keywords (list[str]): List of keywords to detect. + subset (str): Subset of the dataset to use. One of "training", "validation", or "testing". + dataset_split_idx (int): Index of the dataset split to use. + dataset_total_splits (int): Total number of dataset splits. + data_augmentation (bool): Whether to apply data augmentation. + + """ + super(FedSCDataset, self).__init__() + self._path = path + self._subset = subset + self._labels = keywords + [self.NEGATIVE_KEYWORD] + + self._dataset_split_idx = dataset_split_idx + self._dataset_total_splits = dataset_total_splits + self._dataset = torchaudio.datasets.SPEECHCOMMANDS(path, subset=subset, download=True) + self._start_idx = int(dataset_split_idx * len(self._dataset) / self._dataset_total_splits) + self._end_idx = int((dataset_split_idx + 1) * len(self._dataset) / self._dataset_total_splits) + + if data_augmentation: + self._noise_prob = 0.5 + self._noise_mag = 0.9 + self._noise_dataset = BackgroundNoise( + Path(self._dataset._path).joinpath("_background_noise_").as_posix(), + dataset_split_idx=self._dataset_split_idx, + dataset_total_splits=self._dataset_total_splits, + ) + else: + self._noise_prob = 0.0 + self._noise_mag = 0.0 + self._noise_dataset = None + + # All splits use the same rng to shuffle the dataset + self._rng = np.random.RandomState(self.SEED) + self._shuffle_order = self._rng.permutation(len(self._dataset)) + + self._n_mels = 64 + self._hop_length = 160 + self._white_noise_mag = 0.0015 + self._transform = self._get_spectogram_transform(self._n_mels, self._hop_length, SAMPLERATE, data_augmentation) + + self._spectrogram_size = (self._n_mels, SAMPLERATE // self._hop_length) + + # Reinitialize rng with different seeds fot the different splits + self._rng = np.random.RandomState(self.SEED + self._dataset_split_idx) + + @property + def labels(self) -> list[str]: + return self._labels + + @property + def n_mels(self) -> int: + return self._n_mels + + @property + def n_labels(self) -> int: + return len(self._labels) + + @property + def spectrogram_size(self) -> tuple[int, int]: + return (self.n_mels, 100) + + def __getitem__(self, index: int) -> tuple[int, str, torch.Tensor, torch.Tensor]: + shuffled_index = self._shuffle_order[index] + sample, sr, label, _, _ = self._dataset[shuffled_index] + sample.squeeze_() + if sample.shape[-1] < SAMPLERATE: + sample = torch.nn.functional.pad(sample, (0, SAMPLERATE - sample.shape[-1])) + + if sr != SAMPLERATE: + raise Exception("Samplerate from sample: " + str(shuffled_index) + " is not equal to: " + str(SAMPLERATE)) + if sample.shape[-1] != SAMPLERATE: + raise Exception("Sample: " + str(shuffled_index) + " is not one second " + str(sample.shape)) + + if self._noise_dataset and len(self._noise_dataset) and self._noise_prob > self._rng.rand(): + noise_idx = self._rng.randint(len(self._noise_dataset)) + waveform = self._noise_dataset[noise_idx] + sub_start_idx = self._rng.randint(waveform.shape[-1] - SAMPLERATE) + noise = waveform[sub_start_idx : sub_start_idx + SAMPLERATE] + sample += self._noise_mag * noise * self._rng.rand() + + sample += self._rng.normal(scale=self._white_noise_mag, size=sample.shape).astype(np.float32) + + y = self.get_label_from_text(label) + spectrogram = self.get_spectrogram(sample) + + return y, label, spectrogram, sample + + def __len__(self) -> int: + return self._end_idx - self._start_idx + + def get_label_from_text(self, text_label: str) -> int: + if text_label in self._labels: + y = self._labels.index(text_label) + else: + y = len(self._labels) - 1 + return y + + def get_spectrogram(self, sample: int) -> torch.Tensor: + start_idx = self._rng.randint(0, self._hop_length) + length = sample.shape[0] - self._hop_length + return self._transform(sample[start_idx : start_idx + length]) + + def get_stats(self) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """Get the mean and std of the training data. If the stats are already calculated, they are loaded from disk.""" + sha1 = hashlib.sha1() # noqa:S324 + for word in self.labels: + sha1.update(str.encode(word)) + sha1.update(str.encode(str(self._n_mels))) + sha1.update(str.encode(str(self._dataset_split_idx))) + sha1.update(str.encode(str(self._dataset_total_splits))) + sha1.update(str.encode(str(self._hop_length))) + sha1.update(str.encode(str(self.SEED))) + guid = sha1.hexdigest() + filepath = Path(self._path) + filepath = filepath.joinpath(guid + ".stats") + if filepath.exists(): + with open(filepath, "r") as file: + data = json.load(file) + if ( + data["n_mels"] == self._n_mels + and data["labels"] == self.labels + and data["split_index"] == self._dataset_split_idx + and data["dataset_total_splits"] == self._dataset_total_splits + and data["hop_length"] == self._hop_length + and data["white_noise_mag"] == self._white_noise_mag + and data["SEED"] == self.SEED + ): + return torch.tensor(data["label_mean"]), torch.tensor(data["spectrogram_mean"])[:, None], torch.tensor(data["spectrogram_std"])[:, None] + + dataset = FedSCDataset(self._path, [], subset="training", dataset_split_idx=self._dataset_split_idx, dataset_total_splits=self._dataset_total_splits) + label_count = np.zeros(len(self.labels)) + spectrogram_sum = torch.zeros(self._n_mels) + spectrogram_square_sum = torch.zeros(self._n_mels) + + print("Calculating training data statistics...") # noqa:T201 + n_samples = len(dataset) + n_spectrogram_cols = 0 + for i in range(n_samples): + _, label, spectrogram, _ = dataset[i] + spectrogram_sum += spectrogram.sum(-1) + spectrogram_square_sum += spectrogram.square().sum(-1) + + n_spectrogram_cols += spectrogram.shape[-1] + + if label in self.labels: + idx = self.labels.index(label) + label_count[idx] += 1 + else: + label_count[-1] += 1 + + label_mean = label_count / n_samples + spectrogram_mean = spectrogram_sum / n_spectrogram_cols + spectrogram_std = (spectrogram_square_sum - spectrogram_mean.square()) / (n_spectrogram_cols - 1) + spectrogram_std.sqrt_() + with open(filepath, "w") as file: + d = { + "labels": self.labels, + "n_mels": self._n_mels, + "white_noise_mag": self._white_noise_mag, + "SEED": self.SEED, + "split_index": self._dataset_split_idx, + "dataset_total_splits": self._dataset_total_splits, + "hop_length": self._hop_length, + "label_mean": label_mean.tolist(), + "spectrogram_mean": spectrogram_mean.numpy().tolist(), + "spectrogram_std": spectrogram_std.numpy().tolist(), + } + json.dump(d, file) + return torch.tensor(label_mean), spectrogram_mean[:, None], spectrogram_std[:, None] + + def get_collate_fn(self) -> callable: + def collate_fn(batch: tuple[int, str, torch.Tensor, torch.Tensor]) -> tuple[torch.Tensor, torch.Tensor]: + ys, _, spectrogram, _ = zip(*batch) + return torch.tensor(ys, dtype=torch.long), torch.stack(spectrogram) + + return collate_fn + + def _get_spectogram_transform(self, n_mels: int, hop_length: int, sr: int, data_augmentation: bool = False) -> torch.nn.Sequential: + if data_augmentation: + return torch.nn.Sequential( + torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=320, hop_length=hop_length, n_mels=n_mels), + torchaudio.transforms.FrequencyMasking(freq_mask_param=int(n_mels * 0.2)), + torchaudio.transforms.TimeMasking(time_mask_param=int(0.2 * 16000 / 160)), + torchaudio.transforms.AmplitudeToDB(stype="power", top_db=80), + ) + return torch.nn.Sequential( + torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=320, hop_length=hop_length, n_mels=n_mels), + torchaudio.transforms.AmplitudeToDB(stype="power", top_db=80), + ) + + +def get_dataloaders( + path: str, keywords: list[str], dataset_split_idx: int, dataset_total_splits: int, batchsize_train: int, batchsize_valid: int +) -> tuple[DataLoader, DataLoader, DataLoader]: + """Get the dataloaders for the training, validation, and testing datasets.""" + dataset_train = FedSCDataset(path, keywords, "training", dataset_split_idx, dataset_total_splits, data_augmentation=True) + dataloader_train = DataLoader(dataset=dataset_train, batch_size=batchsize_train, collate_fn=dataset_train.get_collate_fn(), shuffle=True, drop_last=True) + + dataset_valid = FedSCDataset(path, keywords, "validation", dataset_split_idx, dataset_total_splits) + dataloader_valid = DataLoader(dataset=dataset_valid, batch_size=batchsize_valid, collate_fn=dataset_valid.get_collate_fn(), shuffle=False, drop_last=False) + + dataset_test = FedSCDataset(path, keywords, "testing", dataset_split_idx, dataset_total_splits) + dataloader_test = DataLoader(dataset=dataset_test, batch_size=batchsize_valid, collate_fn=dataset_test.get_collate_fn(), shuffle=False, drop_last=False) + + return dataloader_train, dataloader_valid, dataloader_test diff --git a/examples/pytorch-keyworddetection-api/env.yaml b/examples/pytorch-keyworddetection-api/env.yaml new file mode 100644 index 000000000..e175e2aa6 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/env.yaml @@ -0,0 +1,16 @@ +name: keyword +channels: + - pytorch + - conda-forge +dependencies: + - python=3.12 + - pip + - pytorch=2.5.1 + - torchaudio=2.5.1 + - torchvision=0.20.1 + - sox=14.4.2 (sys_platform != "Windows") + - pip: + - fedn + - torcheval==0.0.7 + - pyloudnorm==0.1.1 + - soundfile==0.13.1; (sys_platform == "Windows") diff --git a/examples/pytorch-keyworddetection-api/fedn_api.py b/examples/pytorch-keyworddetection-api/fedn_api.py new file mode 100644 index 000000000..72a95de34 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/fedn_api.py @@ -0,0 +1,47 @@ +import argparse + +from data import get_dataloaders +from model import compile_model, model_hyperparams, save_parameters +from settings import BATCHSIZE_VALID, DATASET_PATH, KEYWORDS + +from fedn import APIClient + +HOST = "" ## INSERT HOST +TOKEN = "" ## INSERT TOKEN + + +def init_seedmodel(api_client: APIClient) -> dict: + """Used to send a seed model to the server. The seed model is normalized with all training data.""" + dataloader_train, _, _ = get_dataloaders(DATASET_PATH, KEYWORDS, 0, 1, BATCHSIZE_VALID, BATCHSIZE_VALID) + sc_model = compile_model(**model_hyperparams(dataloader_train.dataset)) + seed_path = "seed.npz" + save_parameters(sc_model, seed_path) + + return api_client.set_active_model(seed_path) + + +def main(): + parser = argparse.ArgumentParser(description="API example") + parser.add_argument("--init-seed", action="store_true", required=False, help="Use this flag to send a seed model to the server") + parser.add_argument("--start-session", action="store_true", required=False, help="Use this flag to start session") + args = parser.parse_args() + + if HOST == "" and TOKEN == "": + print("Please insert HOST and TOKEN in fedn_api.py") + return + + api_client = APIClient(host=HOST, secure=True, verify=True, token=TOKEN) + + if args.init_seed: + response = init_seedmodel(api_client) + print(response) + elif args.start_session: + # Depending on the computer hosting the clients this round_timeout might need to increase + response = api_client.start_session(round_timeout=600) + print(response) + else: + print("No flag passed") + + +if __name__ == "__main__": + main() diff --git a/examples/pytorch-keyworddetection-api/model.py b/examples/pytorch-keyworddetection-api/model.py new file mode 100644 index 000000000..4844d3dbf --- /dev/null +++ b/examples/pytorch-keyworddetection-api/model.py @@ -0,0 +1,161 @@ +import torch +from torch import nn + +import math + +import collections + +from fedn.utils.helpers.helpers import get_helper + +HELPER_MODULE = "numpyhelper" +helper = get_helper(HELPER_MODULE) + +class NormalizerNormal(nn.Module): + def __init__(self, mean, scale): + super(NormalizerNormal, self).__init__() + if not isinstance(mean, torch.Tensor): + mean = torch.tensor(mean, dtype=torch.float32) + if not isinstance(scale, torch.Tensor): + scale = torch.tensor(scale, dtype=torch.float32) + + self.register_buffer("mean", mean) + self.register_buffer("scale", scale) + + def normalize(self, x): + return (x-self.mean)/self.scale + + def unnormalize(self, x): + return x*self.scale + self.mean + +class NormalizerBernoulli(nn.Module): + _epsilon = 1e-3 + def __init__(self, mean): + super(NormalizerBernoulli, self).__init__() + self.register_buffer("mean", torch.minimum(torch.maximum(mean, torch.tensor(self._epsilon)), torch.tensor(1-self._epsilon))) + + def normalize_samples(self, sample): + return sample - self.mean + + def unnormalize_logits(self, logits): + return logits + torch.log(self.mean / (1 - self.mean)) + + +class ConvLayerSetting: + def __init__(self, out_channels, kernel_size, max_pool_kernel_size, activation_fn): + self.out_channels = out_channels + self.kernel_size = kernel_size + if max_pool_kernel_size: + if isinstance(max_pool_kernel_size, int): + max_pool_kernel_size = (max_pool_kernel_size, max_pool_kernel_size) + self.max_pool_kernel_size = max_pool_kernel_size + self.activation_fn = activation_fn + + +class FCLayerSetting: + def __init__(self, out_features, dropout, activation_fn): + self.out_features = out_features + self.activation_fn = activation_fn + self.dropout = dropout + + + + +class NeuralNetworkModel(nn.Module): + def __init__(self, in_channels, out_features, in_image_size, conv_layer_settings= (), fc_layer_settings = ()): + super(NeuralNetworkModel, self).__init__() + self.in_channels = in_channels + self.conv_layers = nn.Sequential() + self.fc_layers = nn.Sequential() + next_layer_in_channels = self.in_channels + c_image_size = in_image_size + + for conv_layer_setting in conv_layer_settings: + conv = nn.Conv2d(in_channels=next_layer_in_channels, out_channels=conv_layer_setting.out_channels, + kernel_size=conv_layer_setting.kernel_size, padding="same") + self.conv_layers.append(conv) + if conv_layer_setting.max_pool_kernel_size: + pooling = nn.MaxPool2d(conv_layer_setting.max_pool_kernel_size, stride=conv_layer_setting.max_pool_kernel_size) + self.conv_layers.append(pooling) + c_image_size = (int((dim_size-1)/stride +1) for dim_size, stride in zip(c_image_size, conv_layer_setting.max_pool_kernel_size)) + self.conv_layers.append(conv_layer_setting.activation_fn) + next_layer_in_channels = conv_layer_setting.out_channels + + self.fc_in_features = next_layer_in_channels*math.prod(c_image_size) + next_fc_in = self.fc_in_features + + for fc_layer_setting in fc_layer_settings: + fc = nn.Linear(next_fc_in, fc_layer_setting.out_features) + self.fc_layers.append(fc) + self.fc_layers.append(nn.Dropout(fc_layer_setting.dropout)) + self.fc_layers.append(fc_layer_setting.activation_fn) + next_fc_in = fc_layer_setting.out_features + + self.final_layer = nn.Linear(next_fc_in, out_features) + + + def forward(self, x): + out = x + out = self.conv_layers(out) + out = out.view(-1, self.fc_in_features) + out = self.fc_layers(out) + out = self.final_layer(out) + + return out + + +class SCModel(nn.Module): + def __init__(self, nn_model: nn.Module, label_norm: NormalizerBernoulli, spectrogram_norm: NormalizerNormal): + super(SCModel, self).__init__() + self.nn_model = nn_model + self.label_norm = label_norm + self.spectrogram_norm = spectrogram_norm + + def forward(self, spectrograms): + spectrograms_normalized = self.spectrogram_norm.normalize(spectrograms)[:, None, ...] # Bx1xWxH + + logits_normalized = self.nn_model(spectrograms_normalized) # B x out_features + + logits = self.label_norm.unnormalize_logits(logits_normalized) + + prob = torch.nn.functional.softmax(logits, -1) + + return prob, logits + + + +def model_hyperparams(dataset): + n_labels = dataset.n_labels + spectrogram_size = dataset.spectrogram_size + label_mean, spectrogram_mean, spectrogram_std = dataset.get_stats() + return {"n_labels":n_labels, "spectrogram_size":spectrogram_size, "label_mean":label_mean, + "spectrogram_mean":spectrogram_mean, "spectrogram_std": spectrogram_std} + + +def compile_model(n_labels, spectrogram_size, label_mean, spectrogram_mean, spectrogram_std): + spectrogram_normalizer = NormalizerNormal(spectrogram_mean, spectrogram_std) + label_normalizer = NormalizerBernoulli(label_mean) + + conv_layers = [ConvLayerSetting(4, 3, None, nn.ReLU()), ConvLayerSetting(8, 3, 2, nn.ReLU()), + ConvLayerSetting(16, 3, 2, nn.ReLU()), ConvLayerSetting(32, 5, (4, 5), nn.ReLU())] + + fc_layers = [FCLayerSetting(128, 0.1, nn.ReLU()), FCLayerSetting(32, 0.1, nn.ReLU())] + + nn_model = NeuralNetworkModel(1, n_labels, spectrogram_size, conv_layers, fc_layers) + sc_model = SCModel(nn_model, label_normalizer, spectrogram_normalizer) + + return sc_model + + +def load_parameters(model, parameters_stream): + parameters_stream.seek(0) + weights = helper.load(parameters_stream) + params_dict = zip(model.state_dict().keys(), weights) + state_dict = collections.OrderedDict({key: torch.tensor(x) for key, x in params_dict}) + model.load_state_dict(state_dict, strict=True) + return model + +def save_parameters(model, path=None): + parameters_np = [val.cpu().numpy() for _, val in model.state_dict().items()] + return helper.save(parameters_np, path) + + diff --git a/examples/pytorch-keyworddetection-api/requirements.txt b/examples/pytorch-keyworddetection-api/requirements.txt new file mode 100644 index 000000000..371f8d803 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/requirements.txt @@ -0,0 +1,5 @@ +fedn +torch==2.5.1 +torchaudio==2.5.1 +torcheval==0.0.7 +pyloudnorm==0.1.1 \ No newline at end of file diff --git a/examples/pytorch-keyworddetection-api/settings.py b/examples/pytorch-keyworddetection-api/settings.py new file mode 100644 index 000000000..60a508f05 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/settings.py @@ -0,0 +1,6 @@ + + +KEYWORDS = ["forward", "backward", "left", "right"] +DATASET_PATH = "data" +DATASET_TOTAL_SPLITS = 5 +BATCHSIZE_VALID=64 diff --git a/examples/pytorch-keyworddetection-api/util.py b/examples/pytorch-keyworddetection-api/util.py new file mode 100644 index 000000000..2feffa594 --- /dev/null +++ b/examples/pytorch-keyworddetection-api/util.py @@ -0,0 +1,42 @@ +import yaml + + +def construct_api_url(api_url: str, api_port: int = None, secure: bool = None) -> str: + """Constructs a valid API URL from the input parameters.""" + api_url = api_url.strip(" ") + + if "://" in api_url: + scheme, api_url = api_url.split("://") + + if scheme not in ["http", "https"]: + raise Exception("API requires http(s)") + if secure is not None and secure != (scheme == "http"): + raise Exception("Scheme is supplied but security flag does not match scheme") + else: + if secure is None: + secure = "localhost" not in api_url and "127.0.0.1" not in api_url + scheme = "https" if secure else "http" + + if "/" in api_url: + host, path = api_url.split("/") + if not path.endswith("/"): + path += "/" + else: + host = api_url + path = "" + + if api_port is not None: + if ":" in host: + # Overriding port + hostname, port = host.split(":") + host = f"{hostname}:{api_port}" + else: + host = f"{host}:{api_port}" + + return f"{scheme}://{host}/{path}" + + +def read_settings(file_path: str) -> dict: + """Reads a YAML file and returns the content as a dictionary.""" + with open(file_path, "rb") as config_file: + return yaml.safe_load(config_file.read())