1import logging2import os3import socket4from calibration import RF_Calibration_Module5logger = logging.getLogger()6class ResponseType:7 NO_RESPONSE = "NO_RESPONSE"8 EXCEPTION = "EXCEPTION"9 UNSUPPORTED_COMMAND = "UNSUPPORTED_COMMAND"10class Comm:11 def __init__(self, unix_socket_path, *args, **kwargs):12 self.unix_socket_path = unix_socket_path13 self.connection = None14 self.welcome_socket = None15 self.rf_cal = RF_Calibration_Module()16 def serve(self):17 try:18 if os.path.exists(self.unix_socket_path):19 logger.warning(20 "Unix socket {} already exist".format(self.unix_socket_path)21 )22 os.unlink(self.unix_socket_path)23 if self.welcome_socket is not None:24 logger.warning("Welcome socket already istantiated")25 self.welcome_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)26 self.welcome_socket.bind(self.unix_socket_path)27 os.system("chown iocuser:ioc {}".format(self.unix_socket_path))28"Unix socket created at {}".format(self.unix_socket_path))29 self.welcome_socket.listen(1)30 while True:31"Unix welcome socket listening")32 connection, client_address = self.welcome_socket.accept()33"Client {} connected".format(client_address))34 connection.settimeout(30)35 self.handle_connection(connection)36 except Exception:37 logger.exception("Comm exception")38 finally:39 self.welcome_socket.close()40 os.remove(self.unix_socket_path)41"Comm server shutdown")42 self.welcome_socket = None43 def handle_connection(self, connection):44 try:45 while True:46 command = connection.recv(1024).decode("utf-8")47 response = ResponseType.NO_RESPONSE48 if command == "DATA?":49 response = else:51 response = ResponseType.UNSUPPORTED_COMMAND52 connection.sendall("{}\r\n".format(response).encode("utf-8"))53 logger.debug("Command {} Length {}".format(command, response))54 except Exception:55 logger.exception("Connection exception")56 finally:57 logger.warning("Connection closed")...

Full Screen

1"""2Errors that may be caused by Hedgehog commands.3Every error corresponds to one acknowledge code from ack.proto; the `OK` code naturally has no corresponding error.4"""5from typing import Dict, Type6from .proto.ack_pb2 import UNKNOWN_COMMAND, INVALID_COMMAND, UNSUPPORTED_COMMAND, FAILED_COMMAND7__all__ = [8 'HedgehogCommandError', 'UnknownCommandError', 'InvalidCommandError', 'UnsupportedCommandError',9 'FailedCommandError', 'EmergencyShutdown',10 'error',11]12class HedgehogCommandError(Exception):13 """Superclass of all errors caused by Hedgehog commands."""14 code = None # type: int15 """Class property containing the acknowledgement code"""16 def to_message(self):17 """18 Creates an error Acknowledgement message.19 The message's code and message are taken from this exception.20 :return: the message representing this exception21 """22 from .messages import ack23 return ack.Acknowledgement(self.code, self.args[0] if len(self.args) > 0 else '')24class UnknownCommandError(HedgehogCommandError):25 code = UNKNOWN_COMMAND26class InvalidCommandError(HedgehogCommandError):27 code = INVALID_COMMAND28class UnsupportedCommandError(HedgehogCommandError):29 code = UNSUPPORTED_COMMAND30class FailedCommandError(HedgehogCommandError):31 code = FAILED_COMMAND32class EmergencyShutdown(FailedCommandError):33 pass34_errors = {35 UNKNOWN_COMMAND: UnknownCommandError,36 INVALID_COMMAND: InvalidCommandError,37 UNSUPPORTED_COMMAND: UnsupportedCommandError,38 FAILED_COMMAND: FailedCommandError,39} # type: Dict[int, Type[HedgehogCommandError]]40def error(code: int, *args, **kwargs) -> HedgehogCommandError:41 """42 Creates an error from the given code, and args and kwargs.43 :param code: The acknowledgement code44 :param args: Exception args45 :param kwargs: Exception kwargs46 :return: the error for the given acknowledgement code47 """48 # TODO add proper error code49 if code == FAILED_COMMAND and len(args) >= 1 and args[0] == "Emergency Shutdown activated":50 return EmergencyShutdown(*args, **kwargs)...

