How to use soft_abort method in avocado

Best Python code snippet using avocado_python

switch_monos.py

Source:switch_monos.py Github

copy

Full Screen

...148 wx.CallAfter(self.output.AppendText, "Moving Mono 1 energy to 12 keV . . . ")149 self.mono1_energy.move_absolute(12.0)150 while self.mono1_energy.is_busy():151 if self.abort_event.is_set():152 self.mono1_energy.soft_abort()153 self.cleanup()154 return155 time.sleep(0.5)156 wx.CallAfter(self.output.AppendText, "Done!\n")157 self.mono1_normal_enabled.write(0)158 self.mono1_parallel_enabled.write(0)159 wx.CallAfter(self.output.AppendText, "Moving Mono 1 theta to 0 degrees . . . ")160 self.mono1_theta.move_absolute(0.0)161 while self.mono1_theta.is_busy():162 if self.abort_event.is_set():163 self.mono1_theta.soft_abort()164 self.cleanup()165 return166 time.sleep(0.5)167 wx.CallAfter(self.output.AppendText, "Done!\n")168 wx.CallAfter(self.output.AppendText, "Moving Mono 1 chi to 31000 urad. . . ")169 self.mono1_x1_chi.move_absolute(31000)170 while self.mono1_x1_chi.is_busy():171 if self.abort_event.is_set():172 self.mono1_x1_chi.soft_abort()173 self.cleanup()174 return175 time.sleep(0.5)176 wx.CallAfter(self.output.AppendText, "Done!\n")177 # Move mono 2 to beam position178 self.mono2_normal_enabled.write(0)179 self.mono2_parallel_enabled.write(0)180 wx.CallAfter(self.output.AppendText, "Moving Mono 2 energy 12 keV . . . ")181 self.mono2_energy.move_absolute(12.1)182 while self.mono2_energy.is_busy():183 if self.abort_event.is_set():184 self.mono2_energy.soft_abort()185 self.cleanup()186 return187 time.sleep(0.5)188 self.mono2_normal_enabled.write(1)189 self.mono2_parallel_enabled.write(1)190 self.mono2_energy.move_absolute(12.0)191 while self.mono2_energy.is_busy():192 if self.abort_event.is_set():193 self.mono2_energy.soft_abort()194 self.cleanup()195 return196 time.sleep(0.5)197 wx.CallAfter(self.output.AppendText, "Done!\n")198 wx.CallAfter(self.output.AppendText, "Moving Mono 2 chi to 0 . . . ")199 self.mono2_x1_chi.move_absolute(0)200 while self.mono2_x1_chi.is_busy():201 if self.abort_event.is_set():202 self.mono2_x1_chi.soft_abort()203 self.cleanup()204 return205 time.sleep(0.5)206 wx.CallAfter(self.output.AppendText, "Done!\n")207 wx.CallAfter(self.output.AppendText, "Mono 1 to 2 switch completed!\n\n\n")208 return209 def two_to_one(self):210 if self.abort_event.is_set():211 self.cleanup()212 return213 wx.CallAfter(self.output.AppendText, "Switching Mono 2 to Mono 1\n")214 # Move mono 2 to bypass position215 wx.CallAfter(self.output.AppendText, "Moving Mono 2 energy to 5 kev . . . ")216 self.mono2_energy.move_absolute(5)217 while self.mono2_energy.is_busy():218 if self.abort_event.is_set():219 self.mono2_energy.soft_abort()220 self.cleanup()221 return222 time.sleep(0.5)223 wx.CallAfter(self.output.AppendText, "Done!\n")224 self.mono2_normal_enabled.write(0)225 self.mono2_parallel_enabled.write(0)226 wx.CallAfter(self.output.AppendText, "Moving Mono 2 theta (mr) to 23 degrees . . . ")227 self.mono2_theta.move_absolute(23)228 while self.mono2_theta.is_busy():229 if self.abort_event.is_set():230 self.mono2_theta.soft_abort()231 self.cleanup()232 return233 time.sleep(0.5)234 wx.CallAfter(self.output.AppendText, "Done!\n")235 wx.CallAfter(self.output.AppendText, "Moving Mono 2 parallel (mtx) to 128000 um . . . ")236 self.mono2_x2_para.move_absolute(128000)237 while self.mono2_x2_para.is_busy():238 if self.abort_event.is_set():239 self.mono2_x2_para.soft_abort()240 self.cleanup()241 return242 time.sleep(0.5)243 wx.CallAfter(self.output.AppendText, "Done!\n")244 wx.CallAfter(self.output.AppendText, "Moving Mono 2 perpendicular (mty) to 20000 um . . . ")245 self.mono2_x2_perp.move_absolute(20000)246 while self.mono2_x2_perp.is_busy():247 if self.abort_event.is_set():248 self.mono2_x2_perp.soft_abort()249 self.cleanup()250 return251 time.sleep(0.5)252 wx.CallAfter(self.output.AppendText, "Done!\n")253 wx.CallAfter(self.output.AppendText, "Moving Mono 2 chi to 0 urad . . . ")254 self.mono2_x1_chi.move_absolute(0)255 while self.mono2_x1_chi.is_busy():256 if self.abort_event.is_set():257 self.mono2_x1_chi.soft_abort()258 self.cleanup()259 return260 time.sleep(0.5)261 wx.CallAfter(self.output.AppendText, "Done!\n")262 wx.CallAfter(self.output.AppendText, "Moving Mono 2 focus to 100 um . . . ")263 self.mono2_focus.move_absolute(100)264 while self.mono2_focus.is_busy():265 if self.abort_event.is_set():266 self.mono2_focus.soft_abort()267 self.cleanup()268 return269 time.sleep(0.5)270 wx.CallAfter(self.output.AppendText, "Done!\n")271 # Move mono 1 to beam position272 wx.CallAfter(self.output.AppendText, "Moving Mono 1 energy to 12 keV . . . ")273 self.mono1_energy.move_absolute(12.1)274 while self.mono1_energy.is_busy():275 if self.abort_event.is_set():276 self.mono1_energy.soft_abort()277 self.cleanup()278 return279 time.sleep(0.5)280 self.mono1_normal_enabled.write(1)281 self.mono1_parallel_enabled.write(1)282 self.mono1_energy.move_absolute(12.0)283 while self.mono1_energy.is_busy():284 if self.abort_event.is_set():285 self.mono1_energy.soft_abort()286 self.cleanup()287 return288 time.sleep(0.5)289 wx.CallAfter(self.output.AppendText, "Done!\n")290 wx.CallAfter(self.output.AppendText, "Moving Mono 1 chi to 0 urad . . . ")291 self.mono1_x1_chi.move_absolute(0)292 while self.mono1_x1_chi.is_busy():293 if self.abort_event.is_set():294 self.mono1_x1_chi.soft_abort()295 self.cleanup()296 return297 time.sleep(0.5)298 wx.CallAfter(self.output.AppendText, "Done!\n")299 wx.CallAfter(self.output.AppendText, "Mono 2 to 1 switch completed!\n\n\n")300 return301 def on_abort(self, evt):302 self.abort_event.set()303 def cleanup(self):304 self.abort_event.clear()305 wx.CallAfter(self.output.AppendText, "\n\nAborted!\n\n\n")306 def _on_rightclick(self, evt):307 """308 Shows a context menu. Current options allow enabling/disabling...

Full Screen

Full Screen

telemetry_control.py

Source:telemetry_control.py Github

copy

Full Screen

...89 return Error.INVALID_HEADER_ERROR90 def heartbeat(self):91 enqueue(self.flag, Log(header="heartbeat", message={"response": "OK", "mode": "Soft abort" if self.registry.get(("general", "soft_abort"))[1] else "Normal"}), LogPriority.INFO)92 print("mode", self.registry.get(("general", "soft_abort"))[1])93 def soft_abort(self):94 self.registry.put(("general", "soft_abort"), True)95 log = Log(header="response", message={"header": "Soft abort", "Status": "Success", "Description": "Rocket is undergoing soft abort"})96 enqueue(self.flag, log, LogPriority.CRIT)97 enqueue(self.flag, Log(header="mode", message={"mode": "Soft abort"}), LogPriority.CRIT)98 """def undo_soft_abort(self):99 self.registry.put(("general", "soft_abort"), False)100 log = Log(header="response", message={"header": "Undo soft abort", "Status": "Success", "Description": "Rocket is no longer aborting, and is at the mercy of the pi"})101 enqueue(self.flag, log, LogPriority.CRIT)102 enqueue(self.flag, Log(header="mode", message={"mode": "Normal"}), LogPriority.CRIT)"""103 def undo_soft_abort(self):104 # GO THROUGH EACH SENSOR AND CHECK STATUS105 sensor_list = self.registry.values["sensor_status"]106 critical_sensors = []107 for s_type in sensor_list:108 for s_loc in sensor_list[s_type]:109 if sensor_list[s_type][s_loc] == LogPriority.CRIT:110 critical_sensors.append([s_type, s_loc])111 112 # SEND FAILURE OR SUCCESS LOG DEPENDING ON CRITICAL VALUES FOUND/NOT FOUND113 if len(critical_sensors) > 0:114 log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Failure", "Description": "Soft abort cancellation unsuccessful - following sensors are still not safe: {}".format(critical_sensors)})115 enqueue(self.flag, log, LogPriority.CRIT)116 else:117 self.registry.put(("general", "soft_abort"), False)118 log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Success", "Description": "Soft abort successfully cancelled"})119 enqueue(self.flag, log, LogPriority.CRIT)120 enqueue(self.flag, Log(header="mode", message={"mode": "Normal"}), LogPriority.CRIT)121 def solenoid_actuate(self, valve_location: ValveLocation, actuation_type: ActuationType, priority: int) -> Error:122 err, current_priority, timestamp = self.registry.get(("valve_actuation", "actuation_priority", ValveType.SOLENOID, valve_location), allow_error=True)123 if err != Error.NONE:124 # Send message back to gs saying it was an invalid message125 log = Log(header="response", message={"header": "Valve actuation", "Status": "Failure", "Description": "Unable to find actuatable solenoid", "Provided valve location": valve_location})126 enqueue(self.flag, log, LogPriority.CRIT)127 return Error.REQUEST_ERROR128 if priority > current_priority:129 # Send message back to gs saying that the request was made w/ too little priority130 log = Log(header="response", message={"header": "Valve actuation", "Status": "Failure", "Description": "Too little priority to actuate solenoid", "Valve location": valve_location, "Actuation type": actuation_type, "Priority": priority})131 enqueue(self.flag, log, LogPriority.CRIT)132 return Error.PRIORITY_ERROR133 print("Actuating solenoid at {} with actuation type {}".format(valve_location, actuation_type))134 self.flag.put(("solenoid", "actuation_type", valve_location), actuation_type)135 self.flag.put(("solenoid", "actuation_priority", valve_location), priority)136 log = Log(header="response", message={"header": "Valve actuation", "Status": "Success", "Description": "Successfully actuated solenoid", "Valve location": valve_location, "Actuation type": actuation_type, "Priority": priority})137 enqueue(self.flag, log, LogPriority.CRIT)138 def sensor_request(self, sensor_type: SensorType, sensor_location: SensorLocation) -> Error:139 err, value, timestamp = self.registry.get(("sensor_measured", sensor_type, sensor_location), allow_error=True)140 if err != Error.NONE:141 # Send message back to gs saying it was an invalid message 142 log = Log(header="response", message={"header": "Sensor data", "Status": "Failure", "Description": "Unable to find sensor", "Type": sensor_type, "Location": sensor_location})143 enqueue(self.flag, log, LogPriority.CRIT)144 return Error.REQUEST_ERROR145 _, kalman_value, _ = self.registry.get(("sensor_normalized", sensor_type, sensor_location), allow_error=True)146 _, status, _ = self.registry.get(("sensor_status", sensor_type, sensor_location))147 log = Log(header="response", message={"header": "Sensor data", "Status": "Success", "Sensor type": sensor_type, "Sensor location": sensor_location, "Measured value": value, "Normalized value": kalman_value, "Sensor status": status, "Last updated": timestamp})148 enqueue(self.flag, log, LogPriority.INFO)149 def valve_request(self, valve_type: ValveType, valve_location: ValveLocation) -> Error: 150 err, value, timestamp = self.registry.get(("valve", valve_type, valve_location), allow_error=True)151 if err != Error.NONE:152 # Send message back to gs saying it was an invalid message 153 log = Log(header="response", message={"header": "Valve data request", "Status": "Failure", "Description": "Unable to find valve", "Valve type": valve_type, "Valve location": valve_location})154 enqueue(self.flag, log, LogPriority.CRIT)155 return Error.REQUEST_ERROR156 _, actuation_type, timestamp = self.registry.get(("valve_actuation", "actuation_type", valve_type, valve_location))157 _, actuation_priority, _ = self.registry.get(("valve_actuation", "actuation_priority", valve_type, valve_location))158 log = Log(header="response", message={"header": "Valve data", "Status": "Success", "Type": valve_type, "Location": valve_location, "State": value, "Actuation type": actuation_type, "Actuation priority": actuation_priority, "Last actuated": timestamp})159 enqueue(self.flag, log, LogPriority.INFO)160 def progress(self):161 self.flag.put(("general", "progress"), True)162 def test(self, msg: str):163 print("\ntest recieved:", msg)164"""165 def undo_soft_abort(self):166 # GO THROUGH EACH SENSOR AND CHECK STATUS167 sensor_list = self.registry.values["sensor_status"]168 critical_sensors = []169 for s_type in sensor_list:170 for s_loc in s_type:171 if sensor_list[s_type][s_loc] == LogPriority.CRIT:172 critical_sensors.append([s_type, s_loc])173 174 # SEND FAILURE OR SUCCESS LOG DEPENDING ON CRITICAL VALUES FOUND/NOT FOUND175 if len(critical_sensors) > 0:176 log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Failure", "Description": "Soft abort cancellation unsuccessful - following sensors are still not safe: {}".format(critical_sensors)})177 enqueue(self.flag, log, LogPriority.CRIT)178 else:179 self.registry.put(("general", "soft_abort"), None)...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1import time2from threading import Thread3from queue import Queue4from enums import SensorStatus5from Task import Task6from Stage import Stage7from Telemetry import Telemetry8from Valve import ValveActuation, ValveMap9from Sensor import SensorMap10class Launch:11 def __init__(self, config):12 self.task_queue: Queue = Queue()13 self.arduino_type: str = config['arduino_type']14 self.telemetry: Telemetry = Telemetry.from_config(config['telemetry'], self.task_queue.put)15 self.sensors: SensorMap = SensorMap.from_config(config['sensors'], self.arduino_type)16 self.valves: ValveMap = ValveMap.from_config(config['valves'], self.arduino_type)17 self.delay: float = config['timer']['delay']18 self.start_time: float = None19 self.hard_abort: bool = False20 self.soft_abort: bool = False21 self.task_executor: Thread = Thread(target=self.execute_tasks, daemon=True)22 self.stage: Stage = Stage('stage_1')23 def execute_task(self, task: Task):24 """25 Main task handler26 """27 if task.action == 'reset_telemetry':28 self.telemetry.reset()29 elif task.action == 'set_valve_actuation':30 type = task.payload['type']31 location = task.payload['location']32 actuation = task.payload['actuation']33 self.valves.set_actuation(type, location, actuation)34 elif task.action == 'hard_abort':35 self.hard_abort = True36 elif task.action == 'soft_abort':37 self.soft_abort = True38 elif task.action == 'cancel_soft_abort':39 self.soft_abort = False40 def execute_tasks(self):41 while not self.hard_abort and not self.soft_abort:42 # 'blocking' is ok here because it's in its own thread43 task = self.task_queue.get(block=True)44 self.execute_task(task)45 def check_pressure_sensors(self):46 # Check pressure sensors47 for sensor in self.sensors.get_type('pressure').values():48 # If the sensor's status is unsafe...49 if sensor.status != SensorStatus.SAFE:50 # Warn about the sensor's status51 self.telemetry.warn('sensor', sensor.to_json())52 # If the pressure is critically high...53 if sensor.status == SensorStatus.CRITICALLY_HIGH:54 # Open the pressure relief valve55 self.task_queue.put(Task('set_valve_actuation', {'type': 'solenoid', 'location': 'pressure_relief', 'actuation': ValveActuation(1, 0)}))56 57 # If the sensor's status is fine...58 else:59 # If the pressure relief valve is open...60 if self.valves.get('solenoid', 'pressure_relief').actuation.type == 1:61 # Close the pressure relief valve62 self.task_queue.put(Task('set_valve_actuation', {'type': 'solenoid', 'location': 'pressure_relief', 'actuation': ValveActuation(0, 0)}))63 64 def run(self):65 self.start_time = time.time()66 self.task_executor.start()67 68 while True:69 self.sensors.log()70 self.check_pressure_sensors()71 72 self.telemetry.info('sensors', self.sensors.to_json())73 time.sleep(self.delay)74if __name__ == "__main__":75 import json76 config_file = open("config.json")77 config = json.load(config_file)78 config_file.close()79 launch = Launch(config)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful