Best Python code snippet using fMBT_python
remote_pyaal
Source:remote_pyaal  
...435            if (include_generation_discontinued_tag and436                not source_lsts_state in tags[generation_discontinued_tag]):437                tags[generation_discontinued_tag].append(source_lsts_state)438            continue439        aal.set_state_obj(source_state_obj)440        for action in get_actions():441            aal.set_state_obj(aal.state_obj_copy(source_state_obj))442            aal.model_execute(action)443            current_tags = aal.getprops()444            next_state_real = aal.state()445            if no_filtering_nor_abstracting:446                next_state_hidden = next_state_real447            else:448                if discard_variables or include_variables:449                    next_state_hidden = tagfilter(aal.state(discard_variables, include_variables), current_tags)450                else:451                    next_state_hidden = tagfilter(next_state_real, current_tags)452            next_state_obj = aal.state_obj()453            # new state?454            if not next_state_hidden in lsts_states:455                transitions.append([])...TControl.py
Source:TControl.py  
...41        self.current = current42        self.temperature = temperature43        self.error = error44        self.acceleration = acceleration45    def set_state_obj(self, other_motor_state):46        self.position = other_motor_state.position47        self.velocity = other_motor_state.velocity48        self.current = other_motor_state.current49        self.temperature = other_motor_state.temperature50        self.error = other_motor_state.error51        self.acceleration = other_motor_state.acceleration52# Data structure to store MIT_command that will be sent upon update53class MIT_command:54    """Data structure to store MIT_command that will be sent upon update"""55    def __init__(self, position, velocity, kp, kd, current):56        self.position = position57        self.velocity = velocity58        self.kp = kp59        self.kd = kd60        self.current = current61# motor state from the controller, uneditable named tuple62MIT_motor_state = namedtuple('motor_state', 'position velocity current temperature error')63# python-can listener object, with handler to be called upon reception of a message on the CAN bus64class motorListener(can.Listener):65    """Python-can listener object, with handler to be called upon reception of a message on the CAN bus"""66    def __init__(self, _canman, motor):67        self._canman = _canman68        self.bus = _canman.bus69        self.motor = motor70    def on_message_received(self, msg):71            data = bytes(msg.data)72            ID = data[0]73            if ID == self.motor.ID:74                self.motor._update_state_async(self._canman.parse_MIT_message(data, self.motor.type))75            76# A class to manage the low level CAN communication protocols77class CAN_Manager(object):78    """A class to manage the low level CAN communication protocols"""79    debug = False80    # Note, defining singletons in this way means that you cannot inherit81    # from this class, as apparently __init__ for the subclass will be called twice82    _instance = None83    def __new__(cls):84        if not cls._instance:85            cls._instance = super(CAN_Manager, cls).__new__(cls)86            print("Initializing CAN Manager")87            # verify the CAN bus is currently down88            os.system( 'sudo /sbin/ip link set can0 down' )89            # start the CAN bus back up90            os.system( 'sudo /sbin/ip link set can0 up type can bitrate 1000000' )91            # create a python-can bus object92            cls._instance.bus = can.interface.Bus(channel='can0', bustype='socketcan_native')93            # create a python-can notifier object, which motors can later subscribe to94            cls._instance.notifier = can.Notifier(bus=cls._instance.bus, listeners=[])95            print("Connected on: " + str(cls._instance.bus))96        return cls._instance97    def __init__(self):98        pass99        100    def __del__(self):101        # shut down the CAN bus when the object is deleted102        # I think this may not ever get called, so keep a pointer and explicitly delete if this is important.103        os.system( 'sudo /sbin/ip link set can0 down' ) 104    # subscribe a motor object to the CAN bus to be updated upon message reception105    def add_motor(self, motor):106        self.notifier.add_listener(motorListener(self, motor))107    # Locks value between min and max108    @staticmethod109    def limit_value(value, min, max):110        if value > max:111            return max112        elif value < min:113            return min114        else:115            return value116    # interpolates a floating point number to fill some amount of the max size of unsigned int, 117    # as specified with the num_bits118    @staticmethod119    def float_to_uint(x,x_min,x_max,num_bits):120        x = CAN_Manager.limit_value(x,x_min,x_max)121        span = x_max-x_min122        # (x - x_min)*(2^num_bits)/span123        return int((x- x_min)*( float((1<<num_bits)/span)) )124    # undoes the above method125    @staticmethod126    def uint_to_float(x,x_min,x_max,num_bits):127        span = x_max-x_min128        # (x*span/(2^num_bits -1)) + x_min129        return float(x*span/((1<<num_bits)-1) + x_min)130    # sends a message to the motor (when the motor is in MIT mode)131    def send_MIT_message(self, motor_id, data):132        DLC = len(data)133        assert (DLC <= 8), ('Data too long in message for motor ' + str(motor_id))134        135        if self.debug:136            print('ID: ' + str(hex(motor_id)) + '   Data: ' + '[{}]'.format(', '.join(hex(d) for d in data)) )137        138        message = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)139        try:140            self.bus.send(message)141            if self.debug:142                print("    Message sent on " + str(self.bus.channel_info) )143        except can.CanError:144            if self.debug:145                print("    Message NOT sent")146    # send the power on code147    def power_on(self, motor_id):148        self.send_MIT_message(motor_id, [ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,0XFC])149        150    # send the power off code151    def power_off(self, motor_id):152        self.send_MIT_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0XFD])153    # send the zeroing code. Like a scale, it takes about a second to zero the position154    def zero(self, motor_id):155        self.send_MIT_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE])156    # send an MIT control signal, consisting of desired position, velocity, and current, and gains for position and velocity control157    # basically an impedance controller158    def MIT_controller(self, motor_id, motor_type, position, velocity, Kp, Kd, I):159        position_uint16 = CAN_Manager.float_to_uint(position, MIT_Params[motor_type]['P_min'], 160                                                    MIT_Params[motor_type]['P_max'], 16)161        velocity_uint12 = CAN_Manager.float_to_uint(velocity, MIT_Params[motor_type]['V_min'], 162                                                    MIT_Params[motor_type]['V_max'], 12)163        Kp_uint12 = CAN_Manager.float_to_uint(Kp, MIT_Params[motor_type]['Kp_min'], 164                                                    MIT_Params[motor_type]['Kp_max'], 12)165        Kd_uint12 = CAN_Manager.float_to_uint(Kd, MIT_Params[motor_type]['Kd_min'], 166                                                    MIT_Params[motor_type]['Kd_max'], 12)167        I_uint12 = CAN_Manager.float_to_uint(I, MIT_Params[motor_type]['I_min'], 168                                                    MIT_Params[motor_type]['I_max'], 12)169        data = [170            position_uint16 >> 8,171            position_uint16 & 0x00FF,172            (velocity_uint12) >> 4,173            ((velocity_uint12&0x00F)<<4) | (Kp_uint12) >> 8,174            (Kp_uint12&0x0FF),175            (Kd_uint12) >> 4,176            ((Kd_uint12&0x00F)<<4) | (I_uint12) >> 8,177            (I_uint12&0x0FF)178        ]179        # print(data)180        self.send_MIT_message(motor_id, data)181        182    # convert data recieved from motor in byte format back into floating point numbers in real units183    def parse_MIT_message(self, data, motor_type):184        185        assert len(data) == 8 or len(data) == 6, 'Tried to parse a CAN message that was not Motor State in MIT Mode'186        temp = None187        error = None188        position_uint = data[1] <<8 | data[2]189        velocity_uint = ((data[3] << 8) | (data[4]>>4) <<4 ) >> 4190        current_uint = (data[4]&0x0F)<<8 | data[5]191        192        if len(data)  == 8:193            temp = int(data[6])194            error = int(data[7])195        position = CAN_Manager.uint_to_float(position_uint, MIT_Params[motor_type]['P_min'], 196                                            MIT_Params[motor_type]['P_max'], 16)197        velocity = CAN_Manager.uint_to_float(velocity_uint, MIT_Params[motor_type]['V_min'], 198                                            MIT_Params[motor_type]['V_max'], 12)199        current = CAN_Manager.uint_to_float(current_uint, MIT_Params[motor_type]['I_min'], 200                                            MIT_Params[motor_type]['I_max'], 12)201        if self.debug:202            print('  Position: ' + str(position))203            print('  Velocity: ' + str(velocity))204            print('  Current: ' + str(current))205            if (temp is not None) and (error is not None):206                print('  Temp: ' + str(temp))207                print('  Error: ' + str(error))208        return MIT_motor_state(position, velocity, current, temp, error)209# defualt variables to be logged210LOG_VARIABLES = [211    "output_angle", 212    "output_velocity", 213    "output_acceleration", 214    "current",215    "output_torque"216]217# possible states for the controller218class _TMotorManState(Enum):219    IDLE = 0220    IMPEDANCE = 1221    CURRENT = 2222    FULL_STATE = 3223# the user-facing class that manages the motor.224class TMotorManager():225    """The user-facing class that manages the motor."""226    def __init__(self, motor_type='AK80-9', motor_ID=1, CSV_file=None, log_vars = LOG_VARIABLES):227        self.type = motor_type228        self.ID = motor_ID229        self.csv_file_name = CSV_file230        print("Initializing device: " + self.device_info_string())231        self._motor_state = motor_state(0.0,0.0,0.0,0.0,0.0,0.0)232        self._motor_state_async = motor_state(0.0,0.0,0.0,0.0,0.0,0.0)233        self._command = MIT_command(0.0,0.0,0.0,0.0,0.0)234        self._control_state = _TMotorManState.IDLE235        self._times_past_limit = 0236        self._angle_threshold = MIT_Params[self.type]['P_max'] - 2.0 # radians, only really matters if the motor's going super fast237        self._old_pos = 0.0238        self._entered = False239        self._start_time = time.time()240        self._last_update_time = self._start_time241        self._last_command_time = None242        self._updated = False243        244        self.log_vars = log_vars245        self.LOG_FUNCTIONS = {246            "output_angle" : self.get_output_angle_radians, 247            "output_velocity" : self.get_output_velocity_radians_per_second, 248            "output_acceleration" : self.get_output_acceleration_radians_per_second_squared, 249            "current" : self.get_current_qaxis_amps,250            "output_torque": self.get_output_torque_newton_meters,251            "motor_angle" : self.get_motor_angle_radians, 252            "motor_velocity" : self.get_motor_velocity_radians_per_second, 253            "motor_acceleration" : self.get_motor_acceleration_radians_per_second_squared, 254            "motor_torque": self.get_motor_torque_newton_meters 255        }256        257        self._canman = CAN_Manager()258        self._canman.add_motor(self)259        260            261        262    def __enter__(self):263        print('Turning on control for device: ' + self.device_info_string())264        if self.csv_file_name is not None:265            with open(self.csv_file_name,'w') as fd:266                writer = csv.writer(fd)267                writer.writerow(["pi_time"]+self.log_vars)268            self.csv_file = open(self.csv_file_name,'a').__enter__()269            self.csv_writer = csv.writer(self.csv_file)270        self.power_on()271        self._send_command()272        self._entered = True273        return self274    def __exit__(self, etype, value, tb):275        print('Turning off control for device: ' + self.device_info_string())276        self.power_off()277        if self.csv_file_name is not None:278            self.csv_file.__exit__(etype, value, tb)279        if not (etype is None):280            traceback.print_exception(etype, value, tb)281    # this method is called by the handler every time a message is recieved on the bus282    # from this motor, to store the most recent state information for later283    def _update_state_async(self, MIT_state):284        """this method is called by the handler every time a message is recieved on the bus285        from this motor, to store the most recent state information for later"""286        now = time.time()287        dt = self._last_update_time - now288        self._last_update_time = now289        acceleration = (MIT_state.velocity - self._motor_state_async.velocity)/dt290        self._motor_state_async.set_state(MIT_state.position, MIT_state.velocity, MIT_state.current, MIT_state.temperature, MIT_state.error, acceleration)291        self._updated = True292    # this method is called by the user to synchronize the current state used by the controller293    # with the most recent message recieved294    def update(self):295        """this method is called by the user to synchronize the current state used by the controller296        with the most recent message recieved"""297        # check that the motor is safely turned on298        if not self._entered:299            raise RuntimeError("Tried to update motor state before safely powering on for device: " + self.device_info_string())300        # check that the motor data is recent301        # print(self._command_sent)302        now = time.time()303        if (now - self._last_command_time) < 0.25 and ( (now - self._last_update_time) > 0.1):304            # print("State update requested but no data recieved from motor. Delay longer after zeroing, decrease frequency, or check connection.")305            warnings.warn("State update requested but no data from motor. Delay longer after zeroing, decrease frequency, or check connection. " + self.device_info_string(), RuntimeWarning)306        else:307            self._command_sent = False308        # artificially extending the range of the position that we track309        P_max = MIT_Params[self.type]['P_max']310        old_pos = self._old_pos311        new_pos = self._motor_state_async.position312        thresh = self._angle_threshold313        if (thresh <= new_pos and new_pos <= P_max) and (-P_max <= old_pos and old_pos <= -thresh):314            self._times_past_limit -= 1315        elif (thresh <= old_pos and old_pos <= P_max) and (-P_max <= new_pos and new_pos <= -thresh) :316            self._times_past_limit += 1317            318        # update position319        self._old_pos = new_pos320        self._motor_state.set_state_obj(self._motor_state_async)321        self._motor_state.position += self._times_past_limit*2*P_max322        323        # send current motor command324        self._send_command()325        # writing to log file326        if self.csv_file_name is not None:327            self.csv_writer.writerow([self._last_update_time - self._start_time] + [self.LOG_FUNCTIONS[var]() for var in self.log_vars])328        self._updated = False329        330        331    # sends a command to the motor depending on whats controlm mode the motor is in332    def _send_command(self):333        """sends a command to the motor depending on whats controlm mode the motor is in"""334        if self._control_state == _TMotorManState.FULL_STATE:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
