being package¶
PATHOS being core.
Subpackages¶
Submodules¶
being.awakening module¶
Awake being to life. Main entry point runner. Define a block network and run
them with the being.awakening.awake()
function.
Example
>>> from being.block import Block
... from being.awakening import awake
... block = Block()
... awake(block) # Will start web server and periodically call block.update()
The interval rate can be configured inside being.configuration
.
- awake(*blocks: Iterable[being.block.Block], web: bool = True, enableMotors: bool = True, homeMotors: bool = True, usePacemaker: bool = True, clock: Optional[being.clock.Clock] = None, network: Optional[being.backends.CanBackend] = None)[source]¶
Run being block network.
- Parameters
blocks – Some blocks of the network. Remaining blocks will be auto discovered.
web – Run with web server.
enableMotors – Enable motors on startup.
homeMotors – Home motors on startup.
usePacemaker – If to use an extra pacemaker thread.
clock – Clock instance.
network – CanBackend instance.
being.backends module¶
Backend resources are wrapped as context managers and can be used with
Example
>>> with SomeResource() as resource:
... # Do something with resource
... pass
Handling dynamic context managers can be done with
contextlib.contextlib.ExitStack
. This functionality accessed in the
being.resources
.
- class CanBackend(bitrate: int = 1000000, bustype: str = 'socketcan', channel: str = 'can0')[source]¶
Bases:
canopen.network.Network
,being.utils.SingleInstanceCache
,contextlib.AbstractContextManager
CANopen network wrapper.
Holds the actual CAN bus. Connects to the CAN interface during __enter__, shutdown drives and disconnects from interface during __exit__ phase. Controls the global NMT state and PDO. RPDO maps from the nodes have to register them self with the network. Transmitting all RPDOs can than be done with
CanBackend.transmit_all_rpdos()
.- Parameters
bitrate (optional) – Bitrate of CAN bus.
bustype (optional) – CAN bus type. Default value is system dependent.
channel (optional) – CAN bus channel. Default value is system dependent.
- logger: logging.Logger¶
Dedicated logger.
- rpdos: Set[canopen.pdo.base.Map]¶
All registered RPDO maps.
- property drives: Generator[being.can.cia_402.CiA402Node, None, None]¶
Iterate over all known drive nodes.
- Returns
Drives generator.
- disable_pdo_communication()[source]¶
Disable PDO communication by setting NMT state to PRE-OPERATIONAL.
- pyaudio_format(dtype: Union[str, numpy.dtype, type]) int [source]¶
Determine pyaudio format number for data type.
- Parameters
dtype – Datatype.
- Returns
Audio format number.
- class AudioBackend(input_device_index: typing.Optional[int] = None, frames_per_buffer: int = 1024, dtype: typing.Union[str, type, numpy.dtype] = <class 'numpy.uint8'>)[source]¶
Bases:
being.utils.SingleInstanceCache
,contextlib.AbstractContextManager
Sound card connection. Collect audio samples with PortAudio / PyAudio.
- Parameters
input_device_index – Input device index for given host api. Unspecified (or None) uses default input device.
frames_per_buffer – Audio buffer size.
dtype – Data type for samples. Not all data types are supported for audio. uint8, int16, int32 and float32 should works.
- class Rpi[source]¶
Bases:
being.utils.SingleInstanceCache
,contextlib.AbstractContextManager
Raspberry Pi GPIO backend. This only works running on a Raspberry Pi. On non Raspberry Pi system a dummy backend will be loaded (see
being.rpi_gpio
).
being.behavior module¶
Simple behavior three state behavior engine.
A behavior processes sensor inputs and outputs motion commands to motion players to trigger motion playback.
- class State(value)[source]¶
Bases:
enum.Enum
Behavior states.
- STATE_I = 0¶
- STATE_II = 1¶
- STATE_III = 2¶
- BEHAVIOR_CHANGED: str = 'BEHAVIOR_CHANGED'¶
PubSub event string literal. Triggered when something inside behavior has changed (behavior state change but also if another motion gets played).
- create_params(attentionSpan: float = 10.0, motions: Optional[list] = None) dict [source]¶
Create default behavior params dictionary.
- Parameters
attentionSpan (optional) – Initial attention span duration (default is 10 seconds).
motions (optional) – Initial motion lists. List of motion names for each behavior state. Lookup happens via index…
- Returns
Behavior params dictionary.
- class Behavior(params: Optional[dict] = None, clock: Optional[being.clock.Clock] = None, content: Optional[being.content.Content] = None, name: Optional[str] = None)[source]¶
Bases:
being.block.Block
,being.pubsub.PubSub
Simple 3x state finite state machine behavior engine. Originally build for the ÉCAL workshop in March 2021.
- There are three states (
being.behavior.State
): STATE_I
STATE_II
STATE_III
Each state has its own repertoire of motions. The sensor input (
being.connectables.MessageInput
) triggers a state transition to STATE_III and fires one single animation playback for STATE_III. When this motion finishes the behavior transitions to STATE_II where it will stay for at leastparams.attentionSpan
many seconds and play motions for this state. Afterwards it transitions back to STATE_I.If provided with a filepath the current behavior params get stored / loaded from a JSON file (inside current working directory).
Notes
By setting the
attentionSpan
to zero STATE_II can be skipped.Animations are chosen randomly from supplied animation from params.
A new sensor trigger will always interrupt STATE_I / STATE_II and jump immediately to STATE_III
- Parameters
params (optional) – Behavior params dictionary
clock (optional) – Being clock (DI).
content (optional) – Being content (DI).
name (optional) – Block name.
- FREE_NUMBERS = count(1)¶
Behavior number counter for default behavior names.
- classmethod from_config(filepath: str, *args, **kwargs) being.behavior.Behavior [source]¶
Construct behavior instance with an associated parms JSON file. Remembers
filepath
and save each change of params to disk.- Parameters
filepath – JSON config file.
*args – Behavior variable length argument list.
**kwargs – Arbitrary Behavior keyword arguments.
- Returns
New Behavior instance.
- associate(motionPlayer: being.motion_player.MotionPlayer)[source]¶
Associate behavior engine with motion player block (connect bi-directional).
- Parameters
motionPlayer – To couple with behavior.
Warning
Deprecated! Behavior runs now in “open loop” and keeps track how long motions run for.
- sensor_triggered() bool [source]¶
Check if sensor got triggered. This will drain all input messages.
- Returns
True if we received any message at the sensor input.
- play_random_motion_for_current_state()[source]¶
Pick a random motion name from motions and fire a non-looping motion command.
- change_state(newState: being.behavior.State)[source]¶
Change behavior state.
- Parameters
newState – New behavior state to change to.
- There are three states (
being.being module¶
Being application core object. Encapsulates the various blocks for a given program and defines the single cycle.
- value_outputs(blocks: Iterable[being.block.Block]) Iterator[being.connectables.ValueOutput] [source]¶
Collect all value outputs from blocks.
- Parameters
blocks – Blocks to traverse.
- Yields
All ValueOutputs.
- message_outputs(blocks: Iterable[being.block.Block]) Iterator[being.connectables.MessageOutput] [source]¶
Collect all message outputs from blocks.
- Parameters
blocks – Blocks to traverse.
- Yields
All MessageOutputs.
- class Being(blocks: List[being.block.Block], clock: being.clock.Clock, pacemaker: being.pacemaker.Pacemaker, network: Optional[being.backends.CanBackend] = None)[source]¶
Bases:
object
Being core.
Main application-like object. Container for being components. Block network graph and additional components (some back ends, clock, motors…). Defines the single cycle which be called repeatedly. Also some helper shortcut methods.
- Parameters
blocks – Blocks (forming a block network) to execute.
clock – Being clock instance.
pacemaker – Pacemaker instance. Thread will not be started but will be used as dummy.
network – CanBackend instance (if any, DI).
- clock: being.clock.Clock¶
Being clock.
- pacemaker: being.pacemaker.Pacemaker¶
Being pacemaker.
- network: being.backends.CanBackend¶
Being CAN backend / network.
- graph: being.graph.Graph¶
Block network for running program.
- execOrder: List[being.block.Block]¶
Block execution order.
- valueOutputs: List[being.connectables.ValueOutput]¶
All value outputs.
- messageOutputs: List[being.connectables.MessageOutput]¶
All message outputs.
- behaviors: List[being.behavior.Behavior]¶
All behavior blocks.
- motionPlayers: List[being.motion_player.MotionPlayer]¶
All motion player blocks.
- motors: List[being.motors.blocks.MotorBlock]¶
All motor blocks.
- params: List[being.params.Parameter]¶
All parameter blocks.
being.bitmagic module¶
Bit operation helpers. Since integers are immutable most of these do not work in-place but return a new value instead
- check_bit(value: int, bit: int) int [source]¶
Check n-th bit of value.
- Parameters
value – Number to check.
bit – Bit number.
- Returns
N-th bit value
Examples
>>> check_bit(0b1000, 0) 0
>>> bin(check_bit(0b1000, 3)) '0b1000'
- set_bit(value: int, bit: int) int [source]¶
Set n-th bit of value.
- Parameters
value – Number to set bit.
bit – Bit number.
- Returns
Value with set n-th bit.
Example
>>> bin(set_bit(0b11011, 2)) '0b11111'
- clear_bit(value: int, bit: int) int [source]¶
Clear n-th bit of value.
- Parameters
value – Number to clear bit.
bit – Bit number.
- Returns
Value with cleared n-th bit
Example
>>> clear_bit(0b1000, 3) 0
- toggle_bit(value: int, bit: int) int [source]¶
Toggle n-th bit of value.
- Parameters
value – Number to toggle bit.
bit – Bit number.
- Returns
Value with toggled n-th bit.
being.block module¶
Block base class and some block related helpers.
Todo
Should
input_connections()
,output_connections()
,input_neighbors()
andoutput_neighbors()
becomeBlock
methods?
- input_connections(block: being.block.Block) Generator[Tuple[being.connectables.OutputBase, being.connectables.InputBase], None, None] [source]¶
Iterate over all incoming connections.
- Parameters
block – Block to inspect.
- Yields
tuple – Src -> block input connections.
- output_connections(block: being.block.Block) Generator[Tuple[being.connectables.OutputBase, being.connectables.InputBase], None, None] [source]¶
Iterate over all outgoing connections.
- Parameters
block – Block to inspect.
- Yields
tuple – Block output -> dst.
- collect_connections(block: being.block.Block) Generator[Tuple[being.connectables.OutputBase, being.connectables.InputBase], None, None] [source]¶
Get all in- and outgoing connections of a block ((output, input) tuples). Exclude loop-around connections (block connected to itself).
- Parameters
block – Block to inspect.
- Yields
tuple – Output -> input connections
- input_neighbors(block: being.block.Block) Generator[Tuple[being.connectables.OutputBase, being.connectables.InputBase], None, None] [source]¶
Get input neighbors of block.
- Parameters
block – Block to inspect
- Yields
Block – ValueInput neighbors / source owners.
- output_neighbors(block: being.block.Block) Generator[Tuple[being.connectables.OutputBase, being.connectables.InputBase], None, None] [source]¶
Get output neighbors of block.
- Parameters
block (Block) – Block to inspect
- Yields
Block – ValueOutput neighbors / destination owner.
- fetch_input(blockOrInput: Union[being.block.Block, being.connectables.InputBase]) being.connectables.InputBase [source]¶
Fetch primary input.
- fetch_output(blockOrOutput: Union[being.block.Block, being.connectables.OutputBase]) being.connectables.OutputBase [source]¶
Fetch primary output.
- pipe_operator(left: Union[being.block.Block, being.connectables.OutputBase], right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block [source]¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- class Block(name: Optional[str] = None)[source]¶
Bases:
object
Blocks are the main building blocks (pun intended) of a being program. They hold there own state and can communicate with each other via value or message connections. Each block has an
Block.update()
method which will be called once during execution. This method should be overridden by child classes.- New connections can be added with the helper methods:
These methods also take an additional name argument which can be used to store the newly created connection as an attribute.
Example
>>> class MyBlock(Block): ... def __init__(self): ... self.add_message_output(name='mouth') ... ... def update(self): ... self.mouth.send('I am alive!')
Note
Not a ABC so that we can use the base class for testing.
- Parameters
name (optional) – Block name for UI. Block type name by default.
- __or__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block ¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- __ror__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block [source]¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- ID_COUNTER = count(0)¶
Counter used for id assignment.
- inputs: List[being.connectables.InputBase]¶
Input connections.
- outputs: List[being.connectables.OutputBase]¶
Output connections.
- property input: being.connectables.InputBase¶
Primary input.
- property output: being.connectables.OutputBase¶
Primary output.
- add_value_input(name: Optional[str] = None) being.connectables.ValueInput [source]¶
Add new value input to block.
- Parameters
name – Attribute name.
- Returns
Newly created value input.
- add_message_input(name: Optional[str] = None) being.connectables.MessageInput [source]¶
Add new message input to block.
- Parameters
name – Attribute name.
- Returns
Newly created message input.
- add_value_output(name: Optional[str] = None) being.connectables.ValueOutput [source]¶
Add new value output to block.
- Parameters
name – Attribute name.
- Returns
Newly created value output.
- add_message_output(name: Optional[str] = None) being.connectables.MessageOutput [source]¶
Add new message output to block.
- Parameters
name – Attribute name.
- Returns
Newly created message output.
- to_dict() collections.OrderedDict [source]¶
Convert block to dictionary representation which can be used for dumping as JSON.
- Returns
Block’s dictionary representation.
being.blocks module¶
Collection of miscellaneous blocks.
- class Sine(frequency: float = 1.0, startPhase: float = 0.0, **kwargs)[source]¶
Bases:
being.block.Block
Sine generator. Outputs sine wave for a given frequency.
- Parameters
frequency – Initial frequency value.
startPhase – Inital phase.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class Trafo(scale: float = 1.0, offset: float = 0.0, **kwargs)[source]¶
Bases:
being.block.Block
Transforms input signal (by some fixed scale and offset):
\[y = a \cdot x + b.\]- Parameters
scale – Scaling factor.
offset – Offset factor.
**kwargs – Arbitrary block keyword arguments.
- classmethod from_ranges(inRange: Sequence = (0.0, 1.0), outRange: Sequence = (0.0, 1.0)) being.blocks.Trafo [source]¶
Construct :class:Trafo. instance for a given input and output range.
- Parameters
inRange – Lower and upper value of input range.
outRange – Lower and upper value of output range.
- Returns
Trafo instance.
Example
>>> # This trafo block will map input values from [1, 2] -> [30, 40] >>> trafo = Trafo.from_ranges([1, 2], [30, 40])
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class Mic(audioBackend=None, **kwargs)[source]¶
Bases:
being.block.Block
Microphone block.
Warning
Make me!
Intentionally left blank.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class Printer(prefix: str = '', carriageReturn: bool = False, **kwargs)[source]¶
Bases:
being.block.Block
Print input values to stdout.
- Parameters
prefix (optional) – Prefix string to prepend.
carriageReturn (optional) –
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- ranged_sine_pulse(phase: float, lower: float = 0.0, upper: float = 1.0) float [source]¶
Shifted and scaled cosine pulse in interval [lower, upper].
- ranged_sine_pulse_integrated(phase: float, lower: float = 0.0, upper: float = 1.0) float [source]¶
Integrated shifted and scaled cosine pulse for derivative range in [lower, upper].
- class Pendulum(frequency: float = 1.0, lower: float = 0.0, upper: float = 1.0, **kwargs)[source]¶
Bases:
being.block.Block
Outputs scaled / shifted sine pulses. Can be used to sway back and forth between two extremes.
- Parameters
frequency (optional) – Frequency of output signal.
lower (optional) – Lower output value.
upper (optional) – Upper output value.
**kwargs – Arbitrary block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
being.choreo module¶
Helpers for converting choreo motion format to splines.
Choreos are ini files where each motion curve is defined under its own section. Section name corresponds to motor id. Example:
[7]
# format time[s]=pos,vel,acc,dec [all in SI]
0.175=0.0865, 0.1504, 0.3568, 0.3568
1.019=0.0329, 0.1037, 0.2006, 0.2006
2.053=0.0969, 0.1498, 0.3502, 0.3502
2.908=0.0270, 0.1048, 0.1569, 0.1569
[8]
# format time[s]=pos,vel,acc,dec [all in SI]
0.870=0.0904, 0.1277, 0.2495, 0.2495
1.894=0.0315, 0.0847, 0.1219, 0.1219
3.283=0.0806, 0.1286, 0.3372, 0.3372
These values correspond to target or set-point values and do not incorporate a
initial value. being.spline.optimal_trajectory_spline()
is used to
perform a kinematic simulation.
- Choreo¶
Ini choreo represented by
ConfigParser
instance.alias of
configparser.ConfigParser
- collect_segments_from_section(section) Generator[Segment, None, None] [source]¶
Collect motion segments from section.
- Parameters
section – ConfigParser section representing a single motion curve.
- Yields
Motion segments.
- collect_segments_from_choreo(choreo: Choreo) Generator[Generator[Segment, None, None], None, None] [source]¶
Collect motion segments for each motion channel from choreo.
- Parameters
intance (Choreo ConfigParser) –
- Yields
Segments generator for each motion channel.
- convert_segments_to_splines(segments, start=State(position=0.0, velocity=0.0, acceleration=0.0)) Generator[scipy.interpolate.interpolate.PPoly, None, None] [source]¶
Convert motion segments to multiple disjoint optimal trajectory splines. Each optimal trajectory splines represents a choreo segment (best effort). They can overlap but do not have to.
- Parameters
segments – Motion segments. Each row with [time, targetPosition, maxSpeed, maxAcceleration].
- Yields
Optimal trajectory spline for each segment.
- combine_splines_in_time(splines: Iterable[scipy.interpolate.interpolate.PPoly]) scipy.interpolate.interpolate.PPoly [source]¶
Combine multiple one dimensional splines in time. Takes care of overlap situation. Assumes that input splines can be extrapolated.
- Parameters
splines – Splines to merge into one single spline.
- Returns
Combined spline.
- combine_splines_in_dimensions(splines: Sequence[scipy.interpolate.interpolate.PPoly]) scipy.interpolate.interpolate.PPoly [source]¶
Pack / stack multiple single dimensional splines into one. Inserts missing knots if the single dimensional splines do not align up.
- Parameters
splines – PPoly splines to combine along dimension.
- Returns
New combined spline.
- convert_choreo_to_spline(choreo: Choreo) scipy.interpolate.interpolate.PPoly [source]¶
Convert choreo to spline. If there are multiple motion curves defined in choreo return multi dimensional spline.
- Parameters
choreo – Configparser for choreo file.
- Returns
Extracted simulated motion curves.
Todo
Make it possible to provide initial state of the kinematic simulation.
being.clock module¶
Being internal time giver. For a given main loop cycle the time should stay constant and not depend on external factors (kind of rendering time).
- class Clock(interval: float = 0.01)[source]¶
Bases:
being.utils.SingleInstanceCache
Clock giver. Returns current timestamp and needs to be ticked during the main loop.
Important
Needs to be ticked once (and only once) by the end of each cycle.
Note
Uses an
int
internally to count up. Less jitter due to floating-point precision. Pure Python integers are unbounded so overflow is not an issue (see Arbitrary-precision arithmetic).- Parameters
interval (optional) – Interval duration in seconds. Default is taken from
CONFIG
object frombeing.configuration
.
being.configs module¶
- Different config formats. Currently supported are:
JSON
INI
YAML
TOML
Round-trip preservation for preserving comments in config files (depending on
third-party libraries). Internally all config formats get mapped on a
being.utils.NestedDict
data structure which can be accessed through a
path-like syntax.
Example
>>> c = Config()
... c.store('this/is/it', 1234)
... print(c.data)
{'this': {'is': {'it': 1234}}}
- split_name(name: str) Tuple[str, str] [source]¶
Split name into (head, tail) where tail is everything after the finals separator (like splitting a filepath in the directory vs. filename part).
- Parameters
name – Name to split.
- Returns
head and tail tuple
Example
>>> split_name('this/is/it') ('this/is', 'it')
- guess_config_format(filepath: str) str [source]¶
Guess config format from file extension.
- Parameters
filepath – Path to guess from.
- Returns
Config format.
Example
>>> guess_config_format('this/is/it.json') 'json'
- class Config(data: Optional[dict] = None, configFormat: Optional[str] = None)[source]¶
Bases:
being.configs._ConfigImpl
,collections.abc.MutableMapping
Configuration object. Proxy for _ConfigImpl (depending on config format).
- Parameters
data (optional) – Initial data.
configFormat (optional) – Config format (if any).
- impl: _ConfigImpl¶
Private config implementation.
- property data: Dict¶
Underlying dict-like data container.
- store(name: str, value: Any)[source]¶
Store value to config.
- Parameters
name – Config path name to store value under.
- retrieve(name: str = '') Any [source]¶
Retrieve config value for a given name. This can also be an intermediate entry.
- Parameters
name (optional) – Config path name to retrieve value for. Root path by default.
- Returns
Config value.
- erase(name: str)[source]¶
Erase config value for a given name.
- Parameters
name – Config path name to erase.
- storedefault(name: str, default: Optional[Any] = None)[source]¶
Fetch config value while providing a default value if entry does not exist. Similar
dict.setdefault
.- Parameters
name – Config path name to store value under.
default – Default value to insert if config entry does not exist.
- Returns
Config value.
- loads(string: str)[source]¶
Load data from string to
being.configs._ConfigImpl
instance.- Parameters
string – String to parse.
- class ConfigFile(filepath: str)[source]¶
Bases:
being.configs.Config
Config instance which can be loaded / saved to a config file on disk.
- Parameters
filepath – Associated config file.
- impl: _ConfigImpl¶
Private config implementation.
- default_factory: Callable¶
Default factory for intermediate dictionaries.
being.configuration module¶
Being configuration and default values. Searches the current working
directory for a being.yaml
configuration file. If present default
configuration values get updated.
Todo
Rename CONFIG
to DEFAULT_CONFIG
? Should not overwrite defaults
but instead update a copy.
Notes
being.configuration.CONFIG
not abeing.configs.Config
instance because read-onlybeing.configuration.CONFIG
not abeing.utils.NestedDict
instance in order to catchKeyError
pre-runtime.
- CONFIG: Dict[str, Any] = {'Can': {'DEFAULT_CAN_BITRATE': 1000000}, 'General': {'CONTENT_DIRECTORY': 'content', 'INTERVAL': 0.01, 'PARAMETER_CONFIG_FILEPATH': 'being_params.yaml'}, 'Logging': {'DIRECTORY': None, 'FILENAME': 'being.log', 'LEVEL': 30}, 'Web': {'API_PREFIX': '/api', 'HOST': None, 'INTERVAL': 0.05, 'PORT': 8080, 'WEB_SOCKET_ADDRESS': '/stream'}}¶
Global being default configuration.
being.connectables module¶
Block output and input connectables. In general it is always possible to connect one output to multiple inputs but not the other way round.
A connection is a (OutputBase
, InputBase
) tuple.
- There are two types of connections:
Value: Propagate some value through the connections in very tick (corresponds to continuous data stream).
Message: Send discrete messages from an output to all connected inputs.
Note
This code is directly taken from the Klang project.
- class ValueInput(owner: Optional[Block] = None, value: Any = 0.0)[source]¶
Bases:
being.connectables.InputBase
,being.connectables._ValueContainer
Value input. Will fetch value from connected output. Also has its own _value attribute as a fallback when not connected.
- Parameters
owner – Parent block owning this output.
- property value¶
Try to fetch value from connected output.
- class ValueOutput(owner: Optional[Block] = None, value=0.0)[source]¶
Bases:
being.connectables.OutputBase
,being.connectables._ValueContainer
Value output. Will propagate its value to connected inputs.
- Parameters
owner – Parent block owning this output.
- class MessageInput(owner: Optional[Block] = None)[source]¶
Bases:
being.connectables.InputBase
,being.connectables._MessageQueue
Message input. Has its own message queue where it receives from a connected
MessageOutput
.- Parameters
owner – Parent block owning this output.
being.constants module¶
Math constants and other literals.
- YOTTA = 1e+24¶
Yotta metric SI prefix
- ZETTA = 1e+21¶
Zetta metric SI prefix
- EXA = 1e+18¶
Exa metric SI prefix
- PETA = 1000000000000000.0¶
Peta metric SI prefix
- TERA = 1000000000000.0¶
Tera metric SI prefix
- GIGA = 1000000000.0¶
Giga metric SI prefix
- MEGA = 1000000.0¶
Mega metric SI prefix
- KILO = 1000.0¶
Kilo metric SI prefix
- HECTO = 100.0¶
Hecto metric SI prefix
- DECA = 10.0¶
Deca metric SI prefix
- ONE = 1.0¶
One metric SI prefix
- DECI = 0.1¶
Deci metric SI prefix
- CENTI = 0.01¶
Centi metric SI prefix
- MILLI = 0.001¶
Milli metric SI prefix
- MICRO = 1e-06¶
Micro metric SI prefix
- NANO = 1e-09¶
Nano metric SI prefix
- PICO = 1e-12¶
Pico metric SI prefix
- FEMTO = 1e-15¶
Femto metric SI prefix
- ATTO = 1e-18¶
Atto metric SI prefix
- ZEPTO = 1e-21¶
Zepto metric SI prefix
- YOCTO = 1e-24¶
Yocto metric SI prefix
being.content module¶
Content manager. Manages motions inside content directory.
- DEFAULT_DIRECTORY: str = 'content'¶
Default content directory. Taken from
being.configuration.CONFIG
.
- stripext(p: str) str [source]¶
Strip file extension from path.
- Parameters
p – Input path.
- Returns
Input path without extension part.
Example
>>> stripext('this/is/a_file.ext') 'this/is/a_file'
- removeprefix(string: str, prefix: str) str [source]¶
Remove string prefix for Python < 3.9. If string does not start with prefix leave as is.
- Parameters
string – Input string.
prefix – Prefix to remove.
- Returns
Trimmed string.
- upgrade_splines_to_curves(directory, logger=None)[source]¶
Go through each JSON file inside directory and upgrade every serialized spline to a curve.
- Parameters
directory – Folder to check.
logger (optional) – Logger to write informations to (e.g. Instance logger of calling
Content
instance).
- class Files(directory: str, loads: typing.Callable = <function loads>, dumps: typing.Callable = <function dumps>)[source]¶
Bases:
collections.abc.MutableMapping
Wrap files inside directory on disk as dictionary. Iteration order is most recently modified.
- Parameters
directory – Directory to manage.
loads (optional) – Serialization loader function. Default is
being.serialization.loads()
.dumps (optional) – Serialization dumper function. Default is
being.serialization.dumps()
.
- loads: Callable¶
Serialization loader function.
- dumps: Callable¶
Serialization dumper function.
- class Content(directory: str = 'content', data: Optional[being.content.Files] = None, ext: str = '.json')[source]¶
Bases:
being.pubsub.PubSub
,being.utils.SingleInstanceCache
Content manager. For now only motions and splines. Motion name is the basename without file extension.
Todo
Extend for all kind of files (multiple subfolders).
being.utils.NestedDict
appropriate?
- Parameters
directory (optional) – Directory to manage. Default content directory from
being.configuration.CONFIG
.data (optional) – Data container.
ext (optional) – File extensions. name + ext = path.
- curve_exists(name: str) bool [source]¶
Check if motion curve exists.
- Parameters
name – Curve name.
- Returns
If curve exists.
- load_curve(name: str) scipy.interpolate.interpolate.BPoly [source]¶
Load miotion curve from disk.
- Parameters
name – Motion name.
- Returns
Spline
- save_curve(name: str, curve: scipy.interpolate.interpolate.BPoly)[source]¶
Save motion curve to disk.
- Parameters
name – Curve name.
curve – Motion curve to save.
- rename_curve(oldName: str, newName: str)[source]¶
Rename motion curve.
- Parameters
oldName – Old curve name.
newName – New curve name.
- find_free_name(wishName='Untitled')[source]¶
Find free name. Append numbers starting from 1 if name is already taken.
- Parameters
wishName – Wish name.
- Returns
Available version of wish name.
- Raises
RuntimeError – If we can not find any available name (after x tries…)
- forge_message() collections.OrderedDict [source]¶
Forge content / motions message.
being.curve module¶
Curve set. Spline-like which can contain multiple splines.
- class Curve(splines: List[Union[scipy.interpolate.interpolate.PPoly, scipy.interpolate.interpolate.BPoly]])[source]¶
Bases:
object
Curve container aka. curve set. Contains multiple curves as splines.
- Parameters
splines – List of splines.
- property n_channels: int¶
Number of channels. Sum of all spline dimensions.
Tip
This is not the same as number of splines
Curve.n_splines()
.
being.error module¶
being.execution module¶
Block execution. Given some interconnected blocks find a suitable order to
execute them in. Aka. calling the being.block.Block.update()
methods in an
appropriate order.
Caution
If the block network graph has cycles it is not possible to resolve a proper execution order. In this case blocks will have to work with out of date data.
This graph has a cycle between b and d and the resulting execution order is [a, b, c, d] (c and d could be swapped depending on the insertion order).
- ExecOrder¶
List of topological sorted blocks.
alias of
List
[being.block.Block
]
- block_network_graph(blocks: Iterable[being.block.Block]) being.graph.Graph [source]¶
Traverse block network and build block network graph.
- Parameters
blocks – Starting blocks for graph traversal.
- Returns
Block network graph.
- determine_execution_order(blocks: Iterable[being.block.Block]) List[being.block.Block] [source]¶
Determine execution order for blocks.
- Parameters
blocks – Interconnected blocks.
- Returns
Execution order.
- execute(execOrder: List[being.block.Block])[source]¶
Execute execution order.
- Parameters
execOrder – Blocks to execute.
being.graph module¶
Graphing and topological sorting.
- unique_elements(iterable: Iterable[Hashable]) Generator[Hashable, None, None] [source]¶
Iterate over unique elements in iterable.
- find_back_edges(graph) Generator[Tuple[Hashable, Hashable], None, None] [source]¶
Find back edges of graph.
- Parameters
graph – Input graph.
- Yields
Source to destination back edges.
See also
- remove_back_edges(graph: being.graph.Graph) being.graph.Graph [source]¶
Remove back edges from graph an return new DAG.
- Parameters
graph – Input graph.
- Returns
DAG
- topological_sort(graph: being.graph.Graph) List[Hashable] [source]¶
Topological sorting of directed graph vertices.
- Parameters
graph. (Input) –
- Returns
Topological sorting.
- class Graph(vertices: Optional[Iterable] = None, edges: Optional[Sequence[Tuple[Hashable, Hashable]]] = None)[source]¶
Bases:
being.graph.Graph
Immutable graph. Graph vertices can be any hashable Python object. An edge is a source -> destination tuple. A graph is a tuple of a vertex and a edges sets.
Note
Tuples instead of sets to preserve order.
- successors¶
Source -> destinations relationships. For each source vertex get a list of destination vertices.
- predecessors¶
Destination -> sources relationships. For each destination vertex get a list of source vertices.
- Parameters
vertices – Initial vertices. Final vertices will be auto updated from the ones present in the edges.
edges – Source -> destination edge tuples.
being.kinematics module¶
Optimal trajectory and kinematic filtering.
- class State(position: float = 0.0, velocity: float = 0.0, acceleration: float = 0.0)[source]¶
Bases:
NamedTuple
Kinematic state.
Create new instance of State(position, velocity, acceleration)
- optimal_trajectory(initial: being.kinematics.State, target: being.kinematics.State, maxSpeed: float = 1.0, maxAcc: float = 1.0) list [source]¶
Calculate acceleration bang profiles for optimal trajectory from initial state start to target state. Respecting the kinematic limits. Bang profiles are given by their duration and the acceleration values.
Both target and start velocities have to be below the maxSpeed limit. Error otherwise.
- Parameters
initial – Initial state.
target – Target state.
maxSpeed (optional) – Maximum speed value. 1.0 by default.
maxAcc (optional) – Maximum acceleration (and deceleration) value. 1.0 by default.
- Returns
List of bang speed profiles. A bang segment is a (duration, acceleration) tuple. 1-3 bang profiles depending on resulting speed profile (critical, triangular or trapezoidal).
Example
>>> optimal_trajectory(initial=(0, 0, 0), target=(1, 0, 0), maxSpeed=0.5) [(0.5, 1.0), (1.5, 0.0), (0.5, -1.0)]
- Resources:
Francisco Ramos, Mohanarajah Gajamohan, Nico Huebel and Raffaello D’Andrea: Time-Optimal Online Trajectory Generator for Robotic Manipulators. http://webarchiv.ethz.ch/roboearth/wp-content/uploads/2013/02/OMG.pdf
- sequencable(func)[source]¶
Filter function for an sequence of input values. Carry on state between single filter calls.
- step(state: being.kinematics.State, dt: float) being.kinematics.State [source]¶
Evolve state for some time interval.
- kinematic_filter(targetPosition: float, dt: float, initial: being.kinematics.State = State(position=0.0, velocity=0.0, acceleration=0.0), targetVelocity: float = 0.0, maxSpeed: float = 1.0, maxAcc: float = 1.0, lower: float = - inf, upper: float = inf) being.kinematics.State [source]¶
Filter target position with respect to the kinematic limits (maximum speed and maximum acceleration / deceleration). Online optimal trajectory.
- Parameters
targetPosition – Target position value.
dt – Time interval.
state – Initial / current state.
targetVelocity – Target velocity value. Use with care! Steady sate with non-zero targetVelocity leads to oscillation.
maxSpeed – Maximum speed value.
maxAcc – Maximum acceleration (and deceleration) value.
lower – Lower clipping value for target value.
upper – Upper clipping value for target value.
- Returns
The next state.
being.logging module¶
Being logging.
- BEING_LOGGER = <Logger being (WARNING)>¶
Being root logger.
- get_logger(name: typing.Optional[str] = None, parent: typing.Optional[logging.Logger] = <Logger being (WARNING)>) logging.Logger [source]¶
Get being logger. Wraps
logging.getLogger()
. Keeps track of all created being loggers (child loggers ofbeing.logging.BEING_LOGGER
).- Parameters
name – Logger name. None for root logger if not parent logger.
parent – Parent logger. BEING_LOGGER by default.
- Returns
Requested logger for given mame.
being.math module¶
Mathematical helper functions.
- clip(number: float, lower: float, upper: float) float [source]¶
Clip number to the closed interval [lower, upper].
\[\begin{split}f_\mathrm{clip}(x; a, b) = \begin{cases} a & x < a \\ x & a \leq x \leq b \\ b & b < x \\ \end{cases}\end{split}\]- Parameters
number – Input value.
lower – Lower bound.
upper – Upper bound.
- Returns
Clipped value.
- sign(number: float) float [source]¶
-
- Parameters
number – Input value.
- Returns
Sign part of the number.
Example
>>> sign(-12.34) -1.0
- solve_quadratic_equation(a: float, b: float, c: float) Tuple[float, float] [source]¶
Both solutions of the quadratic equation
\[x_{1,2} = \frac{-b \pm \sqrt{b^2 - 4ac}}{2a}\]- Returns
Solutions.
- Return type
- linear_mapping(xRange: Tuple[float, float], yRange: Tuple[float, float]) numpy.ndarray [source]¶
Get linear coefficients for
\[y = a \cdot x + b.\]- Parameters
xRange – Input range (xmin, xmax).
yRange – Output range (xmin, xmax).
- Returns
Linear coefficients [a, b].
- class ArchimedeanSpiral(a: float, b: float = 0.0)[source]¶
Bases:
NamedTuple
Archimedean spiral defined by
\[r(\phi) = a + b\phi\]Create new instance of ArchimedeanSpiral(a, b)
- static arc_length_helper(anlge: float, b: float) float [source]¶
Helper function for arc length calculations.
- arc_length(endAngle: float, startAngle: float = 0) float [source]¶
Arc length of spiral from a startAngle to an endAngle.
- classmethod fit(diameter, outerDiameter, arcLength) tuple [source]¶
Fit
ArchimedeanSpiral
to a given diameter, outerDiameter and arcLength.- Parameters
diameter – Inner diameter of spiral.
outerDiameter – Outer diameter of spiral. If equal to diameter -> Circle.
arcLength – Measured arc length.
- Returns
Fitted spiral and estimated maximum angle.
being.motion_player module¶
Curve / motion player block. Outputs motion samples to the motors.
Todo
Changing playback speed on the fly. As separate input? Phasor?
Motion crossover
- constant_spline(position: float = 0.0, duration: float = 1.0) scipy.interpolate.interpolate.BPoly [source]¶
Create a constant position spline which extrapolates indefinitely.
- Parameters
position – Target position value.
duration – Duration of the constant motion.
- Returns
Constant spline.
- constant_curve(positions: float = 0.0, duration: float = 1.0) being.curve.Curve [source]¶
Create a constant curve with a single channel. Keeping a constant value indefinitely.
- Parameters
position – Target position value.
duration – Duration of the constant motion.
- Returns
Constant curve.
- class MotionCommand(name: str, loop: bool = False)[source]¶
Bases:
NamedTuple
Message to trigger motion curve playback.
Create new instance of MotionCommand(name, loop)
- class MotionPlayer(ndim: int = 1, clock: Optional[being.clock.Clock] = None, content: Optional[being.content.Content] = None, **kwargs)[source]¶
Bases:
being.block.Block
Motion curve sampler block. Receives motion commands on its message input, looks up motion curve from
being.content.Content
and samples motion curve to position outputs. Supports looping playback.Note
MotionPlayer.positionOutputs
attributes for the position outputs only. In order to distinguish them from other outputs in the future.- Parameters
ndim (optional) – Number of dimensions / motors / initial number of position outputs. Default is one.
clock (optional) – Clock instance (DI).
content (optional) – Content instance (DI).
**kwargs – Arbitrary block keyword arguments.
- positionOutputs: List[ValueOutput]¶
Position value outputs.
- play_curve(curve: being.curve.Curve, loop: bool = False, offset: float = 0.0) float [source]¶
Play a curve directly.
- Parameters
curve – Curve to play.
loop – Loop playback.
offset – Start time offset.
- Returns
Scheduled start time.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- process_mc(mc: being.motion_player.MotionCommand) float [source]¶
Process new motion command and schedule next curve to play.
- Parameters
mc – Motion command.
- Returns
Scheduled start time.
being.networking module¶
Networking blocks which can send / receive data via UDP over the network.
Warning
Untested.
- class NetworkBlock(address: Tuple[str, int], sock: Optional[socket.socket] = None, **kwargs)[source]¶
Bases:
being.block.Block
Base class for network / socket blocks. Sockets get registered as resources with
register_resource()
. Defaultbeing.serialization
is used for data serialization.- Parameters
address – Network address.
sock (optional) – Socket instance (DI).
**kwargs – Arbitrary block keyword arguments.
- address: Address¶
Socket address.
- sock: Socket¶
Underlying socket instance.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class NetworkOut(address: Tuple[str, int], sock: Optional[socket.socket] = None, **kwargs)[source]¶
Bases:
being.networking.NetworkBlock
Datagram network out block. Send being messages over UDP.
- Parameters
address – Network address.
sock (optional) – Socket instance (DI).
**kwargs – Arbitrary block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- address: Address¶
Socket address.
- sock: Socket¶
Underlying socket instance.
- class NetworkIn(address: Tuple[str, int], sock: Optional[socket.socket] = None, **kwargs)[source]¶
Bases:
being.networking.NetworkBlock
Datagram network in block. Receive being messages over UDP.
- Parameters
address – Network address.
sock (optional) – Socket instance (DI).
**kwargs – Arbitrary block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- address: Address¶
Socket address.
- sock: Socket¶
Underlying socket instance.
being.pacemaker module¶
Pacemaker thread.
- class Once(initial: Any)[source]¶
Bases:
object
Value changed detector.
- Parameters
initial – Initial value.
- class Pacemaker(network: being.backends.CanBackend, maxWait: float = 0.012)[source]¶
Bases:
contextlib.AbstractContextManager
Pacemaker / watchdog / dead man’s switch daemon thread.
Can step in to trigger SYNC messages and PDO transmission if main thread is not on time. In order to prevent RPDO timeouts.
Note
Does not start by default (
Pacemaker.start()
). Can be used as dummy if unstarted.- Parameters
network – CanBackend network instance to trigger PDO transmits / SYNC messages.
maxWait – Maximum wait duration before stepping in. Some portion larger than global
INTERVAL
.
being.params module¶
Parameter blocks can be used to control values inside the block network. Their value are mirrored in a dedicated config file. All Parameter blocks appear in the web UI and can be tweaked by the end user.
Example
>>> # !This will create / add to the config file in your current working directory!
>>> slider = Slider('some/value', default=0.0, minValue=0.0, maxValue=10.0)
... slider.change(15.0) # Will get clipped to `maxValue`
... print(slider.output.value)
10.0
- class ParamsConfigFile(filepath='being_params.yaml')[source]¶
Bases:
being.configs.ConfigFile
,being.utils.SingleInstanceCache
Slim
being.configs.ConfigFile
subclass so that we can use it withbeing.utils.SingleInstanceCache
.- Parameters
filepath – Associated config file.
- impl: _ConfigImpl¶
Private config implementation.
- default_factory: Callable¶
Default factory for intermediate dictionaries.
- class Parameter(fullname: str, configFile: Optional[being.configs.ConfigFile] = None)[source]¶
Bases:
being.block.Block
Parameter block base class.
- Parameters
fullname – Full name of config entry.
configFile – Configuration file instance (DI). Default is
being.params.ParamsConfigFile
instance.
- configFile: ConfigFile¶
Configuration file instance.
- validate(value: Any) Any [source]¶
Validate value. Pass-through / no validation by default.
- Parameters
value – Value to validate.
- Returns
Validated value.
- loaddefault(default)[source]¶
Load value from Configuration file with default value (similar
dict.setdefault()
).- Parameters
default – Default value.
- change(value: Any)[source]¶
Change value and store it to the config file.
- Parameters
value – New value to set.
- to_dict()[source]¶
Convert block to dictionary representation which can be used for dumping as JSON.
- Returns
Block’s dictionary representation.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class Slider(fullname: str, default: Any = 0.0, minValue: float = 0.0, maxValue: float = 1.0, **kwargs)[source]¶
Bases:
being.params.Parameter
Scalar value slider.
- Parameters
fullname – Full name of config entry.
default (optional) – Default value.
minValue (optional) – Minimum value.
maxValue (optional) – Maximum value.
**kwargs – Arbitrary Parameter block keyword arguments.
- validate(value)[source]¶
Validate value. Pass-through / no validation by default.
- Parameters
value – Value to validate.
- Returns
Validated value.
- to_dict()[source]¶
Convert block to dictionary representation which can be used for dumping as JSON.
- Returns
Block’s dictionary representation.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- configFile: ConfigFile¶
Configuration file instance.
- class SingleSelection(fullname: str, possibilities: Iterable, default: Optional[Any] = None, **kwargs)[source]¶
Bases:
being.params.Parameter
Single selection out of multiple possibilities.
- Parameters
fullname – Full name of config entry.
possibilities – All possibilities to choose from.
default (optional) – Default value. First entry of possibilities by default.
**kwargs – Arbitrary Parameter block keyword arguments.
- validate(value)[source]¶
Validate value. Pass-through / no validation by default.
- Parameters
value – Value to validate.
- Returns
Validated value.
- to_dict()[source]¶
Convert block to dictionary representation which can be used for dumping as JSON.
- Returns
Block’s dictionary representation.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- configFile: ConfigFile¶
Configuration file instance.
- class MultiSelection(fullname: str, possibilities: Iterable, default: Optional[List[Any]] = None, **kwargs)[source]¶
Bases:
being.params.Parameter
Multiple selection out of multiple possibilities.
- Parameters
fullname – Full name of config entry.
possibilities – All possibilities to choose from.
default (optional) – Default value(s). List of elements from possibilities. Nothing selected by default.
**kwargs – Arbitrary Parameter block keyword arguments.
- validate(value)[source]¶
Validate value. Pass-through / no validation by default.
- Parameters
value – Value to validate.
- Returns
Validated value.
- to_dict()[source]¶
Convert block to dictionary representation which can be used for dumping as JSON.
- Returns
Block’s dictionary representation.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- configFile: ConfigFile¶
Configuration file instance.
- class MotionSelection(fullname: str, default: Optional[List[str]] = None, content: Optional[being.content.Content] = None, **kwargs)[source]¶
Bases:
being.params.MultiSelection
Multiple motion selection. Similar to
being.params.MultiSelection
but works with motions frombeing.content.Content
and can be updated.- Parameters
fullname – Full name of config entry.
default (optional) – Default motions. List of motion names. Nothing selected by default.
content (optional) – Content instance (DI).
**kwargs – Arbitrary Parameter block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- configFile: ConfigFile¶
Configuration file instance.
being.plotting module¶
Plotting util.
- DEFAULT_COLORS¶
Default matplotlib colors.
- sample_trajectory(spline: scipy.interpolate.interpolate.PPoly, nSamples: int = 100, rett: bool = False)[source]¶
Sample trajectory values from spline. Optionally also return sample times.
- Parameters
spline – Spline to sample.
nSamples – Number of samples
rett – If to return sample times as well.
- Returns
Trajectory.
- plot_spline(spline: scipy.interpolate.interpolate.PPoly, nSamples: int = 100, **kwargs)[source]¶
Plot trajectory of spline.
- Parameters
spline – Spline to plot.
nSamples – Number of samples
plot_trajectory() (**kwargs ->) –
- class Plotter(nInputs=1)[source]¶
Bases:
being.block.Block
Value plotter. Plot multiple signals after shutdown.
- Parameters
name (optional) – Block name for UI. Block type name by default.
- __or__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block ¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- __ror__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block ¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
being.pubsub module¶
Publish / subscribe.
being.resources module¶
Dynamic resources handling with a global exit stack.
Example
>>> from being.resources import manage_resources, register_resource, add_callback
...
...
... class Foo:
... def __enter__(self):
... print('Foo.__enter__()')
... return self
...
... def __exit__(self, exc_type, exc_value, traceback):
... print('Foo.__exit__()')
...
...
... class Bar:
... def close(self):
... print('Bar.close()')
...
...
... with manage_resources():
... foo = Foo()
... register_resource(foo) # Calls __enter__ here and __exit__ at the end
... bar = Bar()
... add_callback(bar.close)
Foo.__enter__()
Bar.close()
Foo.__exit__()
- EXIT_STACK = <contextlib.ExitStack object>¶
Global exit stack for all resources in being.
- register_resource(resource: ContextManager, duplicates=False)[source]¶
Register context manager in global being exit stack.
- Parameters
resource – Resource to enter into global exit stack.
duplicates – Skip duplicate entries.
being.rpi_gpio module¶
Raspberry Pi General Purpose Input/Output.
The package RPi.GPIO is not available on non Raspberry Pi platform. Dummy
being.rpi_gpio.GPIO
for these systems.
being.sensors module¶
Sensor blocks.
- class SensorEvent(timestamp: float, meta: dict)[source]¶
Bases:
NamedTuple
Sensor event message.
Create new instance of SensorEvent(timestamp, meta)
- class Sensor(name: Optional[str] = None)[source]¶
Bases:
being.block.Block
Sensor block base class.
- Parameters
name (optional) – Block name for UI. Block type name by default.
- __or__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block ¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- __ror__(right: Union[being.block.Block, being.connectables.InputBase]) being.block.Block ¶
Binary or dyadic pipe operator for connecting blocks and/or connections with each other. Used by
being.block.Block.__or__()
andbeing.block.Block.__ror__.()
for the shorthand>>> a | b | c # Instead of a.output.connect(b.input); b.output.connect(c.input)
This function also works with
being.connectables.OutputBase
andbeing.connectables.InputBase
instances.- Parameters
left – Left operand.
right – Right operand.
- Returns
Owner of rightmost incoming connection.
- Return type
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class DummySensor(interval: float = 5.0, **kwargs)[source]¶
Bases:
being.sensors.Sensor
Dummy sensor block for testing and standalone usage. Sends a dummy SensorEvent every interval seconds.
- Parameters
interval – Message send interval in seconds.
**kwargs – Arbitrary block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class SensorGpio(channel: int, edge='RISING', pull_up_down='PUD_DOWN', bouncetime: float = 0.01, rpi: Optional[being.backends.Rpi] = None, **kwargs)[source]¶
Bases:
being.sensors.Sensor
Raspberry Pi GPIO sensor block.
Arguments according to RPi.GPIO.
- Parameters
channel – Raspberry PI GPIO number.
edge (optional) – Rising or falling edge.
pull_up_down (optional) – Pull up termination or not.
bouncetime (optional) – Edge detection bounce time in seconds.
rpi (optional) – Raspberry PI backend (DI).
**kwargs – Arbitrary block keyword arguments.
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
- class Mic(threshold=0.01, bouncetime=0.1, input_device_index: typing.Optional[int] = None, frames_per_buffer: int = 1024, dtype: typing.Union[str, type, numpy.dtype] = <class 'numpy.uint8'>, audio: typing.Optional[being.backends.AudioBackend] = None, clock: typing.Optional[being.clock.Clock] = None, **kwargs)[source]¶
Bases:
being.sensors.Sensor
Listens to audio and emits sound event messages.
Example
>>> from being.awakening import awake ... from being.block import Block ... from being.resources import manage_resources ... from being.sensors import Mic ... ... ... class MessagePrinter(Block): ... def __init__(self): ... super().__init__() ... self.add_message_input() ... ... def update(self): ... for msg in self.input.receive(): ... print(msg) ... ... ... with manage_resources(): ... awake(Mic() | MessagePrinter()) ... # Clap your hands
- Parameters
threshold – Spectral flux difference threshold.
bouncetime – Blocked duration after emitted sound event.
input_device_index – Input device index for given host api. Unspecified (or None) uses default input device.
frames_per_buffer – Audio buffer size.
dtype – Data type for samples. Not all data types are supported for audio. uint8, int16, int32 and float32 should works.
audio – Audio backend instance (DI).
clock – Clock instance (DI).
- inputs: List[InputBase]¶
Input connections.
- outputs: List[OutputBase]¶
Output connections.
being.serialization module¶
Serialization of being objects.
Supports dynamic named tuples and enums but these types have to be registered
with being.serialization.register_named_tuple()
and
being.serialization.register_enum()
.
Python objects obj
get converted to dictionary representation dct
which
then can be JSON serialized.
Example
>>> class Foo:
... def __init__(self, a, b=0):
... self.a = a
... self.b = b
>>> import json
... obj = Foo(1, 2)
... json.dumps(obj)
TypeError: Object of type Foo is not JSON serializable
>>> dct = {'type': 'Foo', 'a': 1, 'b': 2}
... json.dumps(dct)
'{"type": "Foo", "a": 1, "b": 2}'
Notes
OrderedDict
to control key ordering for Python versions prior to 3.6.
- class FlyByDecoder(term: str = '\x04')[source]¶
Bases:
object
Continuously decode objects from partial messages.
Example
>>> snippets = ['"Hello, World!"1.23', '4[1, 2, 3, 4]{"a":', ' 1, "b": 2}'] ... dec = FlyByDecoder() ... for snippet in snippets: ... for obj in dec.decode_more(snippet): ... print(obj) Hello, World! 1.234 [1, 2, 3, 4] {'a': 1, 'b': 2}
- Parameters
term – Termination character (EOT by default).
- dumps(obj: Any, *args, **kwargs) str [source]¶
Serialize being object to JSON string.
- Parameters
obj – Object to serialize.
*args – Variable length argument list for
being.serialization.BeingEncoder
.**kwargs – Arbitrary keyword arguments for
being.serialization.BeingEncoder
.
- Returns
JSON string.
Example
>>> from being.block import Block ... block = Block(name='My Block') ... dumps(block) '{"type": "Block", "blockType": "Block", "name": "My Block", "id": 0, "inputNeighbors": [], "outputNeighbors": []}'
- loads(string: str) Any [source]¶
Deserialize being object from JSON string.
- Parameters
string – Input string.
- Returns
Decoded being object.
- register_enum(enumType: enum.EnumMeta)[source]¶
Register enum for serialization / deserialization in
being.serialization.ENUM_LOOKUP
.- Parameters
enumType – Enum type to register for serialization / deserialization.
- Raises
RuntimeError – If enum type has already been registered.
Example
>>> class Foo(enum.Enum): ... first = 0 ... second = 1 ... third = 2 ... ... register_enum(Foo) ... x = Foo.first ... dumps(x) '{"type": "__main__.Foo", "members": ["first", "second", "third"], "value": 0}'
- register_named_tuple(namedTupleType: type)[source]¶
Register named tuple type for serialization / deserialization in
being.serialization.NAMED_TUPLE_LOOKUP
.- Parameters
namedTupleType – Named tuple type to register for serialization / deserialization.
- Raises
ValueError – If named tuple has already been registered.
Example
>>> from typing import NamedTuple ... ... class Foo(NamedTuple): ... first: int ... second: int ... third = None ... ... register_named_tuple(Foo) ... dumps(Foo(1, 2)) '{"type": "Foo", "first": 1, "second": 2}'
being.spline module¶
Spline related helper functions. Building splines, smoothing splines, Bézier
control points, etc. Using scipy.interpolate.BPoly
and
scipy.interpolate.PPoly
.
Knots are stored in sorted breakpoints array Spline.x
. The coefficients
in Spline.c
. The shape of Spline.c
is given by (k, m, ...)
where k
is the spline order and m
the number of segments.
Caution
References
- Spline¶
Piecewise spline.
- Parameters
c (ndarray) – Polynomial coefficients, shape (k, m, …), order k and m intervals.
x (ndarray) – Polynomial breakpoints, shape (m+1,). Must be sorted in either increasing or decreasing order.
extrapolate (bool, optional) – If bool, determines whether to extrapolate to out-of-bounds points based on first and last intervals, or to return NaNs. If ‘periodic’, periodic extrapolation is used. Default is True.
axis (int, optional) – Interpolation axis. Default is zero.
alias of
scipy.interpolate.interpolate._PPolyBase
- class Degree(value)[source]¶
Bases:
enum.IntEnum
Spline / polynomial degree.
degree = order - 1
.- CONSTANT = 0¶
- LINEAR = 1¶
- QUADRATIC = 2¶
- CUBIC = 3¶
- spline_dimensions(spline: Spline) int [source]¶
Number of dimensions of spline. Scalar values are treated as 1 dimensional.
- spline_shape(spline: Spline) tuple [source]¶
Spline output value shape. Output dimensionality to except when evaluating spline.
Note
For scalar splines returns empty tuple.
- sample_spline(spline: Spline, t, loop: bool = False)[source]¶
Sample spline. Clips time values for non extrapolating splines. Also supports looping.
- Parameters
spline – Some spline to sample (must be callable).
t – Time value(s)
loop – Loop spline motion.
- Returns
Spline value(s).
- spline_coefficients(spline: Spline, segment: int) numpy.ndarray [source]¶
Get copy of spline coefficients for a given segment.
- Parameters
spline – Input spline.
segment – Segment number.
- Returns
Segment coefficients.
- Raises
ValueError – Out of bound segment number.
- split_spline(spline: Spline) List[Spline] [source]¶
Split multi dimensional spline along each dimension.
- Parameters
spline – Input spline to split.
- Returns
One dimensional splines.
- build_ppoly(accelerations: Sequence, knots: Sequence, x0: float = 0.0, v0: float = 0.0, extrapolate: bool = False, axis: int = 0) scipy.interpolate.interpolate.PPoly [source]¶
Build quadratic position spline from acceleration segments. Also include initial velocity and position.
- Parameters
accelerations – Acceleration values.
knots – Increasing time values.
x0 – Initial position.
v0 – Initial velocity.
extrapolate (optional) – Extrapolate spline. Default is False.
axis (optional) – Spline axis. Default is 0.
- Returns
Position spline.
Example
>>> x0 = 1.234 ... spline = build_ppoly([1, 0, -1], [0, 1, 2, 3], x0=x0) ... print(spline(0.) == x0) True
- power_basis(order: int) numpy.ndarray [source]¶
Create power basis vector. Ordered so that it fits the spline coefficients matrix.
\[\begin{split}\left[ \begin{array}{ccccc} k! & (k-1)! & \ldots & 1! & 0! \\ \end{array} \right]\end{split}\]- Parameters
order – Order of the spline.
- Returns
Power basis coefficients.
- ppoly_coefficients_at(spline: scipy.interpolate.interpolate.PPoly, x: float) numpy.ndarray [source]¶
Get
scipy.interpolate.PPoly
coefficients for a given x value (which is between two existing knots / breakpoints). Makes it possible to add new knots to the spline without altering its curve shape.- Parameters
spline – Input PPoly spline.
x – X value.
- Returns
Coefficients for intermediate point x.
Example
>>> spline = PPoly([[2], [0]], [0, 1]) # Linear ppoly 0.0 -> 1.0 ... ppoly_coefficients_at(spline, 0.5) array([2., 1.]) # Still slope of 2.0 but second coefficient went up by 1.0
- ppoly_insert(newX: float, spline: scipy.interpolate.interpolate.PPoly, extrapolate: Optional[bool] = None) scipy.interpolate.interpolate.PPoly [source]¶
Insert a new knot / breakpoint somewhere in a
scipy.interpolate.PPoly
spline.- Parameters
newX – New knot / breakpoint value to insert.
spline – Target spline.
extrapolate (optional) – Spline extrapolation. Same as input spline by default.
- Returns
New
scipy.interpolate.PPoly
spline with inserted knot.
- smoothing_factor(smoothing: float, length: int) float [source]¶
Smoothing parameter s for splprep / splprep. Chosen in such a way that 1) the smoothing ~ mean square error of fitting the process and 2) it is length independent.
- Parameters
smoothing – Smoothing factor.
length – Number of data points.
- Returns
splrep / splprep smoothing parameter s.
- smoothing_spline(x: Sequence, y: Sequence, degree: being.spline.Degree = Degree.CUBIC, smoothing: float = 0.001, periodic: bool = False, extrapolate: bool = False) scipy.interpolate.interpolate.PPoly [source]¶
Fit smoothing spline through uni- or multivariate data points.
- Parameters
x – The data points defining a curve y = f(x).
y – The data points defining a curve y = f(x).
degree – The degree of the spline fit. It is recommended to use cubic splines.
smoothing – Tradeoff between closeness and smoothness of fit.
extrapolate – If to extrapolate data.
periodic – Periodic data. Will overwrite extrapolate = ‘periodic’.
- Returns
Piecewise polynomial smoothing spline.
References
- fit_spline(trajectory: numpy.ndarray, smoothing: float = 1e-06) scipy.interpolate.interpolate.BPoly [source]¶
Fit a smoothing spline through a trajectory.
- Parameters
trajectory – Trajectory data in matrix form.
smoothing (optional) – Smoothing factor.
- Returns
Fitted BPoly spline.
- optimal_trajectory_spline(initial: tuple, target: tuple, maxSpeed: float = 1.0, maxAcc: float = 1.0, extrapolate: bool = False) scipy.interpolate.interpolate.PPoly [source]¶
Build a
scipy.interpolate.PPoly
spline following the optimal trajectory from initial to target state.- Parameters
initial – Start state.
target – Final end state.
maxSpeed – Maximum speed.
maxAcc – Maximum acceleration (and deceleration).
extrapolate – Extrapolate splines over borders.
- Returns
PPoly optimal trajectory spline.
being.typing module¶
Some being typing.
Warning
Deprecated
Todo
There is a new Spline type in being.spline
. Remove this module.
being.utils module¶
Miscellaneous helpers.
- filter_by_type(sequence: Iterable, type_) Generator[Any, None, None] [source]¶
Filter sequence by type.
- Parameters
sequence – Input sequence to filter.
type – Type to filter.
- Returns
Filtered types generator.
Example
>>> data = ['Everyone', 1, 'Likes', 2.0, 'Ice', 'Cream'] ... for txt in filter_by_type(data, str): ... print(txt) Everyone Likes Ice Cream
- rootname(path: str) str [source]¶
Get ‘root’ part of path (filename without ext).
- Parameters
path – Input path.
- Returns
Root name.
Example
>>> rootname('swedish-chef/chocolate-moose.txt') 'chocolate-moose'
- collect_files(directory, pattern='*') Generator[str, None, None] [source]¶
Recursively walk over all files in directory. With file extension filter.
- listdir(directory, fullpath=True) List[str] [source]¶
List directory content. Not recursive. No hidden files. Lexicographically sorted.
- Parameters
directory – Directory to traverse.
- update_dict_recursively(dct: dict, other: dict, default_factory: Optional[type] = None) dict [source]¶
Update dictionary recursively in-place.
- Parameters
dct – Dictionary to update.
other – Other dict to go through.
default_factory – Default factory for intermediate dicts.
- Returns
Mutated input dictionary (for recursive calls).
Example
>>> kermit = {'type': 'Frog', 'skin': {'type': 'Skin', 'color': 'green',}} ... update_dict_recursively(kermit, {'skin': {'color': 'blue'}}) ... print(kermit['skin']['color']) blue
- merge_dicts(first: dict, *others) dict [source]¶
Merge dict together. Pre Python 3.5 compatible. Type of first dict is used for the returned one.
- Parameters
first – First dict to copy and update.
*others – All the other dicts.
- Returns
Updated dict.
Example
>>> merge_dicts({'name': 'Sid'}, {'item': 'cookie'}, {'mood': 'happy'}) {'name': 'Sid', 'item': 'cookie', 'mood': 'happy'}
- class SingleInstanceCache[source]¶
Bases:
object
Aka. almost a Singleton but not quite.
Use
SingleInstanceCache.single_instance_setdefault()
to construct and / or access single instance for class.It is still possible to initialize multiple instances of the class.
__init__()
and__new__()
stay untouched.Uses weak references inside. Pay attention to reference counting. Single instance cache will not keep cached instances alive. They can get garbage collected (problematic if resource release during deconstruction).
References
So Singletons are bad, then what?
Example
>>> class Foo(SingleInstanceCache): ... pass ... print('Instance of Foo exists?', Foo.single_instance_initialized()) Instance of Foo exists? False
>>> Foo() ... print('Get single instance of Foo:', Foo.single_instance_get()) Get single instance of Foo: None
>>> foo = Foo.single_instance_setdefault() ... print('Same ref:', foo is Foo.single_instance_setdefault())
- class IdAware(*args, **kwargs)[source]¶
Bases:
object
Class mixin for assigning assigning id numbers to each instance. Each type has its own counter / starts from zero.
- ID_COUNTERS = {}¶
- class NestedDict(data=None, default_factory=<class 'dict'>)[source]¶
Bases:
collections.abc.MutableMapping
Nested dictionary. Supports
tuple
as keys for accessing intermediate dictionaries within.Example
>>> dct = NestedDict() ... dct[1, 2, 3] = 'Fozzie' ... print(dct) NestedDict({1: {2: {3: 'Fozzie'}}})
- Parameters
data (optional) – Initial data object. If non given (default) use default_factory to create a new one.
default_factory (optional) – Default factory for intermediate dictionaries (
dict
by default).
- data: Dict¶
Dict-like data container.
- default_factory: Callable¶
Default factory for intermediate dictionaries.