Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...169        ROUTER.add(PATH_GET_RAW_METRICS, self.get_raw_metrics)170        self.alarm_scheduler = AlarmScheduler()171    def on_before_start(self):172        # re-schedule alarms for persistence use-case173        def restart_alarms(*args):174            poll_condition(lambda: SERVICE_PLUGINS.is_running("cloudwatch"))175            self.alarm_scheduler.restart_existing_alarms()176        start_worker_thread(restart_alarms)177    def on_before_stop(self):178        self.alarm_scheduler.shutdown_scheduler()179    def delete_alarms(self, context: RequestContext, alarm_names: AlarmNames) -> None:180        moto.call_moto(context)181        for alarm_name in alarm_names:182            arn = aws_stack.cloudwatch_alarm_arn(alarm_name)183            self.alarm_scheduler.delete_scheduler_for_alarm(arn)184    def get_raw_metrics(self, request: Request):185        region = aws_stack.extract_region_from_auth_header(request.headers)186        backend = cloudwatch_backends.get(region)187        if backend:...security.py
Source:security.py  
1import global_data2import security_threads3import sensors4import alarms5import database6import threading7import networking8import signal9import sys10import queue11import json12main_thread = security_threads.security_thread_class("Main Security Thread")13network_thread = security_threads.security_thread_class("Network Thread")14system_config = 015sensor_thread_array = []16alarm_thread_array = []17system_running = 118def security_main(main_thread):19   global system_config20   global sensor_thread_array21   global alarm_thread_array22   print("Starting Main")23   init_system()24   network_thread.start(networking.network_listener)25   sensors.start_sensor_threads(sensor_thread_array)26   alarms.start_alarm_threads(alarm_thread_array)27   my_ip = networking.get_my_ip()28   my_port = system_config.web_port29   old_ip = my_ip30   old_port = my_port31   print("main while")32   while (main_thread.is_active()):33      my_ip = networking.get_my_ip()34      my_port = system_config.web_port35      if((my_ip != old_ip) or (my_port != old_port)):36         print("Config changed")37         network_thread.stop_thread()38         network_thread.start(networking.network_listener)39         40      try:41         new_main_instruction = global_data.main_queue.get(timeout=global_data.main_queue_timeout)42      except:43         #main queue was empty and timed out44         pass45      else:46         #log whatever we got from queue here...47         process_main_instruction(new_main_instruction)48   49   security_shutdown()50   return51def process_main_instruction(instruction):52   if(instruction.group == "system_tasks"):53      if(instruction.task == "shutdown_delay"):54         while(main_thread.is_active()):55            pass56   if(instruction.group == "sensor_tasks"):57      if(instruction.task == "sensor_triggered"):58         print("sensor triggered")59   if(instruction.group == "alarm_tasks"):60      if(instruction.task == "alarm_on"):61         print("alarm on")62      if(instruction.task == "alarm_of"):63         print("alarm off")64   if(instruction.group == "local_tasks"):65      if(instruction.task == "get"):66         return_data = process_get(instruction)67         instruction.data = return_data68         global_data.network_queue.put(instruction, block=False)69      if(instruction.task == "post"):70         process_post(instruction)71def process_get(instruction):72   json_object = instruction.data73   instruction.data = {}74   if(json_object["command"] == "get_sensors"):75      sensor_list = database.get_db_sensors()76      instruction.data["command"] = "get_sensors"77      instruction.data["sensors"] = sensor_list78   if(json_object["command"] == "get_alarms"):79      alarm_list = database.get_db_alarms()80      instruction.data["command"] = "get_alarms"81      instruction.data["alarms"] = alarm_list82   return instruction.data83def process_post(instruction):84   global sensor_thread_array85   global alarm_thread_array86   sensor_array = []87   alarm_array = []88   restart_sensors = 089   restart_alarms = 090   if(instruction.data["command"] == "add_sensor"):91      restart_sensors = 192      sensors.add_sensor(instruction.data)93   if(instruction.data["command"] == "remove_sensor"):94      restart_sensors = 195      sensors.remove_sensor(instruction.data)96   if(instruction.data["command"] == "add_alarm"):97      restart_alarms = 198      alarms.add_alarm(instruction.data)99      100   if(instruction.data["command"] == "remove_alarm"):101      restart_alarms = 1102      alarms.remove_alarm(instruction.data)103   if(restart_sensors):104      sensors.stop_sensor_threads(sensor_thread_array)105      sensor_thread_array = []106      sensor_array = database.get_db_sensors()107      for s in sensor_array:108         new_sensor_thread = sensors.sensor_thread_class(s)109         sensor_thread_array.append(new_sensor_thread)110      sensors.start_sensor_threads(sensor_thread_array)111   112   if(restart_alarms):113      alarms.stop_alarm_threads(alarm_thread_array)114      alarm_thread_array = []115      alarm_array = database.get_db_alarms()116      for a in alarm_array:117         new_alarm_thread = alarms.alarm_thread_class(a)118         alarm_thread_array.append(new_alarm_thread)119      alarms.start_alarm_threads(alarm_thread_array)120def init_system():121   global system_config122   global sensor_thread_array123   global alarm_thread_array124   sensor_array = []125   alarm_array = []126   if(database.exists()):127      database.open_security_db()128   else:129      database.init_security_db()130   131   system_config = database.get_db_config()132   sensor_array = database.get_db_sensors()133   for s in sensor_array:134      new_sensor_thread = sensors.sensor_thread_class(s)135      sensor_thread_array.append(new_sensor_thread)136   137   alarm_array = database.get_db_alarms()138   for a in alarm_array:139      new_alarm_thread = alarms.alarm_thread_class(a)140      alarm_thread_array.append(new_alarm_thread)141def exit_handler(signum, frame):142   global system_running143   if(system_running):144      system_running = 0145      print("caught exit... shutting down")146      instruction = global_data.instruction_class()147      instruction.group = "system_tasks"148      instruction.task = "shutdown_delay"149      global_data.main_queue.put(instruction, block=False)150      main_thread.stop_thread()151def stop_threads():152   network_thread.stop_thread()153def security_shutdown():154   stop_threads()155   print("done")156print("Setting Up Interrupt Handler")157try:158   signal.signal(signal.SIGINT, exit_handler)159except:160   print("Couldn't lock SIGINT")161try:162   signal.signal(signal.SIGBREAK, exit_handler)163except:164   print("Couldn't lock SIGBREAK")165try:166   signal.signal(signal.SIGKILL, exit_handler)167except:168   print("Couldn't lock SIGKILL")169try:170   signal.signal(signal.SIGQUIT, exit_handler)171except:172   print("Couldn't lock SIGQUIT")173main_thread.start(security_main)174while(main_thread.is_active()):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
