Source code for being.can.cia_402

"""CiA 402 object dictionary addresses, definitions, state machine and
CiA402Node class.

CiA402Node is a trimmed down version of canopen.BaseNode402. We favor SDO
communication during setup but synchronous acyclic PDO communication during
operation. Also added support for
:class:`being.can.cia_402.OperationMode.CYCLIC_SYNCHRONOUS_POSITION`.
"""
import collections
import contextlib
import enum
import time
import warnings
from typing import (
    Any,
    Dict,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Set,
    Tuple,
    Union,
)

from canopen import RemoteNode, ObjectDictionary, Network

from being.bitmagic import check_bit
from being.can.cia_301 import MANUFACTURER_DEVICE_NAME
from being.can.definitions import TransmissionType
from being.constants import FORWARD, BACKWARD
from being.logging import get_logger


# Mandatory (?) CiA 402 object dictionary entries
# (SJA): CiA 402 is still a Draft Specification Proposal (DSP).
CONTROLWORD: int = 0x6040
"""Controlword. :hex:"""

STATUSWORD: int = 0x6041
"""Statusword. :hex:"""

MODES_OF_OPERATION: int = 0x6060
"""Selecting the active drive profile. :hex:"""

MODES_OF_OPERATION_DISPLAY: int = 0x6061
"""Get supported modes of operations for drive. :hex:"""

POSITION_DEMAND_VALUE: int = 0x6062
"""Target position in user units. :hex:"""

POSITION_ACTUAL_VALUE: int = 0x6064
"""Actual position in internal units. :hex:"""

POSITION_WINDOW: int = 0x6067
"""Target corridor around set-point value for target reached. :hex:"""

POSITION_WINDOW_TIME: int = 0x6068
"""Time needed in target corridor for target reached. :hex:"""

VELOCITY_DEMAND_VALUE: int = 0x606B
"""Target velocity in user units. :hex:"""

VELOCITY_ACTUAL_VALUE: int = 0x606C
"""Actual velocity in internal units. :hex:"""

TARGET_POSITION: int = 0x607A
"""Target position in internal units. :hex:"""

POSITION_RANGE_LIMIT: int = 0x607B
"""Min / max position range limit. :hex:"""

SOFTWARE_POSITION_LIMIT: int = 0x607D
"""Min / max software position range limit. :hex:"""

MIN_POSITION_LIMIT: int = 1
"""Subindex for lower position range limit."""

MAX_POSITION_LIMIT: int = 2
"""Subindex for upper position range limit."""

MAX_PROFILE_VELOCITY: int = 0x607F
"""Maximum velocity. Units vendor dependent. :hex:"""

PROFILE_VELOCITY: int = 0x6081
"""Maximum velocity. Units vendor dependent. :hex:"""

PROFILE_ACCELERATION: int = 0x6083
"""Maximum acceleration. :hex:"""

PROFILE_DECELERATION: int = 0x6084
"""Maximum deceleration. :hex:"""

QUICK_STOP_DECELERATION: int = 0x6085
"""Quick stop deceleration. :hex:"""

HOMING_METHOD: int = 0x6098
"""Homing method number. :hex:"""

HOMING_SPEEDS: int = 0x6099
"""Speed values of homing. :hex:"""

SPEED_FOR_SWITCH_SEARCH: int = 1
"""Homing speed for switch searching."""

SPEED_FOR_ZERO_SEARCH: int = 2
"""Homing speed for zero search."""

HOMING_ACCELERATION: int = 0x609A
"""Acceleration during homing. :hex:"""

DIGITAL_INPUTS: int = 0x60FD
"""Read state of digital inputs (read-only). :hex:"""

TARGET_VELOCITY: int = 0x60FF
"""Target velocity. :hex:"""

SUPPORTED_DRIVE_MODES: int = 0x6502
"""Supported operating modes for drive. :hex:"""


CanOpenRegister = Union[int, str]


[docs]class State(enum.Enum): """CANopen CiA 402 states.""" START = enum.auto() NOT_READY_TO_SWITCH_ON = enum.auto() SWITCH_ON_DISABLED = enum.auto() READY_TO_SWITCH_ON = enum.auto() SWITCHED_ON = enum.auto() OPERATION_ENABLED = enum.auto() QUICK_STOP_ACTIVE = enum.auto() FAULT_REACTION_ACTIVE = enum.auto() FAULT = enum.auto() HALT = enum.auto()
StateSwitching = Iterator[State] Edge = Tuple[State, State]
[docs]class CW(enum.IntEnum): """Controlword bits.""" SWITCH_ON = (1 << 0) """:bin:""" ENABLE_VOLTAGE = (1 << 1) """:bin:""" QUICK_STOP = (1 << 2) """:bin:""" ENABLE_OPERATION = (1 << 3) """:bin:""" NEW_SET_POINT = (1 << 4) """:bin:""" START_HOMING_OPERATION = NEW_SET_POINT # Alias """:bin:""" ENABLE_IP_MODE = NEW_SET_POINT # Alias """:bin:""" CHANGE_SET_IMMEDIATELY = (1 << 5) """:bin:""" ABS_REL = (1 << 6) """:bin:""" FAULT_RESET = (1 << 7) """:bin:""" HALT = (1 << 8) """:bin:"""
[docs]class Command(enum.IntEnum): """CANopen CiA 402 controlword commands for state transitions.""" SHUT_DOWN = CW.QUICK_STOP | CW.ENABLE_VOLTAGE """:bin:""" SWITCH_ON = CW.QUICK_STOP | CW.ENABLE_VOLTAGE | CW.SWITCH_ON """:bin:""" DISABLE_VOLTAGE = 0 """:bin:""" QUICK_STOP = CW.ENABLE_VOLTAGE """:bin:""" DISABLE_OPERATION = CW.QUICK_STOP | CW.ENABLE_VOLTAGE | CW.SWITCH_ON """:bin:""" ENABLE_OPERATION = CW.ENABLE_OPERATION | CW.QUICK_STOP | CW.ENABLE_VOLTAGE | CW.SWITCH_ON """:bin:""" FAULT_RESET = CW.FAULT_RESET """:bin:"""
[docs]class SW(enum.IntEnum): """Statusword bits.""" READY_TO_SWITCH_ON = (1 << 0) """:bin:""" SWITCHED_ON = (1 << 1) """:bin:""" OPERATION_ENABLED = (1 << 2) """:bin:""" FAULT = (1 << 3) """:bin:""" VOLTAGE_ENABLED = (1 << 4) """:bin:""" QUICK_STOP = (1 << 5) """:bin:""" SWITCH_ON_DISABLED = (1 << 6) """:bin:""" WARNING = (1 << 7) """:bin:""" #alwayszero = (1 << 8) REMOTE = (1 << 9) """:bin:""" TARGET_REACHED = (1 << 10) """:bin:""" INTERNAL_LIMIT_ACTIVE = (1 << 11) """:bin:""" ACKNOWLEDGE = (1 << 12) """:bin:""" HOMING_ATTAINED = ACKNOWLEDGE # Alias """:bin:""" HOMING_ERROR = (1 << 13) """:bin:""" DEVIATION_ERROR = HOMING_ERROR # Alias """:bin:"""
#NOT_IN_USE_0 = (1 << 14) #NOT_IN_USE_1 = (1 << 15)
[docs]class OperationMode(enum.IntEnum): """Modes of Operation (0x6060 / 0x6061).""" NO_MODE = 0 PROFILE_POSITION = 1 VELOCITY = 2 PROFILE_VELOCITY = 3 PROFILE_TORQUE = 4 HOMING = 6 INTERPOLATED_POSITION = 7 CYCLIC_SYNCHRONOUS_POSITION = 8 CYCLIC_SYNCHRONOUS_VELOCITY = 9 CYCLIC_SYNCHRONOUS_TORQUE = 10 OPEN_LOOP_SCALAR_MODE = -1 OPEN_LOOP_VECTOR_MODE = -2
TRANSITION_COMMANDS: Dict[Edge, Command] = { # Shut down: 2, 6, 8 (State.SWITCH_ON_DISABLED, State.READY_TO_SWITCH_ON): Command.SHUT_DOWN, (State.SWITCHED_ON, State.READY_TO_SWITCH_ON): Command.SHUT_DOWN, (State.OPERATION_ENABLED, State.READY_TO_SWITCH_ON): Command.SHUT_DOWN, # Switch On: 3 (State.READY_TO_SWITCH_ON, State.SWITCHED_ON): Command.SWITCH_ON, # Disable Voltage: 7, 9, 10, 12 (State.READY_TO_SWITCH_ON, State.SWITCH_ON_DISABLED): Command.DISABLE_VOLTAGE, (State.OPERATION_ENABLED, State.SWITCH_ON_DISABLED): Command.DISABLE_VOLTAGE, (State.SWITCHED_ON, State.SWITCH_ON_DISABLED): Command.DISABLE_VOLTAGE, (State.QUICK_STOP_ACTIVE, State.SWITCH_ON_DISABLED): Command.DISABLE_VOLTAGE, # Quick Stop: 7, 10, 11 (State.READY_TO_SWITCH_ON, State.SWITCH_ON_DISABLED): Command.QUICK_STOP, (State.SWITCHED_ON, State.SWITCH_ON_DISABLED): Command.QUICK_STOP, (State.OPERATION_ENABLED, State.QUICK_STOP_ACTIVE): Command.QUICK_STOP, # Disable Operation: 5 (State.OPERATION_ENABLED, State.SWITCHED_ON): Command.DISABLE_OPERATION, # Enable Operation: 4, 16 (State.SWITCHED_ON, State.OPERATION_ENABLED): Command.ENABLE_OPERATION, (State.QUICK_STOP_ACTIVE, State.OPERATION_ENABLED): Command.ENABLE_OPERATION, # Fault Reset: 15 (State.FAULT, State.SWITCH_ON_DISABLED): Command.FAULT_RESET, # Automatic: 0, 1, 14 (State.START, State.NOT_READY_TO_SWITCH_ON): Command.DISABLE_VOLTAGE, # 0x0 (State.NOT_READY_TO_SWITCH_ON, State.SWITCH_ON_DISABLED): Command.DISABLE_VOLTAGE, # 0x0 (State.FAULT_REACTION_ACTIVE, State.FAULT): Command.DISABLE_VOLTAGE, # 0x0 } """Possible state transitions edges and the corresponding controlword command. :meta hide-value: """ POSSIBLE_TRANSITIONS: Dict[State, Set[State]] = collections.defaultdict(set) """Reachable states from a given start state.""" for _edge in TRANSITION_COMMANDS: _src, _dst = _edge POSSIBLE_TRANSITIONS[_src].add(_dst) VALID_OP_MODE_CHANGE_STATES: Set[State] = { State.SWITCH_ON_DISABLED, State.READY_TO_SWITCH_ON, State.SWITCHED_ON, } """Not every state support switching of operation mode.""" STATUSWORD_2_STATE: List[Tuple[int, int, State]] = [ (0b1001111, 0b0000000, State.NOT_READY_TO_SWITCH_ON), (0b1001111, 0b1000000, State.SWITCH_ON_DISABLED), (0b1101111, 0b0100001, State.READY_TO_SWITCH_ON), (0b1101111, 0b0100011, State.SWITCHED_ON), (0b1101111, 0b0100111, State.OPERATION_ENABLED), (0b1101111, 0b0000111, State.QUICK_STOP_ACTIVE), (0b1001111, 0b0001111, State.FAULT_REACTION_ACTIVE), (0b1001111, 0b0001000, State.FAULT), ] """Statusword bit masks for state loopkup. :meta hide-value: """
[docs]def which_state(statusword: int) -> State: """Extract state from statusword number. Args: statusword: Statusword number. Returns: Current state. Raises: ValueError: If no valid state was found. """ for mask, value, state in STATUSWORD_2_STATE: if (statusword & mask) == value: return state raise ValueError('Unknown state for statusword {statusword}!')
[docs]def supported_operation_modes(supportedDriveModes: int) -> Iterator[OperationMode]: """Which operation modes are supported? Extract information from value of SUPPORTED_DRIVE_MODES (0x6502). Args: supportedDriveModes: Received value from 0x6502. Yields: Supported drive operation modes for the node. """ # Look-up is identical between Faulhaber / Maxon stuff = [ (0, OperationMode.PROFILE_POSITION), (2, OperationMode.PROFILE_VELOCITY), (5, OperationMode.HOMING), (7, OperationMode.CYCLIC_SYNCHRONOUS_POSITION), (8, OperationMode.CYCLIC_SYNCHRONOUS_VELOCITY), (9, OperationMode.CYCLIC_SYNCHRONOUS_TORQUE), ] for bit, op in stuff: if check_bit(supportedDriveModes, bit): yield op
# There are 31 official and a couple unofficial CiA 402 homing methods. All have # assigned some number. It is hard to keep track of the different effect and # parameters. :class:`HomingParam` is an intermediate representation with some # more human understandable representation. This can that get mapped to the # integer homing method number. # The function :func:`determine_homing_method` can than be used to determine the # wanted homing method. POSITIVE: float = FORWARD """Positive homing direction.""" RISING: float = FORWARD """Rising home switch edge.""" NEGATIVE: float = BACKWARD """Negative direction.""" FALLING: float = BACKWARD """Falling home switch edge.""" UNAVAILABLE: float = 0.0 """Unavailable indicator.""" UNDEFINED: float = 0.0 """Undefined indicator."""
[docs]class HomingParam(NamedTuple): """Intermediate homing parameters representation to describe the different CiA 402 homing methods. """ endSwitch: float = UNAVAILABLE """Do we have an end switch? If so at which end?""" homeSwitch: float = UNAVAILABLE """Do we have a home switch? If so at which end?""" homeSwitchEdge: float = UNDEFINED """Home switch edge.""" indexPulse: bool = False """Do we have index pulses?""" direction: float = UNDEFINED """Which direction to home to.""" hardStop: bool = False """Perform hard stop homing. End / home switch and index pulse do not have an effect then. """
HOMING_METHODS: Dict[HomingParam, int] = { HomingParam(indexPulse=True, direction=POSITIVE, hardStop=True, ): -1, HomingParam(indexPulse=True, direction=NEGATIVE, hardStop=True, ): -2, HomingParam(direction=POSITIVE, hardStop=True, ): -3, HomingParam(direction=NEGATIVE, hardStop=True, ): -4, HomingParam(indexPulse=True, endSwitch=NEGATIVE, ): 1, HomingParam(indexPulse=True, endSwitch=POSITIVE, ): 2, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=FALLING, ): 3, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=RISING, ): 4, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, ): 5, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=RISING, ): 6, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, endSwitch=POSITIVE, ): 7, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=RISING, endSwitch=POSITIVE, ): 8, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=RISING, endSwitch=POSITIVE, ): 9, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=FALLING, endSwitch=POSITIVE, ): 10, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=FALLING, endSwitch=NEGATIVE, ): 11, HomingParam(indexPulse=True, homeSwitch=POSITIVE, homeSwitchEdge=RISING, endSwitch=NEGATIVE, ): 12, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=RISING, endSwitch=NEGATIVE, ): 13, HomingParam(indexPulse=True, homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, endSwitch=NEGATIVE, ): 14, HomingParam(endSwitch=NEGATIVE, ): 17, HomingParam(endSwitch=POSITIVE, ): 18, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=FALLING, ): 19, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=RISING, ): 20, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, ): 21, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=RISING, ): 22, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, endSwitch=POSITIVE, ): 23, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=RISING, endSwitch=POSITIVE, ): 24, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=RISING, endSwitch=POSITIVE, ): 25, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=FALLING, endSwitch=POSITIVE, ): 26, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=FALLING, endSwitch=NEGATIVE, ): 27, HomingParam(homeSwitch=POSITIVE, homeSwitchEdge=RISING, endSwitch=NEGATIVE, ): 28, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=RISING, endSwitch=NEGATIVE, ): 29, HomingParam(homeSwitch=NEGATIVE, homeSwitchEdge=FALLING, endSwitch=NEGATIVE, ): 30, HomingParam(indexPulse=True, direction=NEGATIVE,): 33, HomingParam(indexPulse=True, direction=POSITIVE,): 34, HomingParam(): 35, # Todo(atheler): Got replaced with 37 in newer versions } """CiA 402 homing method lookup. :meta hide-value: """ assert len(HOMING_METHODS) == 35, 'Something went wrong with HOMING_METHODS keys! Not enough homing methods anymore.'
[docs]def determine_homing_method( endSwitch: float = UNAVAILABLE, homeSwitch: float = UNAVAILABLE, homeSwitchEdge: float = UNDEFINED, indexPulse: bool = False, direction: float = UNDEFINED, hardStop: bool = False, ) -> int: """Determine homing method number. Args: endSwitch (optional): Do we have an end switch? If so at which end? Default is :const:`UNAVAILABLE`. homeSwitch (optional): Do we have a home switch? If so at which end? Default is :const:`UNAVAILABLE`. homeSwitchEdge (optional): Home switch edge. Default is :const:`UNDEFINED`. indexPulse (optional): Do we have index pulses? Default is False. direction (optional): Which direction to home to. Default is :const:`UNDEFINED`. hardStop (optional): Perform hard stop homing. End / home switch and index pulse do not have an effect then. Default is False. Returns: Homing method number. Examples: >>> determine_homing_method() # Home at current position without moving 35 >>> determine_homing_method(direction=1.0, hardStop=True) # Forward hard stop homing -3 >>> determine_homing_method(endSwitch=1.0) # Forward homing until end switch 18 """ param = HomingParam(endSwitch, homeSwitch, homeSwitchEdge, indexPulse, direction, hardStop) return HOMING_METHODS[param]
assert determine_homing_method(hardStop=True, direction=FORWARD) == -3 assert determine_homing_method(hardStop=True, direction=BACKWARD) == -4
[docs]def find_shortest_state_path(start: State, end: State) -> List[State]: """Find shortest path from `start` to `end` state. Start node is also included in returned path. Args: start: Start state. end: Target end state. Returns: Path from start to end. Empty list for impossible transitions. Examples: >>> find_shortest_state_path(State.SWITCH_ON_DISABLED, State.OPERATION_ENABLED) [<State.SWITCH_ON_DISABLED: 3>, <State.READY_TO_SWITCH_ON: 4>, <State.SWITCHED_ON: 5>, <State.OPERATION_ENABLED: 6>] >>> find_shortest_state_path(State.OPERATION_ENABLED, State.SWITCH_ON_DISABLED) [<State.OPERATION_ENABLED: 6>, <State.SWITCH_ON_DISABLED: 3>] >>> find_shortest_state_path(State.OPERATION_ENABLED, State.NOT_READY_TO_SWITCH_ON) [] # Not possible to get to NOT_READY_TO_SWITCH_ON! """ if start is end: return [] # Breadth-first search queue = collections.deque([[start]]) paths = [] while queue: path = queue.popleft() tail = path[-1] for suc in POSSIBLE_TRANSITIONS[tail]: if suc in path: continue # Cycle detected if suc is end: paths.append(path + [end]) else: queue.append(path + [suc]) return min(paths, key=len, default=[])
[docs]def target_reached(statusword: int) -> bool: """Check if target has been reached from statusword. Args: statusword: Statusword value. Returns: If target has been reached. """ return bool(statusword & SW.TARGET_REACHED)
[docs]def maybe_int(string: str) -> Union[int, str]: """Try to cast string to int. Args: string: Input string. Returns: Maybe an int. Pass on input string otherwise. Example: >>> maybe_int('123') 123 >>> maybe_int(' 0x7b') 123 """ string = string.strip() if string.isnumeric(): return int(string) if string.startswith('0x'): return int(string, base=16) if string.startswith('0b'): return int(string, base=2) return string
WHERE_TO_GO_NEXT: Dict[Edge, State] = {} """Lookup for the next intermediate state for a given state transition.""" for _src in State: for _dst in State: _shortest = find_shortest_state_path(_src, _dst) if _shortest: WHERE_TO_GO_NEXT[(_src, _dst)] = _shortest[1]
[docs]class CiA402Node(RemoteNode): """Remote CiA 402 node. Communicates with and controls remote drive. Default PDO configuration. State switching helpers. Controlword & statusword communication can happen via SDO or PDO (how argument, ``'sdo'`` and ``'pdo'``). Caution: Using SDO and PDO for state switching at the same time can lead to problems. E.g. only sending a command via SDO but not setting the same command in the PDO. The outdated PDO value will then interfere when send the next time. Hint: If the node is constantly in :attr:`State.NOT_READY_TO_SWITCH_ON`, this could indicate deactivated PDO communication. (Default statusword value in PDO is zero which maps to :attr:`State.NOT_READY_TO_SWITCH_ON`). Note: :mod:`canopen` also has a CiA 402 node implementation (:class:`canopen.profiles.p402.BaseNode402`). Implemented our own because we wanted more control over SDO / PDO communication and at the time of writing :attr:`OperationMode.CYCLIC_SYNCHRONOUS_POSITION` was not fully supported. """ def __init__(self, nodeId: int, objectDictionary: ObjectDictionary, network: Network): """ Args: nodeId: CAN node id to connect to. objectDictionary: Object dictionary for the remote drive. network: Connected network. Mandatory for configuring PDOs during initialization. """ super().__init__(nodeId, objectDictionary, load_od=False) self.logger = get_logger(str(self)) network.add_node(self, objectDictionary) # Configure PDOs self.pdo.read() # Load both node.tpdo and node.rpdo # Note: Default PDO mapping of some motors includes the Control- / # Statusword in multiple PDOs. This can lead to unexpected behavior with # our CANopen stack since for example: # # node.pdo['Controlword'] = Command.ENABLE_OPERATION # # will only set the value in the first PDO with the Controlword but not # for the others following. In these, the Controlword will stay zero and # subsequently shut down the motor. # # -> We clear all of them and have the Controlword only in the first RxPDO1. # EPOS4 has no PDO mapping for Error Register, # thus re-register later txpdo1 if available self.setup_txpdo(1, STATUSWORD) self.setup_txpdo(2, POSITION_ACTUAL_VALUE, VELOCITY_ACTUAL_VALUE) self.setup_txpdo(3, enabled=False) self.setup_txpdo(4, enabled=False) self.setup_rxpdo(1, CONTROLWORD) self.setup_rxpdo(2, TARGET_POSITION, TARGET_VELOCITY) self.setup_rxpdo(3, enabled=False) self.setup_rxpdo(4, enabled=False) network.register_rpdo(self.rpdo[1]) network.register_rpdo(self.rpdo[2])
[docs] def setup_txpdo(self, nr: int, *variables: CanOpenRegister, overwrite: bool = True, enabled: bool = True, trans_type: TransmissionType = TransmissionType.SYNCHRONOUS_CYCLIC, event_timer: Optional[int] = None, ): """Setup single transmission PDO of node (receiving PDO messages from remote node). Note: Sending / receiving direction always from the remote nodes perspective. Setting `event_timer` to 0 can lead to KeyErrors on some controllers. Args: nr: TxPDO number (1-4). *variables: CANopen variables to register to receive from remote node via TxPDO. enabled: Enable or disable TxPDO. overwrite: Overwrite TxPDO. trans_type: Event based or synchronized transmission event_timer: """ tx = self.tpdo[nr] if overwrite: tx.clear() for var in variables: tx.add_variable(var) tx.enabled = enabled tx.trans_type = trans_type if event_timer is not None: tx.event_timer = event_timer tx.save()
[docs] def setup_rxpdo(self, nr: int, *variables: CanOpenRegister, overwrite: bool = True, enabled: bool = True, trans_type: TransmissionType = TransmissionType.SYNCHRONOUS_CYCLIC, ): """Setup single receiving PDO of node (sending PDO messages to remote node). Note: Sending / receiving direction always from the remote nodes perspective. Args: nr: RxPDO number (1-4). *variables: CANopen variables to register to send to remote node via RxPDO. enabled: Enable or disable RxPDO. overwrite: Overwrite RxPDO. trans_type: Event based or synchronized transmission """ rx = self.rpdo[nr] if overwrite: rx.clear() for var in variables: rx.add_variable(var) rx.enabled = enabled rx.trans_type = trans_type rx.save()
[docs] def get_state(self, how: str = 'sdo') -> State: """Get current node state. Args: how (optional): Which communication channel to use. Either via ``'sdo'`` or ``'pdo'``. ``'sdo'`` by default. Returns: Current CiA 402 state. """ if how == 'pdo': return which_state(self.pdo[STATUSWORD].raw) # This takes approx. 0.027 ms elif how == 'sdo': return which_state(self.sdo[STATUSWORD].raw) # This takes approx. 2.713 ms else: raise ValueError(f'Unknown how {how!r}')
[docs] def set_state(self, target: State, how: str = 'sdo'): """Set node to a new target state. Target state has to be reachable from node's current state. RuntimeError otherwise. Args: target: Target state to switch to. how (optional): Communication channel. ``'sdo'`` (default) or ``'pdo'``. """ self.logger.debug('set_state(%s (how=%r))', target, how) if target in {State.NOT_READY_TO_SWITCH_ON, State.FAULT, State.FAULT_REACTION_ACTIVE}: raise ValueError(f'Can not change to state {target}') current = self.get_state(how) if current is target: return edge = (current, target) if edge not in TRANSITION_COMMANDS: raise RuntimeError(f'Invalid state transition from {current!r} to {target!r}!') cw = TRANSITION_COMMANDS[edge] if how == 'pdo': self.pdo[CONTROLWORD].raw = cw elif how == 'sdo': self.sdo[CONTROLWORD].raw = cw else: raise ValueError(f'Unknown how {how!r}')
[docs] def state_switching_job(self, target: State, how: str = 'sdo', timeout: float = 1.0, ) -> StateSwitching: """Create a state switching job generator. The generator will check the current state during each cycle and steer the state machine towards the desired target state (traversing necessary intermediate accordingly). Implemented as generator so that multiple nodes can be switched in parallel. Args: target: Target state to switch to. how (optional): Communication channel. ``'sdo'`` (default) or ``'pdo'``. timeout (optional): Optional timeout value in seconds. 1.0 second by default. Yields: Current states. """ self.logger.debug('state_switching_job(%s, how=%r, timeout=%s)', target, how, timeout) endTime = time.perf_counter() + timeout initial = current = self.get_state(how) #self.logger.debug('initial: %s', initial) lastPlanned = None while True: yield current if current is target: self.logger.debug('Reached target %s', target) return #self.logger.debug('Still in %s (not in %s)', current.name, target.name) if time.perf_counter() > endTime: raise TimeoutError(f'Could not transition from {initial.name} to {target.name} in {timeout:.3f} sec!') if current is not lastPlanned: lastPlanned = current intermediate = WHERE_TO_GO_NEXT[(current, target)] self.set_state(intermediate, how) current = self.get_state(how)
[docs] def change_state(self, target: State, how: str = 'sdo', timeout: float = 1.0, ) -> Union[State, StateSwitching]: """Change to a specific target state and traverse necessary intermediate states. Blocking. Args: target: Target state to switch to. how (optional): Communication channel. ``'sdo'`` (default) or ``'pdo'``. timeout (optional): Optional timeout value in seconds. 1.0 second by default. Returns: Final state. """ self.logger.debug('change_state(%s, how=%r, timeout=%s)', target, how, timeout) job = self.state_switching_job(target, how, timeout) state = None for state in job: time.sleep(0.050) return state
[docs] def get_operation_mode(self) -> OperationMode: """Get current operation mode.""" return OperationMode(self.sdo[MODES_OF_OPERATION_DISPLAY].raw)
[docs] def set_operation_mode(self, op: OperationMode): """Set operation mode. Args: op: New target mode of operation. """ self.logger.debug('Switching to %s', op) current = self.get_operation_mode() if current is op: self.logger.debug('Already %s', op) return state = self.get_state() if state not in VALID_OP_MODE_CHANGE_STATES: raise RuntimeError(f'Can not change to {op} when in {state}') sdm = self.sdo[SUPPORTED_DRIVE_MODES].raw if op not in supported_operation_modes(sdm): raise RuntimeError(f'This drive does not support {op!r}!') self.sdo[MODES_OF_OPERATION].raw = op
[docs] @contextlib.contextmanager def restore_states_and_operation_mode(self, how='sdo', timeout: float = 2.0): """Restore NMT state, CiA 402 state and operation mode. Implemented as context manager. Args: how (optional): Communication channel. ``'sdo'`` (default) or ``'pdo'``. timeout (optional): Timeout duration. Example: >>> with node.restore_states_and_operation_mode(): ... # Do something fancy with the states ... pass Warning: Deprecated. Led to more problems than it solved... """ oldOp = self.get_operation_mode() oldState = self.get_state(how) yield self self.set_operation_mode(oldOp) self.change_state(oldState, how=how, timeout=timeout)
[docs] def reset_fault(self): """Perform fault reset to SWITCH_ON_DISABLED.""" self.logger.info('Resetting fault') # TODO: Should we check if in State.FAULT and only then emitting a fault # reset command? self.sdo[CONTROLWORD].raw = 0 self.sdo[CONTROLWORD].raw = CW.FAULT_RESET
[docs] def switch_off(self, timeout: float = 1.0): """Switch off drive. Same state as on power-up. Args: timeout (optional): Timeout duration. """ message = ( 'This method is deprecated. This targets SWITCH_ON_DISABLED and' ' some motor controllers have issues getting there when in' ' READY_TO_SWITCH_ON.' ) warnings.warn(message, DeprecationWarning, stacklevel=2) self.change_state(State.SWITCH_ON_DISABLED, timeout=timeout)
[docs] def disable(self, timeout: float = 1.0): """Disable drive (no power). Args: timeout (optional): Timeout duration. """ self.change_state(State.READY_TO_SWITCH_ON, timeout=timeout)
[docs] def enable(self, timeout: float = 1.0): """Enable drive. Args: timeout (optional): Timeout duration. """ self.change_state(State.OPERATION_ENABLED, timeout=timeout)
[docs] def set_target_position(self, pos): """Set target position in device units.""" self.pdo[TARGET_POSITION].raw = pos
[docs] def get_actual_position(self): """Get actual position in device units.""" return self.pdo[POSITION_ACTUAL_VALUE].raw
[docs] def set_target_velocity(self, vel): """Set target velocity in device units.""" self.pdo[TARGET_VELOCITY].raw = vel
[docs] def get_actual_velocity(self): """Get actual velocity in device units.""" return self.pdo[VELOCITY_ACTUAL_VALUE].raw
[docs] def move_to(self, position: int, velocity: Optional[int] = None, acceleration: Optional[int] = None, immediately: bool = True, ): """Move to position. For :attr:`OperationMode.PROFILED_POSITION`. Args: position: Target position. velocity: Profile velocity (if any). acceleration: Profile acceleration / deceleration (if any). immediately: If True overwrite ongoing command. """ self.logger.debug('move_to(%s, velocity=%s, acceleration=%s)', position, velocity, acceleration) self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION self.sdo[TARGET_POSITION].raw = position if velocity is not None: self.sdo[PROFILE_VELOCITY].raw = velocity if acceleration is not None: self.sdo[PROFILE_ACCELERATION].raw = acceleration self.sdo[PROFILE_DECELERATION].raw = acceleration if immediately: self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION | CW.NEW_SET_POINT | CW.CHANGE_SET_IMMEDIATELY else: self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION | CW.NEW_SET_POINT
[docs] def move_with(self, velocity: int, acceleration: Optional[int] = None, immediately: bool = True, ): """Move with velocity. For :attr:`OperationMode.PROFILE_VELOCITY`. Args: velocity: Target velocity. acceleration: Profile acceleration / deceleration (if any). immediately: If True overwrite ongoing command. """ self.logger.debug('move_with(%s, acceleration=%s)', velocity, acceleration) self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION self.sdo[PROFILE_VELOCITY].raw = velocity if acceleration is not None: self.sdo[PROFILE_ACCELERATION].raw = acceleration self.sdo[PROFILE_DECELERATION].raw = acceleration if immediately: self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION | CW.NEW_SET_POINT | CW.CHANGE_SET_IMMEDIATELY else: self.sdo[CONTROLWORD].raw = Command.ENABLE_OPERATION | CW.NEW_SET_POINT
def _get_info(self) -> dict: """Get the current drive informations.""" return { 'nmt': self.nmt.state, 'state': self.get_state(), 'op': self.get_operation_mode(), }
[docs] def manufacturer_device_name(self): """Get manufacturer device name.""" return self.sdo[MANUFACTURER_DEVICE_NAME].raw
[docs] def apply_settings(self, settings: Dict[str, Any]): """Apply multiple settings to CANopen node. Path syntax for nested entries but it is also possible to use numbers, bin and hex notation for path entries. E.g. ``someName/0x00`` (see :func:`maybe_int`). Args: settings: Settings to apply. Addresses (path syntax) -> value entries. Example: >>> settings = { ... 'Software Position Limit/Minimum Position Limit': 0, ... 'Software Position Limit/Maximum Position Limit': 10000, ... } ... node.apply_settings(settings) """ for name, value in settings.items(): *path, last = map(maybe_int, name.split('/')) sdo = self.sdo for key in path: sdo = sdo[key] self.logger.debug('Applying %r = %s', name, value) sdo[last].raw = value
def __str__(self): return f'{type(self).__name__}(id: {self.id})'