Best Python code snippet using avocado_python
switch_monos.py
Source:switch_monos.py  
...148        wx.CallAfter(self.output.AppendText, "Moving Mono 1 energy to 12 keV . . . ")149        self.mono1_energy.move_absolute(12.0)150        while self.mono1_energy.is_busy():151            if self.abort_event.is_set():152                self.mono1_energy.soft_abort()153                self.cleanup()154                return155            time.sleep(0.5)156        wx.CallAfter(self.output.AppendText, "Done!\n")157        self.mono1_normal_enabled.write(0)158        self.mono1_parallel_enabled.write(0)159        wx.CallAfter(self.output.AppendText, "Moving Mono 1 theta to 0 degrees . . . ")160        self.mono1_theta.move_absolute(0.0)161        while self.mono1_theta.is_busy():162            if self.abort_event.is_set():163                self.mono1_theta.soft_abort()164                self.cleanup()165                return166            time.sleep(0.5)167        wx.CallAfter(self.output.AppendText, "Done!\n")168        wx.CallAfter(self.output.AppendText, "Moving Mono 1 chi to 31000 urad. . . ")169        self.mono1_x1_chi.move_absolute(31000)170        while self.mono1_x1_chi.is_busy():171            if self.abort_event.is_set():172                self.mono1_x1_chi.soft_abort()173                self.cleanup()174                return175            time.sleep(0.5)176        wx.CallAfter(self.output.AppendText, "Done!\n")177        # Move mono 2 to beam position178        self.mono2_normal_enabled.write(0)179        self.mono2_parallel_enabled.write(0)180        wx.CallAfter(self.output.AppendText, "Moving Mono 2 energy 12 keV . . . ")181        self.mono2_energy.move_absolute(12.1)182        while self.mono2_energy.is_busy():183            if self.abort_event.is_set():184                self.mono2_energy.soft_abort()185                self.cleanup()186                return187            time.sleep(0.5)188        self.mono2_normal_enabled.write(1)189        self.mono2_parallel_enabled.write(1)190        self.mono2_energy.move_absolute(12.0)191        while self.mono2_energy.is_busy():192            if self.abort_event.is_set():193                self.mono2_energy.soft_abort()194                self.cleanup()195                return196            time.sleep(0.5)197        wx.CallAfter(self.output.AppendText, "Done!\n")198        wx.CallAfter(self.output.AppendText, "Moving Mono 2 chi to 0 . . . ")199        self.mono2_x1_chi.move_absolute(0)200        while self.mono2_x1_chi.is_busy():201            if self.abort_event.is_set():202                self.mono2_x1_chi.soft_abort()203                self.cleanup()204                return205            time.sleep(0.5)206        wx.CallAfter(self.output.AppendText, "Done!\n")207        wx.CallAfter(self.output.AppendText, "Mono 1 to 2 switch completed!\n\n\n")208        return209    def two_to_one(self):210        if self.abort_event.is_set():211            self.cleanup()212            return213        wx.CallAfter(self.output.AppendText, "Switching Mono 2 to Mono 1\n")214        # Move mono 2 to bypass position215        wx.CallAfter(self.output.AppendText, "Moving Mono 2 energy to 5 kev . . . ")216        self.mono2_energy.move_absolute(5)217        while self.mono2_energy.is_busy():218            if self.abort_event.is_set():219                self.mono2_energy.soft_abort()220                self.cleanup()221                return222            time.sleep(0.5)223        wx.CallAfter(self.output.AppendText, "Done!\n")224        self.mono2_normal_enabled.write(0)225        self.mono2_parallel_enabled.write(0)226        wx.CallAfter(self.output.AppendText, "Moving Mono 2 theta (mr) to 23 degrees . . . ")227        self.mono2_theta.move_absolute(23)228        while self.mono2_theta.is_busy():229            if self.abort_event.is_set():230                self.mono2_theta.soft_abort()231                self.cleanup()232                return233            time.sleep(0.5)234        wx.CallAfter(self.output.AppendText, "Done!\n")235        wx.CallAfter(self.output.AppendText, "Moving Mono 2 parallel (mtx) to 128000 um . . . ")236        self.mono2_x2_para.move_absolute(128000)237        while self.mono2_x2_para.is_busy():238            if self.abort_event.is_set():239                self.mono2_x2_para.soft_abort()240                self.cleanup()241                return242            time.sleep(0.5)243        wx.CallAfter(self.output.AppendText, "Done!\n")244        wx.CallAfter(self.output.AppendText, "Moving Mono 2 perpendicular (mty) to 20000 um . . . ")245        self.mono2_x2_perp.move_absolute(20000)246        while self.mono2_x2_perp.is_busy():247            if self.abort_event.is_set():248                self.mono2_x2_perp.soft_abort()249                self.cleanup()250                return251            time.sleep(0.5)252        wx.CallAfter(self.output.AppendText, "Done!\n")253        wx.CallAfter(self.output.AppendText, "Moving Mono 2 chi to 0 urad . . . ")254        self.mono2_x1_chi.move_absolute(0)255        while self.mono2_x1_chi.is_busy():256            if self.abort_event.is_set():257                self.mono2_x1_chi.soft_abort()258                self.cleanup()259                return260            time.sleep(0.5)261        wx.CallAfter(self.output.AppendText, "Done!\n")262        wx.CallAfter(self.output.AppendText, "Moving Mono 2 focus to 100 um . . . ")263        self.mono2_focus.move_absolute(100)264        while self.mono2_focus.is_busy():265            if self.abort_event.is_set():266                self.mono2_focus.soft_abort()267                self.cleanup()268                return269            time.sleep(0.5)270        wx.CallAfter(self.output.AppendText, "Done!\n")271        # Move mono 1 to beam position272        wx.CallAfter(self.output.AppendText, "Moving Mono 1 energy to 12 keV . . . ")273        self.mono1_energy.move_absolute(12.1)274        while self.mono1_energy.is_busy():275            if self.abort_event.is_set():276                self.mono1_energy.soft_abort()277                self.cleanup()278                return279            time.sleep(0.5)280        self.mono1_normal_enabled.write(1)281        self.mono1_parallel_enabled.write(1)282        self.mono1_energy.move_absolute(12.0)283        while self.mono1_energy.is_busy():284            if self.abort_event.is_set():285                self.mono1_energy.soft_abort()286                self.cleanup()287                return288            time.sleep(0.5)289        wx.CallAfter(self.output.AppendText, "Done!\n")290        wx.CallAfter(self.output.AppendText, "Moving Mono 1 chi to 0 urad . . . ")291        self.mono1_x1_chi.move_absolute(0)292        while self.mono1_x1_chi.is_busy():293            if self.abort_event.is_set():294                self.mono1_x1_chi.soft_abort()295                self.cleanup()296                return297            time.sleep(0.5)298        wx.CallAfter(self.output.AppendText, "Done!\n")299        wx.CallAfter(self.output.AppendText, "Mono 2 to 1 switch completed!\n\n\n")300        return301    def on_abort(self, evt):302        self.abort_event.set()303    def cleanup(self):304        self.abort_event.clear()305        wx.CallAfter(self.output.AppendText, "\n\nAborted!\n\n\n")306    def _on_rightclick(self, evt):307        """308        Shows a context menu. Current options allow enabling/disabling...telemetry_control.py
Source:telemetry_control.py  
...89            return Error.INVALID_HEADER_ERROR90    def heartbeat(self):91        enqueue(self.flag, Log(header="heartbeat", message={"response": "OK", "mode": "Soft abort" if self.registry.get(("general", "soft_abort"))[1] else "Normal"}), LogPriority.INFO)92        print("mode", self.registry.get(("general", "soft_abort"))[1])93    def soft_abort(self):94        self.registry.put(("general", "soft_abort"), True)95        log = Log(header="response", message={"header": "Soft abort", "Status": "Success", "Description": "Rocket is undergoing soft abort"})96        enqueue(self.flag, log, LogPriority.CRIT)97        enqueue(self.flag, Log(header="mode", message={"mode": "Soft abort"}), LogPriority.CRIT)98    """def undo_soft_abort(self):99        self.registry.put(("general", "soft_abort"), False)100        log = Log(header="response", message={"header": "Undo soft abort", "Status": "Success", "Description": "Rocket is no longer aborting, and is at the mercy of the pi"})101        enqueue(self.flag, log, LogPriority.CRIT)102        enqueue(self.flag, Log(header="mode", message={"mode": "Normal"}), LogPriority.CRIT)"""103    def undo_soft_abort(self):104        # GO THROUGH EACH SENSOR AND CHECK STATUS105        sensor_list = self.registry.values["sensor_status"]106        critical_sensors = []107        for s_type in sensor_list:108            for s_loc in sensor_list[s_type]:109                if sensor_list[s_type][s_loc] == LogPriority.CRIT:110                    critical_sensors.append([s_type, s_loc])111        112        # SEND FAILURE OR SUCCESS LOG DEPENDING ON CRITICAL VALUES FOUND/NOT FOUND113        if len(critical_sensors) > 0:114            log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Failure", "Description": "Soft abort cancellation unsuccessful - following sensors are still not safe: {}".format(critical_sensors)})115            enqueue(self.flag, log, LogPriority.CRIT)116        else:117            self.registry.put(("general", "soft_abort"), False)118            log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Success", "Description": "Soft abort successfully cancelled"})119            enqueue(self.flag, log, LogPriority.CRIT)120            enqueue(self.flag, Log(header="mode", message={"mode": "Normal"}), LogPriority.CRIT)121    def solenoid_actuate(self, valve_location: ValveLocation, actuation_type: ActuationType, priority: int) -> Error:122        err, current_priority, timestamp = self.registry.get(("valve_actuation", "actuation_priority", ValveType.SOLENOID, valve_location), allow_error=True)123        if err != Error.NONE:124            # Send message back to gs saying it was an invalid message125            log = Log(header="response", message={"header": "Valve actuation", "Status": "Failure", "Description": "Unable to find actuatable solenoid", "Provided valve location": valve_location})126            enqueue(self.flag, log, LogPriority.CRIT)127            return Error.REQUEST_ERROR128        if priority > current_priority:129            # Send message back to gs saying that the request was made w/ too little priority130            log = Log(header="response", message={"header": "Valve actuation", "Status": "Failure", "Description": "Too little priority to actuate solenoid", "Valve location": valve_location, "Actuation type": actuation_type, "Priority": priority})131            enqueue(self.flag, log, LogPriority.CRIT)132            return Error.PRIORITY_ERROR133        print("Actuating solenoid at {} with actuation type {}".format(valve_location, actuation_type))134        self.flag.put(("solenoid", "actuation_type", valve_location), actuation_type)135        self.flag.put(("solenoid", "actuation_priority", valve_location), priority)136        log = Log(header="response", message={"header": "Valve actuation", "Status": "Success", "Description": "Successfully actuated solenoid", "Valve location": valve_location, "Actuation type": actuation_type, "Priority": priority})137        enqueue(self.flag, log, LogPriority.CRIT)138    def sensor_request(self, sensor_type: SensorType, sensor_location: SensorLocation) -> Error:139        err, value, timestamp = self.registry.get(("sensor_measured", sensor_type, sensor_location), allow_error=True)140        if err != Error.NONE:141            # Send message back to gs saying it was an invalid message            142            log = Log(header="response", message={"header": "Sensor data", "Status": "Failure", "Description": "Unable to find sensor", "Type": sensor_type, "Location": sensor_location})143            enqueue(self.flag, log, LogPriority.CRIT)144            return Error.REQUEST_ERROR145        _, kalman_value, _ = self.registry.get(("sensor_normalized", sensor_type, sensor_location), allow_error=True)146        _, status, _ = self.registry.get(("sensor_status", sensor_type, sensor_location))147        log = Log(header="response", message={"header": "Sensor data", "Status": "Success", "Sensor type": sensor_type, "Sensor location": sensor_location, "Measured value": value, "Normalized value": kalman_value, "Sensor status": status, "Last updated": timestamp})148        enqueue(self.flag, log, LogPriority.INFO)149    def valve_request(self, valve_type: ValveType, valve_location: ValveLocation) -> Error: 150        err, value, timestamp = self.registry.get(("valve", valve_type, valve_location), allow_error=True)151        if err != Error.NONE:152            # Send message back to gs saying it was an invalid message            153            log = Log(header="response", message={"header": "Valve data request", "Status": "Failure", "Description": "Unable to find valve", "Valve type": valve_type, "Valve location": valve_location})154            enqueue(self.flag, log, LogPriority.CRIT)155            return Error.REQUEST_ERROR156        _, actuation_type, timestamp = self.registry.get(("valve_actuation", "actuation_type", valve_type, valve_location))157        _, actuation_priority, _ = self.registry.get(("valve_actuation", "actuation_priority", valve_type, valve_location))158        log = Log(header="response", message={"header": "Valve data", "Status": "Success", "Type": valve_type, "Location": valve_location, "State": value, "Actuation type": actuation_type, "Actuation priority": actuation_priority, "Last actuated": timestamp})159        enqueue(self.flag, log, LogPriority.INFO)160    def progress(self):161        self.flag.put(("general", "progress"), True)162    def test(self, msg: str):163        print("\ntest recieved:", msg)164"""165    def undo_soft_abort(self):166        # GO THROUGH EACH SENSOR AND CHECK STATUS167        sensor_list = self.registry.values["sensor_status"]168        critical_sensors = []169        for s_type in sensor_list:170            for s_loc in s_type:171                if sensor_list[s_type][s_loc] == LogPriority.CRIT:172                    critical_sensors.append([s_type, s_loc])173        174        # SEND FAILURE OR SUCCESS LOG DEPENDING ON CRITICAL VALUES FOUND/NOT FOUND175        if len(critical_sensors) > 0:176            log = Log(header="response", message={"header": "Soft abort cancellation", "Status": "Failure", "Description": "Soft abort cancellation unsuccessful - following sensors are still not safe: {}".format(critical_sensors)})177            enqueue(self.flag, log, LogPriority.CRIT)178        else:179            self.registry.put(("general", "soft_abort"), None)...main.py
Source:main.py  
1import time2from threading import Thread3from queue import Queue4from enums import SensorStatus5from Task import Task6from Stage import Stage7from Telemetry import Telemetry8from Valve import ValveActuation, ValveMap9from Sensor import SensorMap10class Launch:11	def __init__(self, config):12		self.task_queue: Queue = Queue()13		self.arduino_type: str = config['arduino_type']14		self.telemetry: Telemetry = Telemetry.from_config(config['telemetry'], self.task_queue.put)15		self.sensors: SensorMap = SensorMap.from_config(config['sensors'], self.arduino_type)16		self.valves: ValveMap = ValveMap.from_config(config['valves'], self.arduino_type)17		self.delay: float = config['timer']['delay']18		self.start_time: float = None19		self.hard_abort: bool = False20		self.soft_abort: bool = False21		self.task_executor: Thread = Thread(target=self.execute_tasks, daemon=True)22		self.stage: Stage = Stage('stage_1')23	def execute_task(self, task: Task):24		"""25		Main task handler26		"""27		if task.action == 'reset_telemetry':28			self.telemetry.reset()29		elif task.action == 'set_valve_actuation':30			type = task.payload['type']31			location = task.payload['location']32			actuation = task.payload['actuation']33			self.valves.set_actuation(type, location, actuation)34		elif task.action == 'hard_abort':35			self.hard_abort = True36		elif task.action == 'soft_abort':37			self.soft_abort = True38		elif task.action == 'cancel_soft_abort':39			self.soft_abort = False40	def execute_tasks(self):41		while not self.hard_abort and not self.soft_abort:42			# 'blocking' is ok here because it's in its own thread43			task = self.task_queue.get(block=True)44			self.execute_task(task)45	def check_pressure_sensors(self):46		# Check pressure sensors47		for sensor in self.sensors.get_type('pressure').values():48			# If the sensor's status is unsafe...49			if sensor.status != SensorStatus.SAFE:50				# Warn about the sensor's status51				self.telemetry.warn('sensor', sensor.to_json())52				# If the pressure is critically high...53				if sensor.status == SensorStatus.CRITICALLY_HIGH:54					# Open the pressure relief valve55					self.task_queue.put(Task('set_valve_actuation', {'type': 'solenoid', 'location': 'pressure_relief', 'actuation': ValveActuation(1, 0)}))56			57			# If the sensor's status is fine...58			else:59				# If the pressure relief valve is open...60				if self.valves.get('solenoid', 'pressure_relief').actuation.type == 1:61					# Close the pressure relief valve62					self.task_queue.put(Task('set_valve_actuation', {'type': 'solenoid', 'location': 'pressure_relief', 'actuation': ValveActuation(0, 0)}))63		64	def run(self):65		self.start_time = time.time()66		self.task_executor.start()67		68		while True:69			self.sensors.log()70			self.check_pressure_sensors()71			72			self.telemetry.info('sensors', self.sensors.to_json())73			time.sleep(self.delay)74if __name__ == "__main__":75	import json76	config_file = open("config.json")77	config = json.load(config_file)78	config_file.close()79	launch = Launch(config)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
