Coverage for torxtools/locktools.py: 0%
26 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-03 22:01 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-03 22:01 +0000
1import base64
2import functools
3import tempfile
5from filelock import FileLock, Timeout
7__all__ = []
10def critical_section(fn=None, *, name=None, timeout=0, callback=None):
11 def _critical_section(fn):
12 @functools.wraps(fn)
13 def wrapper(*args, **kwargs):
14 # Generate a name and use it as a filename
15 if name:
16 qualname = name
17 else:
18 qualname = f"{fn.__module__}.{fn.__qualname__}"
19 qualname = base64.b64encode(qualname.encode("UTF-8")).decode("UTF-8")
21 filename = tempfile.gettempdir() + "/filelock-" + qualname + ".lock"
23 lock = FileLock(filename)
24 try:
25 with lock.acquire(timeout=timeout):
26 return fn(*args, **kwargs)
27 except Timeout:
28 if callback:
29 return callback()
31 raise PermissionError("Function is already in use and is locked") from None
33 return wrapper
35 if fn:
36 return _critical_section(fn)
38 return _critical_section