Source code for being.motion_player

"""Curve / motion player block. Outputs motion samples to the motors.

.. digraph:: motionplayer
    :align: center
    :alt: Motion Player steering multiple motors
    :caption: Motion Player steering multiple motors
    :name: Motion Player steering multiple motors

    rankdir="LR"
    dummy [label="", shape=none, height=0, width=0]
    MP [shape=box, label="Motion Player"];
    A [shape=box, label="Motor 1"];
    B [shape=box, label="Motor 2"];
    C [shape=box, label="Motor 3"];

    dummy -> MP [label="Motion Command"]
    MP -> A [style=dashed, label="Target Position"]
    MP -> B [style=dashed]
    MP -> C [style=dashed]

Todo:
  - Changing playback speed on the fly. As separate input? Phasor?
  - Motion crossover
"""
import json
from typing import NamedTuple, Optional, List

import numpy as np
from scipy.interpolate import BPoly

from being.block import Block, output_neighbors
from being.clock import Clock
from being.connectables import ValueOutput
from being.content import Content
from being.curve import Curve
from being.logging import get_logger
from being.motors.blocks import MotorBlock
from being.utils import filter_by_type


[docs]def constant_spline(position: float = 0.0, duration: float = 1.0) -> BPoly: """Create a constant position spline which extrapolates indefinitely. Args: position: Target position value. duration: Duration of the constant motion. Returns: Constant spline. """ return BPoly(c=[[position]], x=[0., duration], extrapolate=True)
[docs]def constant_curve(positions: float = 0.0, duration: float = 1.0) -> Curve: """Create a constant curve with a single channel. Keeping a constant value indefinitely. Args: position: Target position value. duration: Duration of the constant motion. Returns: Constant curve. """ return Curve([ constant_spline(pos, duration) for pos in np.atleast_1d(positions) ])
[docs]class MotionCommand(NamedTuple): """Message to trigger motion curve playback.""" name: str """Name of the motion.""" loop: bool = False """Looping the motion indefinitely or not. """
[docs]class MotionPlayer(Block): """Motion curve sampler block. Receives motion commands on its message input, looks up motion curve from :class:`being.content.Content` and samples motion curve to position outputs. Supports looping playback. Note: :attr:`MotionPlayer.positionOutputs` attributes for the position outputs only. In order to distinguish them from other outputs in the future. """ def __init__(self, ndim: int = 1, clock: Optional[Clock] = None, content: Optional[Content] = None, **kwargs, ): """ Args: ndim (optional): Number of dimensions / motors / initial number of position outputs. Default is one. clock (optional): Clock instance (DI). content (optional): Content instance (DI). **kwargs: Arbitrary block keyword arguments. """ if clock is None: clock = Clock.single_instance_setdefault() if content is None: content = Content.single_instance_setdefault() super().__init__(**kwargs) self.add_message_input('mcIn') self.positionOutputs: List[ValueOutput] = [] """Position value outputs.""" for _ in range(ndim): self.add_position_output() self.clock = clock self.content = content self.logger = get_logger(str(self)) self.curve: Optional[Curve] = None """Currently playing motion curve.""" self.startTime: float = 0.0 """Start time of current motion curve.""" self.looping: bool = False """If current motion curve is looping.""" @property def playing(self) -> bool: """Playback in progress.""" return self.curve is not None @property def ndim(self) -> int: """Number of position outputs.""" return len(self.positionOutputs)
[docs] def add_position_output(self): """Add an additional position out to the motion player.""" newOutput = self.add_value_output() self.positionOutputs.append(newOutput)
[docs] def stop(self): """Stop spline playback.""" self.curve = None self.startTime = 0.0 self.looping = False
[docs] def play_curve(self, curve: Curve, loop: bool = False, offset: float = 0.0) -> float: """Play a curve directly. Args: curve: Curve to play. loop: Loop playback. offset: Start time offset. Returns: Scheduled start time. """ self.logger.info('Playing curve') self.curve = curve self.startTime = self.clock.now() - offset self.looping = loop return self.startTime
[docs] def process_mc(self, mc: MotionCommand) -> float: """Process new motion command and schedule next curve to play. Args: mc: Motion command. Returns: Scheduled start time. """ try: self.logger.info('Playing motion %r', mc.name) curve = self.content.load_curve(mc.name) except FileNotFoundError: self.logger.error('Motion %r does not exist!', mc.name) currentVals = [out.value for out in self.positionOutputs] curve = constant_curve(currentVals, duration=5.) except json.JSONDecodeError: self.logger.error('Could not decode %r!', mc.name) currentVals = [out.value for out in self.positionOutputs] curve = constant_curve(currentVals, duration=5.) if curve.n_channels != self.ndim: msg = ( f'Motion {mc.name} is not compatible with connected motors' f'({curve.n_channels} != {self.ndim})!' ) self.logger.error(msg) return self.play_curve(curve, mc.loop)
[docs] def update(self): for mc in self.input.receive(): self.process_mc(mc) if self.playing: now = self.clock.now() t = now - self.startTime if not self.looping and t >= self.curve.end: return self.stop() samples = self.curve.sample(t, loop=self.looping) for val, out in zip(samples, self.positionOutputs): out.value = val
[docs] def neighboring_motors(self): """Iterate over neighboring blocks at the position outputs. Yields: Motor blocks. """ for out in self.positionOutputs: input_ = next(iter(out.connectedInputs)) if input_.owner: yield input_.owner
[docs] def to_dict(self): dct = super().to_dict() dct['ndim'] = self.ndim neighbors = output_neighbors(self) dct['motors'] = list(filter_by_type(neighbors, MotorBlock)) return dct
def __str__(self): return '%s()' % type(self).__name__