Skip to content

DaMiaoController

damiao_motor.core.controller.DaMiaoController

DaMiaoController(channel: str = 'can0', bustype: str = 'socketcan', bitrate: Optional[int] = None, **kwargs: Any)

Simple multi-motor controller.

  • Owns a single CAN bus.
  • Manages multiple DaMiaoMotor instances on that bus.
  • Automatically polls feedback in background when motors are present.
  • Provides helper methods to:
    • enable/disable all motors
    • send commands to one or all motors
    • poll feedback non-blockingly

Parameters:

Name Type Description Default
channel str

For socketcan: the network interface name (e.g. "can0"). For gs_usb: either a numeric index string or a USB serial number string.

 - ``"0"``, ``"1"``, … — device index; ``"0"`` is the first
   detected device, ``"1"`` the second, and so on.
 - ``"DEADBEEF"`` (or any non-numeric string) — resolved by
   scanning connected devices and matching the serial number.

 To list connected gs_usb devices with their indices and serial
 numbers, run::

     uv run python -c "
     from gs_usb.gs_usb import GsUsb
     for i, d in enumerate(GsUsb.scan()):
         print(i, d.serial_number, d.gs_usb.port_numbers)
     "
'can0'
bustype str

python-can interface name ("socketcan" or "gs_usb").

'socketcan'
bitrate Optional[int]

CAN bitrate in bits/s. Required for gs_usb; ignored for socketcan (bitrate is set at the OS level via ip link).

None
**kwargs Any

Extra arguments forwarded to can.interface.Bus.

{}
Source code in damiao_motor/core/controller.py
def __init__(
    self,
    channel: str = "can0",
    bustype: str = "socketcan",
    bitrate: Optional[int] = None,
    **kwargs: Any,
) -> None:
    """
    Args:
        channel: For ``socketcan``: the network interface name (e.g. ``"can0"``).
                 For ``gs_usb``: either a numeric index string or a USB serial
                 number string.

                 - ``"0"``, ``"1"``, … — device index; ``"0"`` is the first
                   detected device, ``"1"`` the second, and so on.
                 - ``"DEADBEEF"`` (or any non-numeric string) — resolved by
                   scanning connected devices and matching the serial number.

                 To list connected gs_usb devices with their indices and serial
                 numbers, run::

                     uv run python -c "
                     from gs_usb.gs_usb import GsUsb
                     for i, d in enumerate(GsUsb.scan()):
                         print(i, d.serial_number, d.gs_usb.port_numbers)
                     "

        bustype: python-can interface name (``"socketcan"`` or ``"gs_usb"``).
        bitrate: CAN bitrate in bits/s.  Required for ``gs_usb``; ignored for
                 ``socketcan`` (bitrate is set at the OS level via ``ip link``).
        **kwargs: Extra arguments forwarded to ``can.interface.Bus``.
    """
    if bustype == "gs_usb":
        channel = _resolve_gs_usb_channel(channel)
    bus_kwargs: Dict[str, Any] = {
        "channel": channel,
        "interface": bustype,
        **kwargs,
    }
    if bitrate is not None:
        bus_kwargs["bitrate"] = bitrate
    self.bus: can.Bus = can.interface.Bus(**bus_kwargs)
    # Keyed by command CAN ID (motor_id)
    self.motors: Dict[int, DaMiaoMotor] = {}
    # Keyed by logical motor ID (embedded in feedback frame)
    self._motors_by_feedback: Dict[int, DaMiaoMotor] = {}
    # Background polling thread
    self._polling_thread: Optional[threading.Thread] = None
    self._polling_active = False
    self._polling_lock = threading.Lock()

add_motor

add_motor(motor_id: int, feedback_id: int, motor_type: str, **kwargs: Any) -> DaMiaoMotor

Create and register a motor on this controller.

Parameters:

Name Type Description Default
motor_id int

Command CAN ID (ESC_ID) used to send commands to the motor.

required
feedback_id int

Feedback CAN ID (MST_ID) configured on the motor.

required
motor_type str

Motor type preset name (for example "4310", "4340").

required
**kwargs Any

Extra arguments forwarded to DaMiaoMotor constructor.

{}

Returns:

Type Description
DaMiaoMotor

The created DaMiaoMotor instance.

Raises:

Type Description
ValueError

If a motor with the same motor_id is already registered.

Source code in damiao_motor/core/controller.py
def add_motor(
    self, motor_id: int, feedback_id: int, motor_type: str, **kwargs: Any
) -> DaMiaoMotor:
    """
    Create and register a motor on this controller.

    Args:
        motor_id: Command CAN ID (ESC_ID) used to send commands to the motor.
        feedback_id: Feedback CAN ID (MST_ID) configured on the motor.
        motor_type: Motor type preset name (for example ``"4310"``, ``"4340"``).
        **kwargs: Extra arguments forwarded to ``DaMiaoMotor`` constructor.

    Returns:
        The created ``DaMiaoMotor`` instance.

    Raises:
        ValueError: If a motor with the same ``motor_id`` is already registered.
    """
    if motor_id in self.motors:
        raise ValueError(f"Motor with ID {motor_id} already exists")

    motor = DaMiaoMotor(
        motor_id=motor_id,
        feedback_id=feedback_id,
        bus=self.bus,
        motor_type=motor_type,
        **kwargs,
    )
    self.motors[motor_id] = motor
    # Bind by logical motor ID; feedback frames embed this ID in the first byte.
    self._motors_by_feedback[motor_id] = motor
    motor._controller = self

    # Start background polling if not already running
    self._start_polling()

    return motor

all_motors

all_motors() -> Iterable[DaMiaoMotor]

Iterate over all registered motors.

Returns:

Type Description
Iterable[DaMiaoMotor]

Iterable of DaMiaoMotor instances.

Source code in damiao_motor/core/controller.py
def all_motors(self) -> Iterable[DaMiaoMotor]:
    """
    Iterate over all registered motors.

    Returns:
        Iterable of ``DaMiaoMotor`` instances.
    """
    return self.motors.values()

disable_all

disable_all() -> None

Disable all registered motors.

Source code in damiao_motor/core/controller.py
def disable_all(self) -> None:
    """Disable all registered motors."""
    for m in self.all_motors():
        m.disable()

enable_all

enable_all() -> None

Enable all registered motors.

Source code in damiao_motor/core/controller.py
def enable_all(self) -> None:
    """Enable all registered motors."""
    for m in self.all_motors():
        m.enable()

flush_bus

flush_bus() -> int

Flush all pending messages from the CAN bus buffer.

Returns:

Type Description
int

Number of messages flushed.

Raises:

Type Description
CanOperationError

If CAN interface is down (Error Code 100) with helpful hint

OSError

If other network/system errors occur

Source code in damiao_motor/core/controller.py
def flush_bus(self) -> int:
    """
    Flush all pending messages from the CAN bus buffer.

    Returns:
        Number of messages flushed.

    Raises:
        can.CanOperationError: If CAN interface is down (Error Code 100) with helpful hint
        OSError: If other network/system errors occur
    """
    count = 0
    try:
        while True:
            msg = self.bus.recv(timeout=0)
            if msg is None:
                break
            count += 1
    except can.CanOperationError as e:
        error_str = str(e)
        errno = getattr(e, "errno", None)

        # Error Code 100: Network is down - CAN interface not up
        if (
            errno == 100
            or "Error Code 100" in error_str
            or "Network is down" in error_str
            or "[Errno 100]" in error_str
        ):
            channel = getattr(self.bus, "channel", "can0")
            raise can.CanOperationError(
                f"CAN interface '{channel}' is down (Error Code 100)"
            ) from e
        # Re-raise other CanOperationError
        raise
    except OSError as e:
        errno = getattr(e, "errno", None)
        if errno == 100 or "Network is down" in str(e) or "[Errno 100]" in str(e):
            channel = getattr(self.bus, "channel", "can0")
            raise OSError(
                f"CAN interface '{channel}' is down (Error Code 100)"
            ) from e
        raise
    return count

get_motor

get_motor(motor_id: int) -> DaMiaoMotor

Get a registered motor by command ID.

Parameters:

Name Type Description Default
motor_id int

Command CAN ID (ESC_ID) used when the motor was added.

required

Returns:

Type Description
DaMiaoMotor

The matching DaMiaoMotor instance.

Source code in damiao_motor/core/controller.py
def get_motor(self, motor_id: int) -> DaMiaoMotor:
    """
    Get a registered motor by command ID.

    Args:
        motor_id: Command CAN ID (ESC_ID) used when the motor was added.

    Returns:
        The matching ``DaMiaoMotor`` instance.
    """
    return self.motors[motor_id]

poll_feedback

poll_feedback() -> None

Drain pending CAN frames (non-blocking) and dispatch feedback to motors.

Behavior: - Uses self.bus.recv(timeout=0) in a loop to read all currently queued frames without blocking. - Stops the loop when recv returns None (queue is empty). - Ignores frames that are not 8 bytes. - Routes frames by logical motor ID encoded in the low 4 bits of D[0]. - For matched motors, forwards frames to DaMiaoMotor.process_feedback_frame(...).

Notes: - This method performs one drain pass. It can be called manually, and is also used repeatedly by the background polling thread. - If bus/controller state becomes invalid (for example bus closed), polling is marked inactive to let background loop exit cleanly.

Source code in damiao_motor/core/controller.py
def poll_feedback(self) -> None:
    """
    Drain pending CAN frames (non-blocking) and dispatch feedback to motors.

    Behavior:
    - Uses ``self.bus.recv(timeout=0)`` in a loop to read all currently queued
      frames without blocking.
    - Stops the loop when ``recv`` returns ``None`` (queue is empty).
    - Ignores frames that are not 8 bytes.
    - Routes frames by logical motor ID encoded in the low 4 bits of ``D[0]``.
    - For matched motors, forwards frames to
      ``DaMiaoMotor.process_feedback_frame(...)``.

    Notes:
    - This method performs one drain pass. It can be called manually, and is
      also used repeatedly by the background polling thread.
    - If bus/controller state becomes invalid (for example bus closed),
      polling is marked inactive to let background loop exit cleanly.
    """
    try:
        while True:
            msg = self.bus.recv(timeout=0)
            if msg is None:
                break

            # Skip echo frames (TX echos from gs_usb / candleLight adapters)
            if not getattr(msg, "is_rx", True):
                continue

            if len(msg.data) != 8:
                continue

            # Feedback messages include the logical motor ID in the low 4 bits
            # of the first data byte. Use that to route feedback to the right motor.
            logical_id = msg.data[0] & 0x0F
            motor = self._motors_by_feedback.get(logical_id)
            if motor is None:
                continue

            motor.process_feedback_frame(
                bytes(msg.data), arbitration_id=msg.arbitration_id
            )
    except (ValueError, OSError, AttributeError):
        # Bus is closed or invalid, stop polling
        with self._polling_lock:
            self._polling_active = False

send_cmd

send_cmd(motor_id: int, target_position: float = 0.0, target_velocity: float = 0.0, stiffness: float = 0.0, damping: float = 0.0, feedforward_torque: float = 0.0) -> None

Send MIT mode command to a specific motor (convenience method).

Source code in damiao_motor/core/controller.py
def send_cmd(
    self,
    motor_id: int,
    target_position: float = 0.0,
    target_velocity: float = 0.0,
    stiffness: float = 0.0,
    damping: float = 0.0,
    feedforward_torque: float = 0.0,
) -> None:
    """Send MIT mode command to a specific motor (convenience method)."""
    self.get_motor(motor_id).send_cmd_mit(
        target_position=target_position,
        target_velocity=target_velocity,
        stiffness=stiffness,
        damping=damping,
        feedforward_torque=feedforward_torque,
    )

send_cmd_all

send_cmd_all(target_position: float = 0.0, target_velocity: float = 0.0, stiffness: float = 0.0, damping: float = 0.0, feedforward_torque: float = 0.0) -> None

Send MIT mode command to all motors (convenience method).

Source code in damiao_motor/core/controller.py
def send_cmd_all(
    self,
    target_position: float = 0.0,
    target_velocity: float = 0.0,
    stiffness: float = 0.0,
    damping: float = 0.0,
    feedforward_torque: float = 0.0,
) -> None:
    """Send MIT mode command to all motors (convenience method)."""
    for m in self.all_motors():
        m.send_cmd_mit(
            target_position=target_position,
            target_velocity=target_velocity,
            stiffness=stiffness,
            damping=damping,
            feedforward_torque=feedforward_torque,
        )

shutdown

shutdown() -> None

Shutdown controller resources in a safe order.

Steps: 1. Stop background polling thread (via _stop_polling). 2. Disable all registered motors. 3. Shutdown the CAN bus handle.

This ensures polling is not racing against bus shutdown.

Source code in damiao_motor/core/controller.py
def shutdown(self) -> None:
    """
    Shutdown controller resources in a safe order.

    Steps:
    1. Stop background polling thread (via ``_stop_polling``).
    2. Disable all registered motors.
    3. Shutdown the CAN bus handle.

    This ensures polling is not racing against bus shutdown.
    """
    self._stop_polling()
    self.disable_all()
    self.bus.shutdown()