"""Block output and input connectables. In general it is always possible to
connect one output to multiple inputs but not the other way round.
A `connection` is a (:class:`OutputBase`, :class:`InputBase`) tuple.
There are two types of connections:
- Value: Propagate some value through the connections in very tick
(corresponds to continuous data stream).
- Message: Send discrete messages from an output to all connected inputs.
Note:
This code is directly taken from the Klang_ project.
"""
import collections
import itertools
from typing import Tuple, ForwardRef, Optional, Union, Set, Any, Iterable, Generator
from being.error import BeingError
__all__ = [
'ValueInput', 'ValueOutput', 'MessageInput', 'MessageOutput',
]
OutputBase = ForwardRef('OutputBase')
InputBase = ForwardRef('InputBase')
Block = ForwardRef('Block')
Connection = Tuple[OutputBase, InputBase]
"""Output -> input connection."""
Connectable = Union[OutputBase, InputBase]
"""General connectable."""
class InputAlreadyConnected(BeingError):
"""Can not connect to already connected input."""
class IncompatibleConnection(BeingError):
"""The two components can not be connected with each other."""
def is_valid_connection(output: OutputBase, input_: InputBase) -> bool:
"""Check if connection types are connectable."""
outputType = type(output)
inputType = type(input_)
validConnectionTypes = {
(OutputBase, InputBase),
(ValueOutput, ValueInput),
(MessageOutput, MessageInput),
}
return (outputType, inputType) in validConnectionTypes
def validate_connection(output: OutputBase, input_: InputBase):
"""Validate connection coupling tuple.
Args:
output: Output connectable.
input_: Input connectable.
Raises:
IncompatibleConnection: When it is not possible to connect output with
input.
"""
if not is_valid_connection(output, input_):
fmt = 'Can not connect %s with %s!'
msg = fmt % (type(output).__name__, type(input_).__name__)
raise IncompatibleConnection(msg)
def make_connection(output: OutputBase, input_: InputBase):
"""Make directional connection from output -> input_.
Args:
output: Output connectable.
input_: Input connectable.
"""
validate_connection(output, input_)
if input_.connected:
msg = '%s is already connected to another output!' % input_
raise InputAlreadyConnected(msg)
# Make the actual connection
output.outgoingConnections.add(input_)
input_.incomingConnection = output
def break_connection(output: OutputBase, input_: InputBase):
"""Break directional connection from output -> input_.
Args:
output: Output connectable.
input_: Input connectable.
"""
validate_connection(output, input_)
# Break the actual connection
output.outgoingConnections.remove(input_)
input_.incomingConnection = None
def is_connected(output: OutputBase, input_: InputBase) -> bool:
"""Check if output is connected to input_.
Args:
output: Output connectable.
input_: Input connectable.
Returns:
If output and input_ are connected.
"""
if not is_valid_connection(output, input_):
return False
return output is input_.incomingConnection and input_ in output.outgoingConnections
def are_connected(*connectables: Iterable[Connectable]) -> bool:
"""Check if each pair in a chain of connectables are connected to each
other.
Args:
*connectables: Multiple connectables.
Returns:
If provided connectables form a connected chain or not.
"""
# Iterate over pairs
outputs, inputs = itertools.tee(connectables)
next(inputs, None)
for output, input_ in zip(outputs, inputs):
if not is_connected(output, input_):
return False
return True
def str_function(connectable: Connectable) -> str:
"""__str__ function for :class:`OutputBase` and :class:`InputBase`.
Args:
connectable: Connectable to stringify.
Returns:
__str__ string.
"""
infos = []
if connectable.owner:
infos.append('owner: %s' % connectable.owner)
if connectable.connected:
infos.append('connected')
else:
infos.append('not connected')
return '%s(%s)' % (type(connectable).__name__, ', '.join(infos))
class OutputBase:
"""Base class for all outputs."""
def __init__(self, owner: Optional[Block] = None):
"""
Args:
owner: Parent block owning this output.
"""
self.owner: Optional[Block] = owner
"""Parent block owning this connectable."""
self.outgoingConnections: Set[InputBase] = set()
"""Connected inputs (outgoing connections)."""
@property
def connected(self) -> bool:
"""Is connected to at least one input?"""
return bool(self.outgoingConnections)
def connect(self, input_):
"""Connect with input."""
make_connection(self, input_)
def disconnect(self, input_):
"""Disconnect input."""
break_connection(self, input_)
__str__ = str_function
class InputBase:
"""Base class for all inputs.
Attributes:
incomingConnection (OutputBase): Connected OutputBase.
"""
def __init__(self, owner: Optional[Block] = None):
"""
Args:
owner: Parent block owning this output.
"""
self.owner: Optional[Block] = owner
"""Parent block owning this connectable."""
self.incomingConnection: Optional[OutputBase] = None
"""Connected output (incoming connection)."""
@property
def connected(self) -> bool:
"""Is connected to an output?"""
return self.incomingConnection is not None
def connect(self, output):
"""Connect with output."""
make_connection(output, self)
def disconnect(self, output):
"""Disconnect output."""
break_connection(output, self)
__str__ = str_function
class _ValueContainer:
"""Value attribute mixin class.
Can not decide between property/setter vs. conventional getters and setters.
With value fetching a property looks nice (data = input.value) with value
propagating a setter looks better (output.set_value(data)).
Todo:
Kill ``get_value()`` / ``set_value()``? Property getter / setter access
`could` be a little bit faster.
"""
def __init__(self, value: Any = 0.):
"""
Args:
value: Initial value.
"""
self._value: Any = value
"""Stored Python value."""
@property
def value(self):
"""Get value."""
return self._value
@value.setter
def value(self, value):
"""Set value."""
self._value = value
def get_value(self):
"""Get value."""
return self._value
def set_value(self, value):
"""Set value."""
self._value = value
[docs]class ValueOutput(OutputBase, _ValueContainer):
"""Value output. Will propagate its value to connected inputs."""
def __init__(self, owner: Optional[Block] = None, value=0.):
super().__init__(owner)
_ValueContainer.__init__(self, value)
class _MessageQueue:
"""Message queue mixin class."""
MAX_MESSAGES: int = 50
"""Maximum size of message queue."""
def __init__(self):
self.queue = collections.deque(maxlen=self.MAX_MESSAGES)
def push(self, message: Any):
"""Push message on the message queue."""
self.queue.append(message)
def receive(self) -> Generator[Any, None, None]:
"""Iterate over received messages."""
while self.queue:
yield self.queue.popleft()
def receive_latest(self) -> Optional[Any]:
"""Return latest received messages (if any). Discard the rest."""
if not self.queue:
return None
latestMsg = self.queue.pop()
self.queue.clear()
return latestMsg
[docs]class MessageOutput(OutputBase):
"""Message output. Sends messages to all connected message inputs."""
[docs] def send(self, message: Any):
"""Send message to all connected message inputs."""
for con in self.outgoingConnections:
con.push(message)