"""Homing procedures and definitions.
Todo:
Documentation. Waiting for feature branch merge.
"""
import abc
import enum
import random
import time
from typing import Generator, Callable, Optional
from canopen.variable import Variable
from being.bitmagic import check_bit_mask
from being.can.cia_402 import (
CONTROLWORD,
CW,
Command,
MODES_OF_OPERATION,
NEGATIVE,
OperationMode,
POSITIVE,
STATUSWORD,
SW,
State as CiA402State,
UNDEFINED,
determine_homing_method
)
from being.constants import INF
from being.logging import get_logger
from being.serialization import register_enum
from being.utils import toss_coin
__all__ = [ 'HomingState', 'CiA402Homing', 'CrudeHoming', ]
LOGGER = get_logger(name=__name__, parent=None)
[docs]class HomingState(enum.Enum):
"""Possible homing states."""
FAILED = 0
UNHOMED = 1
ONGOING = 2
HOMED = 3
register_enum(HomingState)
def default_homing_method(
homingMethod: Optional[int] = None,
homingDirection: int = UNDEFINED,
endSwitches: bool = False,
indexPulse: bool = False,
) -> int:
"""Determine homing method from default homing kwargs."""
if homingMethod is not None:
return homingMethod
if homingDirection == UNDEFINED:
return 35
if endSwitches:
if homingDirection > 0:
return determine_homing_method(endSwitch=POSITIVE, indexPulse=indexPulse)
else:
return determine_homing_method(endSwitch=NEGATIVE, indexPulse=indexPulse)
else:
if homingDirection > 0:
return determine_homing_method(direction=POSITIVE, hardStop=True, indexPulse=indexPulse)
else:
return determine_homing_method(direction=NEGATIVE, hardStop=True, indexPulse=indexPulse)
def start_homing(controlword: Variable) -> Generator:
"""Start homing procedure for node.
Args:
controlword: canopen control word variable.
"""
# Controlword bit 4 has to go from 0 -> 1
controlword.raw = Command.ENABLE_OPERATION
yield
controlword.raw = Command.ENABLE_OPERATION | CW.START_HOMING_OPERATION
yield
def stop_homing(controlword: Variable) -> Generator:
"""Stop homing procedure for node.
Args:
controlword: canopen control word variable.
"""
# Controlword bit has to go from 1 -> 0
controlword.raw = Command.ENABLE_OPERATION | CW.START_HOMING_OPERATION
yield
controlword.raw = Command.ENABLE_OPERATION
yield
def homing_started(statusword: Variable) -> bool:
"""Check if homing procedure has started.
Args:
statusword: canopen status word variable.
"""
sw = statusword.raw
started = not check_bit_mask(sw, SW.HOMING_ATTAINED) and not check_bit_mask(sw, SW.TARGET_REACHED)
return started
def homing_ended(statusword: Variable) -> bool:
"""Check if homing procedure has ended.
Args:
statusword: canopen status word variable.
"""
sw = statusword.raw
ended = check_bit_mask(sw, SW.HOMING_ATTAINED) and check_bit_mask(sw, SW.TARGET_REACHED)
return ended
def homing_reference_run(statusword: Variable) -> Generator:
"""Travel down homing road.
Args:
controlword: canopen controlword variable.
"""
while not homing_started(statusword):
yield
while not homing_ended(statusword):
yield
class HomingBase(abc.ABC):
"""Abstract homing base class."""
def __init__(self):
self.state = HomingState.UNHOMED
self.job = None
self.logger = get_logger('Homing')
@property
def ongoing(self) -> bool:
"""True if homing in progress."""
return self.state is HomingState.ONGOING
@property
def homed(self) -> bool:
"""True if homing in progress."""
return self.state is HomingState.HOMED
def cancel_job(self):
"""Cancel current homing job."""
#self.job.close()
self.job = None
def stop(self):
"""Stop ongoing homing. Motor will be unhomed."""
self.cancel_job()
self.state = HomingState.UNHOMED
@abc.abstractmethod
def homing_job(self) -> Generator:
"""Primary homing job."""
raise NotImplementedError
def home(self):
"""Start homing."""
self.logger.debug('home()')
self.logger.debug('Starting homing')
self.state = HomingState.FAILED
if self.job:
self.cancel_job()
self.job = self.homing_job()
self.state = HomingState.ONGOING
def update(self):
"""Tick homing one step further."""
if self.job:
try:
next(self.job)
except StopIteration:
self.cancel_job()
except TimeoutError as err:
self.logger.exception(err)
self.cancel_job()
def __str__(self):
return f'{type(self).__name__}({self.state})'
class DummyHoming(HomingBase):
"""Dummy homing for testing with virtual motors."""
def __init__(self,
minDuration: float = 1.,
maxDuration: float = 2.,
successProbability: float = 0.9,
time_func: Callable = time.perf_counter,
):
"""Args:
minDuration: Minimum duration of dummy homing.
maxDuration: Maximum duration of dummy homing.
successProbability: Success probability of dummy homing.
time_func: Timing function.
"""
super().__init__()
self.minDuration = minDuration
self.maxDuration = maxDuration
self.successProbability = successProbability
self.time_func = time_func
def homing_job(self) -> Generator:
duration = random.uniform(self.minDuration, self.maxDuration)
endTime = self.time_func() + duration
self.state = HomingState.ONGOING
while self.time_func() < endTime:
yield
if toss_coin(self.successProbability):
self.state = HomingState.HOMED
else:
self.state = HomingState.FAILED
def __str__(self):
return f'{type(self).__name__}({self.state})'
[docs]class CiA402Homing(HomingBase):
"""CiA 402 by the book."""
def __init__(self, node, timeout=10.0, **kwargs):
super().__init__()
self.node = node
self.timeout = timeout
self.homingMethod = default_homing_method(**kwargs)
self.logger = get_logger(f'CiA402Homing(nodeId: {node.id})')
self.statusword = node.pdo[STATUSWORD]
self.controlword = node.pdo[CONTROLWORD]
self.endTime = -1
[docs] def start_timeout_clock(self):
"""Remember when homing needs to end for timeout."""
startTime = time.perf_counter()
self.endTime = startTime + self.timeout
[docs] def timeout_expired(self) -> bool:
"""Check if timeout expired."""
expired = time.perf_counter() > self.endTime
if expired:
self.logger.error('Homing timeout expired (>%.1f sec)', self.timeout)
return expired
[docs] def change_state(self, target) -> Generator:
"""Change to node's state job."""
return self.node.state_switching_job(target, how='pdo')
[docs] def set_operation_mode(self, op: OperationMode):
"""Set operation mode of node. No questions asked..."""
self.logger.debug('set_operation_mode(op=%s)', op)
self.node.sdo[MODES_OF_OPERATION].raw = op
[docs] def homing_job(self):
"""Standard CiA 402 homing procedure."""
self.logger.debug('homing_job()')
self.start_timeout_clock()
yield from self.change_state(CiA402State.SWITCHED_ON)
self.set_operation_mode(OperationMode.HOMING)
self.logger.info('Starting homing reference run')
yield from start_homing(self.controlword)
final = HomingState.UNHOMED
for _ in homing_reference_run(self.statusword):
if self.timeout_expired():
final = HomingState.FAILED
break
yield
else:
self.logger.error('Homing run finished')
final = HomingState.HOMED
yield from self.change_state(CiA402State.READY_TO_SWITCH_ON)
self.state = final
def __str__(self):
return f'{type(self).__name__}({self.node}, {self.state})'
[docs]class CrudeHoming(CiA402Homing):
"""Crude hard stop homing for Faulhaber linear motors.
Args:
speed: Speed for homing in device units.
"""
def __init__(self, node, minWidth, currentLimit, timeout=10.0, **kwargs):
super().__init__(node, timeout=timeout, **kwargs)
self.minWidth = minWidth
self.currentLimit = currentLimit
self.lower = INF
self.upper = -INF
self.logger.info('Overwriting TxPDO4 of %s for Current Actual Value', node)
node.setup_txpdo(4, 'Current Actual Value')
@property
def width(self) -> float:
"""Current homing width in device units."""
return self.upper - self.lower
[docs] def reset_range(self):
"""Reset homing range."""
self.lower = INF
self.upper = -INF
[docs] def expand_range(self, pos: float):
"""Expand homing range."""
self.lower = min(self.lower, pos)
self.upper = max(self.upper, pos)
[docs] def halt_drive(self) -> Generator:
"""Stop drive."""
self.logger.debug('halt_drive()')
self.controlword.raw = Command.ENABLE_OPERATION | CW.HALT
yield
[docs] def move_drive(self, velocity: int) -> Generator:
"""Move motor with constant velocity."""
self.logger.debug('move_drive(%d)', velocity)
self.controlword.raw = Command.ENABLE_OPERATION
yield
self.node.set_target_velocity(velocity)
yield
self.controlword.raw = Command.ENABLE_OPERATION | CW.NEW_SET_POINT
yield
[docs] def on_the_wall(self) -> bool:
"""Check if motor is on the wall."""
current = self.node.pdo['Current Actual Value'].raw
return current > self.currentLimit # Todo: Add percentage threshold?
[docs] def homing_job(self, speed: int = 100):
self.logger.debug('homing_job()')
self.start_timeout_clock()
self.lower = INF
self.upper = -INF
node = self.node
sdo = self.node.sdo
if self.homingMethod in {-1, -3}:
# Forward direction
velocities = [speed, -speed]
else:
# Backward direction
velocities = [-speed, speed]
yield from self.change_state(CiA402State.READY_TO_SWITCH_ON)
sdo['Home Offset'].raw = 0
self.set_operation_mode(OperationMode.PROFILE_VELOCITY)
for vel in velocities:
yield from self.halt_drive()
yield from self.move_drive(vel)
self.logger.debug('Driving towards the wall')
while not self.on_the_wall() and not self.timeout_expired():
self.expand_range(node.get_actual_position())
yield
self.logger.debug('Hit the wall')
yield from self.halt_drive()
# Turn off voltage to reset current current value
yield from self.change_state(CiA402State.READY_TO_SWITCH_ON)
width = self.upper - self.lower
if width < self.minWidth:
self.logger.info('Homing failed. Width too narrow %f', width)
final = HomingState.FAILED
else:
self.logger.info('Homing successful')
final = HomingState.HOMED
# Center in the middle
margin = .5 * (width - self.minWidth)
self.lower += margin
self.upper -= margin
sdo['Home Offset'].raw = self.lower
self.state = final