Skip to content

Commit

Permalink
Transaction-level lock support (#13)
Browse files Browse the repository at this point in the history
* Added acquire/release methods for manual control

* Add transaction-level advisory lock support

* Bump version and add release notes

* Fix docs
  • Loading branch information
wesleykendall authored Aug 25, 2024
1 parent fa50a81 commit 195e390
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 36 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 1.6.0 (2024-08-24)

#### Features

- Support transaction-level locks by [@wesleykendall](https://github.com/wesleykendall) in [#13](https://github.com/Opus10/django-pglock/pull/13).

Use `pglock.advisory(xact=True)` for transaction-level advisory locks. Both context manager and functional invocations are supported.

#### Changes

- Django 5.1 support, drop Django 3.2 support by [@wesleykendall](https://github.com/wesleykendall) in [#12](https://github.com/Opus10/django-pglock/pull/12).

## 1.5.1 (2024-04-06)

#### Trivial
Expand Down
40 changes: 37 additions & 3 deletions docs/advisory.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def my_exclusive_function():

When creating an advisory lock, remember that the lock ID is a global name across the entire database. Be sure to choose meaningful names, ideally with namespaces, when serializing code with [pglock.advisory][].

!!! warning

[Session-based locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) are used by default and released when the context manager exits or the database connection is terminated. If connections are pooled (e.g., [pgbouncer](https://www.pgbouncer.org)) and code is killed without raising exceptions (e.g., out-of-memory errors), locks will be held until the connection is terminated. See [transaction-level locks](#transaction) for an alternative.

## Configuring Lock Wait Time

By default, [pglock.advisory][] will wait forever until the lock can be acquired. Use the `timeout` argument to change this behavior. For example, `timeout=0` will avoid waiting for the lock:
Expand Down Expand Up @@ -51,7 +55,8 @@ The `side_effect` argument adjusts runtime characteristics when using a timeout.

```python
with pglock.advisory(timeout=0, side_effect=pglock.Raise):
# A django.db.utils.OperationalError will be thrown if the lock cannot be acquired.
# A django.db.utils.OperationalError will be thrown if the lock
# cannot be acquired.
```

!!! note
Expand All @@ -63,9 +68,38 @@ Use `side_effect=pglock.Skip` to skip the function entirely if the lock cannot b
```python
@pglock.advisory(timeout=0, side_effect=pglock.Skip)
def one_function_at_a_time():
# This function runs once at a time. If this function runs anywhere else, it will be skipped.
# This function runs once at a time. If this function runs anywhere
# else, it will be skipped.
```

## Shared Locks

Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information.
Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information.

<a id="transaction"></a>

## Transaction-Level Locks

Use `pglock.advisory(xact=True)` to create a transaction-level advisory lock, which are released at the end of a transaction.

When using the decorator or context manager, a transaction will be opened. A `RuntimeError` will be raised if a transaction is already open.

Use the functional interface to acquire a lock if already in a transaction:

```python
import pglock
from django.db import transaction

with transaction.atomic():
...
acquired = pglock.advisory("lock_id", xact=True).acquire()
...

# The lock is released at the end of the transaction.
```

Remember that once acquired, a transaction-level lock cannot be manually released. It will only be released when the transaction is over.

!!! danger

The functional interface is only intended for transaction-level locks. Use the context manager or decorator for other use cases.
95 changes: 64 additions & 31 deletions pglock/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,17 @@ class advisory(contextlib.ContextDecorator):
When using the default side effect, returns `True` if the lock was acquired or `False` if not.
Consult the
`Postgres docs <https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS>`__
for more information on shared and transactional locks.
Args:
lock_id (Union[str, int], default=None): The ID of the lock. When
using the decorator, it defaults to the full module path and
function name of the wrapped function. It must be supplied to
the context manager.
shared (bool, default=False): When `True`, creates a shared
advisory lock. Consult the
`Postgres docs <https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS>`__
for more information.
the context manager or function calls.
shared (bool, default=False): When `True`, creates a shared lock.
xact (bool, default=False): When `True`, creates a transactional-level lock.
using (str, default="default"): The database to use.
timeout (Union[int, float, datetime.timedelta, None]): Set a timeout when waiting
for the lock. This timeout only applies to the lock acquisition statement and not the
Expand Down Expand Up @@ -223,6 +225,7 @@ def __init__(
lock_id=None,
*,
shared=False,
xact=False,
using=DEFAULT_DB_ALIAS,
timeout=_unset,
side_effect=None,
Expand All @@ -232,6 +235,7 @@ def __init__(
self.using = using
self.side_effect = side_effect
self.shared = shared
self.xact = xact
self.timeout = _cast_timeout(timeout)

# Use pg_try_advisory.. when a timeout of 0 has been applied.
Expand All @@ -244,21 +248,16 @@ def __init__(
def int_lock_id(self):
return _cast_lock_id(self.lock_id)

@property
def acquire(self):
return f'pg{"_try" if self.nowait else ""}_advisory_lock{"_shared" if self.shared else ""}'

@property
def release(self):
return f'pg_advisory_unlock{"_shared" if self.shared else ""}'
def in_transaction(self) -> bool:
return connections[self.using].in_atomic_block

def __call__(self, func):
self._func = func

@functools.wraps(func)
def inner(*args, **kwargs):
with self._recreate_cm():
if self.acquired or self.side_effect != Skip:
if self._acquired or self.side_effect != Skip:
return func(*args, **kwargs)

return inner
Expand Down Expand Up @@ -300,50 +299,84 @@ def _process_runtime_parameters(self):
" Use it as a context manager instead."
)

def __enter__(self):
def acquire(self) -> bool:
self._process_runtime_parameters()

sql = f"SELECT {self.acquire}({self.int_lock_id})"
if self.xact and not self.in_transaction():
raise RuntimeError("Must be in a transaction to use xact=True.")

acquire_sql = (
f'pg{"_try" if self.nowait else ""}_advisory'
f'{"_xact" if self.xact else ""}_lock'
f'{"_shared" if self.shared else ""}'
)
sql = f"SELECT {acquire_sql}({self.int_lock_id})"

with connections[self.using].cursor() as cursor:
try:
with contextlib.ExitStack() as stack:
if self.timeout is not _unset and not self.nowait:
stack.enter_context(lock_timeout(self.timeout, using=self.using))

if self.side_effect != Raise and connections[self.using].in_atomic_block:
if self.side_effect != Raise and self.in_transaction():
# If returning True/False, create a savepoint so that
# the transaction isn't in an errored state when returning.
stack.enter_context(transaction.atomic(using=self.using))

cursor.execute(sql)
self.acquired = cursor.fetchone()[0] if self.nowait else True
acquired = cursor.fetchone()[0] if self.nowait else True
except OperationalError:
# This block only happens when the lock times out
if self.side_effect != Raise:
self.acquired = False
acquired = False
else:
raise

if not self.acquired and self.side_effect == Raise:
raise OperationalError(f'Could not acquire lock "{self.lock_id}"')
if not acquired and self.side_effect == Raise:
raise OperationalError(f'Could not acquire lock "{self.lock_id}"')

return acquired

def release(self) -> None:
if self.xact:
raise RuntimeError("Advisory locks with xact=True cannot be manually released.")

with connections[self.using].cursor() as cursor:
release_sql = f'pg_advisory_unlock{"_shared" if self.shared else ""}'
cursor.execute(f"SELECT {release_sql}({self.int_lock_id})")

def __enter__(self):
self._transaction_ctx = contextlib.ExitStack()
if self.xact:
if self.in_transaction():
raise RuntimeError(
"Advisory locks with xact=True cannot run inside a transaction."
" Use the functional interface, i.e. pglock.advisory(...).acquire()"
)

# Transactional locks always create a durable transaction
self._transaction_ctx.enter_context(transaction.atomic(using=self.using, durable=True))

self._transaction_ctx.__enter__()

self._acquired = self.acquire()

self.stack = contextlib.ExitStack()
if self.acquired and connections[self.using].in_atomic_block:
# Create a savepoint so that we can successfully release
# the lock if the transaction errors
self.stack.enter_context(transaction.atomic(using=self.using))
self._savepoint_ctx = contextlib.ExitStack()
if self._acquired and not self.xact and self.in_transaction():
# Create a savepoint so that we can successfully release
# the lock if the transaction errors
self._savepoint_ctx.enter_context(transaction.atomic(using=self.using))

self.stack.__enter__()
self._savepoint_ctx.__enter__()

return self.acquired
return self._acquired

def __exit__(self, exc_type, exc_value, traceback):
self.stack.__exit__(exc_type, exc_value, traceback)
self._savepoint_ctx.__exit__(exc_type, exc_value, traceback)
self._transaction_ctx.__exit__(exc_type, exc_value, traceback)

if self.acquired:
with connections[self.using].cursor() as cursor:
cursor.execute(f"SELECT {self.release}({self.int_lock_id})")
if self._acquired and not self.xact:
self.release()


def model(
Expand Down
64 changes: 63 additions & 1 deletion pglock/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def assert_lock_acquired():


@pytest.mark.django_db(transaction=True)
def test_advisory_transaction(reraise):
def test_advisory_inside_transaction(reraise):
"""Test errored transaction behavior for advisory locks"""
barrier = threading.Barrier(2)
rand_val = str(random.random())
Expand Down Expand Up @@ -284,6 +284,68 @@ def hold_lock_and_error():
acquired.join()


@pytest.mark.django_db(transaction=True)
def test_advisory_xact(reraise):
"""Test basic transactional advisory lock behavior"""
barrier = threading.Barrier(2)
rand_val = str(random.random())

@reraise.wrap
def assert_lock_not_acquired():
barrier.wait(timeout=5)

with pglock.advisory(rand_val, xact=True, timeout=0) as acquired:
assert not acquired

barrier.wait(timeout=5)
barrier.wait(timeout=5)

with transaction.atomic():
assert not pglock.advisory(rand_val, xact=True, timeout=0).acquire()

barrier.wait(timeout=5)

@reraise.wrap
def assert_lock_acquired():
with pglock.advisory(rand_val, xact=True) as acquired:
assert acquired
barrier.wait(timeout=5)
barrier.wait(timeout=5)

with transaction.atomic():
assert pglock.advisory(rand_val, xact=True).acquire()
barrier.wait(timeout=5)
barrier.wait(timeout=5)

not_acquired = threading.Thread(target=assert_lock_not_acquired)
acquired = threading.Thread(target=assert_lock_acquired)
not_acquired.start()
acquired.start()
not_acquired.join()
acquired.join()


@pytest.mark.django_db(transaction=True)
def test_advisory_xact_usage():
"""Test basic error handling scenarios with pglock.advisory(xact=True)."""
lock_id = str(random.random())

with pytest.raises(RuntimeError, match="Must be in a transaction"):
pglock.advisory(lock_id, xact=True).acquire()

with transaction.atomic():
with pytest.raises(RuntimeError, match="cannot run inside a transaction"):
with pglock.advisory(lock_id, xact=True):
pass

with transaction.atomic():
lock = pglock.advisory(lock_id, xact=True)
lock.acquire()

with pytest.raises(RuntimeError, match="cannot be manually released."):
lock.release()


def test_advsiory_id():
assert pglock.advisory_id(9223372036854775807) == (2147483647, 4294967295)
assert pglock.advisory_id("hello") == (245608543, 3125670444)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ packages = [
exclude = [
"*/tests/"
]
version = "1.5.1"
version = "1.6.0"
description = "Postgres locking routines and lock table access."
authors = ["Wes Kendall"]
classifiers = [
Expand Down

0 comments on commit 195e390

Please sign in to comment.