Best Python code snippet using autotest_python
test_rabbitmq.py
Source:test_rabbitmq.py  
...12mq_virtual_host = "dev"13class TestRabbitmqWriter(unittest.TestCase):14    def _create_writer(self):15        return RabbitmqWriter(host=mq_host, username=mq_username, password=mq_password, virtual_host=mq_virtual_host)16    def _create_monitor(self):17        return RabbitmqMonitor(host=mq_host, username=mq_username, password=mq_password, virtual_host=mq_virtual_host)18    def test_config(self):19        w = self._create_writer()20        self.assertEqual(w.config.host, mq_host)21        self.assertEqual(w.config.username, mq_username)22        self.assertEqual(w.config.username, mq_password)23        self.assertEqual(w.config.virtual_host, mq_virtual_host)24    def test_view_channels(self):25        w = self._create_writer()26        w.DUMP_QUEUES()27    def test_write_to_queue(self):28        w = self._create_writer()29        w.config.queue = "TEST_QUEUE"30        w.purge_queue()31        w.start()32        w.put("doc A")33        w.put("doc B")34        w.put("doc C")35        w.stop()36        w.wait()37        print "Waiting 5 seconds for rabbit mq queue stats to catch up."38        sleep(5)39        qsize = w.get_queue()["messages_ready"]40        w.DUMP_QUEUES()41        self.assertEqual(qsize, 3)42        # Clean up43        w.delete_queue()44    def test_write_to_exchange(self):45        w = self._create_writer()46        w.config.queue = "TEST_QUEUE"47        w.config.exchange = "TEST_EXCHANGE"48        w.purge_queue("TEST_EXCHANGE_shared")49        w.start()50        w.put("doc A")51        w.put("doc B")52        w.put("doc C")53        w.stop()54        w.wait()55        print "Waiting 5 seconds for rabbit mq queue stats to catch up."56        sleep(5)57        qsize = w.get_queue("TEST_EXCHANGE_shared")["messages_ready"]58        w.DUMP_QUEUES()59        self.assertEqual(qsize, 3)60        # Clean up61        w.delete_exchange()62        w.delete_queue("TEST_EXCHANGE_shared")63    def test_monitor_consume_queue(self):64        output = []65        r = self._create_monitor()66        r.add_callback(lambda doc: output.append(doc))67        w = self._create_writer()68        w.config.queue = r.config.queue = "TEST_QUEUE"69        w.purge_queue()70        w.start()71        w.put("doc A")72        w.put("doc B")73        w.put("doc C")74        w.stop()75        w.wait()76        r.start()77        print "Giving monitor a second to read all from queue."78        sleep(1)79        r.stop()80        r.wait()81        print "Waiting 5 seconds for rabbit mq queue stats to catch up."82        sleep(5)83        qsize = w.get_queue()["messages_ready"]84        print "Asserting queue size now 0."85        self.assertEqual(0, qsize)86        print "Asserting all items read."87        print output88        self.assertEqual(3, len(output))89        # Cleaning up90        w.delete_exchange()91        w.delete_queue()92    def test_monitor_exclusive_queue(self):93        output = []94        r = self._create_monitor()95        r.add_callback(lambda doc: output.append(doc))96        w = self._create_writer()97        w.config.exchange = r.config.exchange = "TEST_EXCHANGE"98        # This will attach to the exchange and get an exclusive non-persistent queue99        r.config.consuming = False100        w.purge_queue("TEST_EXCHANGE_shared")101        w.start()102        r.start()103        w.put("doc A")104        w.put("doc B")105        w.put("doc C")106        w.stop()107        w.wait()108        print "Giving monitor a second to read all from queue."109        sleep(1)110        r.stop()111        r.wait()112        print "Waiting 5 seconds for rabbit mq queue stats to catch up."113        sleep(5)114        qsize = w.get_queue("TEST_EXCHANGE_shared")["messages_ready"]115        print "Asserting all items still exist in consume queue."116        #self.assertEqual(3, qsize)117        print "Asserting all items read."118        print output119        self.assertEqual(3, len(output))120        # Cleaning up121        w.delete_exchange()122        w.delete_queue("TEST_EXCHANGE_shared")123    def test_multi_monitor(self):124        """125        Test two monitors consuming from the same queue. Both should get some126        items, however which gets which items could be random.127        """128        output1 = []129        output2 = []130        r1 = self._create_monitor()131        r2 = self._create_monitor()132        r1.add_callback(lambda doc: output1.append(doc))133        r2.add_callback(lambda doc: output2.append(doc))134        w = self._create_writer()135        w.config.queue = r1.config.queue = r2.config.queue = "TEST_QUEUE"136        r1.config.consuming = r2.config.consuming = False  # But ignored since we are not using exchange..137        w.purge_queue()138        w.start()139        # Need these both to be active before we put items, or the first one will consume all before the other is created.140        r1.start()141        r2.start()142        w.put("doc A")143        w.put("doc B")144        w.put("doc C")145        w.stop()146        w.wait()147        print "Giving monitors a second to read all from queue."148        sleep(1)149        r1.stop()150        r2.stop()151        r1.wait()152        r2.wait()153        print "Asserting all items read."154        output = output1 + output2155        print output156        self.assertEqual(3, len(output))157        print "Asserting distribution"158        print "1:", output1159        print "2:", output2160        self.assertNotEquals(0, len(output1))161        self.assertNotEquals(0, len(output2))162        print "Waiting 5 seconds for rabbit mq queue stats to catch up."163        sleep(5)164        qsize = w.get_queue()["messages_ready"]165        print "Asserting queue size now 0."166        self.assertEqual(0, qsize)167        # Cleaning up168        w.delete_queue()169    def test_multi_monitor_with_exchange(self):170        """171        Test two monitors consuming from the same consumable queue and two172        listening on exclusive queues. Both consuming should get some items,173        however which gets which items could be random. Both monitors on174        exclusive queues should get all items.175        """176        output1 = []177        output2 = []178        excout1 = []179        excout2 = []180        r1 = self._create_monitor()181        r2 = self._create_monitor()182        e1 = self._create_monitor()183        e2 = self._create_monitor()184        r1.add_callback(lambda doc: output1.append(doc))185        r2.add_callback(lambda doc: output2.append(doc))186        e1.add_callback(lambda doc: excout1.append(doc))187        e2.add_callback(lambda doc: excout2.append(doc))188        w = self._create_writer()189        w.config.exchange = "TEST_EXCHANGE"190        r1.config.exchange = r2.config.exchange = "TEST_EXCHANGE"191        e1.config.exchange = e2.config.exchange = "TEST_EXCHANGE"192        r1.config.consuming = r2.config.consuming = True193        e1.config.consuming = e2.config.consuming = False194        w.purge_queue("TEST_EXCHANGE_shared")195        w.start()196        # Need these both to be active before we put items, or the first one will consume all before the other is created.197        r1.start()...test_alerts.py
Source:test_alerts.py  
...16        app_config.update({17            KEY_REQUEST_FREQUENCY_PER_S: self.request_frequency_per_s,18            KEY_LOG_RETENTION_TIME_S: self.log_retention_time_s19        })20    def _create_monitor(self, count, time_delta_s):21        """22        Helper adding logs to the log_queue for a set date (+ offset :time_delta_s: if required), and creating a Monitor23        object that will run for an interval [start_interval_time, end_interval_time] containing the logs such that24        start_interval_time < log_time < end_interval_time25        :param count: number of logs to be generated for the time interval26        :param time_delta_s: offset time in seconds at which logs are generated, useful when multiple executions27        :return: a Monitor ready to run and process the logs in the log_queue for the specified time interval28        """29        date = datetime(year=2018, month=12, day=12, hour=0, minute=0, second=0) + timedelta(seconds=time_delta_s)30        self._generate_logs(count=count, date=date)31        task_start_time = date + timedelta(seconds=1)32        start_interval_time = date - timedelta(seconds=1)33        end_interval_time = date + timedelta(seconds=1)34        expiry_time = date - timedelta(seconds=self.log_retention_time_s)35        return Monitor(self.log_queue, self.console_model, task_start_time, start_interval_time,36                       end_interval_time, expiry_time)37    def _generate_logs(self, count, date, remote_host='127.0.0.1', auth_user='paul', request_verb='GET',38                       resource='/book/1', protocol='HTTP/1.0', status=200, bytes=20):39        """40        Helper generating a number of logs at a given date and adding them to the log log_queue41        :param count: number of logs to generate42        :param date: time at which logs will be generated43        """44        for _ in range(count):45            self.log_queue.append(Log(remote_host, auth_user, date, request_verb, resource, protocol, status, bytes))46    def test_sending_less_logs_than_threshold_should_not_trigger_an_alert(self):47        # Given48        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s - 1  # normal49        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)50        # When51        monitor.run()52        # Then53        self.assertEqual(len(self.console_model.get_alert_history_messages()), 0)54        self.assertEqual(len(self.console_model.get_current_alerts()), 0)55        self.assertEqual(len(self.console_model.get_previous_alerts()), 0)56    def test_sending_more_logs_than_threshold_should_trigger_an_alert(self):57        # Given58        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1  # spike59        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)60        # When61        monitor.run()62        # Then63        self.assertEqual(len(self.console_model.get_alert_history_messages()), 1)64        self.assertEqual(len(self.console_model.get_current_alerts()), 1)65        self.assertEqual(len(self.console_model.get_previous_alerts()), 0)66    def test_sending_more_logs_than_threshold_should_trigger_a_high_traffic_alert(self):67        # Given68        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 169        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)70        # When71        monitor.run()72        # Then73        alert = self.console_model.get_current_alerts()[0]74        self.assertEqual(alert.type, alerts.TYPE_HIGH_TRAFFIC)75        self.assertFalse(alert.recovered)76    def test_sending_more_logs_than_threshold_but_recovering_after_should_clear_alert(self):77        # Given78        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 179        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)80        monitor.run()81        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s - 182        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s)83        # When84        monitor.run()85        # Then86        self.assertEqual(len(self.console_model.get_current_alerts()), 0)87    def test_sending_more_logs_than_threshold_but_recovering_after_should_create_recovered_alert(self):88        # Given89        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 190        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)91        monitor.run()92        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s - 193        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s)94        # When95        monitor.run()96        # Then97        self.assertEqual(len(self.console_model.get_previous_alerts()), 1)98        alert = self.console_model.get_previous_alerts()[0]99        self.assertEqual(alert.type, alerts.TYPE_HIGH_TRAFFIC)100        self.assertTrue(alert.recovered)101    def test_sending_more_logs_than_threshold_but_recovering_after_should_create_2_alerts_in_history(self):102        # Given103        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1104        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)105        monitor.run()106        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s - 1107        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s)108        # When109        monitor.run()110        # Then111        self.assertEqual(len(self.console_model.get_alert_history_messages()), 2)112    def test_going_multiple_time_above_threshold_within_same_retention_timeframe_does_not_trigger_multiple_alarms(self):113        # Given114        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1115        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)116        monitor.run()117        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1118        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s - 1)119        # When120        monitor.run()121        # Then122        self.assertEqual(len(self.console_model.get_alert_history_messages()), 1)123        self.assertEqual(len(self.console_model.get_current_alerts()), 1)124        self.assertEqual(len(self.console_model.get_previous_alerts()), 0)125    def test_going_multiple_time_above_threshold_within_different_retention_timeframe_triggers_multiple_alarms(self):126        # Given127        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1  # spike128        monitor = self._create_monitor(log_to_generate_count, time_delta_s=0)129        monitor.run()130        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s - 1  # recovery131        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s + 10)132        monitor.run()133        log_to_generate_count = self.request_frequency_per_s * self.log_retention_time_s + 1  # spike134        monitor = self._create_monitor(log_to_generate_count, time_delta_s=self.log_retention_time_s + 10)135        # When136        monitor.run()137        # Then138        self.assertEqual(len(self.console_model.get_alert_history_messages()), 3)139        self.assertEqual(len(self.console_model.get_current_alerts()), 1)...screen.py
Source:screen.py  
...9        self.width    = 30.010        self.allowGUI = True11        self.screen_number = screen_number12        self.window   = self._create_window()13        self.monitor  = self._create_monitor()14        15    def _create_window(self): 16        return visual.Window(size = self.size, 17                             screen = self.screen_number,18                             monitor = self._create_monitor(),19                             fullscr = self.fullscr,20                             units = self.units,21                             color = self.color, 22                             allowGUI = self.allowGUI) 23    def _create_monitor(self):24        # set up monitor25        monitor = monitors.Monitor(26       "stimulus",27        distance = self.distance,28        width = self.width,29        )30        monitor.setSizePix(self.size) # screen size (not window!) look in display prefs 31        monitor.saveMon()32        return monitor33    34    def fixation_cross(self):35        #fixation cross36        fixation = visual.ShapeStim(self.window, 37            vertices=((0, -0.05), (0, 0.05), (0,0), (-0.03,0), (0.03, 0)),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
