diff --git a/CHANGELOG.md b/CHANGELOG.md
index 294a4c4..afbfca1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,17 @@
# Changelog
+## 1.6.0 (2024-08-24)
+
+#### Features
+
+- Support transaction-level locks by [@wesleykendall](https://github.com/wesleykendall) in [#13](https://github.com/Opus10/django-pglock/pull/13).
+
+ Use `pglock.advisory(xact=True)` for transaction-level advisory locks. Both context manager and functional invocations are supported.
+
+#### Changes
+
+- Django 5.1 support, drop Django 3.2 support by [@wesleykendall](https://github.com/wesleykendall) in [#12](https://github.com/Opus10/django-pglock/pull/12).
+
## 1.5.1 (2024-04-06)
#### Trivial
diff --git a/docs/advisory.md b/docs/advisory.md
index a706c3a..a751dfe 100644
--- a/docs/advisory.md
+++ b/docs/advisory.md
@@ -22,6 +22,10 @@ def my_exclusive_function():
When creating an advisory lock, remember that the lock ID is a global name across the entire database. Be sure to choose meaningful names, ideally with namespaces, when serializing code with [pglock.advisory][].
+!!! warning
+
+ [Session-based locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) are used by default and released when the context manager exits or the database connection is terminated. If connections are pooled (e.g., [pgbouncer](https://www.pgbouncer.org)) and code is killed without raising exceptions (e.g., out-of-memory errors), locks will be held until the connection is terminated. See [transaction-level locks](#transaction) for an alternative.
+
## Configuring Lock Wait Time
By default, [pglock.advisory][] will wait forever until the lock can be acquired. Use the `timeout` argument to change this behavior. For example, `timeout=0` will avoid waiting for the lock:
@@ -51,7 +55,8 @@ The `side_effect` argument adjusts runtime characteristics when using a timeout.
```python
with pglock.advisory(timeout=0, side_effect=pglock.Raise):
- # A django.db.utils.OperationalError will be thrown if the lock cannot be acquired.
+ # A django.db.utils.OperationalError will be thrown if the lock
+ # cannot be acquired.
```
!!! note
@@ -63,9 +68,38 @@ Use `side_effect=pglock.Skip` to skip the function entirely if the lock cannot b
```python
@pglock.advisory(timeout=0, side_effect=pglock.Skip)
def one_function_at_a_time():
- # This function runs once at a time. If this function runs anywhere else, it will be skipped.
+ # This function runs once at a time. If this function runs anywhere
+ # else, it will be skipped.
```
## Shared Locks
-Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information.
\ No newline at end of file
+Advisory locks can be acquired in shared mode using `shared=True`. Shared locks do not conflict with other shared locks. They only conflict with other exclusive locks of the same lock ID. See the [Postgres docs](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE) for more information.
+
+
+
+## Transaction-Level Locks
+
+Use `pglock.advisory(xact=True)` to create a transaction-level advisory lock, which are released at the end of a transaction.
+
+When using the decorator or context manager, a transaction will be opened. A `RuntimeError` will be raised if a transaction is already open.
+
+Use the functional interface to acquire a lock if already in a transaction:
+
+```python
+import pglock
+from django.db import transaction
+
+with transaction.atomic():
+ ...
+ acquired = pglock.advisory("lock_id", xact=True).acquire()
+ ...
+
+# The lock is released at the end of the transaction.
+```
+
+Remember that once acquired, a transaction-level lock cannot be manually released. It will only be released when the transaction is over.
+
+!!! danger
+
+ The functional interface is only intended for transaction-level locks. Use the context manager or decorator for other use cases.
diff --git a/pglock/core.py b/pglock/core.py
index 36bf202..8a2417e 100644
--- a/pglock/core.py
+++ b/pglock/core.py
@@ -187,15 +187,17 @@ class advisory(contextlib.ContextDecorator):
When using the default side effect, returns `True` if the lock was acquired or `False` if not.
+ Consult the
+ `Postgres docs `__
+ for more information on shared and transactional locks.
+
Args:
lock_id (Union[str, int], default=None): The ID of the lock. When
using the decorator, it defaults to the full module path and
function name of the wrapped function. It must be supplied to
- the context manager.
- shared (bool, default=False): When `True`, creates a shared
- advisory lock. Consult the
- `Postgres docs `__
- for more information.
+ the context manager or function calls.
+ shared (bool, default=False): When `True`, creates a shared lock.
+ xact (bool, default=False): When `True`, creates a transactional-level lock.
using (str, default="default"): The database to use.
timeout (Union[int, float, datetime.timedelta, None]): Set a timeout when waiting
for the lock. This timeout only applies to the lock acquisition statement and not the
@@ -223,6 +225,7 @@ def __init__(
lock_id=None,
*,
shared=False,
+ xact=False,
using=DEFAULT_DB_ALIAS,
timeout=_unset,
side_effect=None,
@@ -232,6 +235,7 @@ def __init__(
self.using = using
self.side_effect = side_effect
self.shared = shared
+ self.xact = xact
self.timeout = _cast_timeout(timeout)
# Use pg_try_advisory.. when a timeout of 0 has been applied.
@@ -244,13 +248,8 @@ def __init__(
def int_lock_id(self):
return _cast_lock_id(self.lock_id)
- @property
- def acquire(self):
- return f'pg{"_try" if self.nowait else ""}_advisory_lock{"_shared" if self.shared else ""}'
-
- @property
- def release(self):
- return f'pg_advisory_unlock{"_shared" if self.shared else ""}'
+ def in_transaction(self) -> bool:
+ return connections[self.using].in_atomic_block
def __call__(self, func):
self._func = func
@@ -258,7 +257,7 @@ def __call__(self, func):
@functools.wraps(func)
def inner(*args, **kwargs):
with self._recreate_cm():
- if self.acquired or self.side_effect != Skip:
+ if self._acquired or self.side_effect != Skip:
return func(*args, **kwargs)
return inner
@@ -300,10 +299,18 @@ def _process_runtime_parameters(self):
" Use it as a context manager instead."
)
- def __enter__(self):
+ def acquire(self) -> bool:
self._process_runtime_parameters()
- sql = f"SELECT {self.acquire}({self.int_lock_id})"
+ if self.xact and not self.in_transaction():
+ raise RuntimeError("Must be in a transaction to use xact=True.")
+
+ acquire_sql = (
+ f'pg{"_try" if self.nowait else ""}_advisory'
+ f'{"_xact" if self.xact else ""}_lock'
+ f'{"_shared" if self.shared else ""}'
+ )
+ sql = f"SELECT {acquire_sql}({self.int_lock_id})"
with connections[self.using].cursor() as cursor:
try:
@@ -311,39 +318,65 @@ def __enter__(self):
if self.timeout is not _unset and not self.nowait:
stack.enter_context(lock_timeout(self.timeout, using=self.using))
- if self.side_effect != Raise and connections[self.using].in_atomic_block:
+ if self.side_effect != Raise and self.in_transaction():
# If returning True/False, create a savepoint so that
# the transaction isn't in an errored state when returning.
stack.enter_context(transaction.atomic(using=self.using))
cursor.execute(sql)
- self.acquired = cursor.fetchone()[0] if self.nowait else True
+ acquired = cursor.fetchone()[0] if self.nowait else True
except OperationalError:
# This block only happens when the lock times out
if self.side_effect != Raise:
- self.acquired = False
+ acquired = False
else:
raise
- if not self.acquired and self.side_effect == Raise:
- raise OperationalError(f'Could not acquire lock "{self.lock_id}"')
+ if not acquired and self.side_effect == Raise:
+ raise OperationalError(f'Could not acquire lock "{self.lock_id}"')
+
+ return acquired
+
+ def release(self) -> None:
+ if self.xact:
+ raise RuntimeError("Advisory locks with xact=True cannot be manually released.")
+
+ with connections[self.using].cursor() as cursor:
+ release_sql = f'pg_advisory_unlock{"_shared" if self.shared else ""}'
+ cursor.execute(f"SELECT {release_sql}({self.int_lock_id})")
+
+ def __enter__(self):
+ self._transaction_ctx = contextlib.ExitStack()
+ if self.xact:
+ if self.in_transaction():
+ raise RuntimeError(
+ "Advisory locks with xact=True cannot run inside a transaction."
+ " Use the functional interface, i.e. pglock.advisory(...).acquire()"
+ )
+
+ # Transactional locks always create a durable transaction
+ self._transaction_ctx.enter_context(transaction.atomic(using=self.using, durable=True))
+
+ self._transaction_ctx.__enter__()
+
+ self._acquired = self.acquire()
- self.stack = contextlib.ExitStack()
- if self.acquired and connections[self.using].in_atomic_block:
- # Create a savepoint so that we can successfully release
- # the lock if the transaction errors
- self.stack.enter_context(transaction.atomic(using=self.using))
+ self._savepoint_ctx = contextlib.ExitStack()
+ if self._acquired and not self.xact and self.in_transaction():
+ # Create a savepoint so that we can successfully release
+ # the lock if the transaction errors
+ self._savepoint_ctx.enter_context(transaction.atomic(using=self.using))
- self.stack.__enter__()
+ self._savepoint_ctx.__enter__()
- return self.acquired
+ return self._acquired
def __exit__(self, exc_type, exc_value, traceback):
- self.stack.__exit__(exc_type, exc_value, traceback)
+ self._savepoint_ctx.__exit__(exc_type, exc_value, traceback)
+ self._transaction_ctx.__exit__(exc_type, exc_value, traceback)
- if self.acquired:
- with connections[self.using].cursor() as cursor:
- cursor.execute(f"SELECT {self.release}({self.int_lock_id})")
+ if self._acquired and not self.xact:
+ self.release()
def model(
diff --git a/pglock/tests/test_core.py b/pglock/tests/test_core.py
index 17bb4c5..da1413d 100644
--- a/pglock/tests/test_core.py
+++ b/pglock/tests/test_core.py
@@ -245,7 +245,7 @@ def assert_lock_acquired():
@pytest.mark.django_db(transaction=True)
-def test_advisory_transaction(reraise):
+def test_advisory_inside_transaction(reraise):
"""Test errored transaction behavior for advisory locks"""
barrier = threading.Barrier(2)
rand_val = str(random.random())
@@ -284,6 +284,68 @@ def hold_lock_and_error():
acquired.join()
+@pytest.mark.django_db(transaction=True)
+def test_advisory_xact(reraise):
+ """Test basic transactional advisory lock behavior"""
+ barrier = threading.Barrier(2)
+ rand_val = str(random.random())
+
+ @reraise.wrap
+ def assert_lock_not_acquired():
+ barrier.wait(timeout=5)
+
+ with pglock.advisory(rand_val, xact=True, timeout=0) as acquired:
+ assert not acquired
+
+ barrier.wait(timeout=5)
+ barrier.wait(timeout=5)
+
+ with transaction.atomic():
+ assert not pglock.advisory(rand_val, xact=True, timeout=0).acquire()
+
+ barrier.wait(timeout=5)
+
+ @reraise.wrap
+ def assert_lock_acquired():
+ with pglock.advisory(rand_val, xact=True) as acquired:
+ assert acquired
+ barrier.wait(timeout=5)
+ barrier.wait(timeout=5)
+
+ with transaction.atomic():
+ assert pglock.advisory(rand_val, xact=True).acquire()
+ barrier.wait(timeout=5)
+ barrier.wait(timeout=5)
+
+ not_acquired = threading.Thread(target=assert_lock_not_acquired)
+ acquired = threading.Thread(target=assert_lock_acquired)
+ not_acquired.start()
+ acquired.start()
+ not_acquired.join()
+ acquired.join()
+
+
+@pytest.mark.django_db(transaction=True)
+def test_advisory_xact_usage():
+ """Test basic error handling scenarios with pglock.advisory(xact=True)."""
+ lock_id = str(random.random())
+
+ with pytest.raises(RuntimeError, match="Must be in a transaction"):
+ pglock.advisory(lock_id, xact=True).acquire()
+
+ with transaction.atomic():
+ with pytest.raises(RuntimeError, match="cannot run inside a transaction"):
+ with pglock.advisory(lock_id, xact=True):
+ pass
+
+ with transaction.atomic():
+ lock = pglock.advisory(lock_id, xact=True)
+ lock.acquire()
+
+ with pytest.raises(RuntimeError, match="cannot be manually released."):
+ lock.release()
+
+
def test_advsiory_id():
assert pglock.advisory_id(9223372036854775807) == (2147483647, 4294967295)
assert pglock.advisory_id("hello") == (245608543, 3125670444)
diff --git a/pyproject.toml b/pyproject.toml
index 0ec76f8..7e3da87 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -27,7 +27,7 @@ packages = [
exclude = [
"*/tests/"
]
-version = "1.5.1"
+version = "1.6.0"
description = "Postgres locking routines and lock table access."
authors = ["Wes Kendall"]
classifiers = [