diff --git a/lockable/lockable.py b/lockable/lockable.py index 4b4f6f7..6fa8577 100644 --- a/lockable/lockable.py +++ b/lockable/lockable.py @@ -1,19 +1,22 @@ """ lockable library """ -import random +from contextlib import contextmanager +from datetime import datetime import json -import socket import os +import random +import socket import time import tempfile -from datetime import datetime -from contextlib import contextmanager + from pydash import filter_, merge from pid import PidFile, PidFileError -from lockable.provider_helpers import create as create_provider -from lockable.logger import get_logger + from lockable.allocation import Allocation +from lockable.logger import get_logger +from lockable.provider_helpers import create as create_provider MODULE_LOGGER = get_logger() +DEFAULT_TIMEOUT=1000 class ResourceNotFound(Exception): @@ -31,7 +34,8 @@ class Lockable: Base class for Lockable. It handle low-level functionality. """ - def __init__(self, hostname=socket.gethostname(), + def __init__(self, + hostname=socket.gethostname(), resource_list_file=None, resource_list=None, lock_folder=tempfile.gettempdir()): @@ -109,35 +113,69 @@ def _lock_some(self, requirements, candidates, timeout_s, retry_interval): """ Contextmanager that lock some candidate that is free and release it finally """ MODULE_LOGGER.debug('Total match local resources: %d, timeout: %d', len(candidates), timeout_s) + if not isinstance(requirements, list): + requirements = [requirements] abort_after = timeout_s start = time.time() + current_allocations = [] + fulfilled_requirement_indexes = [] while True: - for candidate in candidates: - try: - allocation = self._try_lock(requirements, candidate) - MODULE_LOGGER.debug('resource %s allocated (%s), alloc_id: (%s)', - allocation.resource_id, - json.dumps(allocation.resource_info), - allocation.alloc_id) - return allocation - except AssertionError: - pass + for index, req in enumerate(requirements): + if index in fulfilled_requirement_indexes: + continue + for candidate in candidates: + try: + allocation = self._try_lock(req, candidate) + MODULE_LOGGER.debug('resource %s allocated (%s), alloc_id: (%s)', + allocation.resource_id, + json.dumps(allocation.resource_info), + allocation.alloc_id) + self._allocations[allocation.resource_id] = allocation + current_allocations.append(allocation) + fulfilled_requirement_indexes.append(index) + break + except AssertionError: + pass + + # All resources allocated + if len(requirements) == len(current_allocations): + break + # Check if timeout occurs. No need to be high resolution timeout. # in first loop we should first check before giving up. delta = time.time() - start if delta >= abort_after: + # Unlock all already done allocations + # pylint: disable=expression-not-assigned + [allocation.unlock() for allocation in current_allocations] MODULE_LOGGER.warning('Allocation timeout') raise TimeoutError(f'Allocation timeout ({timeout_s}s)') MODULE_LOGGER.debug('trying to lock after short period') time.sleep(retry_interval) + return current_allocations + def _lock(self, requirements, timeout_s, retry_interval=1) -> Allocation: """ Lock resource """ local_resources = filter_(self.resource_list, requirements) random.shuffle(local_resources) ResourceNotFound.invariant(local_resources, "Suitable resource not available") + return self._lock_some(requirements, local_resources, timeout_s, retry_interval)[0] + + def _lock_many(self, requirements, timeout_s, retry_interval=1) -> [Allocation]: + """ Lock resource """ + local_resources = [] + for req in requirements: + resources = filter_(self.resource_list, req) + ResourceNotFound.invariant(resources, "Suitable resource not available") + local_resources += resources + # Unique resources by id + local_resources = list({v['id']:v for v in local_resources}.values()) + ResourceNotFound.invariant( + len(local_resources) >= len(requirements), "Suitable resource not available") + random.shuffle(local_resources) return self._lock_some(requirements, local_resources, timeout_s, retry_interval) @staticmethod @@ -146,7 +184,7 @@ def _get_requirements(requirements, hostname): MODULE_LOGGER.debug('hostname: %s', hostname) return merge(dict(hostname=hostname, online=True), requirements) - def lock(self, requirements: (str or dict), timeout_s: int = 1000) -> Allocation: + def lock(self, requirements: (str or dict), timeout_s: int = DEFAULT_TIMEOUT) -> Allocation: """ Lock resource :param requirements: resource requirements @@ -163,10 +201,31 @@ def lock(self, requirements: (str or dict), timeout_s: int = 1000) -> Allocation MODULE_LOGGER.debug("Requirements: %s", json.dumps(predicate)) MODULE_LOGGER.debug("Resource list: %s", json.dumps(self.resource_list)) allocation = self._lock(predicate, timeout_s) - self._allocations[allocation.resource_id] = allocation allocation.allocation_queue_time = datetime.now() - begin return allocation + def lock_many(self, requirements: list, timeout_s: int = DEFAULT_TIMEOUT) -> list: + """ + Lock many resources + :param requirements: resource requirements, list of string or dicts + :param timeout_s: max duration to try to lock + :return: List of allocation contexts + """ + assert isinstance(self.resource_list, list), "resources list is not loaded" + predicates = [] + for req in requirements: + predicates.append(self._get_requirements(self.parse_requirements(req), self._hostname)) + self._provider.reload() + begin = datetime.now() + MODULE_LOGGER.debug("Use lock folder: %s", self._lock_folder) + MODULE_LOGGER.debug("Requirements: %s", json.dumps(predicates)) + MODULE_LOGGER.debug("Resource list: %s", json.dumps(self.resource_list)) + + allocations = self._lock_many(predicates, timeout_s) + for allocation in allocations: + allocation.allocation_queue_time = datetime.now() - begin + return allocations + def unlock(self, allocation: Allocation) -> None: """ Method to release resource diff --git a/tests/test_Lockable.py b/tests/test_Lockable.py index 1fec4a5..154d848 100644 --- a/tests/test_Lockable.py +++ b/tests/test_Lockable.py @@ -1,4 +1,5 @@ import dataclasses +from datetime import timedelta import json import logging import mock @@ -200,3 +201,63 @@ def test_auto_lock(self): self.assertEqual(resource, resource_info) self.assertTrue(os.path.exists(lock_file)) self.assertFalse(os.path.exists(lock_file)) + + def test_lock_many(self): + with TemporaryDirectory() as tmpdirname: + resources = [ + {"id": "1", "hostname": "myhost", "online": True}, + {"id": "2", "hostname": "myhost", "online": True} + ] + lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, '1.pid'))) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, '2.pid'))) + allocations = lockable.lock_many(['id=1', 'id=2'], timeout_s=0) + self.assertTrue(allocations[0].allocation_queue_time < timedelta(seconds=1)) + self.assertTrue(allocations[1].allocation_queue_time < timedelta(seconds=1)) + self.assertTrue(os.path.exists(os.path.join(tmpdirname, '1.pid'))) + self.assertTrue(os.path.exists(os.path.join(tmpdirname, '2.pid'))) + lockable.unlock(allocations[0]) + lockable.unlock(allocations[1]) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, '1.pid'))) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, '2.pid'))) + + def test_lock_many_can_not_fulfill(self): + with TemporaryDirectory() as tmpdirname: + resources = [ + {"id": "a", "hostname": "myhost", "online": True}, + {"id": "b", "hostname": "myhost", "online": True}, + {"id": "c", "hostname": "myhost", "online": True}, + {"id": "d", "hostname": "myhost", "online": True}, + {"id": "e", "hostname": "myhost", "online": True} + ] + lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname) + with self.assertRaises(ResourceNotFound): + lockable.lock_many(['id=a', 'id=x'], timeout_s=0) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'a.pid'))) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'x.pid'))) + + with self.assertRaises(ResourceNotFound): + lockable.lock_many(['id=a', 'id=a'], timeout_s=0) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'a.pid'))) + + def test_lock_many_existing_allocation(self): + with TemporaryDirectory() as tmpdirname: + resources = [ + {"id": "a", "hostname": "myhost", "online": True}, + {"id": "b", "hostname": "myhost", "online": True}, + {"id": "c", "hostname": "myhost", "online": True}, + {"id": "d", "hostname": "myhost", "online": True}, + {"id": "e", "hostname": "myhost", "online": True} + ] + + lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname) + allocations = lockable.lock_many(['id=a'], timeout_s=0) + self.assertTrue(os.path.exists(os.path.join(tmpdirname, 'a.pid'))) + + start = time.time() + with self.assertRaises(TimeoutError): + lockable.lock_many(['id=a', 'id=b'], timeout_s=1) + end = time.time() + self.assertTrue(end - start < 2 and end - start > 1) + self.assertTrue(os.path.exists(os.path.join(tmpdirname, 'a.pid'))) + self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'b.pid')))