How to use _is_awaitable_timeout method in lisa

Best Python code snippet using lisa_python

lisa_runner.py

Source:lisa_runner.py Github

copy

Full Screen

...243 environment.status == EnvironmentStatus.Deployed244 ), f"actual: {environment.status}"245 self._reset_awaitable_timer("deploy")246 except ResourceAwaitableException as identifier:247 if self._is_awaitable_timeout("deploy"):248 self._log.info(249 f"[{environment.name}] timeout on waiting for more resource: "250 f"{identifier}, skip assigning case."251 )252 raise SkippedException(identifier)253 else:254 # rerun prepare to calculate resource again.255 environment.status = EnvironmentStatus.New256 except Exception as identifier:257 self._attach_failed_environment_to_result(258 environment=environment,259 result=test_results[0],260 exception=identifier,261 )262 self._delete_environment_task(environment=environment, test_results=[])263 def _initialize_environment_task(264 self, environment: Environment, test_results: List[TestResult]265 ) -> None:266 self._log.debug(f"start initializing task on '{environment.name}'")267 assert test_results268 try:269 environment.initialize()270 assert (271 environment.status == EnvironmentStatus.Connected272 ), f"actual: {environment.status}"273 except Exception as identifier:274 self._attach_failed_environment_to_result(275 environment=environment,276 result=test_results[0],277 exception=identifier,278 )279 self._delete_environment_task(environment=environment, test_results=[])280 def _run_test_task(281 self,282 environment: Environment,283 test_results: List[TestResult],284 case_variables: Dict[str, VariableEntry],285 ) -> None:286 self._log.debug(287 f"start running cases on '{environment.name}', "288 f"case count: {len(test_results)}, "289 f"status {environment.status.name}"290 )291 assert test_results292 assert len(test_results) == 1, (293 f"single test result to run, " f"but {len(test_results)} found."294 )295 test_result = test_results[0]296 suite_metadata = test_result.runtime_data.metadata.suite297 test_suite: TestSuite = suite_metadata.test_class(298 suite_metadata,299 )300 test_suite.start(301 environment=environment,302 case_results=test_results,303 case_variables=case_variables,304 )305 # release environment reference to optimize memory.306 test_result.environment = None307 # Some test cases may break the ssh connections. To reduce side effects308 # on next test cases, close the connection after each test run. It will309 # be connected on the next command automatically.310 environment.nodes.close()311 # Try to connect node(s), if cannot access node(s) of this environment,312 # set the current environment as Bad. So that this environment won't be reused.313 if not is_unittest() and not environment.nodes.test_connections():314 environment.status = EnvironmentStatus.Bad315 self._log.debug(316 f"set environment '{environment.name}' as bad, "317 f"because after test case '{test_result.name}', "318 f"node(s) cannot be accessible."319 )320 environment.nodes.close()321 # keep failed environment, not to delete322 if (323 test_result.status == TestStatus.FAILED324 and self.platform.runbook.keep_environment325 == constants.ENVIRONMENT_KEEP_FAILED326 ):327 self._log.debug(328 f"keep environment '{environment.name}', "329 f"because keep_environment is 'failed', "330 f"and test case '{test_result.name}' failed on it."331 )332 environment.status = EnvironmentStatus.Deleted333 # if an environment is in bad status, it will be deleted, not run more334 # test cases. But if the setting is to keep failed environment, it may335 # be kept in above logic.336 if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337 self._log.debug(338 f"delete environment '{environment.name}', "339 f"because it's in Bad status or marked as dirty."340 )341 self._delete_environment_task(342 environment=environment, test_results=test_results343 )344 def _delete_environment_task(345 self, environment: Environment, test_results: List[TestResult]346 ) -> None:347 """348 May be called async349 """350 # the predefined environment shouldn't be deleted, because it351 # serves all test cases.352 if environment.status == EnvironmentStatus.Deleted or (353 environment.status == EnvironmentStatus.Prepared354 and not environment.is_in_use355 ):356 # The prepared only environment doesn't need to be deleted.357 # It may cause platform fail to delete non-existing environment.358 environment.status = EnvironmentStatus.Deleted359 else:360 try:361 self.platform.delete_environment(environment)362 except Exception as identifier:363 self._log.debug(364 f"error on deleting environment '{environment.name}': {identifier}"365 )366 def _prepare_environment(self, environment: Environment) -> bool:367 success = True368 try:369 try:370 self.platform.prepare_environment(environment)371 self._reset_awaitable_timer("prepare")372 except ResourceAwaitableException as identifier:373 # if timed out, raise the exception and skip the test case. If374 # not, do nothing to keep env as new to try next time.375 if self._is_awaitable_timeout("prepare"):376 raise SkippedException(identifier)377 except Exception as identifier:378 success = False379 matched_result = self._match_failed_environment_with_result(380 environment=environment,381 candidate_results=self.test_results,382 exception=identifier,383 )384 self._attach_failed_environment_to_result(385 environment=environment,386 result=matched_result,387 exception=identifier,388 )389 return success...

Full Screen

Full Screen

runner.py

Source:runner.py Github

copy

Full Screen

...126 self._log_handler = create_file_handler(127 Path(self._log_file_name), self._log128 )129 self._working_folder = self._working_folder / runner_path_name130 def _is_awaitable_timeout(self, name: str) -> bool:131 _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())132 self._wait_resource_timers[name] = _wait_resource_timer133 if _wait_resource_timer.elapsed(False) < self._wait_resource_timeout * 60:134 # wait a while to prevent called too fast135 if not self._wait_resource_logged:136 self._log.info(f"{name} waiting for more resource...")137 self._wait_resource_logged = True138 time.sleep(5)139 return False140 else:141 self._log.info(f"{name} timeout on waiting for more resource...")142 return True143 def _reset_awaitable_timer(self, name: str) -> None:144 _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful