Source code for tradingmate.utils.task_thread

# Source: http://code.activestate.com/recipes/65222-run-a-task-every-few-seconds/

import threading
import time


[docs]class TaskThread(threading.Thread): """Thread that executes a task every N seconds""" _timeout: threading.Event _finished: threading.Event _enabled: threading.Event _interval: float = 1 _singleRun: bool = False _start_delay: float = 0 def __init__(self) -> None: threading.Thread.__init__(self) self.setDaemon(True) self._timeout = threading.Event() self._finished = threading.Event() self._enabled = threading.Event() self._enabled.set() self._interval = 1 # default value self._singleRun = False self._start_delay = 0
[docs] def setInterval(self, interval: float) -> None: """Set the number of seconds we sleep between executing our task""" self._interval = interval
[docs] def shutdown(self) -> None: """Stop this thread""" self._timeout.set() self._enabled.set() self._finished.set()
[docs] def enable(self, enabled: bool) -> None: """Disable/enable this thread""" self._enabled.set() if enabled else self._enabled.clear()
def is_enabled(self) -> bool: return self._enabled.is_set()
[docs] def cancel_timeout(self): """Cancel the timeout and run the task""" self._timeout.set()
def force_single_run(self) -> None: self._singleRun = True self.enable(True) self.cancel_timeout()
[docs] def run(self) -> None: time.sleep(self._start_delay) while 1: # reset timeout self._timeout.clear() # check shutdown flag if self._finished.is_set(): return # sleep until enabled or return immediatly self._enabled.wait() # perform task self.task() # Check if it was a single run if self._singleRun: self.enable(False) self._singleRun = False # sleep for interval or until shutdown self._timeout.wait(self._interval)
[docs] def task(self) -> None: """The task done by this thread - override in subclasses""" raise NotImplementedError( "TaskThread: task function not overridden by children class!" )
def start_delayed(self, delay_s: float) -> None: self._start_delay = delay_s self.start()