Source code for being.blocks

"""Collection of miscellaneous blocks."""
import math
import sys
import time
from typing import ForwardRef, Sequence

from being.backends import AudioBackend
from being.block import Block
from being.clock import Clock
from being.configuration import CONFIG
from being.constants import TAU
from being.math import linear_mapping
from being.resources import register_resource
from being.sensors import Sensor


# Look before you leap
INTERVAL = CONFIG['General']['INTERVAL']


[docs]class Sine(Block): """Sine generator. Outputs sine wave for a given frequency.""" def __init__(self, frequency: float = 1., startPhase: float = 0., **kwargs): """ Args: frequency: Initial frequency value. startPhase: Inital phase. """ super().__init__(**kwargs) self.phase = startPhase self.add_value_input('frequency') self.add_value_output() self.frequency.value = frequency
[docs] def update(self): self.output.value = math.sin(self.phase) self.phase += TAU * self.frequency.value * INTERVAL self.phase %= TAU
def __str__(self): return '%s()' % type(self).__name__
Trafo = ForwardRef('Trafo')
[docs]class Trafo(Block): """Transforms input signal (by some fixed scale and offset): .. math:: y = a \cdot x + b. """ def __init__(self, scale: float = 1., offset: float = 0., **kwargs): """ Args: scale: Scaling factor. offset: Offset factor. **kwargs: Arbitrary block keyword arguments. """ super().__init__(**kwargs) self.scale = scale self.offset = offset self.add_value_input() self.add_value_output()
[docs] @classmethod def from_ranges(cls, inRange: Sequence = (0., 1.), outRange: Sequence = (0., 1.)) -> Trafo: """Construct :class:Trafo. instance for a given input and output range. Args: inRange: Lower and upper value of input range. outRange: Lower and upper value of output range. Returns: Trafo instance. Example: >>> # This trafo block will map input values from [1, 2] -> [30, 40] >>> trafo = Trafo.from_ranges([1, 2], [30, 40]) """ scale, offset = linear_mapping(inRange, outRange) return cls(scale, offset)
[docs] def update(self): self.output.value = self.scale * self.input.value + self.offset
def __str__(self): return f'{type(self).__name__}(scale={self.scale:.3f}, offset={self.offset:.3f})'
[docs]class Mic(Block): """Microphone block. Warning: Make me! """ def __init__(self, audioBackend=None, **kwargs): """Intentionally left blank.""" if audioBackend is None: audioBackend = AudioBackend.single_instance_setdefault() register_resource(audioBackend, duplicates=False) self.audioBackend = audioBackend
[docs]class Printer(Block): """Print input values to stdout.""" def __init__(self, prefix: str = '', carriageReturn: bool = False, **kwargs): """ Args: prefix (optional): Prefix string to prepend. carriageReturn (optional). Prepend carriage return character to each output. """ super().__init__(**kwargs) self.prefix = prefix self.carriageReturn = carriageReturn self.add_value_input()
[docs] def update(self): if self.carriageReturn: print('\r\033c', self.prefix, self.input.value, end='') sys.stdout.flush() else: print(self.prefix, self.input.value)
[docs]def sine_pulse(phase: float) -> float: """Cosine pulse from [0., 1.].""" return .5 * (1 - math.cos(phase))
[docs]def ranged_sine_pulse(phase: float, lower: float = 0.0, upper: float = 1.0) -> float: """Shifted and scaled cosine pulse in interval [lower, upper].""" return (upper - lower) * sine_pulse(phase) + lower
[docs]def ranged_sine_pulse_integrated(phase: float, lower: float = 0.0, upper: float = 1.0) -> float: """Integrated shifted and scaled cosine pulse for derivative range in [lower, upper]. """ return .5 * (lower - upper) * math.sin(phase) + .5 * (lower + upper) * phase
[docs]class Pendulum(Block): """Outputs scaled / shifted sine pulses. Can be used to sway back and forth between two extremes. """ def __init__(self, frequency: float = 1., lower: float = 0., upper: float = 1., **kwargs): """ Args: frequency (optional): Frequency of output signal. lower (optional): Lower output value. upper (optional): Upper output value. **kwargs: Arbitrary block keyword arguments. """ super().__init__(**kwargs) self.add_value_output() self.frequency = frequency self.lower = lower self.upper = upper self.clock = Clock.single_instance_setdefault()
[docs] def update(self): phase = TAU * self.frequency * self.clock.now() self.output.value = ranged_sine_pulse(phase, self.lower, self.upper)