File size: 1,419 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# mypy: allow-untyped-defs
import os
import time
class FileBaton:
"""A primitive, file-based synchronization utility."""
def __init__(self, lock_file_path, wait_seconds=0.1):
"""
Create a new :class:`FileBaton`.
Args:
lock_file_path: The path to the file used for locking.
wait_seconds: The seconds to periodically sleep (spin) when
calling ``wait()``.
"""
self.lock_file_path = lock_file_path
self.wait_seconds = wait_seconds
self.fd = None
def try_acquire(self):
"""
Try to atomically create a file under exclusive access.
Returns:
True if the file could be created, else False.
"""
try:
self.fd = os.open(self.lock_file_path, os.O_CREAT | os.O_EXCL)
return True
except FileExistsError:
return False
def wait(self):
"""
Periodically sleeps for a certain amount until the baton is released.
The amount of time slept depends on the ``wait_seconds`` parameter
passed to the constructor.
"""
while os.path.exists(self.lock_file_path):
time.sleep(self.wait_seconds)
def release(self):
"""Release the baton and removes its file."""
if self.fd is not None:
os.close(self.fd)
os.remove(self.lock_file_path)
|