""" Backend resources are wrapped as context managers and can be used with
Example:
>>> with SomeResource() as resource:
... # Do something with resource
... pass
.. Context Manager:
https://docs.python.org/3/glossary.html#term-context-manager
Handling dynamic context managers can be done with
:class:`contextlib.contextlib.ExitStack`. This functionality accessed in the
:mod:`being.resources`.
"""
import contextlib
import sys
import time
import warnings
from typing import List, Generator, Set, Union, Optional
from logging import Logger
try:
import pyaudio
except ImportError:
pyaudio = None
warnings.warn('PyAudio is not installed!')
import numpy as np
import can
import canopen
from canopen.pdo.base import Map
from being.can.cia_402 import CiA402Node
from being.can.nmt import PRE_OPERATIONAL, OPERATIONAL, RESET_COMMUNICATION
from being.configuration import CONFIG
from being.logging import get_logger
from being.math import linear_mapping
from being.rpi_gpio import GPIO
from being.utils import SingleInstanceCache, filter_by_type
# Look before you leap
_DEFAULT_CAN_BITRATE = CONFIG['Can']['DEFAULT_CAN_BITRATE']
_INTERVAL = CONFIG['General']['INTERVAL']
# Default system dependent CAN bus parameters
if sys.platform.startswith('darwin'):
_DEFAULT_BUS_TYPE = 'pcan'
_DEFAULT_CHANNEL = 'PCAN_USBBUS1'
else:
_DEFAULT_BUS_TYPE = 'socketcan'
_DEFAULT_CHANNEL = 'can0'
_CAN_SYNC_MSG: can.Message = can.Message(
is_extended_id=False,
arbitration_id=0x80,
data=[],
is_remote_frame=False,
)
"""Ready to send CAN SYNC message."""
[docs]class CanBackend(canopen.Network, SingleInstanceCache, contextlib.AbstractContextManager):
"""CANopen network wrapper.
Holds the actual CAN bus. Connects to the CAN interface during `__enter__`,
shutdown drives and disconnects from interface during `__exit__` phase.
Controls the global NMT state and PDO. RPDO maps from the nodes have to
register them self with the network. Transmitting all RPDOs can than be done
with :meth:`CanBackend.transmit_all_rpdos`.
"""
def __init__(self,
bitrate: int =_DEFAULT_CAN_BITRATE,
bustype: str =_DEFAULT_BUS_TYPE,
channel: str =_DEFAULT_CHANNEL,
):
"""
Args:
bitrate (optional): Bitrate of CAN bus.
bustype (optional): CAN bus type. Default value is system dependent.
channel (optional): CAN bus channel. Default value is system dependent.
"""
super().__init__(bus=None)
self.bitrate: str = bitrate
"""CAN bus bit rate."""
self.bustype: str = bustype
"""CAN bus type."""
self.channel: str = channel
"""CAN bus channel."""
self.logger: Logger = get_logger('CanBackend')
"""Dedicated logger."""
self.rpdos: Set[Map] = set()
"""All registered RPDO maps."""
@property
def drives(self) -> Generator[CiA402Node, None, None]:
"""Iterate over all known drive nodes.
Returns:
Drives generator.
"""
return filter_by_type(self.values(), CiA402Node)
[docs] def turn_off_motors(self):
"""Turn off all registered drives."""
for node in self.drives:
try:
node.disable()
except TimeoutError as err:
self.logger.exception(err)
[docs] def enable_pdo_communication(self):
"""Enable PDO communication by setting NMT state to OPERATIONAL."""
self.logger.debug('Global NMT -> OPERATIONAL')
self.nmt.state = OPERATIONAL
[docs] def disable_pdo_communication(self):
"""Disable PDO communication by setting NMT state to PRE-OPERATIONAL."""
self.logger.debug('Global NMT -> PRE-OPERATIONAL')
self.nmt.state = PRE_OPERATIONAL
[docs] def reset_communication(self):
"""Reset NMT communication."""
self.nmt.state = RESET_COMMUNICATION
[docs] def send_sync(self):
"""Send SYNC message over CAN network."""
# self.send_message(0x80, []) # send_message() has a lock inside
self.bus.send(_CAN_SYNC_MSG)
[docs] def register_rpdo(self, rx: Map):
"""Register RPDO map to be transmitted repeatedly by sender thread."""
self.rpdos.add(rx)
[docs] def transmit_all_rpdos(self):
"""Transmit all current values of all registered RPDO maps."""
for rx in self.rpdos:
#self.pdo_node.network.send_message(rx.cob_id, rx.data) # Lock inside
msg = can.Message(
is_extended_id=rx.cob_id > 0x7FF,
arbitration_id=rx.cob_id,
data=rx.data,
is_remote_frame=False,
)
self.bus.send(msg)
[docs] def scan_for_node_ids(self) -> List[int]:
"""Scan for node ids which are online.
Returns:
Ascending list of all detected CAN ids.
"""
self.scanner.search()
time.sleep(0.1)
return sorted(self.scanner.nodes)
def __enter__(self):
self.connect(bitrate=self.bitrate, bustype=self.bustype, channel=self.channel)
self.check()
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
self.disable_pdo_communication()
self.turn_off_motors()
finally:
self.disconnect()
[docs]class SpectralFlux:
"""Spectral flux filter."""
def __init__(self, bufferSize):
self.bufferSize = bufferSize
freqs = np.fft.rfftfreq(bufferSize)
self.prevMag = np.ones(len(freqs))
def __call__(self, samples):
transformed = np.fft.rfft(samples)
mag = np.abs(transformed)
delta = mag - self.prevMag
self.prevMag = mag
# Half-wave rectification
rectified = (delta + np.abs(delta)) / 2
# Normalized spectral flux
return np.sum(rectified) / self.bufferSize**1.5
[docs]class AudioBackend(SingleInstanceCache, contextlib.AbstractContextManager):
"""Sound card connection. Collect audio samples with PortAudio / PyAudio."""
def __init__(self,
input_device_index: Optional[int] = None,
frames_per_buffer: int = 1024,
dtype: Union[str, type, np.dtype] = np.uint8,
):
"""
Args:
input_device_index: Input device index for given host api.
Unspecified (or None) uses default input device.
frames_per_buffer: Audio buffer size.
dtype: Data type for samples. Not all data types are supported for
audio. uint8, int16, int32 and float32 should works.
"""
self.dtype = np.dtype(dtype)
# Prepare sample normalization
if np.issubdtype(self.dtype, np.integer):
iinfo = np.iinfo(self.dtype)
xRange = (iinfo.min, iinfo.max)
else:
xRange = (-1.0, 1.0) # Float samples are already normalized
self.scale, self.offset = linear_mapping(xRange, yRange=(-1.0, 1.0))
# Input device
self.pa: pyaudio.PyAudio = pyaudio.PyAudio()
if input_device_index is None:
device = self.pa.get_default_input_device_info()
else:
device = self.pa.get_device_info_by_index(input_device_index)
self.deviceName = device['name']
self.stream = self.pa.open(
rate=int(device['defaultSampleRate']),
channels=1,
format=pyaudio_format(self.dtype),
input=True,
output=False,
input_device_index=input_device_index,
frames_per_buffer=frames_per_buffer,
stream_callback=self.stream_callback,
)
self.flux = SpectralFlux(frames_per_buffer)
self.microphones = []
[docs] def stream_callback(self, in_data, frame_count, time_info, status):
"""pyaudio audio stream callback function."""
samples = np.frombuffer(in_data, dtype=self.dtype)
normalized = self.scale * samples + self.offset
sf = self.flux(normalized)
for mic in self.microphones:
mic.new_spectral_flux_value(sf)
return (None, pyaudio.paContinue)
[docs] def subscribe_microphone(self, mic):
"""Subscribe Mic block to audio backend."""
self.microphones.append(mic)
def __exit__(self, exc_type, exc_value, traceback):
self.stream.stop_stream()
self.stream.close()
self.pa.terminate()
[docs]class Rpi(SingleInstanceCache, contextlib.AbstractContextManager):
"""Raspberry Pi GPIO backend. This only works running on a Raspberry Pi. On
non Raspberry Pi system a dummy backend will be loaded (see
:mod:`being.rpi_gpio`).
"""
def __init__(self):
self.logger = get_logger(str(self))
self.gpio = GPIO
def __enter__(self):
GPIO.setmode(GPIO.BCM)
def __exit__(self, exc_type, exc_value, traceback):
GPIO.cleanup()