Source code for being.pacemaker

"""Pacemaker thread."""
import contextlib
import threading
from typing import Any

from being.backends import CanBackend
from being.configuration import CONFIG
from being.logging import get_logger


INTERVAL = CONFIG['General']['INTERVAL']


[docs]class Once: """Value changed detector.""" def __init__(self, initial: Any): """ Args: initial: Initial value. """ self.prev = initial
[docs] def changed(self, value: Any) -> bool: """Check if value changed since last call. Args: value: Value to check. Returns: True if value changed since last call. False otherwise. """ if value == self.prev: return False self.prev = value return True
[docs]class Pacemaker(contextlib.AbstractContextManager): """Pacemaker / watchdog / dead man's switch daemon thread. Can step in to trigger SYNC messages and PDO transmission if main thread is not on time. In order to prevent RPDO timeouts. Note: Does not start by default (:meth:`Pacemaker.start`). Can be used as dummy if unstarted. """ def __init__(self, network: CanBackend, maxWait: float = 1.2 * INTERVAL): """ Args: network: CanBackend network instance to trigger PDO transmits / SYNC messages. maxWait: Maximum wait duration before stepping in. Some portion larger than global :attr:`INTERVAL`. """ self.network = network self.maxWait = maxWait self.logger = get_logger('Pacemaker') self.pulseEvent = threading.Event() self.running = False self.thread = None self.once = Once(initial=True)
[docs] def tick(self): """Push the dead man's switch.""" self.pulseEvent.set()
def _run(self): while self.running: if self.pulseEvent.wait(timeout=self.maxWait): if self.once.changed(True): self.logger.debug('Off') else: self.network.transmit_all_rpdos() self.network.send_sync() if self.once.changed(False): self.logger.debug('On') self.pulseEvent.clear()
[docs] def start(self): """Start watchdog daemon thread.""" if self.running: raise RuntimeError('Watchdog thread already running!') self.logger.info('Starting watchdog thread') self.running = True self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start()
[docs] def stop(self): """Stop watchdog daemon thread.""" if not self.running: raise RuntimeError('No watchdog thread running!') self.logger.info('Stopping watchdog thread') self.running = False self.tick() self.thread.join()
def __enter__(self): #self.start() return self def __exit__(self, exc_type, exc_value, traceback): self.stop()