From 60cb9065adce4e96630d555df64036cc46f35d73 Mon Sep 17 00:00:00 2001 From: bamsumit Date: Tue, 31 Oct 2023 22:11:57 -0400 Subject: [PATCH 1/7] Lava fixes to enable large convolutional networks (#808) * update refport unittest to always wait when it writes to port for consistent behavior Signed-off-by: bamsumit * Removed pyproject changes Signed-off-by: bamsumit * Fix to convolution tests. Fixed imcompatible mnist_pretrained for old python versions. Signed-off-by: bamsumit * Missing moudle parent fix Signed-off-by: bamsumit * Added ConvVarModel * Added iterable callback function Signed-off-by: bamsumit * Fix codacy issues in callback_fx.py * Fix linting in callback_fx.py * Fix codacy sig issue in callback_fx.py * Bugfix to pass the args by keyword * Delay Dense PyModel fix Signed-off-by: bamsumit * Fixed unittests Signed-off-by: bamsumit * Fixed sparse delay Signed-off-by: bamsumit * IO modules fixes Signed-off-by: bamsumit * IO modules fixes Signed-off-by: bamsumit * bypass port splitting Signed-off-by: bamsumit * Delta sparse encoding fix Signed-off-by: bamsumit * Delta encoder fixes Signed-off-by: bamsumit * Removing torch computation of conv until we find the solution to process model hanging Signed-off-by: bamsumit * Updated lava fixes Signed-off-by: bamsumit * Cleaned PR Signed-off-by: bamsumit * Codacy fixes Signed-off-by: bamsumit * Suggestions encorporated Signed-off-by: bamsumit --------- Signed-off-by: bamsumit Co-authored-by: Joyesh Mishra Co-authored-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> --- src/lava/magma/compiler/mapper.py | 2 +- src/lava/proc/conv/utils.py | 15 +- src/lava/proc/io/encoder.py | 69 ++++- src/lava/proc/io/extractor.py | 135 ++++++++- tests/lava/proc/io/test_extractor.py | 421 ++++++++++++++++++++++++++- 5 files changed, 622 insertions(+), 20 deletions(-) diff --git a/src/lava/magma/compiler/mapper.py b/src/lava/magma/compiler/mapper.py index d0d957faf..e05939f89 100644 --- a/src/lava/magma/compiler/mapper.py +++ b/src/lava/magma/compiler/mapper.py @@ -161,7 +161,7 @@ def map_cores(self, executable: Executable, chips = [addr.physical_chip_id for addr in src_addr] address.update(chips) break - if len(address) > 1: + if len(address) > 1 and hasattr(var_model, "address"): raise ValueError("Lava Compiler doesn't support port" "splitting currently. MultiChip " "Not Supported ") diff --git a/src/lava/proc/conv/utils.py b/src/lava/proc/conv/utils.py index 06943c095..3b1c06d0e 100644 --- a/src/lava/proc/conv/utils.py +++ b/src/lava/proc/conv/utils.py @@ -7,12 +7,15 @@ from scipy import signal from enum import IntEnum, unique -try: - import torch - import torch.nn.functional as F - TORCH_IS_AVAILABLE = True -except ModuleNotFoundError: - TORCH_IS_AVAILABLE = False +# NOTE: It is known that torch calls inside Lava PyProcess hangs. +# Disabling torch usage inside a Lava CPU process until a fix is found. +# try: +# import torch +# import torch.nn.functional as F +# TORCH_IS_AVAILABLE = True +# except ModuleNotFoundError: +# TORCH_IS_AVAILABLE = False +TORCH_IS_AVAILABLE = False @unique diff --git a/src/lava/proc/io/encoder.py b/src/lava/proc/io/encoder.py index 3e284ce85..4f09495e1 100644 --- a/src/lava/proc/io/encoder.py +++ b/src/lava/proc/io/encoder.py @@ -58,10 +58,14 @@ class DeltaEncoder(AbstractProcess): Shape of the sigma process. vth: int or float Threshold of the delta encoder. - spike_exp: int + spike_exp: Optional[int] Scaling exponent with base 2 for the spike message. Note: This should only be used for fixed point models. Default is 0. + num_bits: Optional[int] + Precision for spike output. It is applied before spike_exp. If None, + precision is not enforced, i.e. the spike output is unbounded. + Default is None. compression : Compression Data compression mode, by default DENSE compression. """ @@ -71,6 +75,7 @@ def __init__(self, shape: Tuple[int, ...], vth: Union[int, float], spike_exp: Optional[int] = 0, + num_bits: Optional[int] = None, compression: Compression = Compression.DENSE) -> None: super().__init__(shape=shape, vth=vth, cum_error=False, spike_exp=spike_exp, state_exp=0) @@ -84,6 +89,13 @@ def __init__(self, self.act = Var(shape=shape, init=0) self.residue = Var(shape=shape, init=0) self.spike_exp = Var(shape=(1,), init=spike_exp) + if num_bits is not None: + a_min = -(1 << (num_bits - 1)) << spike_exp + a_max = ((1 << (num_bits - 1)) - 1) << spike_exp + else: + a_min = a_max = -1 + self.a_min = Var(shape=(1,), init=a_min) + self.a_max = Var(shape=(1,), init=a_max) self.proc_params['compression'] = compression @property @@ -101,10 +113,14 @@ class AbstractPyDeltaEncoderModel(PyLoihiProcessModel): act: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) residue: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) spike_exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=3) + a_min: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + a_max: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) def encode_delta(self, act_new): delta = act_new - self.act + self.residue s_out = np.where(np.abs(delta) >= self.vth, delta, 0) + if self.a_max > 0: + s_out = np.clip(s_out, a_min=self.a_min, a_max=self.a_max) self.residue = delta - s_out self.act = act_new return s_out @@ -188,19 +204,32 @@ def encode_delta_sparse_8(self, s_out): if len(idx) == 0: idx = np.array([0]) data = np.array([0]) + max_idx = 0xFF + if idx[0] > max_idx: + idx = np.concatenate([np.zeros(1, dtype=idx.dtype), + idx.flatten()]) + data = np.concatenate([np.zeros(1, dtype=idx.dtype), + data.flatten()]) # 8 bit index encoding idx[1:] = idx[1:] - idx[:-1] - 1 # default increment of 1 delta_idx = [] delta_data = [] - max_idx = 0xFF start = 0 for i in np.argwhere(idx >= max_idx)[:, 0]: delta_idx.append((idx[start:i].flatten()) % max_idx) delta_data.append(data[start:i].flatten()) - delta_idx.append(np.array([max_idx - 1] * (idx[i] // max_idx))) - delta_data.append(np.array([0] * (idx[i] // max_idx))) - start = i + repeat_data = idx[i] // max_idx + num_repeats = repeat_data // max_idx + delta_idx.append(np.array([max_idx] * (num_repeats + 1)).flatten()) + delta_data.append(np.array([max_idx] * num_repeats + + [repeat_data % max_idx]).flatten()) + delta_idx.append((idx[i:i + 1].flatten()) % max_idx) + delta_data.append(data[i:i + 1].flatten()) + start = i + 1 + delta_idx.append(idx[start:].flatten()) + delta_data.append(data[start:].flatten()) + if len(delta_idx) > 0: delta_idx = np.concatenate(delta_idx) delta_data = np.concatenate(delta_data) @@ -208,11 +237,6 @@ def encode_delta_sparse_8(self, s_out): delta_idx = idx.flatten() delta_data = data.flatten() - # Decoding - # idx = delta_idx - # idx[1:] += 1 - # idx = np.cumsum(idx) - # data = delta_data padded_idx = np.zeros(int(np.ceil(len(delta_idx) / 4) * 4)) padded_data = np.zeros(int(np.ceil(len(delta_data) / 4) * 4)) @@ -232,6 +256,31 @@ def encode_delta_sparse_8(self, s_out): + padded_data[0::4]) return packed_data, packed_idx + def decode_encode_delta_sparse_8(self, packed_data, packed_idx): + """Python decoding script for delta_sparse_8 encoding. It is useful for + debug and verify the encoding.""" + data_list = [] + idx_list = [] + count = 0 + data = 0 + idx = 0 + for p_data, p_idx in zip(packed_data, packed_idx): + for _ in range(4): + data = p_data & 0xFF + idx_1 = p_idx & 0xFF + if idx_1 == 0xFF: + idx += data * 0xFF + data = 0 + else: + idx += idx_1 + int(count > 0) + if data != 0: + data_list.append(data) + idx_list.append(idx) + p_data >>= 8 + p_idx >>= 8 + count += 1 + return np.array(data_list), np.array(idx_list) + def run_spk(self): self.s_out.send(self.data, self.idx) # Receive synaptic input diff --git a/src/lava/proc/io/extractor.py b/src/lava/proc/io/extractor.py index 156370c6a..1f0ca1cf8 100644 --- a/src/lava/proc/io/extractor.py +++ b/src/lava/proc/io/extractor.py @@ -6,13 +6,13 @@ import typing as ty from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import InPort +from lava.magma.core.process.ports.ports import InPort, RefPort, Var from lava.magma.core.resources import CPU from lava.magma.core.decorator import implements, requires from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol from lava.magma.core.model.py.type import LavaPyType -from lava.magma.core.model.py.ports import PyInPort +from lava.magma.core.model.py.ports import PyInPort, PyRefPort from lava.magma.compiler.channels.pypychannel import PyPyChannel from lava.magma.runtime.message_infrastructure.multiprocessing import \ MultiProcessing @@ -47,6 +47,7 @@ class Extractor(AbstractProcess): buffer is full and how the dst_port behaves when the buffer is empty and not empty. """ + def __init__(self, shape: ty.Tuple[int, ...], buffer_size: ty.Optional[int] = 50, @@ -134,3 +135,133 @@ def run_spk(self) -> None: def __del__(self) -> None: self._pm_to_p_src_port.join() + + +class VarWire(AbstractProcess): + """VarWire allows non-Lava code, such as a third-party Python library + to tap data from a Lava Process Variable (Var) while the Lava Runtime is + running, by calling receive. + + Internally, this Process builds a channel from the ProcessModel to the + Process (named pm_to_p, of type PyPyChannel). + The wire_tap (ref_port) of the channel lives in the ProcessModel. + The dst_port of the channel lives in the Process. + + In the ProcessModel, data is received from this Process's RefPort, and + relayed to the pm_to_p.src_port. + When the receive method is called from the external Python script, data is + received from the pm_to_p.dst_port. + + Parameters + ---------- + shape : tuple + Shape of the InPort of the Process, and of the np.ndarrays passed + through the channel between the ProcessModel and the Process. + buffer_size : int, optional + Buffer size (in terms of number of np.ndarrays) of the channel between + the ProcessModel and Process. + channel_config : ChannelConfig, optional + Configuration object specifying how the src_port behaves when the + buffer is full and how the dst_port behaves when the buffer is empty + and not empty. + """ + + def __init__(self, + buffer_size: ty.Optional[int] = 50, + channel_config: ty.Optional[utils.ChannelConfig] = None) -> \ + None: + super().__init__() + + channel_config = channel_config or utils.ChannelConfig() + + utils.validate_buffer_size(buffer_size) + utils.validate_channel_config(channel_config) + + self.shape = None + self.buffer_size = buffer_size + self._multi_processing = None + # Stands for ProcessModel to Process + self._pm_to_p_dst_port = None + + self.proc_params["channel_config"] = channel_config + + self._receive_when_empty = channel_config.get_receive_empty_function() + self._receive_when_not_empty = \ + channel_config.get_receive_not_empty_function() + + self.wire_tap = RefPort((1,)) + + def connect_var(self, var: Var) -> None: + self.shape = var.shape + self.wire_tap = RefPort(self.shape) + self.wire_tap.connect_var(var) + + self._multi_processing = MultiProcessing() + self._multi_processing.start() + + # Stands for ProcessModel to Process + pm_to_p = PyPyChannel(message_infrastructure=self._multi_processing, + src_name="src", + dst_name="dst", + shape=self.shape, + dtype=float, + size=self.buffer_size) + self._pm_to_p_dst_port = pm_to_p.dst_port + self._pm_to_p_dst_port.start() + + self.proc_params["pm_to_p_src_port"] = pm_to_p.src_port + self._post_init() + + def receive(self) -> np.ndarray: + """Receive data from the ProcessModel. + + The data is received from pm_to_p.dst_port. + + Returns + ---------- + data : np.ndarray + Data received. + """ + elements_in_buffer = self._pm_to_p_dst_port._queue.qsize() + + if elements_in_buffer == 0: + data = self._receive_when_empty( + self._pm_to_p_dst_port, + np.zeros(self.shape)) + else: + data = self._receive_when_not_empty( + self._pm_to_p_dst_port, + np.zeros(self.shape), + elements_in_buffer) + + return data + + def __del__(self) -> None: + super().__del__() + + self._multi_processing.stop() + self._pm_to_p_dst_port.join() + + +@implements(proc=VarWire, protocol=LoihiProtocol) +@requires(CPU) +class PyLoihiVarWireModel(PyLoihiProcessModel): + wire_tap: PyRefPort = LavaPyType(PyRefPort.VEC_DENSE, float) + + def __init__(self, proc_params: dict) -> None: + super().__init__(proc_params=proc_params) + + channel_config = self.proc_params["channel_config"] + self._pm_to_p_src_port = self.proc_params["pm_to_p_src_port"] + self._pm_to_p_src_port.start() + + self._send = channel_config.get_send_full_function() + + def post_guard(self) -> None: + return True + + def run_post_mgmt(self) -> None: + self._send(self._pm_to_p_src_port, self.wire_tap.read()) + + def __del__(self) -> None: + self._pm_to_p_src_port.join() diff --git a/tests/lava/proc/io/test_extractor.py b/tests/lava/proc/io/test_extractor.py index 141b0c251..d918c1e10 100644 --- a/tests/lava/proc/io/test_extractor.py +++ b/tests/lava/proc/io/test_extractor.py @@ -11,7 +11,7 @@ from lava.magma.core.model.py.model import PyLoihiProcessModel from lava.magma.core.process.process import AbstractProcess -from lava.magma.core.process.ports.ports import InPort, OutPort +from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort from lava.magma.core.process.variable import Var from lava.magma.core.resources import CPU from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol @@ -23,7 +23,9 @@ from lava.magma.runtime.message_infrastructure.multiprocessing import \ MultiProcessing from lava.magma.compiler.channels.pypychannel import PyPyChannel, CspSendPort +from lava.proc.lif.process import LIF from lava.proc.io.extractor import Extractor, PyLoihiExtractorModel +from lava.proc.io.extractor import VarWire, PyLoihiVarWireModel from lava.proc.io import utils @@ -511,3 +513,420 @@ def test_run_continuous(self): extractor.stop() np.testing.assert_equal(recv_data, send_data) + + +class TestVarWire(unittest.TestCase): + def test_init(self): + """Test that the VarWire Process is instantiated correctly.""" + proc_shape = (5,) + + lif = LIF(shape=proc_shape) + listener = VarWire() + listener.connect_var(lif.u) + + self.assertIsInstance(listener, VarWire) + + config = listener.proc_params["channel_config"] + self.assertIsInstance(config, utils.ChannelConfig) + self.assertEqual(config.send_full, utils.SendFull.BLOCKING) + self.assertEqual(config.receive_empty, utils.ReceiveEmpty.BLOCKING) + self.assertEqual(config.receive_not_empty, utils.ReceiveNotEmpty.FIFO) + + self.assertIsInstance(listener.proc_params["pm_to_p_src_port"], + CspSendPort) + + self.assertIsInstance(listener.wire_tap, RefPort) + self.assertEqual(listener.wire_tap.shape, lif.u.shape) + + def test_invalid_buffer_size(self): + """Test that instantiating the VarWire Process with an invalid + buffer_size parameter raises errors.""" + buffer_size = 0.5 + with self.assertRaises(TypeError): + VarWire(buffer_size=buffer_size) + + buffer_size = -5 + with self.assertRaises(ValueError): + VarWire(buffer_size=buffer_size) + + def test_invalid_channel_config(self): + """Test that instantiating the VarWire Process with an invalid + channel_config parameter raises errors.""" + channel_config = "config" + with self.assertRaises(TypeError): + VarWire(channel_config=channel_config) + + channel_config = utils.ChannelConfig( + send_full=1, + receive_empty=utils.ReceiveEmpty.BLOCKING, + receive_not_empty=utils.ReceiveNotEmpty.FIFO) + with self.assertRaises(TypeError): + VarWire(channel_config=channel_config) + + channel_config = utils.ChannelConfig( + send_full=utils.SendFull.BLOCKING, + receive_empty=1, + receive_not_empty=utils.ReceiveNotEmpty.FIFO) + with self.assertRaises(TypeError): + VarWire(channel_config=channel_config) + + channel_config = utils.ChannelConfig( + send_full=utils.SendFull.BLOCKING, + receive_empty=utils.ReceiveEmpty.BLOCKING, + receive_not_empty=1) + with self.assertRaises(TypeError): + VarWire(channel_config=channel_config) + + +class TestPyLoihiVarWireModel(unittest.TestCase): + def test_init(self): + """Test that the PyLoihiVarWireModel ProcessModel is instantiated + correctly.""" + shape = (1, ) + buffer_size = 10 + + multi_processing = MultiProcessing() + multi_processing.start() + channel = PyPyChannel(message_infrastructure=multi_processing, + src_name="src", + dst_name="dst", + shape=shape, + dtype=float, + size=buffer_size) + + proc_params = {"channel_config": utils.ChannelConfig(), + "pm_to_p_src_port": channel.src_port} + + pm = PyLoihiVarWireModel(proc_params) + + self.assertIsInstance(pm, PyLoihiVarWireModel) + + @staticmethod + def _test_send_full_policy(send_full: utils.SendFull) \ + -> ty.Tuple[int, int, bool]: + """Sets up a simple network involving a Send Process and an VarWire + with buffer_size=1. + + In the main thread, the network is ran a single time (so that the + Runtime gets created), the element present in the channel gets + consumed, then we sleep for 2 seconds. + In a separate thread, the network is ran (and timing is recorded), + triggering the Send Process to send an item to VarWire, two times + in a row. + Finally, the main thread calls receive() on the VarWire so that the + element present in the channel is consumed. + + The first run() will make the VarWire fill the buffer and be fast. + The second send() will make the VarWire trigger the behavior + determined by the SendFull policy passed as argument + (either BLOCKING or NON_BLOCKING_DROP). + + Parameters + ---------- + send_full : SendFull + Enum instance specifying the SendFull policy of the channel. + Returns + ---------- + time_send_1 : int + Time it took to run the first run(). + time_send_2 : int + Time it took to run the second run(). + thread_is_alive : bool + Boolean representing whether or not the separate thread terminated. + """ + data_shape = (1,) + buffer_size = 1 + channel_config = utils.ChannelConfig(send_full=send_full) + num_steps = 1 + data = np.ones((num_steps,) + data_shape) + + send = Send(data=data) + listener = VarWire(buffer_size=buffer_size, + channel_config=channel_config) + listener.connect_var(send.var) + + run_condition = RunSteps(num_steps=num_steps) + run_cfg = Loihi2SimCfg() + + listener.run(condition=run_condition, run_cfg=run_cfg) + listener.receive() + + shared_queue = Queue(2) + + def thread_2_fn(queue: Queue) -> None: + checkpoint_1 = time.perf_counter() + listener.run(condition=run_condition, run_cfg=run_cfg) + checkpoint_2 = time.perf_counter() + listener.run(condition=run_condition, run_cfg=run_cfg) + checkpoint_3 = time.perf_counter() + + queue.put(checkpoint_2 - checkpoint_1) + queue.put(checkpoint_3 - checkpoint_2) + + thread_2 = threading.Thread(target=thread_2_fn, + daemon=True, + args=[shared_queue]) + thread_2.start() + time.sleep(2) + listener.receive() + time.sleep(1) + listener.stop() + thread_2.join() + + time_run_1 = shared_queue.get() + time_run_2 = shared_queue.get() + + return time_run_1, time_run_2, thread_2.is_alive() + + def test_receive_data_send_full_blocking(self): + """Test that running an instance of the VarWire Process with + SendFull.BLOCKING when the channel is full blocks.""" + send_full = utils.SendFull.BLOCKING + + time_run_1, time_run_2, thread_is_alive = \ + self._test_send_full_policy(send_full) + + self.assertLess(time_run_1, 1) + self.assertGreater(time_run_2, 1) + self.assertFalse(thread_is_alive) + + def test_receive_data_send_full_non_blocking_drop(self): + """Test that running an instance of the VarWire Process with + SendFull.NON_BLOCKING_DROP when the channel is full does not block.""" + send_full = utils.SendFull.NON_BLOCKING_DROP + + time_run_1, time_run_2, thread_is_alive = \ + self._test_send_full_policy(send_full) + + self.assertLess(time_run_1, 1) + self.assertLess(time_run_2, 1) + self.assertFalse(thread_is_alive) + + def test_receive_data_receive_empty_blocking(self): + """Test that calling receive on an instance of the VarWire Process + with ReceiveEmpty.BLOCKING blocks when the channel is empty. + + Sets up a simple network involving a Send Process and an VarWire with + (buffer_size=1, receive_empty=ReceiveEmpty.BLOCKING). + + In the main thread, the network is ran a single time (so that the + VarWire channel is not empty), then we sleep for 2 seconds. + In a separate thread, we call receive() on the VarWire (and timing + is recorded). two times in a row. + Finally, the main thread runs the network a single time step, sleeps + a second, and stops the network. + + The first receive() should be fast because an item is already in the + channel. + The second receive() should be slow because no item is in the channel + (main thread is sleeping for 2 seconds). + """ + data_shape = (1,) + buffer_size = 1 + channel_config = utils.ChannelConfig( + receive_empty=utils.ReceiveEmpty.BLOCKING) + num_steps = 1 + data = np.ones((num_steps,) + data_shape) + + send = Send(data=data) + listener = VarWire(buffer_size=buffer_size, + channel_config=channel_config) + listener.connect_var(send.var) + + run_condition = RunSteps(num_steps=num_steps) + run_cfg = Loihi2SimCfg() + + shared_queue = Queue(2) + + def thread_2_fn(queue: Queue) -> None: + checkpoint_1 = time.perf_counter() + listener.receive() + checkpoint_2 = time.perf_counter() + listener.receive() + checkpoint_3 = time.perf_counter() + + queue.put(checkpoint_2 - checkpoint_1) + queue.put(checkpoint_3 - checkpoint_2) + + listener.run(condition=run_condition, run_cfg=run_cfg) + thread_2 = threading.Thread(target=thread_2_fn, + daemon=True, + args=[shared_queue]) + thread_2.start() + time.sleep(2) + listener.run(condition=run_condition, run_cfg=run_cfg) + time.sleep(1) + listener.stop() + thread_2.join() + + time_1 = shared_queue.get() + time_2 = shared_queue.get() + + self.assertFalse(thread_2.is_alive()) + self.assertLess(time_1, 1) + self.assertGreater(time_2, 1) + + @staticmethod + def _test_receive_not_empty_policy(receive_not_empty: utils.ReceiveNotEmpty, + send_data: np.ndarray) -> np.ndarray: + """Sets up a simple network involving a Send Process and an VarWire + with buffer_size=10. + + Runs the network for 2 time steps, making the Send Process send two + data items in a row. + Then, call receive() on the VarWire 2 times in a row. + + Depending on the ReceiveNotEmpty policy passed as argument (either + FIFO or ACCUMULATE), the two consecutive calls to receive() on the + VarWire will either return the two sent items one after the other + or the sum of the two items followed by a 0. + + Parameters + ---------- + receive_not_empty : ReceiveNotEmpty + Enum instance specifying the ReceiveNotEmpty policy of the channel. + + Returns + ---------- + recv_data : np.ndarray + Data returned by the two consecutive calls to receive() on the + VarWire. + """ + buffer_size = 10 + # ReceiveEmpty policy is set to NON_BLOCKING_ZEROS so that the second + # call to receive() does not block when ReceiveNotEmpty policy is set + # to ACCUMULATE + channel_config = utils.ChannelConfig( + receive_empty=utils.ReceiveEmpty.NON_BLOCKING_ZEROS, + receive_not_empty=receive_not_empty) + num_steps = 2 + + send = Send(data=send_data) + listener = VarWire(buffer_size=buffer_size, + channel_config=channel_config) + listener.connect_var(send.var) + + run_condition = RunSteps(num_steps=num_steps) + run_cfg = Loihi2SimCfg() + + listener.run(condition=run_condition, run_cfg=run_cfg) + recv_data = [listener.receive(), listener.receive()] + listener.stop() + + return np.array(recv_data) + + def test_receive_data_receive_not_empty_fifo(self): + """Test that calling receive on an instance of the VarWire Process + with ReceiveNotEmpty.FIFO after having sent two items in a row + has the effect of returning the two sent items one by one.""" + receive_not_empty = utils.ReceiveNotEmpty.FIFO + send_data = np.array([[10], [15]]) + + recv_data = self._test_receive_not_empty_policy(receive_not_empty, + send_data) + + np.testing.assert_equal(recv_data[0], send_data) + + def test_receive_data_receive_not_empty_accumulate(self): + """Test that calling receive on an instance of the VarWire Process + with ReceiveNotEmpty.ACCUMULATE after having sent two items in a row + has the effect of returning the two sent items, accumulated.""" + receive_not_empty = utils.ReceiveNotEmpty.ACCUMULATE + send_data = np.array([[10], [15]]) + + recv_data = self._test_receive_not_empty_policy(receive_not_empty, + send_data) + + np.testing.assert_equal( + recv_data[0], 2 * send_data) # runs for 2 steps + + def test_run_steps_blocking(self): + """Test that running the a Lava network involving the VarWire + Process, with RunSteps(blocking=True), for multiple time steps, with a + separate thread calling receive, runs and terminates.""" + np.random.seed(0) + data_shape = (1,) + buffer_size = 10 + num_steps = 50 + num_send = num_steps + send_data = np.random.random(size=(num_send,) + data_shape) + + send = Send(data=send_data) + listener = VarWire(buffer_size=buffer_size) + listener.connect_var(send.var) + + run_condition = RunSteps(num_steps=num_steps) + run_cfg = Loihi2SimCfg() + + shared_queue = Queue(num_steps) + + def thread_2_fn(queue: Queue) -> None: + for _ in range(num_steps): + queue.put(listener.receive()) + + thread_2 = threading.Thread(target=thread_2_fn, + daemon=True, + args=[shared_queue]) + thread_2.start() + listener.run(condition=run_condition, run_cfg=run_cfg) + listener.stop() + + np.testing.assert_equal(list(shared_queue.queue)[0], send_data) + + def test_run_steps_non_blocking(self): + """Test that running the a Lava network involving the VarWire + Process, with RunSteps(blocking=False), for multiple time steps, with + the main thread calling receive, runs and terminates.""" + np.random.seed(0) + data_shape = (1,) + buffer_size = 10 + num_steps = 50 + num_send = num_steps + send_data = np.random.random(size=(num_send,) + data_shape) + + send = Send(data=send_data) + listener = VarWire(buffer_size=buffer_size) + listener.connect_var(send.var) + + run_condition = RunSteps(num_steps=num_steps, blocking=False) + run_cfg = Loihi2SimCfg() + + listener.run(condition=run_condition, run_cfg=run_cfg) + + recv_data = [] + for _ in range(num_steps): + recv_data.append(listener.receive()) + + listener.wait() + listener.stop() + + np.testing.assert_equal(recv_data[0], send_data) + + def test_run_continuous(self): + """Test that running the a Lava network involving the VarWire + Process, with RunContinuous(), for multiple time steps, with + the main thread calling receive, runs and terminates.""" + np.random.seed(0) + data_shape = (1,) + buffer_size = 10 + num_send = 50 + send_data = np.random.random(size=(num_send,) + data_shape) + + send = Send(data=send_data) + listener = VarWire(buffer_size=buffer_size) + listener.connect_var(send.var) + + run_condition = RunContinuous() + run_cfg = Loihi2SimCfg() + + listener.run(condition=run_condition, run_cfg=run_cfg) + + recv_data = [] + for _ in range(num_send): + recv_data.append(listener.receive()) + + listener.pause() + listener.wait() + listener.stop() + + np.testing.assert_equal(recv_data[0], send_data) From 22825174dabe65320ffc7cf5be1205519e34f905 Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Thu, 9 Nov 2023 08:01:31 -0800 Subject: [PATCH 2/7] Update README.md for release --- README.md | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7317912b2..9c4003a75 100644 --- a/README.md +++ b/README.md @@ -154,12 +154,28 @@ conda install -n lava -c intel numpy scipy conda install -n lava -c conda-forge lava --freeze-installed ``` +## Alternative: Installing Lava from pypi + +If you would like to install Lava as a user you can install via pypi binaries. +Installing in this way does not give you access to run tests. + +Open a Python terminal and run: + +### Windows/MacOS/Linux + +```bash +python -m venv .venv +source .venv/bin/activate ## Or Windows: .venv\Scripts\activate +pip install -U pip +pip install lava-nc +``` + ## Alternative: Installing Lava from binaries -If you only need to install Lava as a user in your python environment, we will -publish Lava releases via +You can also install Lava as a user with published Lava releases via [GitHub Releases](https://github.com/lava-nc/lava/releases). Please download -the package and install it. +the package and install it with the following commands. Installing in this way does not +give you access to run tests. Open a Python terminal and run: @@ -169,7 +185,8 @@ Open a Python terminal and run: python -m venv .venv source .venv/bin/activate ## Or Windows: .venv\Scripts\activate pip install -U pip -pip install lava-nc-0.6.0.tar.gz +# Substitute lava version needed for lava-nc-.tar.gz below +pip install lava-nc-0.8.0.tar.gz ``` ## Linting, testing, documentation and packaging From aedb3cc435cdaecdce07fb0ba3d0ead4ea74c63a Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Tue, 14 Nov 2023 11:52:52 -0800 Subject: [PATCH 3/7] Fix poetry config for publish to pypi (#782) * Try to use poetry config and token to publish to pypi * Temporarily enable dry run publish to pypi in cd.yml * Remove dry run publish to pypi in cd.yml --- .github/workflows/cd.yml | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 57de4efc1..c778bf1dd 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -134,6 +134,8 @@ jobs: name: Upload release artifact runs-on: ubuntu-latest if: github.triggering_actor == 'mgkwill' || github.triggering_actor == 'PhilippPlank' || github.triggering_actor == 'tim-shea' + outputs: + api-token: ${{ steps.mint-token.outputs.api-token}} permissions: contents: write id-token: write @@ -185,27 +187,24 @@ jobs: - name: Mint Github API token id: mint-token run: | - # retrieve the ambient OIDC token + # retrieve OIDC token resp=$(curl -H "Authorization: bearer $ACTIONS_ID_TOKEN_REQUEST_TOKEN" \ "$ACTIONS_ID_TOKEN_REQUEST_URL&audience=pypi") oidc_token=$(jq '.value' <<< "${resp}") - # exchange the OIDC token for an API token + # exchange OIDC token for API token resp=$(curl -X POST https://pypi.org/_/oidc/github/mint-token -d "{\"token\": \"${oidc_token}\"}") api_token=$(jq '.token' <<< "${resp}") - # mask the newly minted API token, so that we don't accidentally leak it + # mask the API token, to prevent leaking it echo "::add-mask::${api_token}" - # see the next step in the workflow for an example of using this step output echo "api-token=${api_token}" >> "${GITHUB_OUTPUT}" - name: Publish to PyPI if: steps.check-version.outputs.prerelease != 'true' - env: - POETRY_HTTP_BASIC_PYPI_USERNAME: __token__ - POETRY_HTTP_BASIC_PYPI_PASSWORD: ${{ steps.mint-token.outputs.api-token }} run: | + poetry config pypi-token.pypi ${{ steps.mint-token.outputs.api-token }} mkdir dist cp lava* dist/. poetry publish From 682edb920fe9bf055839894315ce54b7991c49ad Mon Sep 17 00:00:00 2001 From: Marcus G K Williams Date: Tue, 14 Nov 2023 12:57:43 -0800 Subject: [PATCH 4/7] Release 0.9.0 Signed-off-by: Marcus G K Williams --- .gitmodules | 3 --- README.md | 6 +++--- docs | 1 - pyproject.toml | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) delete mode 160000 docs diff --git a/.gitmodules b/.gitmodules index e11057ce3..e69de29bb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "docs"] - path = docs - url = https://github.com/lava-nc/lava-docs.git diff --git a/README.md b/README.md index 9c4003a75..a6fd40b32 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ cd $HOME curl -sSL https://install.python-poetry.org | python3 - git clone git@github.com:lava-nc/lava.git cd lava -git checkout v0.8.0 +git checkout v0.9.0 ./utils/githook/install-hook.sh poetry config virtualenvs.in-project true poetry install @@ -90,7 +90,7 @@ pytest cd $HOME git clone git@github.com:lava-nc/lava.git cd lava -git checkout v0.8.0 +git checkout v0.9.0 python3 -m venv .venv .venv\Scripts\activate pip install -U pip @@ -186,7 +186,7 @@ python -m venv .venv source .venv/bin/activate ## Or Windows: .venv\Scripts\activate pip install -U pip # Substitute lava version needed for lava-nc-.tar.gz below -pip install lava-nc-0.8.0.tar.gz +pip install lava-nc-0.9.0.tar.gz ``` ## Linting, testing, documentation and packaging diff --git a/docs b/docs deleted file mode 160000 index 9492b5759..000000000 --- a/docs +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9492b5759b081812e1d111e6f188d2ee2bba94d4 diff --git a/pyproject.toml b/pyproject.toml index 9d5f7f851..536b83b0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ packages = [ {include = "tests"} ] include = ["tutorials"] -version = "0.8.0.dev0" +version = "0.9.0" readme = "README.md" description = "A Software Framework for Neuromorphic Computing" homepage = "https://lava-nc.org/" From 5e2fa3dd2559d7b9996f434291aea9e912ee436a Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Tue, 14 Nov 2023 14:33:26 -0800 Subject: [PATCH 5/7] Change how poetry uploads to pypi in cd.yml (#810) --- .github/workflows/cd.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index c778bf1dd..747e0beaf 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -138,7 +138,7 @@ jobs: api-token: ${{ steps.mint-token.outputs.api-token}} permissions: contents: write - id-token: write + id-token: write needs: [build-artifacts, test-artifact-install, test-artifact-use] steps: @@ -204,7 +204,6 @@ jobs: - name: Publish to PyPI if: steps.check-version.outputs.prerelease != 'true' run: | - poetry config pypi-token.pypi ${{ steps.mint-token.outputs.api-token }} mkdir dist cp lava* dist/. - poetry publish + poetry publish -u __token__ -p '${{ steps.mint-token.outputs.api-token }}' From 5ed88f790106e8c508f7dcc44200d05eebcf87e0 Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Tue, 14 Nov 2023 19:04:56 -0800 Subject: [PATCH 6/7] Further fix to pypi upload in cd.yml Set contents to write in publish release --- .github/workflows/cd.yml | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml index 747e0beaf..42c763c96 100644 --- a/.github/workflows/cd.yml +++ b/.github/workflows/cd.yml @@ -134,11 +134,12 @@ jobs: name: Upload release artifact runs-on: ubuntu-latest if: github.triggering_actor == 'mgkwill' || github.triggering_actor == 'PhilippPlank' || github.triggering_actor == 'tim-shea' - outputs: - api-token: ${{ steps.mint-token.outputs.api-token}} + environment: + name: pypi + url: https://pypi.org/p/lava-nc/ permissions: - contents: write - id-token: write + id-token: write + contents: write needs: [build-artifacts, test-artifact-install, test-artifact-use] steps: @@ -184,26 +185,12 @@ jobs: generateReleaseNotes: true makeLatest: true - - name: Mint Github API token - id: mint-token - run: | - # retrieve OIDC token - resp=$(curl -H "Authorization: bearer $ACTIONS_ID_TOKEN_REQUEST_TOKEN" \ - "$ACTIONS_ID_TOKEN_REQUEST_URL&audience=pypi") - oidc_token=$(jq '.value' <<< "${resp}") - - # exchange OIDC token for API token - resp=$(curl -X POST https://pypi.org/_/oidc/github/mint-token -d "{\"token\": \"${oidc_token}\"}") - api_token=$(jq '.token' <<< "${resp}") - - # mask the API token, to prevent leaking it - echo "::add-mask::${api_token}" - - echo "api-token=${api_token}" >> "${GITHUB_OUTPUT}" - - name: Publish to PyPI if: steps.check-version.outputs.prerelease != 'true' run: | mkdir dist cp lava* dist/. - poetry publish -u __token__ -p '${{ steps.mint-token.outputs.api-token }}' + + - name: Publish package distributions to PyPI + if: steps.check-version.outputs.prerelease != 'true' + uses: pypa/gh-action-pypi-publish@release/v1 From 06110649c1d528bf61b6dd2266e34100a813e3fd Mon Sep 17 00:00:00 2001 From: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Date: Tue, 14 Nov 2023 21:41:59 -0800 Subject: [PATCH 7/7] Set version back to dev in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 536b83b0e..1dc5b8fc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ packages = [ {include = "tests"} ] include = ["tutorials"] -version = "0.9.0" +version = "0.9.0.dev0" readme = "README.md" description = "A Software Framework for Neuromorphic Computing" homepage = "https://lava-nc.org/"