Source code for being.can
"""All things CAN / CANopen.
Quick primer on CanOpen. CanOpen is a communication protocol that builds on top
of CAN. There are two main ways of communication: *SDO* and *PDO*. The
addresses for values are stored in the *Object Dictionary* with an *index* value
and optionally with a *sub-index*. The index is commonly noted in hex.
There are many different standardized communication parameters device profiles.
For motors of main interest are:
- CiA 301 - CANopen application layer and communication profile
- CiA 402 - CANopen device profile for drives and motion control
CiA 301 are some standard addresses. CiA 402 defines a device state machine,
different operation modes, homing and communication procedure for triggering
motion on the motors.
See Also:
- `CAN bus on Wikipedia <https://en.wikipedia.org/wiki/CAN_bus>`_
- `CANopen on Wikipedia <https://en.wikipedia.org/wiki/CANopen>`_
- `Python canopen doc <https://canopen.readthedocs.io/en/latest/index.html>`_
"""
import pkgutil
import contextlib
import io
import os
from typing import Dict, Optional, Iterator
from canopen import Network, ObjectDictionary
from canopen.sdo import SdoClient, SdoCommunicationError, SdoAbortedError
from canopen.objectdictionary.eds import import_eds
from being.can.definitions import FunctionCode
from being.can.cia_301 import DEVICE_TYPE
STORE_EDS: int = 0x1021
"""Some manufacturer use this for downloading the object dictionary directly
from the device. :hex:
"""
SUPPORTED_DEVICE_TYPES: Dict[bytes, str] = {
b'\x92\x01\x42\x00': 'eds_files/MCLM3002P-CO.eds',
b'\x92\x01\x02\x00': 'eds_files/maxon_EPOS4_50-5.eds',
}
"""Device type bytes to local EDS file mapping."""
[docs]@contextlib.contextmanager
def sdo_client(network: Network, nodeId: int, od: Optional[ObjectDictionary] = None) -> Iterator[SdoClient]:
"""Temporary SDO client connection. Can be used for SDO communication
connection without having an object dictionary / EDS at hand. Without object
dictionary only integer based addresses are supported.
Args:
network: Connected CANopen network.
nodeId: Node ID.
od (optional): Object dictionary.
Yields:
SdoClient instance.
Example:
>>> with sdo_client(network, nodeId=8) as client:
... deviceType = client.upload(0x1000, subindex=0)
"""
if od is None:
od = ObjectDictionary()
rxCob = FunctionCode.SDOrx + nodeId
txCob = FunctionCode.SDOtx + nodeId
client = SdoClient(rxCob,txCob, od)
client.network = network
network.subscribe(txCob, client.on_response)
try:
yield client
finally:
network.unsubscribe(txCob)
def _load_local_eds(deviceType: bytes) -> io.StringIO:
"""Given deviceType try to load local EDS file.
Args:
deviceType: Raw device type from node.
Returns:
EDS file content wrapped in StringIO buffer.
"""
fp = SUPPORTED_DEVICE_TYPES[deviceType]
data = pkgutil.get_data(__name__, fp)
return io.StringIO(data.decode())
[docs]def load_object_dictionary(network: Network, nodeId: int) -> ObjectDictionary:
"""Get object dictionary for node. Ping node, try to download EDS from it,
see if we have a fallback. RuntimeError otherwise.
Args:
network (Network / CanBackend): Connected CAN network.
nodeId: Node ID to load object dictionary for.
Returns:
Object dictionary for node.
"""
with sdo_client(network, nodeId) as client:
# Ping node
try:
deviceType = client.open(DEVICE_TYPE).read()
except SdoCommunicationError as err:
raise RuntimeError(f'CANopen node {nodeId} is not reachable!') from err
# Try to download remote object dictionary from node
try:
edsFp = client.open(STORE_EDS, mode='rt')
return import_eds(edsFp, nodeId)
except SdoAbortedError:
pass
# Try to load local object dictionary
if deviceType in SUPPORTED_DEVICE_TYPES:
filelike = _load_local_eds(deviceType)
return import_eds(filelike, nodeId)
raise RuntimeError(f'Unknown CANopen node {nodeId}. Could not load object dictionary!')