diff --git a/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb b/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb index fe5383f993..4076ac52cd 100644 --- a/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb +++ b/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb @@ -103,7 +103,7 @@ "id": "bd0713e2-e393-41c0-9da0-392535cf8a54", "metadata": {}, "source": [ - "## 4. Run simulated kmeans experiment\n", + "## 3. Run simulated kmeans experiment\n", "We run the federated training using NVFlare Simulator via [JobAPI](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html):" ] }, @@ -124,7 +124,7 @@ "id": "913e9ee2-e993-442d-a525-d2baf92af539", "metadata": {}, "source": [ - "## 5. Result visualization\n", + "## 4. Result visualization\n", "Model accuracy is computed as the homogeneity score between the cluster formed and the ground truth label, which can be visualized in tensorboard." ] }, @@ -140,14 +140,6 @@ "%load_ext tensorboard\n", "%tensorboard --logdir /tmp/nvflare/workspace/works/kmeans/sklearn_kmeans_uniform_3_clients" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bea9ebcd-96f5-45c8-a490-0559fab9991f", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png new file mode 100644 index 0000000000..6c18b663a3 Binary files /dev/null and b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png differ diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py new file mode 100644 index 0000000000..be7a31795a --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py @@ -0,0 +1,240 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +from enum import Enum +from typing import List + +import numpy as np +from src.kmeans_assembler import KMeansAssembler +from src.kmeans_learner import KMeansLearner + +from nvflare import FedJob +from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator +from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator +from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather +from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor +from nvflare.app_opt.sklearn.sklearn_executor import SKLearnExecutor + + +class SplitMethod(Enum): + UNIFORM = "uniform" + LINEAR = "linear" + SQUARE = "square" + EXPONENTIAL = "exponential" + + +def get_split_ratios(site_num: int, split_method: SplitMethod): + if split_method == SplitMethod.UNIFORM: + ratio_vec = np.ones(site_num) + elif split_method == SplitMethod.LINEAR: + ratio_vec = np.linspace(1, site_num, num=site_num) + elif split_method == SplitMethod.SQUARE: + ratio_vec = np.square(np.linspace(1, site_num, num=site_num)) + elif split_method == SplitMethod.EXPONENTIAL: + ratio_vec = np.exp(np.linspace(1, site_num, num=site_num)) + else: + raise ValueError(f"Split method {split_method.name} not implemented!") + + return ratio_vec + + +def split_num_proportion(n, site_num, split_method: SplitMethod) -> List[int]: + split = [] + ratio_vec = get_split_ratios(site_num, split_method) + total = sum(ratio_vec) + left = n + for site in range(site_num - 1): + x = int(n * ratio_vec[site] / total) + left = left - x + split.append(x) + split.append(left) + return split + + +def assign_data_index_to_sites( + data_size: int, + valid_fraction: float, + num_sites: int, + split_method: SplitMethod = SplitMethod.UNIFORM, +) -> dict: + if valid_fraction > 1.0: + raise ValueError("validation percent should be less than or equal to 100% of the total data") + elif valid_fraction < 1.0: + valid_size = int(round(data_size * valid_fraction, 0)) + train_size = data_size - valid_size + else: + valid_size = data_size + train_size = data_size + + site_sizes = split_num_proportion(train_size, num_sites, split_method) + split_data_indices = { + "valid": {"start": 0, "end": valid_size}, + } + for site in range(num_sites): + site_id = site + 1 + if valid_fraction < 1.0: + idx_start = valid_size + sum(site_sizes[:site]) + idx_end = valid_size + sum(site_sizes[: site + 1]) + else: + idx_start = sum(site_sizes[:site]) + idx_end = sum(site_sizes[: site + 1]) + split_data_indices[site_id] = {"start": idx_start, "end": idx_end} + + return split_data_indices + + +def get_file_line_count(input_path: str) -> int: + count = 0 + with open(input_path, "r") as fp: + for i, _ in enumerate(fp): + count += 1 + return count + + +def split_data( + data_path: str, + num_clients: int, + valid_frac: float, + split_method: SplitMethod = SplitMethod.UNIFORM, +): + size_total_file = get_file_line_count(data_path) + site_indices = assign_data_index_to_sites(size_total_file, valid_frac, num_clients, split_method) + return site_indices + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--workspace_dir", + type=str, + default="/tmp/nvflare/workspace/works/kmeans", + help="work directory, default to '/tmp/nvflare/workspace/works/kmeans'", + ) + parser.add_argument( + "--job_dir", + type=str, + default="/tmp/nvflare/workspace/jobs/kmeans", + help="directory for job export, default to '/tmp/nvflare/workspace/jobs/kmeans'", + ) + parser.add_argument( + "--data_path", + type=str, + default="/tmp/nvflare/dataset/sklearn_iris.csv", + help="work directory, default to '/tmp/nvflare/dataset/sklearn_iris.csv'", + ) + parser.add_argument( + "--num_clients", + type=int, + default=3, + help="number of clients to simulate, default to 3", + ) + parser.add_argument( + "--num_rounds", + type=int, + default=5, + help="number of rounds, default to 5", + ) + parser.add_argument( + "--split_mode", + type=str, + default="uniform", + choices=["uniform", "linear", "square", "exponential"], + help="how to split data among clients", + ) + parser.add_argument( + "--valid_frac", + type=float, + default=1, + help="fraction of data to use for validation, default to perform validation on all data", + ) + return parser.parse_args() + + +def main(): + args = define_parser() + # Get args + data_path = args.data_path + num_clients = args.num_clients + num_rounds = args.num_rounds + split_mode = args.split_mode + valid_frac = args.valid_frac + job_name = f"sklearn_kmeans_{split_mode}_{num_clients}_clients" + + # Set the output workspace and job directories + workspace_dir = os.path.join(args.workspace_dir, job_name) + job_dir = args.job_dir + + # Create the FedJob + job = FedJob(name=job_name, min_clients=num_clients) + + # Define the controller workflow and send to server + controller = ScatterAndGather( + min_clients=num_clients, + num_rounds=num_rounds, + aggregator_id="aggregator", + persistor_id="persistor", + shareable_generator_id="shareable_generator", + train_task_name="train", + ) + job.to_server(controller, id="scatter_and_gather") + + # Define other server components + assembler = KMeansAssembler() + job.to_server(assembler, id="kmeans_assembler") + aggregator = CollectAndAssembleAggregator(assembler_id="kmeans_assembler") + job.to_server(aggregator, id="aggregator") + shareable_generator = FullModelShareableGenerator() + job.to_server(shareable_generator, id="shareable_generator") + persistor = JoblibModelParamPersistor( + initial_params={"n_clusters": 3}, + ) + job.to_server(persistor, id="persistor") + + # Get the data split numbers and send to each client + # generate data split + site_indices = split_data( + data_path, + num_clients, + valid_frac, + SplitMethod(split_mode), + ) + + for i in range(1, num_clients + 1): + # Define the executor and send to clients + runner = SKLearnExecutor(learner_id="kmeans_learner") + job.to(runner, f"site-{i}", tasks=["train"]) + + learner = KMeansLearner( + data_path=data_path, + train_start=site_indices[i]["start"], + train_end=site_indices[i]["end"], + valid_start=site_indices["valid"]["start"], + valid_end=site_indices["valid"]["end"], + random_state=0, + ) + job.to(learner, f"site-{i}", id="kmeans_learner") + + # Export the job + print("job_dir=", job_dir) + job.export_job(job_dir) + + # Run the job + print("workspace_dir=", workspace_dir) + job.simulator_run(workspace_dir) + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt new file mode 100644 index 0000000000..b72d5c2798 --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt @@ -0,0 +1,4 @@ +pandas +scikit-learn +joblib +tensorboard diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py new file mode 100644 index 0000000000..23e6fdc62e --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py @@ -0,0 +1,75 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict + +import numpy as np +from sklearn.cluster import KMeans + +from nvflare.apis.dxo import DXO, DataKind +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.aggregators.assembler import Assembler +from nvflare.app_common.app_constant import AppConstants + + +class KMeansAssembler(Assembler): + def __init__(self): + super().__init__(data_kind=DataKind.WEIGHTS) + # Aggregator needs to keep record of historical + # center and count information for mini-batch kmeans + self.center = None + self.count = None + self.n_cluster = 0 + + def get_model_params(self, dxo: DXO): + data = dxo.data + return {"center": data["center"], "count": data["count"]} + + def assemble(self, data: Dict[str, dict], fl_ctx: FLContext) -> DXO: + current_round = fl_ctx.get_prop(AppConstants.CURRENT_ROUND) + if current_round == 0: + # First round, collect the information regarding n_feature and n_cluster + # Initialize the aggregated center and count to all zero + client_0 = list(self.collection.keys())[0] + self.n_cluster = self.collection[client_0]["center"].shape[0] + n_feature = self.collection[client_0]["center"].shape[1] + self.center = np.zeros([self.n_cluster, n_feature]) + self.count = np.zeros([self.n_cluster]) + # perform one round of KMeans over the submitted centers + # to be used as the original center points + # no count for this round + center_collect = [] + for _, record in self.collection.items(): + center_collect.append(record["center"]) + centers = np.concatenate(center_collect) + kmeans_center_initial = KMeans(n_clusters=self.n_cluster) + kmeans_center_initial.fit(centers) + self.center = kmeans_center_initial.cluster_centers_ + else: + # Mini-batch k-Means step to assemble the received centers + for center_idx in range(self.n_cluster): + centers_global_rescale = self.center[center_idx] * self.count[center_idx] + # Aggregate center, add new center to previous estimate, weighted by counts + for _, record in self.collection.items(): + centers_global_rescale += record["center"][center_idx] * record["count"][center_idx] + self.count[center_idx] += record["count"][center_idx] + # Rescale to compute mean of all points (old and new combined) + alpha = 1 / self.count[center_idx] + centers_global_rescale *= alpha + # Update the global center + self.center[center_idx] = centers_global_rescale + params = {"center": self.center} + dxo = DXO(data_kind=self.expected_data_kind, data=params) + + return dxo diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py new file mode 100644 index 0000000000..61c96a5abe --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py @@ -0,0 +1,116 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Tuple + +from sklearn.cluster import KMeans, MiniBatchKMeans, kmeans_plusplus +from sklearn.metrics import homogeneity_score + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.abstract.learner_spec import Learner +from nvflare.app_opt.sklearn.data_loader import load_data_for_range + + +class KMeansLearner(Learner): + def __init__( + self, + data_path: str, + train_start: int, + train_end: int, + valid_start: int, + valid_end: int, + random_state: int = None, + max_iter: int = 1, + n_init: int = 1, + reassignment_ratio: int = 0, + ): + super().__init__() + self.data_path = data_path + self.train_start = train_start + self.train_end = train_end + self.valid_start = valid_start + self.valid_end = valid_end + + self.random_state = random_state + self.max_iter = max_iter + self.n_init = n_init + self.reassignment_ratio = reassignment_ratio + self.train_data = None + self.valid_data = None + self.n_samples = None + self.n_clusters = None + + def load_data(self) -> dict: + train_data = load_data_for_range(self.data_path, self.train_start, self.train_end) + valid_data = load_data_for_range(self.data_path, self.valid_start, self.valid_end) + return {"train": train_data, "valid": valid_data} + + def initialize(self, parts: dict, fl_ctx: FLContext): + data = self.load_data() + self.train_data = data["train"] + self.valid_data = data["valid"] + # train data size, to be used for setting + # NUM_STEPS_CURRENT_ROUND for potential use in aggregation + self.n_samples = data["train"][-1] + # note that the model needs to be created every round + # due to the available API for center initialization + + def train(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # get training data, note that clustering is unsupervised + # so only x_train will be used + (x_train, y_train, train_size) = self.train_data + if curr_round == 0: + # first round, compute initial center with kmeans++ method + # model will be None for this round + self.n_clusters = global_param["n_clusters"] + center_local, _ = kmeans_plusplus(x_train, n_clusters=self.n_clusters, random_state=self.random_state) + kmeans = None + params = {"center": center_local, "count": None} + else: + center_global = global_param["center"] + # following rounds, local training starting from global center + kmeans = MiniBatchKMeans( + n_clusters=self.n_clusters, + batch_size=self.n_samples, + max_iter=self.max_iter, + init=center_global, + n_init=self.n_init, + reassignment_ratio=self.reassignment_ratio, + random_state=self.random_state, + ) + kmeans.fit(x_train) + center_local = kmeans.cluster_centers_ + count_local = kmeans._counts + params = {"center": center_local, "count": count_local} + return params, kmeans + + def validate(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # local validation with global center + # fit a standalone KMeans with just the given center + center_global = global_param["center"] + kmeans_global = KMeans(n_clusters=self.n_clusters, init=center_global, n_init=1) + kmeans_global.fit(center_global) + # get validation data, both x and y will be used + (x_valid, y_valid, valid_size) = self.valid_data + y_pred = kmeans_global.predict(x_valid) + homo = homogeneity_score(y_valid, y_pred) + self.log_info(fl_ctx, f"Homogeneity {homo:.4f}") + metrics = {"Homogeneity": homo} + return metrics, kmeans_global + + def finalize(self, fl_ctx: FLContext) -> None: + # freeing resources in finalize + del self.train_data + del self.valid_data + self.log_info(fl_ctx, "Freed training resources") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py new file mode 100644 index 0000000000..cfc12462d1 --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py @@ -0,0 +1,84 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +from typing import Optional + +import numpy as np +import pandas as pd +from sklearn import datasets + + +def load_data(dataset_name: str = "iris"): + if dataset_name == "iris": + dataset = datasets.load_iris() + elif dataset_name == "cancer": + dataset = datasets.load_breast_cancer() + else: + raise ValueError("Dataset unknown!") + return dataset + + +def prepare_data( + output_dir: str, + dataset_name: str = "iris", + randomize: bool = False, + filename: Optional[str] = None, + file_format="csv", +): + # Load data + dataset = load_data(dataset_name) + x = dataset.data + y = dataset.target + if randomize: + np.random.seed(0) + idx_random = np.random.permutation(len(y)) + x = x[idx_random, :] + y = y[idx_random] + + data = np.column_stack((y, x)) + df = pd.DataFrame(data=data) + + # Check if the target folder exists, + # If not, create + + if os.path.exists(output_dir) and not os.path.isdir(output_dir): + os.rmdir(output_dir) + os.makedirs(output_dir, exist_ok=True) + + # Save to csv file + filename = filename if filename else f"{dataset_name}.csv" + if file_format == "csv": + file_path = os.path.join(output_dir, filename) + + df.to_csv(file_path, sep=",", index=False, header=False) + else: + raise NotImplementedError + + +def main(): + parser = argparse.ArgumentParser(description="Load sklearn data and save to csv") + parser.add_argument("--dataset_name", type=str, choices=["iris", "cancer"], help="Dataset name") + parser.add_argument("--randomize", type=int, help="Whether to randomize data sequence") + parser.add_argument("--out_path", type=str, help="Path to output data file") + args = parser.parse_args() + + output_dir = os.path.dirname(args.out_path) + filename = os.path.basename(args.out_path) + prepare_data(output_dir, args.dataset_name, args.randomize, filename) + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb index 930a53239b..f25d8d2af1 100644 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb @@ -1,19 +1,162 @@ { "cells": [ + { + "cell_type": "markdown", + "id": "7d7767c9", + "metadata": {}, + "source": [ + "# Federated K-Means Clustering with Scikit-learn on Iris Dataset" + ] + }, + { + "cell_type": "markdown", + "id": "f635ea04", + "metadata": {}, + "source": [ + "## Introduction to Scikit-learn, tabular data, and federated k-Means\n", + "### Scikit-learn\n", + "This example shows how to use [NVIDIA FLARE](https://nvflare.readthedocs.io/en/main/index.html) on tabular data.\n", + "It uses [Scikit-learn](https://scikit-learn.org/),\n", + "a widely used open-source machine learning library that supports supervised \n", + "and unsupervised learning.\n", + "### Tabular data\n", + "The data used in this example is tabular in a format that can be handled by [pandas](https://pandas.pydata.org/), such that:\n", + "- rows correspond to data samples\n", + "- the first column represents the label \n", + "- the other columns cover the features. \n", + "\n", + "Each client is expected to have one local data file containing both training \n", + "and validation samples. To load the data for each client, the following \n", + "parameters are expected by the local learner:\n", + "- data_file_path: string, the full path to the client's data file \n", + "- train_start: int, start row index for the training set\n", + "- train_end: int, end row index for the training set\n", + "- valid_start: int, start row index for the validation set\n", + "- valid_end: int, end row index for the validation set\n", + "\n", + "### Federated k-Means clustering\n", + "The machine learning algorithm in this example is [k-Means clustering](https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html).\n", + "\n", + "The aggregation follows the scheme defined in [Mini-batch k-Means](https://scikit-learn.org/stable/modules/generated/sklearn.cluster.MiniBatchKMeans.html). \n", + "\n", + "Under this setting, each round of federated learning can be formulated as follows:\n", + "- local training: starting from global centers, each client trains a local MiniBatchKMeans model with their own data\n", + "- global aggregation: server collects the cluster center, \n", + " counts information from all clients, aggregates them by considering \n", + " each client's results as a mini-batch, and updates the global center and per-center counts.\n", + "\n", + "For center initialization, at the first round, each client generates its initial centers with the k-means++ method. Then, the server collects all initial centers and performs one round of k-means to generate the initial global center.\n", + "\n", + "Below we listed steps to run this example." + ] + }, + { + "cell_type": "markdown", + "id": "ce92018e", + "metadata": {}, + "source": [ + "## Install requirements\n", + "First, install the required packages:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e08b25db", + "metadata": {}, + "outputs": [], + "source": [ + "% pip install -r code/requirements.txt" + ] + }, + { + "cell_type": "markdown", + "id": "31c22f7d", + "metadata": {}, + "source": [ + "## Download and prepare data\n", + "This example uses the Iris dataset available from Scikit-learn's dataset API. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e6c3b765", + "metadata": {}, + "outputs": [], + "source": [ + "%env DATASET_PATH=/tmp/nvflare/dataset/sklearn_iris.csv\n", + "! python3 ./code/utils/prepare_data.py --dataset_name iris --out_path ${DATASET_PATH}" + ] + }, + { + "cell_type": "markdown", + "id": "6a1fefd8", + "metadata": {}, + "source": [ + "This will load the data, format it properly by removing the header, order \n", + "the label and feature columns, randomize the dataset, and save it to a CSV file with comma separation. \n", + "The default path is `/tmp/nvflare/dataset/sklearn_iris.csv`. \n", + "\n", + "Note that the dataset contains a label for each sample, which will not be \n", + "used for training since k-Means clustering is an unsupervised method. \n", + "The entire dataset with labels will be used for performance evaluation \n", + "based on [homogeneity_score](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.homogeneity_score.html)." + ] + }, + { + "cell_type": "markdown", + "id": "cf161c43", + "metadata": {}, + "source": [ + "## Run simulated kmeans experiment\n", + "We can run the federated training using the NVFlare Simulator with the JobAPI:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a2a8f0ee", + "metadata": {}, + "outputs": [], + "source": [ + "! python kmeans_job.py --num_clients 3 --split_mode uniform" + ] + }, + { + "cell_type": "markdown", + "id": "7b9fdb72", + "metadata": {}, + "source": [ + "With the default arguments, [kmeans_job.py](code/kmeans_job.py) will export the job to `/tmp/nvflare/workspace/jobs/kmeans` and then the job will be run with a workspace directory of `/tmp/nvflare/workspace/works/kmeans`." + ] + }, + { + "cell_type": "markdown", + "id": "fb48af70", + "metadata": {}, + "source": [ + "## Result visualization\n", + "Model accuracy is computed as the homogeneity score between the cluster formed and the ground truth label, which can be visualized in tensorboard." + ] + }, { "cell_type": "code", "execution_count": null, - "id": "94a3e985-bc57-4973-b43f-867cf94ced6c", + "id": "88d9f366", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "%load_ext tensorboard\n", + "%tensorboard --logdir /tmp/nvflare/workspace/works/kmeans/sklearn_kmeans_uniform_3_clients" + ] } ], "metadata": { "kernelspec": { - "display_name": "nvflare_example", + "display_name": ".venv", "language": "python", - "name": "nvflare_example" + "name": "python3" }, "language_info": { "codemirror_mode": {