diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 74d778bd0..34f1125c7 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -39,6 +39,7 @@ jobs: run: | cd tests export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor + export HIVEMIND_DHT_NUM_WORKERS=1 pytest --durations=0 --durations-min=1.0 -v build_and_test_p2pd: runs-on: ubuntu-latest @@ -100,6 +101,7 @@ jobs: - name: Test run: | export HIVEMIND_MEMORY_SHARING_STRATEGY=file_descriptor + export HIVEMIND_DHT_NUM_WORKERS=1 pytest --cov hivemind --cov-config=pyproject.toml -v tests - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/requirements.txt b/requirements.txt index 5a39ffbcd..f32fc94c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ msgpack>=0.5.6 sortedcontainers uvloop>=0.14.0 grpcio-tools>=1.33.2 -protobuf>=3.12.2 +protobuf>=3.12.2,<5.28.0 configargparse>=1.2.3 py-multihash>=0.2.3 multiaddr @ git+https://github.com/multiformats/py-multiaddr.git@e01dbd38f2c0464c0f78b556691d655265018cce diff --git a/tests/test_allreduce_fault_tolerance.py b/tests/test_allreduce_fault_tolerance.py index 12e310eba..92c45d47f 100644 --- a/tests/test_allreduce_fault_tolerance.py +++ b/tests/test_allreduce_fault_tolerance.py @@ -1,16 +1,18 @@ from __future__ import annotations +import asyncio from enum import Enum, auto import pytest +import torch import hivemind -from hivemind.averaging.averager import * +from hivemind.averaging.averager import AllReduceRunner, AveragingMode, GatheredData from hivemind.averaging.group_info import GroupInfo from hivemind.averaging.load_balancing import load_balance_peers from hivemind.averaging.matchmaking import MatchmakingException from hivemind.proto import averaging_pb2 -from hivemind.utils.asyncio import aenumerate, as_aiter, azip, enter_asynchronously +from hivemind.utils.asyncio import AsyncIterator, aenumerate, as_aiter, azip, enter_asynchronously from hivemind.utils.logging import get_logger logger = get_logger(__name__) @@ -138,6 +140,8 @@ async def _generate_input_for_peer(self, peer_index: int) -> AsyncIterator[avera ], ) def test_fault_tolerance(fault0: Fault, fault1: Fault): + torch.manual_seed(0) + def _make_tensors(): return [torch.rand(16, 1024), -torch.rand(3, 8192), 2 * torch.randn(4, 4, 4), torch.randn(1024, 1024)] @@ -149,10 +153,10 @@ def _make_tensors(): _make_tensors(), hivemind.DHT(initial_peers=dht.get_visible_maddrs(), start=True), prefix="test", - request_timeout=0.3, - min_matchmaking_time=1.0, - next_chunk_timeout=0.5, - allreduce_timeout=5, + request_timeout=1.5, + min_matchmaking_time=3.0, + next_chunk_timeout=2.0, + allreduce_timeout=30, part_size_bytes=2**16, client_mode=(i == 1), start=True, diff --git a/tests/test_averaging.py b/tests/test_averaging.py index 1059e321b..732a329ce 100644 --- a/tests/test_averaging.py +++ b/tests/test_averaging.py @@ -82,7 +82,7 @@ def _test_allreduce_once(n_clients, n_aux): tensors, dht=dht, target_group_size=4, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", client_mode=mode == AveragingMode.CLIENT, auxiliary=mode == AveragingMode.AUX, @@ -139,7 +139,7 @@ def test_allreduce_weighted(n_client_mode_peers: int = 2): tensors, dht=dht, target_group_size=4, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", client_mode=client_mode, start=True, @@ -225,7 +225,7 @@ def test_allgather(n_averagers=8, target_group_size=4): [torch.ones(1)], dht=dht, target_group_size=target_group_size, - min_matchmaking_time=15, + min_matchmaking_time=30, prefix="mygroup", initial_group_bits="000", start=True, diff --git a/tests/test_cli_scripts.py b/tests/test_cli_scripts.py index 97c674000..02084e0ed 100644 --- a/tests/test_cli_scripts.py +++ b/tests/test_cli_scripts.py @@ -7,7 +7,7 @@ def test_dht_connection_successful(): - dht_refresh_period = 1 + dht_refresh_period = 3 cloned_env = os.environ.copy() # overriding the loglevel to prevent debug print statements diff --git a/tests/test_dht_experts.py b/tests/test_dht_experts.py index 0332a1a59..4210ca6ff 100644 --- a/tests/test_dht_experts.py +++ b/tests/test_dht_experts.py @@ -47,6 +47,10 @@ def test_store_get_experts(n_peers=10): assert all(declare_experts(remaining_peer1, ["new_expert.1"], expiration_time=get_dht_time() + 30)) assert get_experts(remaining_peer2, ["new_expert.1"])[0].peer_id == remaining_peer1.peer_id + for peer in peers: + if peer.is_alive(): + peer.shutdown() + @pytest.mark.forked def test_beam_search( diff --git a/tests/test_moe.py b/tests/test_moe.py index f62c2159d..7fc735843 100644 --- a/tests/test_moe.py +++ b/tests/test_moe.py @@ -21,48 +21,49 @@ @pytest.mark.forked -def test_moe(): +def test_moe(batch_size=2, hid_dim=4): all_expert_uids = [ f"ffn.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}" for _ in range(10) ] with background_server( - expert_uids=all_expert_uids, device="cpu", expert_cls="ffn", num_handlers=1, hidden_dim=16 + expert_uids=all_expert_uids, device="cpu", expert_cls="ffn", num_handlers=1, hidden_dim=hid_dim ) as server_peer_info: dht = DHT(start=True, initial_peers=server_peer_info.addrs) - dmoe = RemoteMixtureOfExperts(in_features=16, grid_size=(4, 4, 4), dht=dht, k_best=3, uid_prefix="ffn.") + dmoe = RemoteMixtureOfExperts(in_features=hid_dim, grid_size=(4, 4, 4), dht=dht, k_best=3, uid_prefix="ffn.") for i in range(3): - out = dmoe(torch.randn(10, 16)) + out = dmoe(torch.randn(batch_size, hid_dim)) out.sum().backward() @pytest.mark.forked -def test_no_experts(): +def test_no_experts(batch_size=2, hid_dim=4): all_expert_uids = [ f"expert.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}.{np.random.randint(0, 3)}" for _ in range(10) ] with background_server( - expert_uids=all_expert_uids, device="cpu", expert_cls="nop_delay", num_handlers=1, hidden_dim=16 + expert_uids=all_expert_uids, device="cpu", expert_cls="nop_delay", num_handlers=1, hidden_dim=hid_dim ) as server_peer_info: dht = DHT(start=True, initial_peers=server_peer_info.addrs) dmoe = RemoteSwitchMixtureOfExperts( - in_features=16, + in_features=hid_dim, grid_size=(4, 4, 4), dht=dht, uid_prefix="expert.", - forward_timeout=0.1, - backward_timeout=0.1, + forward_timeout=0.01, + backward_timeout=0.01, allow_zero_outputs=True, ) for i in range(3): - out, balancing_loss = dmoe(torch.randn(10, 16)) + out, balancing_loss = dmoe(torch.randn(batch_size, hid_dim)) out.sum().backward() + dht.shutdown() @pytest.mark.forked -def test_call_many(hidden_dim=16): +def test_call_many(hidden_dim=4): k_min = 1 timeout_after_k_min = None backward_k_min = 1 @@ -88,7 +89,7 @@ def test_call_many(hidden_dim=16): [ExpertInfo(uid=f"expert.{i}", peer_id=server_peer_info.peer_id) for i in range(5)], dht, ) - e5 = RemoteExpert(ExpertInfo(f"thisshouldnotexist", server_peer_info), None) + e5 = RemoteExpert(ExpertInfo("thisshouldnotexist", server_peer_info), None) mask, expert_outputs = _RemoteCallMany.apply( DUMMY, @@ -133,7 +134,7 @@ def test_call_many(hidden_dim=16): @pytest.mark.forked -def test_remote_module_call(hidden_dim=16): +def test_remote_module_call(hidden_dim=4): with background_server( num_experts=1, device="cpu", @@ -315,9 +316,9 @@ def test_client_anomaly_detection(): server.shutdown() -def _measure_coro_running_time(n_coros, elapsed_fut, counter): +def _measure_coro_running_time(n_coros, elapsed_fut, counter, coroutine_time): async def coro(): - await asyncio.sleep(0.1) + await asyncio.sleep(coroutine_time) counter.value += 1 try: @@ -336,20 +337,21 @@ async def coro(): @pytest.mark.forked -def test_remote_expert_worker_runs_coros_concurrently(n_processes=4, n_coros=10): +def test_remote_expert_worker_runs_coros_concurrently(n_processes=4, n_coros=10, coroutine_time=0.1): processes = [] counter = mp.Value(ctypes.c_int64) for i in range(n_processes): elapsed_fut = MPFuture() factory = threading.Thread if i % 2 == 0 else mp.Process # Test both threads and processes - proc = factory(target=_measure_coro_running_time, args=(n_coros, elapsed_fut, counter)) + proc = factory(target=_measure_coro_running_time, args=(n_coros, elapsed_fut, counter, coroutine_time)) proc.start() processes.append((proc, elapsed_fut)) for proc, elapsed_fut in processes: # Ensure that the coroutines were run concurrently, not sequentially - assert elapsed_fut.result() < 0.2 + expected_time = coroutine_time * 3 # from non-blocking calls + blocking call + some overhead + assert elapsed_fut.result() < expected_time proc.join() assert counter.value == n_processes * n_coros # Ensure all couroutines have finished diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index c859e3879..9c91fbd11 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -2,12 +2,14 @@ import multiprocessing as mp import time from functools import partial +from typing import List import numpy as np import pytest import torch import torch.nn as nn import torch.nn.functional as F +from multiaddr import Multiaddr import hivemind from hivemind.averaging.control import AveragingStage @@ -227,29 +229,29 @@ def test_progress_tracker(): finished_evt = mp.Event() emas = mp.Array(ctypes.c_double, 5) - def run_worker(index: int, batch_size: int, period: float, **kwargs): - dht = hivemind.DHT(initial_peers=dht_root.get_visible_maddrs(), start=True) + root_maddrs = dht_root.get_visible_maddrs() + + def run_worker(index: int, batch_size: int, step_time: float, initial_peers: List[Multiaddr]): + dht = hivemind.DHT(initial_peers=initial_peers, start=True) tracker = ProgressTracker( dht, prefix, target_batch_size, start=True, - min_refresh_period=0.1, - default_refresh_period=0.2, - max_refresh_period=0.5, - private_key=RSAPrivateKey(), - **kwargs, + min_refresh_period=0.01, + default_refresh_period=0.02, + max_refresh_period=0.05, ) + with tracker.pause_updates(): + barrier.wait() + if index == 4: + delayed_start_evt.wait() - barrier.wait() - if index == 4: - delayed_start_evt.wait() - - local_epoch = 2 if index == 4 else 0 - samples_accumulated = 0 + local_epoch = 2 if index == 4 else 0 + samples_accumulated = 0 while True: - time.sleep(period) + time.sleep(step_time) if finished_evt.is_set(): break @@ -269,23 +271,29 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): tracker.shutdown() dht.shutdown() - workers = [ - mp.Process(target=run_worker, kwargs=dict(index=1, batch_size=12, period=0.6)), - mp.Process(target=run_worker, kwargs=dict(index=2, batch_size=16, period=0.5)), - mp.Process(target=run_worker, kwargs=dict(index=3, batch_size=24, period=0.4)), - mp.Process(target=run_worker, kwargs=dict(index=4, batch_size=64, period=0.4)), - ] - for worker in workers: + worker_batch_sizes = [12, 16, 24, 64] + worker_step_times = [0.6, 0.5, 0.4, 0.4] + + workers = [] + for i, (peer_batch_size, peer_step_time) in enumerate(zip(worker_batch_sizes, worker_step_times), start=1): + peer_kwargs = { + "index": i, + "batch_size": peer_batch_size, + "step_time": peer_step_time, + "initial_peers": root_maddrs, + } + worker = mp.Process(target=run_worker, kwargs=peer_kwargs) worker.start() + workers.append(worker) tracker = ProgressTracker( dht_root, prefix, target_batch_size, start=True, - min_refresh_period=0.1, - default_refresh_period=0.2, - max_refresh_period=0.5, + min_refresh_period=0.01, + default_refresh_period=0.02, + max_refresh_period=0.05, ) barrier.wait() @@ -294,7 +302,7 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): step_time_deltas = [] while local_epoch < 6: - time.sleep(0.1) + time.sleep(0.01) if tracker.ready_to_update_epoch: with tracker.pause_updates(): @@ -319,7 +327,7 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): for i in (0, 1, 5): # Without the 4th worker (the fastest one) assert 1.05 * mean_step_time < step_time_deltas[i] < 2.0 * mean_step_time for i in (2, 3, 4): # With the 4th worker - assert 0.5 * mean_step_time < step_time_deltas[i] < 0.95 * mean_step_time + assert 0.3 * mean_step_time < step_time_deltas[i] < 0.95 * mean_step_time assert emas[1] < emas[2] < emas[3] < emas[4] assert tracker.performance_ema.samples_per_second < 1e-9 @@ -336,7 +344,7 @@ def run_worker(index: int, batch_size: int, period: float, **kwargs): (False, True, True, True, True), (False, True, True, False, True), (True, False, False, False, False), - (True, True, False, False, False,), + (True, True, False, False, False), ], # fmt: on ) @@ -359,6 +367,8 @@ def test_optimizer( def _test_optimizer( num_peers: int = 1, num_clients: int = 0, + default_batch_size: int = 4, + default_batch_time: int = 0.1, target_batch_size: int = 32, total_epochs: int = 3, use_local_updates: bool = False, @@ -422,20 +432,21 @@ def run_trainer(batch_size: int, batch_time: float, client_mode: bool): prev_time = time.perf_counter() - time.sleep(1.0) optimizer.shutdown() return optimizer peers = [] for index in range(num_peers): + peer_batch_size = default_batch_size + index + peer_batch_time = default_batch_time + 0.01 * index peers.append( mp.Process( target=run_trainer, name=f"trainer-{index}", kwargs=dict( - batch_size=4 + index, - batch_time=0.3 + 0.2 * index, + batch_size=peer_batch_size, + batch_time=peer_batch_time, client_mode=(index >= num_peers - num_clients), ), ) @@ -451,7 +462,12 @@ def run_trainer(batch_size: int, batch_time: float, client_mode: bool): assert optimizer.local_epoch == optimizer.tracker.global_epoch == total_epochs expected_samples_accumulated = target_batch_size * total_epochs assert expected_samples_accumulated <= total_samples_accumulated.value <= expected_samples_accumulated * 1.2 - assert 4 / 0.3 * 0.8 <= optimizer.tracker.performance_ema.samples_per_second <= 4 / 0.3 * 1.2 + expected_performance = default_batch_size / default_batch_time + assert ( + expected_performance * 0.8 + <= optimizer.tracker.performance_ema.samples_per_second + <= expected_performance * 1.2 + ) assert not optimizer.state_averager.is_alive() assert not optimizer.tracker.is_alive() diff --git a/tests/test_util_modules.py b/tests/test_util_modules.py index f245b777e..f4371372f 100644 --- a/tests/test_util_modules.py +++ b/tests/test_util_modules.py @@ -3,7 +3,8 @@ import multiprocessing as mp import random import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Event import numpy as np import pytest @@ -266,9 +267,10 @@ def _check_result_and_set(future): with pytest.raises(RuntimeError): future1.add_done_callback(lambda future: (1, 2, 3)) + events[0].wait() assert future1.done() and not future1.cancelled() assert future2.done() and future2.cancelled() - for i in 0, 1, 4: + for i in 1, 4: events[i].wait(1) assert events[0].is_set() and events[1].is_set() and events[2].is_set() and events[4].is_set() assert not events[3].is_set() @@ -524,9 +526,9 @@ async def test_async_context_flooding(): async def coro(): async with enter_asynchronously(lock1): - await asyncio.sleep(1e-2) + await asyncio.sleep(1e-6) async with enter_asynchronously(lock2): - await asyncio.sleep(1e-2) + await asyncio.sleep(1e-6) num_coros = max(33, mp.cpu_count() * 5 + 1) await asyncio.wait({asyncio.create_task(coro()) for _ in range(num_coros)}) @@ -556,17 +558,26 @@ def test_performance_ema_threadsafe( bias_power: float = 0.7, tolerance: float = 0.05, ): - def run_task(ema): - task_size = random.randint(1, 4) + def run_task(ema, start_event, task_size): + start_event.wait() with ema.update_threadsafe(task_size): time.sleep(task_size * interval * (0.9 + 0.2 * random.random())) return task_size with ThreadPoolExecutor(max_workers) as pool: ema = PerformanceEMA(alpha=alpha) + start_event = Event() + + futures = [] + for _ in range(num_updates): + task_size = random.randint(1, 4) + future = pool.submit(run_task, ema, start_event, task_size) + futures.append(future) + + ema.reset_timer() + start_event.set() start_time = time.perf_counter() - futures = [pool.submit(run_task, ema) for i in range(num_updates)] - total_size = sum(future.result() for future in futures) + total_size = sum(future.result() for future in as_completed(futures)) end_time = time.perf_counter() target = total_size / (end_time - start_time) assert ema.samples_per_second >= (1 - tolerance) * target * max_workers ** (bias_power - 1)