How to use set_state_obj method in fMBT

Best Python code snippet using fMBT_python

remote_pyaal

Source:remote_pyaal Github

copy

Full Screen

...435 if (include_generation_discontinued_tag and436 not source_lsts_state in tags[generation_discontinued_tag]):437 tags[generation_discontinued_tag].append(source_lsts_state)438 continue439 aal.set_state_obj(source_state_obj)440 for action in get_actions():441 aal.set_state_obj(aal.state_obj_copy(source_state_obj))442 aal.model_execute(action)443 current_tags = aal.getprops()444 next_state_real = aal.state()445 if no_filtering_nor_abstracting:446 next_state_hidden = next_state_real447 else:448 if discard_variables or include_variables:449 next_state_hidden = tagfilter(aal.state(discard_variables, include_variables), current_tags)450 else:451 next_state_hidden = tagfilter(next_state_real, current_tags)452 next_state_obj = aal.state_obj()453 # new state?454 if not next_state_hidden in lsts_states:455 transitions.append([])...

Full Screen

Full Screen

TControl.py

Source:TControl.py Github

copy

Full Screen

...41 self.current = current42 self.temperature = temperature43 self.error = error44 self.acceleration = acceleration45 def set_state_obj(self, other_motor_state):46 self.position = other_motor_state.position47 self.velocity = other_motor_state.velocity48 self.current = other_motor_state.current49 self.temperature = other_motor_state.temperature50 self.error = other_motor_state.error51 self.acceleration = other_motor_state.acceleration52# Data structure to store MIT_command that will be sent upon update53class MIT_command:54 """Data structure to store MIT_command that will be sent upon update"""55 def __init__(self, position, velocity, kp, kd, current):56 self.position = position57 self.velocity = velocity58 self.kp = kp59 self.kd = kd60 self.current = current61# motor state from the controller, uneditable named tuple62MIT_motor_state = namedtuple('motor_state', 'position velocity current temperature error')63# python-can listener object, with handler to be called upon reception of a message on the CAN bus64class motorListener(can.Listener):65 """Python-can listener object, with handler to be called upon reception of a message on the CAN bus"""66 def __init__(self, _canman, motor):67 self._canman = _canman68 self.bus = _canman.bus69 self.motor = motor70 def on_message_received(self, msg):71 data = bytes(msg.data)72 ID = data[0]73 if ID == self.motor.ID:74 self.motor._update_state_async(self._canman.parse_MIT_message(data, self.motor.type))75 76# A class to manage the low level CAN communication protocols77class CAN_Manager(object):78 """A class to manage the low level CAN communication protocols"""79 debug = False80 # Note, defining singletons in this way means that you cannot inherit81 # from this class, as apparently __init__ for the subclass will be called twice82 _instance = None83 def __new__(cls):84 if not cls._instance:85 cls._instance = super(CAN_Manager, cls).__new__(cls)86 print("Initializing CAN Manager")87 # verify the CAN bus is currently down88 os.system( 'sudo /sbin/ip link set can0 down' )89 # start the CAN bus back up90 os.system( 'sudo /sbin/ip link set can0 up type can bitrate 1000000' )91 # create a python-can bus object92 cls._instance.bus = can.interface.Bus(channel='can0', bustype='socketcan_native')93 # create a python-can notifier object, which motors can later subscribe to94 cls._instance.notifier = can.Notifier(bus=cls._instance.bus, listeners=[])95 print("Connected on: " + str(cls._instance.bus))96 return cls._instance97 def __init__(self):98 pass99 100 def __del__(self):101 # shut down the CAN bus when the object is deleted102 # I think this may not ever get called, so keep a pointer and explicitly delete if this is important.103 os.system( 'sudo /sbin/ip link set can0 down' ) 104 # subscribe a motor object to the CAN bus to be updated upon message reception105 def add_motor(self, motor):106 self.notifier.add_listener(motorListener(self, motor))107 # Locks value between min and max108 @staticmethod109 def limit_value(value, min, max):110 if value > max:111 return max112 elif value < min:113 return min114 else:115 return value116 # interpolates a floating point number to fill some amount of the max size of unsigned int, 117 # as specified with the num_bits118 @staticmethod119 def float_to_uint(x,x_min,x_max,num_bits):120 x = CAN_Manager.limit_value(x,x_min,x_max)121 span = x_max-x_min122 # (x - x_min)*(2^num_bits)/span123 return int((x- x_min)*( float((1<<num_bits)/span)) )124 # undoes the above method125 @staticmethod126 def uint_to_float(x,x_min,x_max,num_bits):127 span = x_max-x_min128 # (x*span/(2^num_bits -1)) + x_min129 return float(x*span/((1<<num_bits)-1) + x_min)130 # sends a message to the motor (when the motor is in MIT mode)131 def send_MIT_message(self, motor_id, data):132 DLC = len(data)133 assert (DLC <= 8), ('Data too long in message for motor ' + str(motor_id))134 135 if self.debug:136 print('ID: ' + str(hex(motor_id)) + ' Data: ' + '[{}]'.format(', '.join(hex(d) for d in data)) )137 138 message = can.Message(arbitration_id=motor_id, data=data, is_extended_id=False)139 try:140 self.bus.send(message)141 if self.debug:142 print(" Message sent on " + str(self.bus.channel_info) )143 except can.CanError:144 if self.debug:145 print(" Message NOT sent")146 # send the power on code147 def power_on(self, motor_id):148 self.send_MIT_message(motor_id, [ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,0XFC])149 150 # send the power off code151 def power_off(self, motor_id):152 self.send_MIT_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0XFD])153 # send the zeroing code. Like a scale, it takes about a second to zero the position154 def zero(self, motor_id):155 self.send_MIT_message(motor_id, [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFE])156 # send an MIT control signal, consisting of desired position, velocity, and current, and gains for position and velocity control157 # basically an impedance controller158 def MIT_controller(self, motor_id, motor_type, position, velocity, Kp, Kd, I):159 position_uint16 = CAN_Manager.float_to_uint(position, MIT_Params[motor_type]['P_min'], 160 MIT_Params[motor_type]['P_max'], 16)161 velocity_uint12 = CAN_Manager.float_to_uint(velocity, MIT_Params[motor_type]['V_min'], 162 MIT_Params[motor_type]['V_max'], 12)163 Kp_uint12 = CAN_Manager.float_to_uint(Kp, MIT_Params[motor_type]['Kp_min'], 164 MIT_Params[motor_type]['Kp_max'], 12)165 Kd_uint12 = CAN_Manager.float_to_uint(Kd, MIT_Params[motor_type]['Kd_min'], 166 MIT_Params[motor_type]['Kd_max'], 12)167 I_uint12 = CAN_Manager.float_to_uint(I, MIT_Params[motor_type]['I_min'], 168 MIT_Params[motor_type]['I_max'], 12)169 data = [170 position_uint16 >> 8,171 position_uint16 & 0x00FF,172 (velocity_uint12) >> 4,173 ((velocity_uint12&0x00F)<<4) | (Kp_uint12) >> 8,174 (Kp_uint12&0x0FF),175 (Kd_uint12) >> 4,176 ((Kd_uint12&0x00F)<<4) | (I_uint12) >> 8,177 (I_uint12&0x0FF)178 ]179 # print(data)180 self.send_MIT_message(motor_id, data)181 182 # convert data recieved from motor in byte format back into floating point numbers in real units183 def parse_MIT_message(self, data, motor_type):184 185 assert len(data) == 8 or len(data) == 6, 'Tried to parse a CAN message that was not Motor State in MIT Mode'186 temp = None187 error = None188 position_uint = data[1] <<8 | data[2]189 velocity_uint = ((data[3] << 8) | (data[4]>>4) <<4 ) >> 4190 current_uint = (data[4]&0x0F)<<8 | data[5]191 192 if len(data) == 8:193 temp = int(data[6])194 error = int(data[7])195 position = CAN_Manager.uint_to_float(position_uint, MIT_Params[motor_type]['P_min'], 196 MIT_Params[motor_type]['P_max'], 16)197 velocity = CAN_Manager.uint_to_float(velocity_uint, MIT_Params[motor_type]['V_min'], 198 MIT_Params[motor_type]['V_max'], 12)199 current = CAN_Manager.uint_to_float(current_uint, MIT_Params[motor_type]['I_min'], 200 MIT_Params[motor_type]['I_max'], 12)201 if self.debug:202 print(' Position: ' + str(position))203 print(' Velocity: ' + str(velocity))204 print(' Current: ' + str(current))205 if (temp is not None) and (error is not None):206 print(' Temp: ' + str(temp))207 print(' Error: ' + str(error))208 return MIT_motor_state(position, velocity, current, temp, error)209# defualt variables to be logged210LOG_VARIABLES = [211 "output_angle", 212 "output_velocity", 213 "output_acceleration", 214 "current",215 "output_torque"216]217# possible states for the controller218class _TMotorManState(Enum):219 IDLE = 0220 IMPEDANCE = 1221 CURRENT = 2222 FULL_STATE = 3223# the user-facing class that manages the motor.224class TMotorManager():225 """The user-facing class that manages the motor."""226 def __init__(self, motor_type='AK80-9', motor_ID=1, CSV_file=None, log_vars = LOG_VARIABLES):227 self.type = motor_type228 self.ID = motor_ID229 self.csv_file_name = CSV_file230 print("Initializing device: " + self.device_info_string())231 self._motor_state = motor_state(0.0,0.0,0.0,0.0,0.0,0.0)232 self._motor_state_async = motor_state(0.0,0.0,0.0,0.0,0.0,0.0)233 self._command = MIT_command(0.0,0.0,0.0,0.0,0.0)234 self._control_state = _TMotorManState.IDLE235 self._times_past_limit = 0236 self._angle_threshold = MIT_Params[self.type]['P_max'] - 2.0 # radians, only really matters if the motor's going super fast237 self._old_pos = 0.0238 self._entered = False239 self._start_time = time.time()240 self._last_update_time = self._start_time241 self._last_command_time = None242 self._updated = False243 244 self.log_vars = log_vars245 self.LOG_FUNCTIONS = {246 "output_angle" : self.get_output_angle_radians, 247 "output_velocity" : self.get_output_velocity_radians_per_second, 248 "output_acceleration" : self.get_output_acceleration_radians_per_second_squared, 249 "current" : self.get_current_qaxis_amps,250 "output_torque": self.get_output_torque_newton_meters,251 "motor_angle" : self.get_motor_angle_radians, 252 "motor_velocity" : self.get_motor_velocity_radians_per_second, 253 "motor_acceleration" : self.get_motor_acceleration_radians_per_second_squared, 254 "motor_torque": self.get_motor_torque_newton_meters 255 }256 257 self._canman = CAN_Manager()258 self._canman.add_motor(self)259 260 261 262 def __enter__(self):263 print('Turning on control for device: ' + self.device_info_string())264 if self.csv_file_name is not None:265 with open(self.csv_file_name,'w') as fd:266 writer = csv.writer(fd)267 writer.writerow(["pi_time"]+self.log_vars)268 self.csv_file = open(self.csv_file_name,'a').__enter__()269 self.csv_writer = csv.writer(self.csv_file)270 self.power_on()271 self._send_command()272 self._entered = True273 return self274 def __exit__(self, etype, value, tb):275 print('Turning off control for device: ' + self.device_info_string())276 self.power_off()277 if self.csv_file_name is not None:278 self.csv_file.__exit__(etype, value, tb)279 if not (etype is None):280 traceback.print_exception(etype, value, tb)281 # this method is called by the handler every time a message is recieved on the bus282 # from this motor, to store the most recent state information for later283 def _update_state_async(self, MIT_state):284 """this method is called by the handler every time a message is recieved on the bus285 from this motor, to store the most recent state information for later"""286 now = time.time()287 dt = self._last_update_time - now288 self._last_update_time = now289 acceleration = (MIT_state.velocity - self._motor_state_async.velocity)/dt290 self._motor_state_async.set_state(MIT_state.position, MIT_state.velocity, MIT_state.current, MIT_state.temperature, MIT_state.error, acceleration)291 self._updated = True292 # this method is called by the user to synchronize the current state used by the controller293 # with the most recent message recieved294 def update(self):295 """this method is called by the user to synchronize the current state used by the controller296 with the most recent message recieved"""297 # check that the motor is safely turned on298 if not self._entered:299 raise RuntimeError("Tried to update motor state before safely powering on for device: " + self.device_info_string())300 # check that the motor data is recent301 # print(self._command_sent)302 now = time.time()303 if (now - self._last_command_time) < 0.25 and ( (now - self._last_update_time) > 0.1):304 # print("State update requested but no data recieved from motor. Delay longer after zeroing, decrease frequency, or check connection.")305 warnings.warn("State update requested but no data from motor. Delay longer after zeroing, decrease frequency, or check connection. " + self.device_info_string(), RuntimeWarning)306 else:307 self._command_sent = False308 # artificially extending the range of the position that we track309 P_max = MIT_Params[self.type]['P_max']310 old_pos = self._old_pos311 new_pos = self._motor_state_async.position312 thresh = self._angle_threshold313 if (thresh <= new_pos and new_pos <= P_max) and (-P_max <= old_pos and old_pos <= -thresh):314 self._times_past_limit -= 1315 elif (thresh <= old_pos and old_pos <= P_max) and (-P_max <= new_pos and new_pos <= -thresh) :316 self._times_past_limit += 1317 318 # update position319 self._old_pos = new_pos320 self._motor_state.set_state_obj(self._motor_state_async)321 self._motor_state.position += self._times_past_limit*2*P_max322 323 # send current motor command324 self._send_command()325 # writing to log file326 if self.csv_file_name is not None:327 self.csv_writer.writerow([self._last_update_time - self._start_time] + [self.LOG_FUNCTIONS[var]() for var in self.log_vars])328 self._updated = False329 330 331 # sends a command to the motor depending on whats controlm mode the motor is in332 def _send_command(self):333 """sends a command to the motor depending on whats controlm mode the motor is in"""334 if self._control_state == _TMotorManState.FULL_STATE:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful