Source code for yao.mech_controller

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: Aidan Gray (Aidan.Gray@idg.jhu.edu), José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2022-05-26
# @Filename: mech_controller.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import enum
import re
import warnings

from typing import TYPE_CHECKING, Any

from sdsstools.logger import SDSSLogger, get_logger

from yao import config

from .exceptions import SpecMechError, YaoUserWarning


if TYPE_CHECKING:
    from .actor import YaoCommand


__all__ = ["MechController", "ReplyCode", "SpecMechReply", "STATS", "check_reply"]


#: Stat names accepted by the report command.
STATS: dict[str, str] = {
    "time": "rt",
    "version": "rV",
    "environment": "re",
    "vacuum": "rv",
    "motors": "rd",
    "motor-a": "ra",
    "motor-b": "rb",
    "motor-c": "rc",
    "orientation": "ro",
    "pneumatics": "rp",
    # "nitrogen": "rn",
    "specmech": "rs",
}


[docs] def check_reply(reply: SpecMechReply): """Checks a specMech reply.""" if reply.code == ReplyCode.ERR_IN_REPLY: for reply_data in reply.data: if "ERR" in reply_data[0]: code, msg = reply_data[1:] raise SpecMechError(f"Error {code} found in specMech reply: {msg!r}.") if reply.code == ReplyCode.CONTROLLER_REBOOTED: raise SpecMechError( "The specMech controller has rebooted. " "Acknowledge the reboot before continuing." ) if reply.code == ReplyCode.CONNECTION_FAILED: raise SpecMechError("The connection to the specMech failed. Try reconnecting.") if reply.code != ReplyCode.VALID and reply.code != ReplyCode.REBOOT_ACKNOWLEDGED: raise SpecMechError(f"Failed parsing specMech reply: {reply.code.name!r}.")
class ReplyCode(enum.Enum): """Reply error codes.""" VALID = enum.auto() UNPARSABLE_RESPONSE = enum.auto() MISMATCHED_COMMAND_ID = enum.auto() INVALID_COMMAND_CHECKSUM = enum.auto() INVALID_REPLY_CHECKSUM = enum.auto() ERR_IN_REPLY = enum.auto() CONTROLLER_REBOOTED = enum.auto() REBOOT_ACKNOWLEDGED = enum.auto() CONNECTION_FAILED = enum.auto()
[docs] class SpecMechReply: """A valid response to a command to the specMech.""" def __init__(self, raw: bytes): self.command_id = 0 self.raw = raw self.code = ReplyCode.VALID self.reply_sentence: str = "" self.data: list[list[str]] = [] self.parse() def __str__(self): return self.raw def __repr__(self): return f"<MechControlReply (raw={self.raw!r})>"
[docs] @staticmethod def calculate_checksum(message: bytes): """Computes the checksum field for the NMEA protocol. The checksum is simple, just an XOR of all the bytes between the ``$`` and the ``*`` (not including the delimiters themselves), and written in hexadecimal. """ match = re.match(rb"^\$?(?:(.*?)(?:\*(?:[0-9A-Za-z]+)?)?)$", message) if match is None: raise ValueError(f"Unparsable message {message!r}") message = match.group(1) checksum = 0 for b in message: checksum ^= b return f"{checksum:02X}"
[docs] @staticmethod def check_checksum(message: bytes): """Checks the checksum from a reply. Parameters ---------- message The message to check. Must include the checksum ``*XX``. """ match = re.match(rb"^(.+?)\*([0-9A-Fa-f]+)$", message, re.DOTALL) if match is None: raise ValueError(f"Cannot parse checksum in message {message!r}") data, checksum = match.groups() return checksum.decode() == SpecMechReply.calculate_checksum(data)
[docs] def parse(self): """Parses the raw reply.""" # Check if the the response indicates a reboot. ! only ever happens if # a reboot has occurred. if b"!" in self.raw: self.code = ReplyCode.CONTROLLER_REBOOTED return # Get only the part without telnet subnegotiations and without the # terminator. match1 = re.match(b"(?:\xff.+\xf0)?(.*?)\x00?\n?>", self.raw, re.DOTALL) if match1 is None or len(match1.groups()) != 1: self.code = ReplyCode.UNPARSABLE_RESPONSE return data = match1.group(1) # Check if the the response indicates a reboot. if data == b"": self.code = ReplyCode.REBOOT_ACKNOWLEDGED return match2 = re.match(rb"(\$S2CMD.+?)\r(?:\x00?\n(.+)\r)?", data, re.DOTALL) if match2 is None or len(match2.groups()) != 2: self.code = ReplyCode.UNPARSABLE_RESPONSE return cmd, replies = match2.groups() if not self.check_checksum(cmd): self.code = ReplyCode.INVALID_COMMAND_CHECKSUM return match_command_id = re.match(rb".+?;([0-9])+\*", cmd) if match_command_id is None: warnings.warn( f"Failed matching command ID in command echo {cmd!r}", YaoUserWarning, ) else: self.command_id = int(match_command_id.group(1)) # The reply is only an echo of the command, without additional data. if replies is None: return reply_list = re.split(b"\r\x00\n", replies) for reply in reply_list: if not self.check_checksum(reply): self.code = ReplyCode.INVALID_REPLY_CHECKSUM return match3 = re.match(rb"^\$S2([A-Za-z0-9]+),(.+?)\*[0-9A-Fa-f]+$", reply) if match3 is None or len(match2.groups()) != 2: self.code = ReplyCode.UNPARSABLE_RESPONSE return sentence, data_pack = match3.groups() if sentence == b"ERR": self.code = ReplyCode.ERR_IN_REPLY # The data includes the sentence and the timestamp. self.data.append([sentence.decode()] + data_pack.decode().split(","))
[docs] class MechController: """Controller for the spectrograph mechanics. The `.MechController` handles the connection to the spectrograph mechanics microcontroller. A description of the communication protocol is available `here <https://bit.ly/38Xn2VE>`__. Parameters ---------- address The IP address or host of the mech server. port The port of the host of the mech server. log A logger to which to write. log_path If ``log=None`` and ``log_path`` is provided, the log is saved to this location using a rotating file handler. """ def __init__( self, address: str, port: int = 23, log: SDSSLogger | None = None, log_path: str | None = None, ): self.reader: asyncio.StreamReader | None = None self.writer: asyncio.StreamWriter | None = None self.reboot: bool = False self.command_number: int = 0 self.log = log or get_logger("yao.boss-spech-mech-client") if log is None and log_path: self.log.start_file_logger(log_path) self.spechMechAddress = address self.specMechPort = port self.lock = asyncio.Lock()
[docs] async def start(self): """Opens a connection with the given IP and port.""" self.log.info( f"Opening connection to specMech on {self.spechMechAddress} " f"at port {self.specMechPort}" ) loop = asyncio.get_running_loop() loop.set_exception_handler(self.log.asyncio_exception_handler) connect = asyncio.open_connection(self.spechMechAddress, self.specMechPort) self.reader, self.writer = await asyncio.wait_for(connect, timeout=3)
[docs] def is_connected(self): """Checks if we are connected to the specMech.""" if not self.writer: return False return not self.writer.is_closing()
[docs] async def send_data(self, command: str, timeout: float | None = None): """Sends the given string to the specMech and then awaits a response. Currently when a command is sent the controller is locked and any new command is blocked until a reply for the currenly running command arrives or a timeout happens. Parameters ---------- command A string that is sent to specMech. timeout How long to wait for replies. Returns ------- replies A tuple in which the first element is bytes array with the raw reply, and successive items are tuples with the data associated with each reply. If a timeout occurred before """ # Only send '!\r' to acknowledge a reboot if command == "!": commandFinal = command + "\r" else: # Increment command number self.command_number += 1 # Add command identifier to the command commandFinal = command + ";" + str(self.command_number) + "\r" # Send the command self.log.debug(f"Sent to specMech: {commandFinal!r}") try: if self.writer is None: raise ConnectionResetError("SpecMech client not connected.") async with self.lock: self.writer.write(commandFinal.encode()) reply = await asyncio.wait_for(self.read_data(), timeout) except ConnectionResetError: reply = SpecMechReply(b"") reply.code = ReplyCode.CONNECTION_FAILED self.writer = None self.reader = None return reply
[docs] async def read_data(self): """Awaits responses from specMech until the EOM character '>' is seen. Returns ------- mech_reply A `.SpecMechReply` object with the reply received. """ if self.reader is None: raise ConnectionResetError("SpecMech client not connected.") dataRaw = await self.reader.read(1024) # Continue accepting responses until '>' is received while b">" not in dataRaw and b"!" not in dataRaw: dataRawTmp = await self.reader.read(1024) dataRaw = dataRaw + dataRawTmp self.log.debug(f"Received from specMech: {dataRaw!r}") reply = SpecMechReply(dataRaw) if reply.code == ReplyCode.CONTROLLER_REBOOTED: self.reboot = True else: self.reboot = False return reply
[docs] async def close(self): """Closes the connection with specMech.""" self.log.info("Closing the connection to the specMech.") if self.writer is not None: self.writer.close() await self.writer.wait_closed() self.writer = None self.reader = None
[docs] async def get_stat(self, stat: str) -> tuple[Any, ...]: """Returns the output of the report commands. Parameters ---------- stat The report stat to recover. One of `.STATS`. Returns ------- reply The parsed command reply values. """ stat = stat.lower() if stat in STATS: mech_command = STATS[stat] else: raise SpecMechError(f"Invalid specMech stat {stat!r}.") reply = await self.send_data(mech_command) check_reply(reply) values = reply.data[0] if values[0] == "MTR": if stat.startswith("motor-"): # Only one reply. mtr = values[2] mtrPosition = int(values[3]) mtrSpeed = int(values[5]) mtrCurrent = int(values[7]) mtrDirection = values[9] mtrLimit = True if values[11] == "Y" else False return (mtr, mtrPosition, mtrSpeed, mtrCurrent, mtrDirection, mtrLimit) elif stat == "motors": # This is the only case in which we have multiple replies. # Return only the positions as floats. positions = [] for d in reply.data: positions.append(int(d[3])) return tuple(positions) else: raise SpecMechError("Invalid stat for MTR reply.") elif values[0] == "ENV": env0T = float(values[2]) env0H = float(values[4]) env1T = float(values[6]) env1H = float(values[8]) env2T = float(values[10]) env2H = float(values[12]) specMechT = float(values[14]) return (env0T, env0H, env1T, env1H, env2T, env2H, specMechT) elif values[0] == "ORI": accx = float(values[2]) accy = float(values[3]) accz = float(values[4]) return (accx, accy, accz) elif values[0] == "PNU": # change the c/o/t and 0/1 responses of specMech # to something more readable if values[2] == "c": pnus = "closed" elif values[2] == "o": pnus = "open" else: pnus = "transiting" if values[4] == "c": pnul = "closed" elif values[4] == "o": pnul = "open" else: pnul = "transiting" if values[6] == "c": pnur = "closed" elif values[6] == "o": pnur = "open" else: pnur = "transiting" if values[8] == "0": pnup = "off" else: pnup = "on" return (pnus, pnul, pnur, pnup) elif values[0] == "TIM": tim = values[1] stim = values[2] btm = values[4] return (btm, tim, stim) elif values[0] == "VER": ver = values[2] return (ver,) elif values[0] == "VAC": red = float(values[2]) blue = float(values[4]) return (red, blue) elif values[0] == "LN2": valves = [] for valve_status in values[2]: if valve_status.upper() == "C": valves.append("closed") elif valve_status.upper() == "O": valves.append("open") elif valve_status.upper() == "T": valves.append("timeout") elif valve_status.upper() == "X": valves.append("disabled") else: valves.append("?") ( buffer_dewar_supply_status, buffer_dewar_vent_status, red_dewar_vent_status, blue_dewar_vent_status, ) = valves time_next_fill = int(values[3]) max_valve_open_time = int(values[5]) fill_interval = int(values[7]) ln2_pressure = int(values[9]) if values[11].upper() == "C": buffer_dewar_thermistor_status = "cold" elif values[11].upper() == "H": buffer_dewar_thermistor_status = "warm" else: buffer_dewar_thermistor_status = "?" if values[13].upper() == "C": red_dewar_thermistor_status = "cold" elif values[13].upper() == "H": red_dewar_thermistor_status = "warm" else: red_dewar_thermistor_status = "?" if values[15].upper() == "C": blue_dewar_thermistor_status = "cold" elif values[15].upper() == "H": blue_dewar_thermistor_status = "warm" else: blue_dewar_thermistor_status = "?" return ( buffer_dewar_supply_status, buffer_dewar_vent_status, red_dewar_vent_status, blue_dewar_vent_status, time_next_fill, max_valve_open_time, fill_interval, ln2_pressure, buffer_dewar_thermistor_status, red_dewar_thermistor_status, blue_dewar_thermistor_status, ) elif "specmech": fan = "on" if int(values[2]) else "off" volts = float(values[4]) return (fan, volts) else: raise SpecMechError(f"Invalid reply sentence {values[0]}.")
[docs] async def pneumatic_move( self, mechanism: str, open: bool = True, command: YaoCommand | None = None, ): """Opens/closes a pneumatic mechanism. Parameters ---------- mechanism Either ``shutter``, ``left``, or ``right``. open If `True`, opens the mechanism, otherwise closes it. command An actor command for outputs. """ if mechanism == "left": spec_command = "ol" if open else "cl" elif mechanism == "right": spec_command = "or" if open else "cr" elif mechanism == "shutter": spec_command = "os" if open else "cs" else: raise SpecMechError(f"Invalid mechanism {mechanism!r}.") reply = await self.send_data(spec_command) check_reply(reply) # Check that all the mechanisms have arrived to their desired position. # Try twice, then fail. for ii in [1, 2]: timeout = config["timeouts"]["pneumatics"] if ii == 2: # Longer timeout if the first one failed. timeout *= 3 await asyncio.sleep(timeout) reached = True status = await self.send_data("rp") try: check_reply(status) except SpecMechError as err: raise SpecMechError( f"Failed checking the status of the pneumatics after a move: {err}" ) if mechanism == "shutter": mech_position = status.data[0][2] elif mechanism == "left": mech_position = status.data[0][4] elif mechanism == "right": mech_position = status.data[0][6] else: continue destination = "open" if open else "closed" if mech_position != destination[0]: reached = False if reached is True: if command: mech_key = mechanism if mechanism in ["left", "right"]: mech_key = "hartmann_" + mech_key command.info(message={mech_key: destination}) return True if ii == 1: if command: command.warning( "Pneumatics did not reach the desired position. " "Waiting a bit longer ..." ) else: if command: await command.send_command("yao", "mech status pneumatics") raise SpecMechError("Pneumatics did not reach the desired position.") # We should never get here. raise SpecMechError
[docs] async def pneumatic_status(self, mechanism: str) -> str: """Returns the open/closed status of a mechanism.""" mechanism = mechanism.lower() if mechanism not in ["shutter", "left", "right"]: raise SpecMechError(f"Invalid mechanism {mechanism!r}.") status = await self.get_stat("pneumatics") if mechanism == "shutter": return status[0] elif mechanism == "left": return status[1] else: return status[2]