Vraag Hoe een lock met een time-out in Python 2.7 te implementeren


Is er een manier om een ​​lock in Python te implementeren voor multithreading-doeleinden waarvan acquire methode kan een willekeurige time-out hebben? De enige werkende oplossingen die ik tot nu toe heb gevonden gebruiken polling, dat

  • Ik vind onelegant en inefficiënt
  • Bewaart de begrensde wacht / voortgangsgarantie van het slot niet als een oplossing voor het kritieke deelprobleem

Is er een betere manier om dit te implementeren?


20
2017-12-05 22:12


oorsprong


antwoorden:


om nader in te gaan op de opmerkingen van Steven:

import threading
import time

lock = threading.Lock()
cond = threading.Condition(threading.Lock())

def waitLock(timeout):
    with cond:
        current_time = start_time = time.time()
        while current_time < start_time + timeout:
            if lock.acquire(False):
                return True
            else:
                cond.wait(timeout - current_time + start_time)
                current_time = time.time()
    return False

Dingen om op te merken:

  • er zijn er twee threading.Lock() objecten, een is intern in de threading.Condition().
  • bij het manipuleren cond, het is vergrendeld; de wait() de bediening ontgrendelt het echter, zodat een willekeurig aantal threads het kan bekijken.
  • het wachten is ingebed in een for-lus die de tijd bijhoudt. threading.Condition kan worden aangemeld om andere redenen dan time-outs, dus u moet nog steeds de tijd bijhouden als u echt wilt dat deze verloopt.
  • zelfs met de conditie, 'polling' je nog steeds het echte slot, omdat het mogelijk is om meer dan één draad te wekken en voor het slot te racen. als de lock.acquire mislukt, keert de lus terug naar wachten.
  • bellers hiervan waitLock functie moet volgen a lock.release() met een cond.notify() zodat andere draden die erop wachten, op de hoogte worden gebracht dat ze opnieuw moeten proberen de vergrendeling aan te schaffen. Dit wordt niet in het voorbeeld getoond.

18
2017-12-05 22:50



Mijn versie met behulp van thread veilige wachtrijen http://docs.python.org/2/library/queue.html en hun put / get-methoden die time-out ondersteunen.

Tot nu toe werkt het prima, maar als iemand er een peer review op kan doen, zal ik je dankbaar zijn.

"""
Thread-safe lock mechanism with timeout support module.
"""

from threading import ThreadError, current_thread
from Queue import Queue, Full, Empty


class TimeoutLock(object):
    """
    Thread-safe lock mechanism with timeout support.
    """

    def __init__(self, mutex=True):
        """
        Constructor.
        Mutex parameter specifies if the lock should behave like a Mutex, and
        thus use the concept of thread ownership.
        """
        self._queue = Queue(maxsize=1)
        self._owner = None
        self._mutex = mutex

    def acquire(self, timeout=0):
        """
        Acquire the lock.
        Returns True if the lock was succesfully acquired, False otherwise.

        Timeout:
        - < 0 : Wait forever.
        -   0 : No wait.
        - > 0 : Wait x seconds.
        """
        th = current_thread()
        try:
            self._queue.put(
                th, block=(timeout != 0),
                timeout=(None if timeout < 0 else timeout)
            )
        except Full:
            return False

        self._owner = th
        return True

    def release(self):
        """
        Release the lock.
        If the lock is configured as a Mutex, only the owner thread can release
        the lock. If another thread attempts to release the lock a
        ThreadException is raised.
        """
        th = current_thread()
        if self._mutex and th != self._owner:
            raise ThreadError('This lock isn\'t owned by this thread.')

        self._owner = None
        try:
            self._queue.get(False)
            return True
        except Empty:
            raise ThreadError('This lock was released already.')

4
2018-01-04 00:26



Ik twijfel of dit mogelijk is.

Als je dit wilt uitvoeren zonder enige vorm van polling, dan moet het OS weten dat de thread is geblokkeerd en moet het besturingssysteem op de hoogte zijn van de time-out om de discussie na enige tijd te deblokkeren. Daarvoor moet in het OS al ondersteuning bestaan; je kunt dit niet op Python-niveau implementeren.

(U kunt de thread op OS- of app-niveau laten blokkeren en een mechanisme hebben waardoor het op het juiste moment door een andere thread kan worden gewekt, maar dan hebt u die andere thread nodig om effectief te kunnen pollen)

Over het algemeen hebt u echter geen echt begrensde wacht- / voortgangsgarantie voor het slot, omdat uw thread een onbeperkte tijd moet wachten voordat een contextswitch plaatsvindt om te merken dat deze is geblokkeerd. Dus tenzij je een bovengrens kunt stellen aan de hoeveelheid CPU-conflicten, kun je de time-out niet gebruiken om harde realtime deadlines te halen. Maar waarschijnlijk heb je dat niet nodig, anders zou je er niet van dromen sloten te gebruiken die in Python zijn geïmplementeerd.


Vanwege de Python GIL (Global Interpreter Lock) zijn die polling-gebaseerde oplossingen waarschijnlijk niet zo inefficiënt of zo grenzeloos als je denkt (afhankelijk van hoe ze zijn geïmplementeerd) (en ervan uitgaande dat je CPython of PyPy gebruikt) .

Er is slechts één thread tegelijk, en per definitie is er nog een thread die u wilt uitvoeren (degene die het slot bevat waarop u wacht). De GIL wordt een tijdje vastgehouden door een thread om een ​​aantal bytecodes uit te voeren, waarna deze wordt verwijderd en opnieuw wordt opgehaald om iemand anders een kans te geven. Dus als de thread met de time-out geblokkeerd is en zich overgeeft aan andere threads, wordt deze slechts af en toe wakker wanneer deze de GIL ontvangt en laat deze dan bijna onmiddellijk terugvallen naar iemand anders en blokkeert de GIL weer. Omdat deze thread alleen maar wakker kan worden als hij toch de beurt krijgt aan de GIL, zal hij deze controle ook doen zodra de time-out verloopt, omdat hij de uitvoering zou kunnen hervatten, zelfs als de time-out magisch perfect was.

De enige keer dat dit veel inefficiëntie veroorzaakt is als uw thread geblokkeerd is, wachtend op de draad die het slot vasthoudt, die geblokkeerd is in afwachting van iets dat niet veroorzaakt kan worden door een ander Python-thread (bijvoorbeeld geblokkeerd op IO), en daar zijn geen andere uitvoerbare Python-threads. Dan zal je polling-time-out echt gewoon blijven zitten en de tijd herhaaldelijk controleren, wat erg kan zijn als je verwacht dat deze situatie voor lange tijd zal plaatsvinden.


1
2017-12-05 22:43



ik nam SingleNegationElimination's antwoord en creëerde een klasse met kan worden gebruikt in een with-statement op de volgende manier:

global_lock = timeout_lock()
...

with timeout_lock(owner='task_name', lock=global_lock):
    do()
    some.stuff()

Op deze manier alleen waarschuwen als de time-out is verlopen (standaard = 1s) en de eigenaar van het slot voor onderzoek toont.

Gebruik het op deze manier en er zal een uitzondering worden gegenereerd na de time-out:

with timeout_lock(owner='task_name', lock=global_lock, raise_on_timeout=True):
    do()
    some.stuff()

De timeout_lock.lock() instance moet één keer worden gemaakt en kan worden gebruikt in verschillende threads.

Hier is de klas - het werkt voor mij, maar voel je vrij om te reageren en te verbeteren:

class timeout_lock:
    ''' taken from https://stackoverflow.com/a/8393033/1668622
    '''
    class lock:
        def __init__(self):
            self.owner = None
            self.lock = threading.Lock()
            self.cond = threading.Condition()

        def _release(self):
            self.owner = None
            self.lock.release()
            with self.cond:
                self.cond.notify()

    def __init__(self, owner, lock, timeout=1, raise_on_timeout=False):
        self._owner = owner
        self._lock = lock
        self._timeout = timeout
        self._raise_on_timeout = raise_on_timeout

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, tb):
        ''' will only be called if __enter__ did not raise '''
        self.release()

    def acquire(self):
        if self._raise_on_timeout:
            if not self._waitLock():
                raise RuntimeError('"%s" could not aquire lock within %d sec'
                                   % (self._owner, self._timeout))
        else:
            while True:
                if self._waitLock():
                    break
                print('"%s" is waiting for "%s" and is getting bored...'
                      % (self._owner, self._lock.owner))
        self._lock.owner = self._owner

    def release(self):
        self._lock._release()

    def _waitLock(self):
        with self._lock.cond:
            _current_t = _start_t = time.time()
            while _current_t < _start_t + self._timeout:
                if self._lock.lock.acquire(False):
                    return True
                else:
                    self._lock.cond.wait(self._timeout - _current_t + _start_t)
                    _current_t = time.time()
        return False

Om er zeker van te zijn dat de threads echt niet interfereren en niet wachten om zo snel mogelijk op de hoogte te worden gebracht, heb ik een kleine multithreading-test geschreven die de tijd zal verdelen die nodig is om alle threads uit te voeren:

def test_lock_guard():
    import random

    def locking_thread_fn(name, lock, duration, timeout):
        with timeout_lock(name, lock, timeout=timeout):
            print('%x: "%s" begins to work..' % (threading.get_ident(), name))
            time.sleep(duration)
            print('%x: "%s" finished' % (threading.get_ident(), name))

    _lock = timeout_lock.lock()

    _threads = []
    _total_d = 0
    for i in range(3):
        _d = random.random() * 3
        _to = random.random() * 2
        _threads.append(threading.Thread(
            target=locking_thread_fn, args=('thread%d' % i, _lock, _d, _to)))
        _total_d += _d

    _t = time.time()

    for t in _threads: t.start()
    for t in _threads: t.join()

    _t = time.time() - _t

    print('duration: %.2f sec / expected: %.2f (%.1f%%)'
          % (_t, _total_d, 100 / _total_d * _t))

Output is:

7f940fc2d700: "thread0" begins to work..
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
"thread2" is waiting for "thread0" and is getting bored...
7f940fc2d700: "thread0" finished
7f940f42c700: "thread1" begins to work..
"thread2" is waiting for "thread1" and is getting bored...
"thread2" is waiting for "thread1" and is getting bored...
7f940f42c700: "thread1" finished
"thread2" is waiting for "None" and is getting bored...
7f940ec2b700: "thread2" begins to work..
7f940ec2b700: "thread2" finished
duration: 5.20 sec / expected: 5.20 (100.1%)

0
2017-08-14 12:21