"""Async wrapper around :class:`SoftReadWriteLock` for use with ``asyncio``."""
from __future__ import annotations
import asyncio
import functools
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from ._sync import SoftReadWriteLock
if TYPE_CHECKING:
import os
from collections.abc import AsyncGenerator, Callable
from concurrent import futures
from types import TracebackType
[docs]
class AsyncAcquireSoftReadWriteReturnProxy:
"""Async context-aware object that releases an :class:`AsyncSoftReadWriteLock` on exit."""
def __init__(self, lock: AsyncSoftReadWriteLock) -> None:
self.lock = lock
async def __aenter__(self) -> AsyncSoftReadWriteLock:
return self.lock
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
await self.lock.release()
[docs]
class AsyncSoftReadWriteLock:
"""
Async wrapper around :class:`SoftReadWriteLock` for ``asyncio`` applications.
The sync class's blocking filesystem operations run on a thread pool via ``loop.run_in_executor()``.
Reentrancy, upgrade/downgrade rules, fork handling, heartbeat and TTL stale detection, and singleton
behavior are delegated to the underlying :class:`SoftReadWriteLock`.
:param lock_file: path to the lock file; sidecar state/write/readers live next to it
:param timeout: maximum wait time in seconds; ``-1`` means block indefinitely
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately on contention
:param is_singleton: if ``True``, reuse existing :class:`SoftReadWriteLock` instances per resolved path
:param heartbeat_interval: seconds between heartbeat refreshes; default 30 s
:param stale_threshold: seconds of mtime inactivity before a marker is stale; defaults to ``3 * heartbeat_interval``
:param poll_interval: seconds between acquire retries under contention; default 0.25 s
:param loop: event loop for ``run_in_executor``; ``None`` uses the running loop
:param executor: executor for ``run_in_executor``; ``None`` uses the default executor
.. versionadded:: 3.27.0
"""
def __init__( # noqa: PLR0913
self,
lock_file: str | os.PathLike[str],
timeout: float = -1,
*,
blocking: bool = True,
is_singleton: bool = True,
heartbeat_interval: float = 30.0,
stale_threshold: float | None = None,
poll_interval: float = 0.25,
loop: asyncio.AbstractEventLoop | None = None,
executor: futures.Executor | None = None,
) -> None:
self._lock = SoftReadWriteLock(
lock_file,
timeout,
blocking=blocking,
is_singleton=is_singleton,
heartbeat_interval=heartbeat_interval,
stale_threshold=stale_threshold,
poll_interval=poll_interval,
)
self._loop = loop
self._executor = executor
@property
def lock_file(self) -> str:
""":returns: the path to the lock file passed to the constructor."""
return self._lock.lock_file
@property
def timeout(self) -> float:
""":returns: the default timeout applied when ``acquire_read`` / ``acquire_write`` is called without one."""
return self._lock.timeout
@property
def blocking(self) -> bool:
""":returns: whether ``acquire_*`` defaults to blocking; ``False`` makes contention raise immediately."""
return self._lock.blocking
@property
def loop(self) -> asyncio.AbstractEventLoop | None:
""":returns: the event loop used for ``run_in_executor``, or ``None`` for the running loop."""
return self._loop
@property
def executor(self) -> futures.Executor | None:
""":returns: the executor used for ``run_in_executor``, or ``None`` for the default executor."""
return self._executor
[docs]
async def acquire_read(
self, timeout: float | None = None, *, blocking: bool | None = None
) -> AsyncAcquireSoftReadWriteReturnProxy:
"""
Acquire a shared read lock.
See :meth:`SoftReadWriteLock.acquire_read` for the full reentrancy / upgrade / fork semantics. The blocking
work runs inside ``run_in_executor`` so other coroutines on the same loop continue to progress while this
call waits.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
:returns: a proxy usable as an async context manager to release the lock
:raises RuntimeError: if a write lock is already held, if this instance was invalidated by
:func:`os.fork`, or if :meth:`close` was called
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self._run(self._lock.acquire_read, timeout, blocking=blocking)
return AsyncAcquireSoftReadWriteReturnProxy(lock=self)
[docs]
async def acquire_write(
self, timeout: float | None = None, *, blocking: bool | None = None
) -> AsyncAcquireSoftReadWriteReturnProxy:
"""
Acquire an exclusive write lock.
See :meth:`SoftReadWriteLock.acquire_write` for the two-phase writer-preferring semantics. The blocking
work runs inside ``run_in_executor``.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
:returns: a proxy usable as an async context manager to release the lock
:raises RuntimeError: if a read lock is already held, if a write lock is held by a different thread, if
this instance was invalidated by :func:`os.fork`, or if :meth:`close` was called
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self._run(self._lock.acquire_write, timeout, blocking=blocking)
return AsyncAcquireSoftReadWriteReturnProxy(lock=self)
[docs]
async def release(self, *, force: bool = False) -> None:
"""
Release one level of the current lock.
:param force: if ``True``, release the lock completely regardless of the current lock level
:raises RuntimeError: if no lock is currently held and *force* is ``False``
"""
await self._run(self._lock.release, force=force)
[docs]
@asynccontextmanager
async def read_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
"""
Async context manager that acquires and releases a shared read lock.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
:raises RuntimeError: if a write lock is already held on this instance
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self.acquire_read(timeout, blocking=blocking)
try:
yield
finally:
await self.release()
[docs]
@asynccontextmanager
async def write_lock(self, timeout: float | None = None, *, blocking: bool | None = None) -> AsyncGenerator[None]:
"""
Async context manager that acquires and releases an exclusive write lock.
:param timeout: maximum wait time in seconds, or ``None`` to use the instance default
:param blocking: if ``False``, raise :class:`~filelock.Timeout` immediately; ``None`` uses the instance default
:raises RuntimeError: if a read lock is already held, or a write lock is held by a different thread
:raises Timeout: if the lock cannot be acquired within *timeout* seconds
"""
await self.acquire_write(timeout, blocking=blocking)
try:
yield
finally:
await self.release()
[docs]
async def close(self) -> None:
"""Release any held lock and release the underlying filesystem resources. Idempotent."""
await self._run(self._lock.close)
async def _run(self, func: Callable[..., object], *args: object, **kwargs: object) -> object:
loop = self._loop or asyncio.get_running_loop()
return await loop.run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
__all__ = [
"AsyncAcquireSoftReadWriteReturnProxy",
"AsyncSoftReadWriteLock",
]