Source code for being.behavior
"""Simple behavior three state behavior engine.
A behavior processes sensor inputs and outputs motion commands to motion players
to trigger motion playback.
"""
import enum
import itertools
import os
import random
import warnings
from typing import Optional, ForwardRef
from being.block import Block, output_neighbors
from being.clock import Clock
from being.constants import INF
from being.content import Content
from being.logging import get_logger
from being.motion_player import MotionPlayer, MotionCommand
from being.pubsub import PubSub
from being.serialization import register_enum, loads, dumps
from being.utils import read_file, write_file, filter_by_type
[docs]class State(enum.Enum):
"""Behavior states."""
STATE_I = 0
STATE_II = 1
STATE_III = 2
register_enum(State)
# For comforts / de-clutter
STATE_I = State.STATE_I
STATE_II = State.STATE_II
STATE_III = State.STATE_III
BEHAVIOR_CHANGED: str = 'BEHAVIOR_CHANGED'
"""PubSub event string literal. Triggered when something inside behavior has
changed (behavior state change but also if another motion gets played).
"""
Behavior = ForwardRef('Behavior')
[docs]def create_params(attentionSpan: float = 10., motions: Optional[list] = None) -> dict:
"""Create default behavior params dictionary.
Args:
attentionSpan (optional): Initial attention span duration (default is 10
seconds).
motions (optional): Initial motion lists. List of motion names for each
behavior state. Lookup happens via index...
Returns:
Behavior params dictionary.
"""
if motions is None:
motions = [[] for _ in State]
return {
'attentionSpan': attentionSpan,
'motions': motions,
}
[docs]class Behavior(Block, PubSub):
"""Simple 3x state finite state machine behavior engine. Originally build
for the ÉCAL workshop in March 2021.
There are three states (:class:`being.behavior.State`):
1) STATE_I
2) STATE_II
3) STATE_III
Each state has its own repertoire of motions. The sensor input
(:class:`being.connectables.MessageInput`) triggers a state transition to
STATE_III and fires one single animation playback for STATE_III. When this
motion finishes the behavior transitions to STATE_II where it will stay for
at least ``params.attentionSpan`` many seconds and play motions for this
state. Afterwards it transitions back to STATE_I.
If provided with a filepath the current behavior params get stored / loaded
from a JSON file (inside current working directory).
Notes:
- By setting the ``attentionSpan`` to zero STATE_II can be skipped.
- Animations are chosen randomly from supplied animation from params.
- A new sensor trigger will always interrupt STATE_I / STATE_II and jump
immediately to STATE_III
"""
FREE_NUMBERS = itertools.count(1)
"""Behavior number counter for default behavior names."""
def __init__(self,
params: Optional[dict] = None,
clock: Optional[Clock] = None,
content: Optional[Content] = None,
name: Optional[str] = None,
):
"""
Args:
params (optional): Behavior params dictionary
clock (optional): Being clock (DI).
content (optional): Being content (DI).
name (optional): Block name.
"""
if params is None:
params = create_params()
if clock is None:
clock = Clock.single_instance_setdefault()
if content is None:
content = Content.single_instance_setdefault()
if name is None:
name = 'Behavior %d' % next(self.FREE_NUMBERS)
super().__init__(name=name)
PubSub.__init__(self, events=[BEHAVIOR_CHANGED])
self.add_message_input('sensorIn')
self.add_message_output('mcOut')
self._params = params
self.clock = clock
self.content = content
self.active: bool = True
"""If behavior engine is running."""
self.state: State = State.STATE_I
"""Current behavior state."""
self.lastChanged: float = -INF
"""Duration since last behavior state change."""
self.lastPlayed: str = ''
"""Name of the last played motion."""
self.playingUntil: float = -INF
"""Timestamp until current motion is playing."""
self.filepath: str = ''
"""Associated behavior config file for storing params."""
self.logger = get_logger(self.name)
[docs] @classmethod
def from_config(cls, filepath: str, *args, **kwargs) -> Behavior:
"""
Construct behavior instance with an associated parms JSON file.
Remembers ``filepath`` and save each change of params to disk.
Args:
filepath: JSON config file.
*args: Behavior variable length argument list.
**kwargs: Arbitrary Behavior keyword arguments.
Returns:
New Behavior instance.
"""
if os.path.exists(filepath):
params = loads(read_file(filepath))
else:
params = create_params()
self = cls(*args, **kwargs)
self.filepath = filepath
self.params = params
return self
@property
def params(self) -> dict:
"""Get current behavior params."""
return self._params
@params.setter
def params(self, params: dict):
"""Update behavior params. If ``filepath`` attribute is defined also
save it to disk.
"""
self._params = params
self._purge_params()
if self.filepath:
write_file(self.filepath, dumps(self._params, indent=4))
[docs] def associate(self, motionPlayer: MotionPlayer):
"""Associate behavior engine with motion player block (connect
bi-directional).
Args:
motionPlayer: To couple with behavior.
Warning:
Deprecated! Behavior runs now in "open loop" and keeps track how
long motions run for.
"""
msg = (
'Behavior.associate(motionPlayer) is deprecated. Just connect'
' motion player normally behavior.mcOut.connect(motionPlayer).'
)
warnings.warn(msg, DeprecationWarning, stacklevel=2)
[docs] def reset(self):
"""Reset behavior states and attributes. Jump back to STATE_I."""
self.active = True
self.state = State.STATE_I
self.lastChanged = 0.
self.lastPlayed = ''
self.playingUntil = 0.
[docs] def play(self):
"""Start behavior playback."""
self.active = True
self.publish(BEHAVIOR_CHANGED)
[docs] def pause(self):
"""Pause behavior playback."""
self.reset()
self.active = False
self.publish(BEHAVIOR_CHANGED)
outputNeighbors = output_neighbors(self)
for mp in filter_by_type(outputNeighbors, MotionPlayer):
mp.stop()
[docs] def sensor_triggered(self) -> bool:
"""Check if sensor got triggered. This will drain all input messages.
Returns:
True if we received `any` message at the sensor input.
"""
triggered = False
for _ in self.sensorIn.receive(): # Consume all trigger messages
triggered = True
return triggered
def _purge_params(self):
"""Check with content and remove all outdated motions from params."""
existing = self.content.list_curve_names()
for stateNr, names in enumerate(self._params['motions']):
self._params['motions'][stateNr] = [
name
for name in names
if name in existing
]
[docs] def motion_duration(self, name: str) -> float:
"""Get duration of motion."""
motion = self.content.load_curve(name)
return motion.end
[docs] def play_random_motion_for_current_state(self):
"""Pick a random motion name from motions and fire a non-looping motion
command.
"""
names = self._params['motions'][self.state.value]
if len(names) == 0:
return
name = random.choice(names)
self.lastPlayed = name
self.logger.info('Playing motion %r', name)
duration = self.motion_duration(name)
until = self.clock.now() + duration
self.playingUntil = until
mc = MotionCommand(name)
self.mcOut.send(mc)
self.publish(BEHAVIOR_CHANGED)
[docs] def change_state(self, newState: State):
"""Change behavior state.
Args:
newState: New behavior state to change to.
"""
if newState is self.state:
return
self.logger.info('Changed to state %s', newState.name)
self.state = newState
self.lastChanged = self.clock.now()
self.publish(BEHAVIOR_CHANGED)
[docs] def update(self):
triggered = self.sensor_triggered() # Consume trigger events also when not active
if not self.active:
return
now = self.clock.now()
playing = now <= self.playingUntil
passed = now - self.lastChanged
attentionLost = (passed >= self._params['attentionSpan'])
if self.state is STATE_I:
if triggered:
self.change_state(STATE_III)
self.play_random_motion_for_current_state()
elif self.state is STATE_II:
if triggered:
self.change_state(STATE_III)
self.play_random_motion_for_current_state()
elif attentionLost and not playing:
self.change_state(STATE_I)
elif self.state is STATE_III:
if not playing:
if triggered:
self.change_state(STATE_III)
elif self._params['attentionSpan'] > 0:
self.change_state(STATE_II)
else:
self.change_state(STATE_I)
if not playing:
self.play_random_motion_for_current_state()
[docs] def to_dict(self):
dct = super().to_dict()
dct['active'] = self.active
dct['lastPlayed'] = self.lastPlayed
dct['params'] = self._params
dct['state'] = self.state
return dct