How to use is_parent_session method in Slash

Best Python code snippet using slash

slash_plugin.py

Source:slash_plugin.py Github

copy

Full Screen

1from __future__ import print_function2import functools3import hashlib4import itertools5import json6import os7import pkg_resources8import socket9import sys10import time11import webbrowser12import logbook13import requests14import vintage15try:16 import git17except Exception as e: # pylint: disable=broad-except18 pass19import slash20from sentinels import NOTHING21from slash import config as slash_config22from slash.plugins import PluginInterface, registers_on23from slash.utils.conf_utils import Cmdline, Doc24from urlobject import URLObject as URL25from requests import HTTPError26from .._compat import shellquote27from ..client import Backslash as BackslashClient28from ..exceptions import ParamsTooLarge29from ..utils import ensure_dir30from .keepalive_thread import KeepaliveThread31from .utils import normalize_file_path, distill_slash_traceback, distill_object_attributes, add_environment_variable_metadata32from ..lazy_query import LazyQuery33from ..session import APPEND_UPCOMING_TESTS_STR34from ..__version__ import __version__ as BACKSLASH_CLIENT_VERSION35_DEFAULT_CONFIG_FILENAME = os.path.expanduser('~/.backslash/config.json')36_logger = logbook.Logger(__name__)37_PWD = os.path.abspath('.')38_HAS_TEST_AVOIDED = (int(slash.__version__.split('.')[0]) >= 1)39_HAS_SESSION_INTERRUPT = hasattr(slash.hooks, 'session_interrupt')40_HAS_TEST_DISTRIBUTED = hasattr(slash.hooks, 'test_distributed')41_HAS_APP_QUIT = hasattr(slash.hooks, 'app_quit')42def handle_exceptions(func):43 @functools.wraps(func)44 def new_func(self, *args, **kwargs):45 try:46 with slash.exception_handling.handling_exceptions():47 return func(self, *args, **kwargs)48 except Exception: # pylint: disable=broad-except49 exc_info = sys.exc_info()50 if not self._handle_exception(exc_info) or self._propagate_exceptions: # pylint: disable=protected-access51 raise52 return new_func53class BackslashPlugin(PluginInterface):54 client = current_test = session = None55 def __init__(self, url=None, keepalive_interval=None, runtoken=None,56 propagate_exceptions=False, config_filename=_DEFAULT_CONFIG_FILENAME):57 super().__init__()58 self._url = url59 self._repo_cache = {}60 self._config_filename = config_filename61 self._file_hash_cache = {}62 self._keepalive_interval = keepalive_interval63 self._keepalive_thread = None64 self._error_containers = {}65 self._runtoken = runtoken66 self._propagate_exceptions = propagate_exceptions67 self._started = False68 self._adding_error = False69 @property70 def rest_url(self):71 if self.client is None:72 return None73 return self.client.url.add_path('rest')74 @property75 def webapp_url(self):76 if self.client is None:77 return None78 return self.client.get_ui_url()79 @property80 def session_webapp_url(self):81 session = slash.context.session82 if session is None or self.client is None:83 return None84 return self.client.get_ui_url(f'sessions/{session.id}')85 def _handle_exception(self, exc_info):86 pass87 def _handle_keepalive_exception(self, exc_info):88 pass89 def _get_backslash_url(self):90 return self._url91 def get_name(self):92 return 'backslash'93 def get_default_config(self):94 return {95 "session_ttl_days": 0 // Doc(96 'Optional number of days after which this session will be discarded '97 'from Backslash') // Cmdline(arg='--session-ttl-days', metavar='DAYS'),98 "session_labels": [] // Doc('Specify labels to be added to the session when reported') \99 // Cmdline(append="--session-label", metavar="LABEL"),100 "blacklisted_warnings_category": [] // Doc(101 'Specify warnings categories which should not be reported to backslash'),102 "report_test_docstrings": False // Doc(103 'Add test docstring to backslash test metadata') // Cmdline(on="--report_test_docstrings"),104 }105 @handle_exceptions106 def activate(self):107 if self._runtoken is None:108 self._runtoken = self._ensure_run_token()109 self.client = BackslashClient(110 URL(self._get_backslash_url()),111 self._runtoken, headers=self._get_default_headers())112 def _get_default_headers(self):113 """Override this method to control the headers sent to the Backslash server114 on each reequest115 """116 return None117 def deactivate(self):118 if self._keepalive_thread is not None:119 self._keepalive_thread.stop()120 super().deactivate()121 def _notify_session_start(self):122 metadata = self._get_initial_session_metadata()123 is_parent_session = False124 parent_logical_id = child_id = None125 is_slash_support_parallel = getattr(slash_config.root, 'parallel', False)126 if is_slash_support_parallel and slash_config.root.parallel.num_workers:127 child_id = slash_config.root.parallel.worker_id128 if child_id is not None:129 parent_logical_id = getattr(slash.context.session, 'parent_session_id', None)130 else:131 is_parent_session = True132 self.session = self.client.report_session_start(133 logical_id=slash.context.session.id,134 parent_logical_id=parent_logical_id,135 is_parent_session=is_parent_session,136 child_id=child_id,137 total_num_tests=slash.context.session.get_total_num_tests(),138 hostname=socket.getfqdn(),139 keepalive_interval=self._keepalive_interval,140 infrastructure='slash',141 metadata=metadata,142 **self._get_extra_session_start_kwargs()143 )144 self._started = True145 for warning in slash.context.session.warnings:146 self.warning_added(warning)147 for label in self.current_config.session_labels:148 self.client.api.call.add_label(session_id=self.session.id, label=label)149 if self._keepalive_interval is not None:150 self._keepalive_thread = KeepaliveThread(151 self.client,152 self.session,153 self._keepalive_interval,154 error_callback=self._handle_keepalive_exception155 )156 self._keepalive_thread.start()157 @handle_exceptions158 def session_start(self):159 if self.session is None:160 self._notify_session_start()161 def _get_initial_session_metadata(self):162 returned = {163 'slash::version': slash.__version__,164 'slash::commandline': ' '.join(shellquote(arg) for arg in sys.argv),165 'backslash_client_version': BACKSLASH_CLIENT_VERSION,166 'python_version': '.'.join(map(str, sys.version_info[:3])),167 'process_id': os.getpid(),168 }169 add_environment_variable_metadata(metadata=returned)170 return returned171 def _get_extra_session_start_kwargs(self):172 returned = {}173 ttl_seconds = self.current_config.session_ttl_days * 24 * 60 * 60174 if ttl_seconds:175 returned['ttl_seconds'] = ttl_seconds176 return returned177 @slash.plugins.register_if(_HAS_TEST_AVOIDED)178 @handle_exceptions179 def test_avoided(self, reason):180 self.test_start()181 self.test_skip(reason=reason)182 self.test_end()183 @handle_exceptions184 def test_interrupt(self):185 if self.current_test is not None:186 self.current_test.report_interrupted()187 @slash.plugins.register_if(_HAS_SESSION_INTERRUPT)188 @handle_exceptions189 def session_interrupt(self):190 if self.session is not None:191 self.session.report_interrupted()192 @slash.plugins.registers_on(None)193 @handle_exceptions194 def report_planned_tests(self, tests):195 if not APPEND_UPCOMING_TESTS_STR in self.client.api.info().endpoints:196 return197 tests_metadata = []198 for test in tests:199 test_info = self._get_test_info(test)200 current = {'test_logical_id':test.__slash__.id,201 'file_name':test_info['file_name'],202 'name':test_info['name'],203 'class_name':test_info['class_name']204 }205 if 'variation' in test_info:206 current['variation'] = test_info['variation']207 tests_metadata.append(current)208 tests_count = 0209 batch_size = 100210 try:211 while tests_count < len(tests_metadata):212 self.session.report_upcoming_tests(tests_metadata[tests_count:tests_count+batch_size])213 tests_count += batch_size214 except requests.exceptions.HTTPError:215 _logger.error('Ignoring exception while reporting planned tests', exc_info=True)216 @handle_exceptions217 def test_start(self):218 kwargs = self._get_test_info(slash.context.test)219 self._update_scm_info(kwargs)220 tags = slash.context.test.__slash__.tags221 tag_dict = {tag_name: tags[tag_name] for tag_name in tags}222 if tag_dict:223 kwargs['metadata'] = {224 'slash::tags': {225 'values': {tag_name: tag_value for tag_name, tag_value in tag_dict.items() if tag_value is not NOTHING},226 'names': list(tag_dict),227 },228 }229 log_path = slash.context.result.get_log_path()230 if log_path:231 kwargs.setdefault('metadata', {})['local_log_path'] = os.path.abspath(log_path)232 if self.current_config.report_test_docstrings and slash.test.get_test_function().__doc__:233 kwargs.setdefault('metadata', {})['docstring'] = slash.test.get_test_function().__doc__234 self.current_test = self.session.report_test_start(235 test_logical_id=slash.context.test.__slash__.id,236 test_index=slash.context.test.__slash__.test_index1,237 **kwargs238 )239 self._error_containers[slash.context.test.__slash__.id] = self.current_test240 @slash.plugins.register_if(_HAS_TEST_DISTRIBUTED)241 @handle_exceptions #pylint: disable=unused-argument242 def test_distributed(self, test_logical_id, worker_session_id): #pylint: disable=unused-argument243 if 'report_test_distributed' in self.client.api.info().endpoints:244 self.current_test = self.session.report_test_distributed(test_logical_id)245 @handle_exceptions246 def test_skip(self, reason=None):247 self.current_test.mark_skipped(reason=reason)248 @slash.plugins.registers_on(None)249 def is_session_exist(self, session_id):250 try:251 self.client.api.get(f'/rest/sessions/{session_id}')252 return True253 except HTTPError as e:254 if e.response.status_code == 404:255 return False256 raise257 @handle_exceptions258 @slash.plugins.registers_on(None)259 def get_tests_to_resume(self, session_id, filters_dict):260 """Queries backslash specific session's tests261 :param session_id: the wanted session262 :param filters_dict: a dictionary containing filters for backslash tests query263 :rtype: list of test objects264 """265 max_retries = 3266 for i in range(max_retries):267 try:268 default_params_dict = {x: 'true' for x in ['show_planned', 'show_skipped', 'show_unsuccessful', 'show_abandoned']}269 default_params_dict.update({'session_id': session_id, 'show_successful': 'false'})270 default_params_dict.update(filters_dict)271 return reversed(LazyQuery(self.client, '/rest/tests', query_params=default_params_dict).all())272 except HTTPError:273 if i == max_retries-1:274 raise275 def _get_test_info(self, test):276 if test.__slash__.is_interactive() and \277 pkg_resources.parse_version(slash.__version__) < pkg_resources.parse_version('1.6.0'):278 returned = {279 'file_name': '<interactive>',280 'class_name': '<interactive>',281 'name': '<interactive>',282 'is_interactive': True283 }284 else:285 test_display_name = test.__slash__.address286 if set(test_display_name) & set('/.'):287 test_display_name = test.__slash__.function_name288 returned = {289 'file_name': normalize_file_path(test.__slash__.file_path),290 'class_name': test.__slash__.class_name,291 'name': test_display_name,292 'is_interactive': test.__slash__.is_interactive(),293 }294 variation = getattr(test.__slash__, 'variation', None)295 if variation:296 if hasattr(test.__slash__.variation, 'labels'):297 items = test.__slash__.variation.labels.items()298 returned['parameters'] = variation.values.copy()299 elif hasattr(test.__slash__.variation, 'id'):300 items = test.__slash__.variation.id.items()301 returned['parameters'] = variation.values.copy()302 else:303 items = test.__slash__.variation.items()304 returned['variation'] = dict((name, value) for name, value in items)305 return returned306 def _update_scm_info(self, test_info):307 try:308 test_info['file_hash'] = self._calculate_file_hash(test_info['file_name'])309 dirname = os.path.dirname(test_info['file_name'])310 repo = self._repo_cache.get(dirname, NOTHING)311 if repo is NOTHING:312 repo = self._repo_cache[dirname] = self._get_git_repo(dirname)313 if repo is None:314 return315 test_info['scm'] = 'git'316 try:317 hexsha = repo.head.commit.hexsha318 except Exception: # pylint: disable=broad-except319 _logger.debug('Unable to get commit hash', exc_info=True)320 hexsha = None321 test_info['scm_revision'] = hexsha322 test_info['scm_dirty'] = bool(repo.untracked_files or repo.index.diff(None) or repo.index.diff(repo.head.commit))323 if self.client.api.info().endpoints.report_test_start.version >= 3:324 if not repo.head.is_detached:325 test_info['scm_local_branch'] = repo.active_branch.name326 tracking_branch = repo.active_branch.tracking_branch()327 if tracking_branch is not None:328 test_info['scm_remote_branch'] = tracking_branch.name329 except Exception: # pylint: disable=broad-except330 _logger.warning('Error when obtaining SCM information', exc_info=True)331 def _calculate_file_hash(self, filename):332 returned = self._file_hash_cache.get(filename)333 if returned is None:334 try:335 with open(filename, 'rb') as f:336 data = f.read()337 h = hashlib.sha1()338 h.update('blob '.encode('utf-8'))339 h.update(f'{len(data)}\0'.encode('utf-8'))340 h.update(data)341 except IOError as e:342 _logger.debug(f'Ignoring IOError {e!r} when calculating file hash for {filename}')343 returned = None344 else:345 returned = h.hexdigest()346 self._file_hash_cache[filename] = returned347 return returned348 def _get_git_repo(self, dirname):349 if not os.path.isabs(dirname):350 dirname = os.path.abspath(os.path.join(_PWD, dirname))351 while dirname != os.path.normpath(os.path.abspath(os.path.sep)):352 if os.path.isdir(os.path.join(dirname, '.git')):353 return git.Repo(dirname)354 dirname = os.path.normpath(os.path.abspath(os.path.join(dirname, '..')))355 return None356 @handle_exceptions357 def test_end(self):358 if self.current_test is None:359 return360 details = {}361 if hasattr(slash.context.result, 'details'):362 additional = slash.context.result.details.all()363 else:364 additional = slash.context.result.get_additional_details()365 details.update(additional)366 self.current_test.set_metadata_dict(details)367 self.current_test.report_end()368 self.current_test = None369 @handle_exceptions370 def session_end(self):371 self._session_report_end('session_end')372 @slash.plugins.register_if(_HAS_APP_QUIT)373 @handle_exceptions # pylint: disable=unused-argument374 def app_quit(self):375 self._session_report_end('app_quit')376 def _session_report_end(self, hook_name):377 if not self._started:378 return379 try:380 if self._keepalive_thread is not None:381 self._keepalive_thread.stop()382 kwargs = {}383 session_results = getattr(slash.session, 'results', None)384 has_fatal_errors = hasattr(session_results, 'has_fatal_errors') and session_results.has_fatal_errors()385 if self.client.api.info().endpoints.report_session_end.version >= 2:386 kwargs['has_fatal_errors'] = has_fatal_errors387 self.session.report_end(**kwargs)388 self._started = False389 except Exception: # pylint: disable=broad-except390 _logger.error(f'Exception ignored in {hook_name}', exc_info=True)391 @handle_exceptions392 def error_added(self, result, error):393 if self._adding_error:394 return395 self._adding_error = True396 try:397 with slash.exception_handling.handling_exceptions():398 self._add_exception(result=result, exception=error, is_fatal=error.is_fatal())399 finally:400 self._adding_error = False401 @slash.plugins.register_if(hasattr(slash.hooks, 'interruption_added'))402 @handle_exceptions403 def interruption_added(self, result, exception):404 self._add_exception(result=result, exception=exception, is_interruption=True)405 def _add_exception(self, result, exception, is_interruption=False, is_fatal=False):406 has_interruptions = self.client.api.info().endpoints.add_error.version >= 4407 if is_interruption and not has_interruptions:408 _logger.debug('Server does not support recording is_interruption exceptions. Skipping reporting')409 return410 if result is slash.session.results.global_result:411 error_container = self.session412 else:413 error_container = self._error_containers.get(result.test_metadata.id, self.current_test) or self.session414 if error_container is None:415 _logger.debug('Could not determine error container to report on for {}', result)416 return417 with vintage.get_no_deprecations_context():418 exception_attrs = getattr(exception, 'exception_attributes', NOTHING)419 if exception_attrs is NOTHING and hasattr(exception, 'exc_info'):420 exception_attrs = distill_object_attributes(exception.exc_info[1])421 kwargs = {'exception_type': exception.exception_type.__name__ if exception.exception_type is not None else None,422 'traceback': distill_slash_traceback(exception), 'exception_attrs': exception_attrs}423 if exception.message:424 message = exception.message425 elif hasattr(exception, 'exception_str'):426 message = exception.exception_str427 else:428 message = str(exception.exception)429 kwargs['message'] = message430 if has_interruptions:431 kwargs['is_interruption'] = is_interruption432 has_fatal = self.client.api.info().endpoints.add_error.version >= 5433 if has_fatal:434 kwargs['is_fatal'] = is_fatal435 for compact_variables in [False, True]:436 if compact_variables:437 for frame in kwargs['traceback']:438 frame['globals'] = None439 frame['locals'] = None440 try:441 error_container.add_error(**kwargs)442 except ParamsTooLarge:443 if compact_variables:444 raise445 # continue to try compacting446 else:447 break448 @handle_exceptions449 def warning_added(self, warning):450 if any(issubclass(warning.category, b_cls) for b_cls in self.current_config.blacklisted_warnings_category):451 return452 kwargs = {'message': warning.message, 'filename': warning.filename, 'lineno': warning.lineno}453 warning_obj = self.current_test if self.current_test is not None else self.session454 if warning_obj is not None:455 warning_obj.add_warning(**kwargs)456 @handle_exceptions457 def exception_caught_before_debugger(self, **_):458 if self.session is not None and slash.config.root.debug.enabled:459 self.session.report_in_pdb()460 @handle_exceptions461 def exception_caught_after_debugger(self, **_):462 if self.session is not None and slash.config.root.debug.enabled:463 self.session.report_not_in_pdb()464 #### Token Setup #########465 def _ensure_run_token(self):466 if self._runtoken is None:467 tokens = self._get_existing_tokens()468 self._runtoken = tokens.get(self._get_backslash_url())469 if self._runtoken is None:470 self._runtoken = self._fetch_token()471 self._save_token(self._runtoken)472 return self._runtoken473 def _get_existing_tokens(self):474 return self._get_config().get('run_tokens', {})475 def _get_config(self):476 if not os.path.isfile(self._config_filename):477 return {}478 with open(self._config_filename) as f:479 return json.load(f)480 def _save_token(self, token):481 tmp_filename = self._config_filename + '.tmp'482 cfg = self._get_config()483 cfg.setdefault('run_tokens', {})[self._get_backslash_url()] = token484 ensure_dir(os.path.dirname(tmp_filename))485 with open(tmp_filename, 'w') as f:486 json.dump(cfg, f, indent=2)487 os.rename(tmp_filename, self._config_filename)488 @registers_on(None)489 def fetch_token(self, username, password):490 url = URL(self._get_backslash_url())491 with requests.Session() as s:492 resp = s.get(self._get_token_request_url())493 resp.raise_for_status()494 response_url = resp.json()['url']495 request_id = response_url.split('/')[-1]496 s.post(url.add_path('login'),497 data=json.dumps({'username': username, 'password': password}),498 headers={'Content-type': 'application/json'})\499 .raise_for_status()500 s.post(URL(self._get_backslash_url()).add_path(f'/runtoken/request/{request_id}/complete'))\501 .raise_for_status()502 resp = s.get(response_url)503 resp.raise_for_status()504 returned = self._runtoken = resp.json()['token']505 return returned506 def _fetch_token(self):507 """Template method for the fallback behavior of fetching a runtoken from the server.508 By default this uses _fetch_token_via_browser to initiate the browser-based509 authentication.510 You can override this method to provide a custom logic for fetching your tokens511 """512 return self._fetch_token_via_browser()513 def _fetch_token_via_browser(self):514 opened_browser = False515 url = self._get_token_request_url()516 for retry in itertools.count():517 resp = requests.get(url)518 resp.raise_for_status()519 data = resp.json()520 if retry == 0:521 url = data['url']522 token = data.get('token')523 if token:524 return token525 if not opened_browser:526 if not self._browse_url(data['complete']):527 print('Could not open browser to fetch user token. Please login at', data['complete'])528 print('Waiting for Backlash token...')529 opened_browser = True530 time.sleep(1)531 def _get_token_request_url(self):532 return URL(self._get_backslash_url()).add_path('/runtoken/request/new')533 def _browse_url(self, url):534 if 'linux' in sys.platform and os.environ.get('DISPLAY') is None:535 return False # can't start browser...

Full Screen

Full Screen

test_parallel.py

Source:test_parallel.py Github

copy

Full Screen

...347 test.append_line("from slash import config")348 test.append_line("slash.context.result.data.setdefault('worker_id', config.root.parallel.worker_id)")349 @slash.hooks.tests_loaded.register # pylint: disable=no-member, unused-argument350 def tests_loaded(tests): # pylint: disable=unused-variable, unused-argument351 if slash.utils.parallel_utils.is_parent_session():352 from slash import ctx353 workers = ctx.session.parallel_manager.workers354 for index in range(len(tests)):355 worker_id = '1' if index%2 == 0 else '2'356 workers[worker_id].force_test(index)357 summary = parallel_suite.run(num_workers=2)358 assert summary.session.results.is_success()359 for index, test in enumerate(parallel_suite):360 [result] = summary.get_all_results_for_test(test)361 assert result.data['worker_id'] if index%2 == 0 else '2'362def test_force_on_one_worker(parallel_suite):363 @slash.hooks.tests_loaded.register # pylint: disable=no-member, unused-argument364 def tests_loaded(tests): # pylint: disable=unused-variable, unused-argument365 if slash.utils.parallel_utils.is_parent_session():366 from slash import ctx367 worker = list(ctx.session.parallel_manager.workers.values())[0]368 for index in range(len(tests)):369 worker.force_test(index)370 summary = parallel_suite.run(num_workers=2)371 assert summary.session.results.is_success()372@pytest.mark.parametrize('num_workers', [1, 2])373def test_exclude_on_one_worker(parallel_suite, config_override, num_workers):374 config_override("parallel.no_request_timeout", 2)375 @slash.hooks.tests_loaded.register # pylint: disable=no-member, unused-argument376 def tests_loaded(tests): # pylint: disable=unused-variable, unused-argument377 if slash.utils.parallel_utils.is_parent_session():378 from slash import ctx379 worker = list(ctx.session.parallel_manager.workers.values())[0]380 for index in range(len(tests)):381 worker.exclude_test(index)382 if num_workers == 1:383 summary = parallel_suite.run(num_workers=num_workers, verify=False)384 assert summary.session.results.get_num_started() == 0385 else:386 summary = parallel_suite.run(num_workers=num_workers)387 assert summary.session.results.is_success()388@pytest.mark.parametrize('use_test_index', [True, False])389def test_exclude_test_on_all_workers_causes_timeout(parallel_suite, config_override, use_test_index):390 config_override("parallel.no_request_timeout", 2)391 @slash.hooks.tests_loaded.register # pylint: disable=no-member, unused-argument392 def tests_loaded(tests): # pylint: disable=unused-variable, unused-argument393 if slash.utils.parallel_utils.is_parent_session():394 from slash import ctx395 for worker in ctx.session.parallel_manager.workers.values():396 if use_test_index:397 worker.exclude_test(0)398 else:399 worker.exclude_test(tests[0])400 summary = parallel_suite.run(num_workers=2, verify=False)401 assert summary.session.results.get_num_started() == len(parallel_suite) - 1402 assert not summary.get_all_results_for_test(parallel_suite[0])403def test_exclude_and_force_on_same_worker_raises_runtime_err(parallel_suite):404 @slash.hooks.tests_loaded.register # pylint: disable=no-member, unused-argument405 def tests_loaded(tests): # pylint: disable=unused-variable, unused-argument406 if slash.utils.parallel_utils.is_parent_session():407 from slash import ctx408 worker = list(ctx.session.parallel_manager.workers.values())[0]409 worker.exclude_test(0)410 worker.force_test(0)411 summary = parallel_suite.run(num_workers=2, verify=False)...

Full Screen

Full Screen

sessions.py

Source:sessions.py Github

copy

Full Screen

1from flask import g, request2from flask_simple_api import error_abort3import flux4import requests5from sqlalchemy.orm.exc import NoResultFound6from sqlalchemy.exc import IntegrityError7from ...auth import get_or_create_user8from ...search import get_orm_query_from_search_string9from ...models import Session, Test, db, SessionMetadata, User10from ...utils import get_current_time, statuses11from ...utils.api_utils import requires_role12from ...utils.subjects import get_or_create_subject_instance13from ...utils.users import has_role14from ...utils import profiling15from .blueprint import API16NoneType = type(None)17_DEFAULT_DELETE_GRACE_PERIOD_SECONDS = 60 * 60 * 24 * 3018@API(version=3)19def report_session_start(logical_id: str=None,20 parent_logical_id: (NoneType, str)=None,21 is_parent_session: bool=False,22 child_id: (NoneType, str)=None,23 hostname: str=None,24 total_num_tests: int=None,25 metadata: dict=None,26 user_email: str=None,27 keepalive_interval: (NoneType, int)=None,28 subjects: (list, NoneType)=None,29 infrastructure: (str, NoneType)=None,30 ttl_seconds: (int, NoneType)=None,31 ):32 if hostname is None:33 hostname = request.remote_addr34 # fix user identification35 if user_email is not None and user_email != g.token_user.email:36 if not has_role(g.token_user.id, 'proxy'):37 error_abort('User {} is not authorized to run tests on others behalf. Tried running as {}'.format(g.token_user.email, user_email),38 code=requests.codes.forbidden)39 real_user_id = g.token_user.id40 real_user = get_or_create_user({'email': user_email})41 user_id = real_user.id42 else:43 user_id = g.token_user.id44 real_user = None45 real_user_id = None46 if keepalive_interval is None and ttl_seconds is not None:47 error_abort("Cannot specify session TTL when keepalive isn't used")48 returned = Session(49 hostname=hostname,50 parent_logical_id=parent_logical_id,51 is_parent_session=is_parent_session,52 child_id=child_id,53 total_num_tests=total_num_tests,54 infrastructure=infrastructure,55 user_id=user_id,56 real_user_id=real_user_id,57 status=statuses.RUNNING,58 logical_id=logical_id,59 keepalive_interval=keepalive_interval,60 ttl_seconds=ttl_seconds,61 )62 if real_user is not None:63 real_user.last_activity = flux.current_timeline.time()64 returned.mark_started()65 returned.update_keepalive()66 if subjects:67 for subject_data in subjects:68 subject_name = subject_data.get('name', None)69 if subject_name is None:70 error_abort('Missing subject name')71 subject = get_or_create_subject_instance(72 name=subject_name,73 product=subject_data.get('product', None),74 version=subject_data.get('version', None),75 revision=subject_data.get('revision', None))76 returned.subject_instances.append(subject)77 db.session.add(subject)78 assert list(returned.subject_instances)79 returned.notify_subject_activity()80 if metadata is not None:81 for key, value in metadata.items():82 returned.metadata_items.append(SessionMetadata(83 session=returned, key=key, metadata_item=value))84 db.session.add(returned)85 try:86 db.session.commit()87 except IntegrityError:88 db.session.rollback()89 if parent_logical_id:90 Session.query.filter_by(logical_id=parent_logical_id).first_or_404()91 error_abort('Tried to report a session which conflicts with an existing session', code=requests.codes.conflict)92 profiling.notify_session_start()93 return returned94@API(version=2)95def report_session_end(id: int, duration: (int, NoneType)=None, has_fatal_errors: bool=False):96 try:97 session = Session.query.filter(Session.id == id).one()98 except NoResultFound:99 error_abort('Session not found', code=requests.codes.not_found)100 session.notify_subject_activity()101 if session.status not in (statuses.RUNNING, statuses.INTERRUPTED):102 error_abort('Session is not running', code=requests.codes.conflict)103 if duration is None:104 session.mark_ended()105 else:106 session.mark_ended_at(session.start_time + duration)107 # TODO: handle interrupted sessions108 if session.num_error_tests or session.num_errors:109 session.status = statuses.ERROR110 elif session.num_failed_tests or session.num_failures:111 session.status = statuses.FAILURE112 elif session.status != statuses.INTERRUPTED:113 session.status = statuses.SUCCESS114 session.has_fatal_errors = has_fatal_errors115 session.in_pdb = False116 if session.ttl_seconds is not None:117 session.delete_at = flux.current_timeline.time() + session.ttl_seconds118 db.session.add(session)119 db.session.commit()120@API121def report_in_pdb(session_id: int):122 s = Session.query.get_or_404(session_id)123 s.in_pdb = True124 db.session.add(s)125 db.session.commit()126@API127def report_not_in_pdb(session_id: int):128 s = Session.query.get_or_404(session_id)129 s.in_pdb = False130 db.session.add(s)131 db.session.commit()132@API133def send_keepalive(session_id: int):134 s = Session.query.get_or_404(session_id)135 if s.end_time is not None:136 return137 if s.keepalive_interval is not None:138 timestamp = get_current_time() + s.keepalive_interval139 s.update_keepalive()140 for test in Test.query.filter(Test.session_id==session_id,141 Test.end_time == None,142 Test.start_time != None):143 test.extend_timespan_to(timestamp)144 s.notify_subject_activity()145 db.session.commit()146@API147def report_session_interrupted(id: int):148 s = Session.query.get_or_404(id)149 s.status = statuses.INTERRUPTED150 if s.parent:151 s.parent.status = statuses.INTERRUPTED152 db.session.commit()153@API(require_real_login=True)154@requires_role("admin")155def discard_session(156 session_id: int, grace_period_seconds: int = _DEFAULT_DELETE_GRACE_PERIOD_SECONDS157):158 session = Session.query.get_or_404(session_id)159 delete_at = flux.current_timeline.time() + grace_period_seconds # pylint: disable=undefined-variable160 Session.query.filter(161 (Session.parent_logical_id == session.logical_id) | (Session.id == session.id)162 ).update({Session.delete_at: delete_at}, synchronize_session=False)163 db.session.commit()164@API(require_real_login=True)165@requires_role('admin')166def discard_sessions_search(search_string: str, grace_period_seconds: int=_DEFAULT_DELETE_GRACE_PERIOD_SECONDS):167 if not search_string:168 error_abort('Invadlid search string')169 delete_at = flux.current_timeline.time() + grace_period_seconds170 search_query = get_orm_query_from_search_string('session', search_string).filter(Session.delete_at == None)171 Session.query.filter(Session.id.in_(db.session.query(search_query.subquery().c.id))).update({172 'delete_at': delete_at173 }, synchronize_session=False)174 db.session.commit()175@API(require_real_login=True)176@requires_role("admin")177def preserve_session(session_id: int):178 session = Session.query.get_or_404(session_id)179 Session.query.filter(180 (Session.parent_logical_id == session.logical_id) | (Session.id == session.id)181 ).update({Session.delete_at: None}, synchronize_session=False)182 db.session.commit()183@API184def report_reporting_stopped(session_id: int):185 session = Session.query.get_or_404(session_id)186 session.reporting_stopped = True...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

1# pylint: disable=no-member2import logbook3from sentinels import NOTHING4from urlobject import URLObject as URL5from .api import API6from .lazy_query import LazyQuery7from typing import Optional, Union8from urlobject.urlobject import URLObject9_logger = logbook.Logger(__name__)10class Backslash():11 def __init__(self, url: Union[str, URLObject], runtoken: str, headers: None=None) -> None:12 super().__init__()13 if not url.startswith('http'):14 url = f'http://{url}'15 self._url = URL(url)16 self.api = API(self, url, runtoken, headers=headers)17 @property18 def url(self) -> URLObject:19 return self._url20 def get_ui_url(self, fragment: Optional[str]=None) -> str:21 returned = str(self.url)22 if not returned.endswith('/'):23 returned += '/'24 if not fragment:25 fragment = '/'26 elif not fragment.startswith('/'):27 fragment = '/' + fragment28 returned += f'#{fragment}'29 return returned30 def toggle_user_role(self, user_id, role):31 return self.api.call_function('toggle_user_role', {'user_id': user_id, 'role': role})32 def get_user_run_tokens(self, user_id):33 return self.api.call_function('get_user_run_tokens', {'user_id': user_id})34 def delete_comment(self, comment_id) -> None:35 self.api.call_function('delete_comment', {'comment_id': comment_id})36 def report_session_start(self, logical_id=NOTHING,37 parent_logical_id=NOTHING,38 is_parent_session=False,39 child_id=NOTHING,40 hostname=NOTHING,41 total_num_tests=NOTHING,42 user_email=NOTHING,43 metadata=NOTHING,44 keepalive_interval=NOTHING,45 subjects=NOTHING,46 infrastructure=NOTHING,47 ttl_seconds=NOTHING,48 ):49 """Reports a new session starting50 :rtype: A session object representing the reported session51 """52 params = {53 'hostname': hostname,54 'logical_id': logical_id,55 'total_num_tests': total_num_tests,56 'user_email': user_email,57 'metadata': metadata,58 'keepalive_interval': keepalive_interval,59 'subjects': subjects,60 'infrastructure': infrastructure,61 'ttl_seconds': ttl_seconds,62 }63 if parent_logical_id is not None or is_parent_session:64 supports_parallel = (self.api.info().endpoints.report_session_start.version >= 2)65 if supports_parallel:66 params['parent_logical_id'] = parent_logical_id67 params['is_parent_session'] = is_parent_session68 params['child_id'] = child_id69 if child_id is not None:70 del params['total_num_tests']71 returned = self.api.call_function('report_session_start', params)72 return returned73 def query_sessions(self) -> LazyQuery:74 """Queries sessions stored on the server75 :rtype: A lazy query object76 """77 return LazyQuery(self, '/rest/sessions')78 def query_tests(self) -> LazyQuery:79 """Queries tests stored on the server (directly, not via a session)80 :rtype: A lazy query object81 """82 return LazyQuery(self, '/rest/tests')83 def query(self, path, **kwargs) -> LazyQuery:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful