Best Python code snippet using autotest_python
rpc_interface_unittest.py
Source:rpc_interface_unittest.py  
...171            is_active=True, requested_by=models.User.current_user())172        self.task3 = models.SpecialTask.objects.create(173            host=host, task=models.SpecialTask.Task.VERIFY,174            requested_by=models.User.current_user())  # not yet run175    def test_get_special_tasks(self):176        self._setup_special_tasks()177        tasks = rpc_interface.get_special_tasks(host__hostname='host1',178                                                queue_entry__isnull=True)179        self.assertEquals(len(tasks), 2)180        self.assertEquals(tasks[0]['task'], models.SpecialTask.Task.VERIFY)181        self.assertEquals(tasks[0]['is_active'], False)182        self.assertEquals(tasks[0]['is_complete'], True)183    def test_get_latest_special_task(self):184        # a particular usage of get_special_tasks()185        self._setup_special_tasks()186        self.task2.time_started = datetime.datetime(2009, 1, 2)187        self.task2.save()188        tasks = rpc_interface.get_special_tasks(189            host__hostname='host1', task=models.SpecialTask.Task.VERIFY,190            time_started__isnull=False, sort_by=['-time_started'],191            query_limit=1)192        self.assertEquals(len(tasks), 1)193        self.assertEquals(tasks[0]['id'], 2)194    def _common_entry_check(self, entry_dict):195        self.assertEquals(entry_dict['host']['hostname'], 'host1')196        self.assertEquals(entry_dict['job']['id'], 2)197    def test_get_host_queue_entries_and_special_tasks(self):198        self._setup_special_tasks()199        entries_and_tasks = (200            rpc_interface.get_host_queue_entries_and_special_tasks('host1'))201        paths = [entry['execution_path'] for entry in entries_and_tasks]202        self.assertEquals(paths, ['hosts/host1/3-verify',203                                  '2-autotest_system/host1',204                                  'hosts/host1/2-verify',205                                  '1-autotest_system/host1',206                                  'hosts/host1/1-verify'])207        verify2 = entries_and_tasks[2]208        self._common_entry_check(verify2)209        self.assertEquals(verify2['type'], 'Verify')210        self.assertEquals(verify2['status'], 'Running')211        self.assertEquals(verify2['execution_path'], 'hosts/host1/2-verify')212        entry2 = entries_and_tasks[1]213        self._common_entry_check(entry2)214        self.assertEquals(entry2['type'], 'Job')215        self.assertEquals(entry2['status'], 'Queued')216        self.assertEquals(entry2['started_on'], '2009-01-03 00:00:00')217    def test_view_invalid_host(self):218        # RPCs used by View Host page should work for invalid hosts219        self._create_job_helper(hosts=[1], profiles=['N/A'])220        self.hosts[0].delete()221        self.assertEquals(1, rpc_interface.get_num_hosts(hostname='host1',222                                                         valid_only=False))223        data = rpc_interface.get_hosts(hostname='host1', valid_only=False)224        self.assertEquals(1, len(data))225        self.assertEquals(1, rpc_interface.get_num_host_queue_entries(226            host__hostname='host1'))227        data = rpc_interface.get_host_queue_entries(host__hostname='host1')228        self.assertEquals(1, len(data))229        count = rpc_interface.get_num_host_queue_entries_and_special_tasks(230            hostname='host1')231        self.assertEquals(1, count)232        data = rpc_interface.get_host_queue_entries_and_special_tasks(233            hostname='host1')234        self.assertEquals(1, len(data))235    def test_reverify_hosts(self):236        hostname_list = rpc_interface.reverify_hosts(id__in=[1, 2])237        self.assertEquals(hostname_list, ['host1', 'host2'])238        tasks = rpc_interface.get_special_tasks()239        self.assertEquals(len(tasks), 2)240        self.assertEquals(set(task['host']['id'] for task in tasks),241                          set([1, 2]))242        task = tasks[0]243        self.assertEquals(task['task'], models.SpecialTask.Task.VERIFY)244        self.assertEquals(task['requested_by'], 'autotest_system')245    def test_parameterized_job(self):246        settings.settings.override_value(247            'AUTOTEST_WEB', 'parameterized_jobs', 'True')248        string_type = model_attributes.ParameterTypes.STRING249        test = models.Test.objects.create(250            name='test', test_type=model_attributes.TestTypes.SERVER)251        test_parameter = test.testparameter_set.create(name='key')252        profiler = models.Profiler.objects.create(name='profiler')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
