Skip to content

Commit

Permalink
Add Lockable.lock_many API (#34)
Browse files Browse the repository at this point in the history
* Add Lockable.lock_many API
* Add tests
  • Loading branch information
juhhov authored Nov 11, 2021
1 parent 23dc64a commit d4a39f7
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 19 deletions.
97 changes: 78 additions & 19 deletions lockable/lockable.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
""" lockable library """
import random
from contextlib import contextmanager
from datetime import datetime
import json
import socket
import os
import random
import socket
import time
import tempfile
from datetime import datetime
from contextlib import contextmanager

from pydash import filter_, merge
from pid import PidFile, PidFileError
from lockable.provider_helpers import create as create_provider
from lockable.logger import get_logger

from lockable.allocation import Allocation
from lockable.logger import get_logger
from lockable.provider_helpers import create as create_provider

MODULE_LOGGER = get_logger()
DEFAULT_TIMEOUT=1000


class ResourceNotFound(Exception):
Expand All @@ -31,7 +34,8 @@ class Lockable:
Base class for Lockable. It handle low-level functionality.
"""

def __init__(self, hostname=socket.gethostname(),
def __init__(self,
hostname=socket.gethostname(),
resource_list_file=None,
resource_list=None,
lock_folder=tempfile.gettempdir()):
Expand Down Expand Up @@ -109,35 +113,69 @@ def _lock_some(self, requirements, candidates, timeout_s, retry_interval):
""" Contextmanager that lock some candidate that is free and release it finally """
MODULE_LOGGER.debug('Total match local resources: %d, timeout: %d',
len(candidates), timeout_s)
if not isinstance(requirements, list):
requirements = [requirements]
abort_after = timeout_s
start = time.time()

current_allocations = []
fulfilled_requirement_indexes = []
while True:
for candidate in candidates:
try:
allocation = self._try_lock(requirements, candidate)
MODULE_LOGGER.debug('resource %s allocated (%s), alloc_id: (%s)',
allocation.resource_id,
json.dumps(allocation.resource_info),
allocation.alloc_id)
return allocation
except AssertionError:
pass
for index, req in enumerate(requirements):
if index in fulfilled_requirement_indexes:
continue
for candidate in candidates:
try:
allocation = self._try_lock(req, candidate)
MODULE_LOGGER.debug('resource %s allocated (%s), alloc_id: (%s)',
allocation.resource_id,
json.dumps(allocation.resource_info),
allocation.alloc_id)
self._allocations[allocation.resource_id] = allocation
current_allocations.append(allocation)
fulfilled_requirement_indexes.append(index)
break
except AssertionError:
pass

# All resources allocated
if len(requirements) == len(current_allocations):
break

# Check if timeout occurs. No need to be high resolution timeout.
# in first loop we should first check before giving up.
delta = time.time() - start
if delta >= abort_after:
# Unlock all already done allocations
# pylint: disable=expression-not-assigned
[allocation.unlock() for allocation in current_allocations]
MODULE_LOGGER.warning('Allocation timeout')
raise TimeoutError(f'Allocation timeout ({timeout_s}s)')

MODULE_LOGGER.debug('trying to lock after short period')
time.sleep(retry_interval)

return current_allocations

def _lock(self, requirements, timeout_s, retry_interval=1) -> Allocation:
""" Lock resource """
local_resources = filter_(self.resource_list, requirements)
random.shuffle(local_resources)
ResourceNotFound.invariant(local_resources, "Suitable resource not available")
return self._lock_some(requirements, local_resources, timeout_s, retry_interval)[0]

def _lock_many(self, requirements, timeout_s, retry_interval=1) -> [Allocation]:
""" Lock resource """
local_resources = []
for req in requirements:
resources = filter_(self.resource_list, req)
ResourceNotFound.invariant(resources, "Suitable resource not available")
local_resources += resources
# Unique resources by id
local_resources = list({v['id']:v for v in local_resources}.values())
ResourceNotFound.invariant(
len(local_resources) >= len(requirements), "Suitable resource not available")
random.shuffle(local_resources)
return self._lock_some(requirements, local_resources, timeout_s, retry_interval)

@staticmethod
Expand All @@ -146,7 +184,7 @@ def _get_requirements(requirements, hostname):
MODULE_LOGGER.debug('hostname: %s', hostname)
return merge(dict(hostname=hostname, online=True), requirements)

def lock(self, requirements: (str or dict), timeout_s: int = 1000) -> Allocation:
def lock(self, requirements: (str or dict), timeout_s: int = DEFAULT_TIMEOUT) -> Allocation:
"""
Lock resource
:param requirements: resource requirements
Expand All @@ -163,10 +201,31 @@ def lock(self, requirements: (str or dict), timeout_s: int = 1000) -> Allocation
MODULE_LOGGER.debug("Requirements: %s", json.dumps(predicate))
MODULE_LOGGER.debug("Resource list: %s", json.dumps(self.resource_list))
allocation = self._lock(predicate, timeout_s)
self._allocations[allocation.resource_id] = allocation
allocation.allocation_queue_time = datetime.now() - begin
return allocation

def lock_many(self, requirements: list, timeout_s: int = DEFAULT_TIMEOUT) -> list:
"""
Lock many resources
:param requirements: resource requirements, list of string or dicts
:param timeout_s: max duration to try to lock
:return: List of allocation contexts
"""
assert isinstance(self.resource_list, list), "resources list is not loaded"
predicates = []
for req in requirements:
predicates.append(self._get_requirements(self.parse_requirements(req), self._hostname))
self._provider.reload()
begin = datetime.now()
MODULE_LOGGER.debug("Use lock folder: %s", self._lock_folder)
MODULE_LOGGER.debug("Requirements: %s", json.dumps(predicates))
MODULE_LOGGER.debug("Resource list: %s", json.dumps(self.resource_list))

allocations = self._lock_many(predicates, timeout_s)
for allocation in allocations:
allocation.allocation_queue_time = datetime.now() - begin
return allocations

def unlock(self, allocation: Allocation) -> None:
"""
Method to release resource
Expand Down
61 changes: 61 additions & 0 deletions tests/test_Lockable.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataclasses
from datetime import timedelta
import json
import logging
import mock
Expand Down Expand Up @@ -200,3 +201,63 @@ def test_auto_lock(self):
self.assertEqual(resource, resource_info)
self.assertTrue(os.path.exists(lock_file))
self.assertFalse(os.path.exists(lock_file))

def test_lock_many(self):
with TemporaryDirectory() as tmpdirname:
resources = [
{"id": "1", "hostname": "myhost", "online": True},
{"id": "2", "hostname": "myhost", "online": True}
]
lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname)
self.assertFalse(os.path.exists(os.path.join(tmpdirname, '1.pid')))
self.assertFalse(os.path.exists(os.path.join(tmpdirname, '2.pid')))
allocations = lockable.lock_many(['id=1', 'id=2'], timeout_s=0)
self.assertTrue(allocations[0].allocation_queue_time < timedelta(seconds=1))
self.assertTrue(allocations[1].allocation_queue_time < timedelta(seconds=1))
self.assertTrue(os.path.exists(os.path.join(tmpdirname, '1.pid')))
self.assertTrue(os.path.exists(os.path.join(tmpdirname, '2.pid')))
lockable.unlock(allocations[0])
lockable.unlock(allocations[1])
self.assertFalse(os.path.exists(os.path.join(tmpdirname, '1.pid')))
self.assertFalse(os.path.exists(os.path.join(tmpdirname, '2.pid')))

def test_lock_many_can_not_fulfill(self):
with TemporaryDirectory() as tmpdirname:
resources = [
{"id": "a", "hostname": "myhost", "online": True},
{"id": "b", "hostname": "myhost", "online": True},
{"id": "c", "hostname": "myhost", "online": True},
{"id": "d", "hostname": "myhost", "online": True},
{"id": "e", "hostname": "myhost", "online": True}
]
lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname)
with self.assertRaises(ResourceNotFound):
lockable.lock_many(['id=a', 'id=x'], timeout_s=0)
self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'a.pid')))
self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'x.pid')))

with self.assertRaises(ResourceNotFound):
lockable.lock_many(['id=a', 'id=a'], timeout_s=0)
self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'a.pid')))

def test_lock_many_existing_allocation(self):
with TemporaryDirectory() as tmpdirname:
resources = [
{"id": "a", "hostname": "myhost", "online": True},
{"id": "b", "hostname": "myhost", "online": True},
{"id": "c", "hostname": "myhost", "online": True},
{"id": "d", "hostname": "myhost", "online": True},
{"id": "e", "hostname": "myhost", "online": True}
]

lockable = Lockable(hostname='myhost', resource_list=resources, lock_folder=tmpdirname)
allocations = lockable.lock_many(['id=a'], timeout_s=0)
self.assertTrue(os.path.exists(os.path.join(tmpdirname, 'a.pid')))

start = time.time()
with self.assertRaises(TimeoutError):
lockable.lock_many(['id=a', 'id=b'], timeout_s=1)
end = time.time()
self.assertTrue(end - start < 2 and end - start > 1)
self.assertTrue(os.path.exists(os.path.join(tmpdirname, 'a.pid')))
self.assertFalse(os.path.exists(os.path.join(tmpdirname, 'b.pid')))

0 comments on commit d4a39f7

Please sign in to comment.