Coverage for torxtools/locktools.py: 0%

26 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-03 22:01 +0000

1import base64 

2import functools 

3import tempfile 

4 

5from filelock import FileLock, Timeout 

6 

7__all__ = [] 

8 

9 

10def critical_section(fn=None, *, name=None, timeout=0, callback=None): 

11 def _critical_section(fn): 

12 @functools.wraps(fn) 

13 def wrapper(*args, **kwargs): 

14 # Generate a name and use it as a filename 

15 if name: 

16 qualname = name 

17 else: 

18 qualname = f"{fn.__module__}.{fn.__qualname__}" 

19 qualname = base64.b64encode(qualname.encode("UTF-8")).decode("UTF-8") 

20 

21 filename = tempfile.gettempdir() + "/filelock-" + qualname + ".lock" 

22 

23 lock = FileLock(filename) 

24 try: 

25 with lock.acquire(timeout=timeout): 

26 return fn(*args, **kwargs) 

27 except Timeout: 

28 if callback: 

29 return callback() 

30 

31 raise PermissionError("Function is already in use and is locked") from None 

32 

33 return wrapper 

34 

35 if fn: 

36 return _critical_section(fn) 

37 

38 return _critical_section