Skip to content

Commit

Permalink
Add P2P distributed optimization to advanced examples (#3189)
Browse files Browse the repository at this point in the history
### Description

This PR adds a new set of advanced examples in
`examples/advanced/distributed_optimization`, showing how to use the
lower-level APIs to build P2P distributed optimization algorithms.

### Types of changes
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).

---------

Co-authored-by: Holger Roth <[email protected]>
Co-authored-by: Chester Chen <[email protected]>
  • Loading branch information
3 people authored Feb 3, 2025
1 parent 0329d12 commit 14f40cc
Show file tree
Hide file tree
Showing 43 changed files with 3,182 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ venv/
ENV/
env.bak/
venv.bak/
.mise.toml

# Spyder project settings
.spyderproject
Expand Down
4 changes: 4 additions & 0 deletions examples/advanced/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ Please also install "./requirements.txt" in each example folder.
* [Swarm Learning](./swarm_learning/README.md)
* Example of swarm learning with NVIDIA FLARE using PyTorch with the CIFAR-10 dataset.

## Distributed Optimization / P2P algorithms
* [Distributed Optimization](./distributed_optimization/README.md)
* Example of using the low-level NVFlare APIs to implement and run P2P distributed optimization algorithms.

## Vertical Federated Learning
* [Vertical Federated Learning](./vertical_federated_learning/README.md)
* Example of running split learning using the CIFAR-10 dataset.
Expand Down
2 changes: 2 additions & 0 deletions examples/advanced/distributed_optimization/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tmp
data
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Consensus algorithm
In this example we show how to run the consensus algorithm. You can find a detailed walkthrough in the [tutorial](tutorial.ipynb) or you can just run the provided [script](launcher.py) via `python launcher.py`.

![dgd](consensus.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
55 changes: 55 additions & 0 deletions examples/advanced/distributed_optimization/1-consensus/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random

import matplotlib.pyplot as plt
import torch

from nvflare.app_opt.p2p.controllers import DistOptController
from nvflare.app_opt.p2p.executors import ConsensusExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob

if __name__ == "__main__":
# Create job
job = FedJob(name="consensus")

# generate random config
num_clients = 6
network, _ = generate_random_network(num_clients=num_clients)
config = Config(network=network, extra={"iterations": 50})

# send controller to server
controller = DistOptController(config=config)
job.to_server(controller)

# Add clients
for i in range(num_clients):
executor = ConsensusExecutor(random.randint(0, 10))
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run("./tmp/runs/consensus")

history = {
f"site-{i + 1}": torch.load(f"tmp/runs/consensus/site-{i + 1}/value_sequence.pt") for i in range(num_clients)
}
plt.figure()
for i in range(num_clients):
plt.plot(history[f"site-{i + 1}"], label=f"site-{i + 1}")
plt.legend()
plt.title("Evolution of local values")
plt.show()
760 changes: 760 additions & 0 deletions examples/advanced/distributed_optimization/1-consensus/tutorial.ipynb

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions examples/advanced/distributed_optimization/2-two_moons/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Distributed classification - two moons dataset

In this example we consider the simple [two moons](https://scikit-learn.org/dev/modules/generated/sklearn.datasets.make_moons.html) classification problem and compare different distributed optimization algorithms:
- Distributed gradient descent
- Gradient tracking
- GTAdam

We run all the algorithms with 6 clients, for 1000 iterations and with a stepsize of 0.01. These common parameters can be changed in the `config.py` file.

The models and datasets are stored in `utils.py` and are the same for all algorithms.

## Distributed gradient descent
```
python launcher_dgd.py
```
![dgd](dgd_results.png)

## Gradient tracking
```
python launcher_gt.py
```
![gt](gt_results.png)

## GTAdam
```
python launcher_gtadam.py
```
![gtadam](gtadam_results.png)
16 changes: 16 additions & 0 deletions examples/advanced/distributed_optimization/2-two_moons/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
NUM_CLIENTS = 6
ITERATIONS = 1000
STEPSIZE = 0.01
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import DistOptController
from nvflare.app_opt.p2p.executors import DGDExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomDGDExecutor(DGDExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)


if __name__ == "__main__":
# Create job
job_name = "dgd"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(
network=network,
extra={"iterations": ITERATIONS, "stepsize": STEPSIZE},
)

# send controller to server
controller = DistOptController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomDGDExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

# plot and save results
plot_results(job_name, NUM_CLIENTS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import DistOptController
from nvflare.app_opt.p2p.executors import GTExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomGTExecutor(GTExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)


if __name__ == "__main__":
# Create job
job_name = "gt"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(network=network, extra={"iterations": ITERATIONS, "stepsize": STEPSIZE})

# send controller to server
controller = DistOptController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomGTExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

plot_results(job_name, NUM_CLIENTS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import DistOptController
from nvflare.app_opt.p2p.executors import GTADAMExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomGTADAMExecutor(GTADAMExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)


if __name__ == "__main__":
# Create job
job_name = "gtadam"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(
network=network,
extra={
"iterations": ITERATIONS,
"stepsize": STEPSIZE,
"beta1": 0.9,
"beta2": 0.999,
"epsilon": 1e-8,
},
)

# send controller to server
controller = DistOptController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomGTADAMExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

plot_results(job_name, NUM_CLIENTS)
Loading

0 comments on commit 14f40cc

Please sign in to comment.