How to use registers_on method in Slash

Best Python code snippet using slash

slash_plugin.py

Source:slash_plugin.py Github

copy

Full Screen

...188 @handle_exceptions189 def session_interrupt(self):190 if self.session is not None:191 self.session.report_interrupted()192 @slash.plugins.registers_on(None)193 @handle_exceptions194 def report_planned_tests(self, tests):195 if not APPEND_UPCOMING_TESTS_STR in self.client.api.info().endpoints:196 return197 tests_metadata = []198 for test in tests:199 test_info = self._get_test_info(test)200 current = {'test_logical_id':test.__slash__.id,201 'file_name':test_info['file_name'],202 'name':test_info['name'],203 'class_name':test_info['class_name']204 }205 if 'variation' in test_info:206 current['variation'] = test_info['variation']207 tests_metadata.append(current)208 tests_count = 0209 batch_size = 100210 try:211 while tests_count < len(tests_metadata):212 self.session.report_upcoming_tests(tests_metadata[tests_count:tests_count+batch_size])213 tests_count += batch_size214 except requests.exceptions.HTTPError:215 _logger.error('Ignoring exception while reporting planned tests', exc_info=True)216 @handle_exceptions217 def test_start(self):218 kwargs = self._get_test_info(slash.context.test)219 self._update_scm_info(kwargs)220 tags = slash.context.test.__slash__.tags221 tag_dict = {tag_name: tags[tag_name] for tag_name in tags}222 if tag_dict:223 kwargs['metadata'] = {224 'slash::tags': {225 'values': {tag_name: tag_value for tag_name, tag_value in tag_dict.items() if tag_value is not NOTHING},226 'names': list(tag_dict),227 },228 }229 log_path = slash.context.result.get_log_path()230 if log_path:231 kwargs.setdefault('metadata', {})['local_log_path'] = os.path.abspath(log_path)232 if self.current_config.report_test_docstrings and slash.test.get_test_function().__doc__:233 kwargs.setdefault('metadata', {})['docstring'] = slash.test.get_test_function().__doc__234 self.current_test = self.session.report_test_start(235 test_logical_id=slash.context.test.__slash__.id,236 test_index=slash.context.test.__slash__.test_index1,237 **kwargs238 )239 self._error_containers[slash.context.test.__slash__.id] = self.current_test240 @slash.plugins.register_if(_HAS_TEST_DISTRIBUTED)241 @handle_exceptions #pylint: disable=unused-argument242 def test_distributed(self, test_logical_id, worker_session_id): #pylint: disable=unused-argument243 if 'report_test_distributed' in self.client.api.info().endpoints:244 self.current_test = self.session.report_test_distributed(test_logical_id)245 @handle_exceptions246 def test_skip(self, reason=None):247 self.current_test.mark_skipped(reason=reason)248 @slash.plugins.registers_on(None)249 def is_session_exist(self, session_id):250 try:251 self.client.api.get(f'/rest/sessions/{session_id}')252 return True253 except HTTPError as e:254 if e.response.status_code == 404:255 return False256 raise257 @handle_exceptions258 @slash.plugins.registers_on(None)259 def get_tests_to_resume(self, session_id, filters_dict):260 """Queries backslash specific session's tests261 :param session_id: the wanted session262 :param filters_dict: a dictionary containing filters for backslash tests query263 :rtype: list of test objects264 """265 max_retries = 3266 for i in range(max_retries):267 try:268 default_params_dict = {x: 'true' for x in ['show_planned', 'show_skipped', 'show_unsuccessful', 'show_abandoned']}269 default_params_dict.update({'session_id': session_id, 'show_successful': 'false'})270 default_params_dict.update(filters_dict)271 return reversed(LazyQuery(self.client, '/rest/tests', query_params=default_params_dict).all())272 except HTTPError:273 if i == max_retries-1:274 raise275 def _get_test_info(self, test):276 if test.__slash__.is_interactive() and \277 pkg_resources.parse_version(slash.__version__) < pkg_resources.parse_version('1.6.0'):278 returned = {279 'file_name': '<interactive>',280 'class_name': '<interactive>',281 'name': '<interactive>',282 'is_interactive': True283 }284 else:285 test_display_name = test.__slash__.address286 if set(test_display_name) & set('/.'):287 test_display_name = test.__slash__.function_name288 returned = {289 'file_name': normalize_file_path(test.__slash__.file_path),290 'class_name': test.__slash__.class_name,291 'name': test_display_name,292 'is_interactive': test.__slash__.is_interactive(),293 }294 variation = getattr(test.__slash__, 'variation', None)295 if variation:296 if hasattr(test.__slash__.variation, 'labels'):297 items = test.__slash__.variation.labels.items()298 returned['parameters'] = variation.values.copy()299 elif hasattr(test.__slash__.variation, 'id'):300 items = test.__slash__.variation.id.items()301 returned['parameters'] = variation.values.copy()302 else:303 items = test.__slash__.variation.items()304 returned['variation'] = dict((name, value) for name, value in items)305 return returned306 def _update_scm_info(self, test_info):307 try:308 test_info['file_hash'] = self._calculate_file_hash(test_info['file_name'])309 dirname = os.path.dirname(test_info['file_name'])310 repo = self._repo_cache.get(dirname, NOTHING)311 if repo is NOTHING:312 repo = self._repo_cache[dirname] = self._get_git_repo(dirname)313 if repo is None:314 return315 test_info['scm'] = 'git'316 try:317 hexsha = repo.head.commit.hexsha318 except Exception: # pylint: disable=broad-except319 _logger.debug('Unable to get commit hash', exc_info=True)320 hexsha = None321 test_info['scm_revision'] = hexsha322 test_info['scm_dirty'] = bool(repo.untracked_files or repo.index.diff(None) or repo.index.diff(repo.head.commit))323 if self.client.api.info().endpoints.report_test_start.version >= 3:324 if not repo.head.is_detached:325 test_info['scm_local_branch'] = repo.active_branch.name326 tracking_branch = repo.active_branch.tracking_branch()327 if tracking_branch is not None:328 test_info['scm_remote_branch'] = tracking_branch.name329 except Exception: # pylint: disable=broad-except330 _logger.warning('Error when obtaining SCM information', exc_info=True)331 def _calculate_file_hash(self, filename):332 returned = self._file_hash_cache.get(filename)333 if returned is None:334 try:335 with open(filename, 'rb') as f:336 data = f.read()337 h = hashlib.sha1()338 h.update('blob '.encode('utf-8'))339 h.update(f'{len(data)}\0'.encode('utf-8'))340 h.update(data)341 except IOError as e:342 _logger.debug(f'Ignoring IOError {e!r} when calculating file hash for {filename}')343 returned = None344 else:345 returned = h.hexdigest()346 self._file_hash_cache[filename] = returned347 return returned348 def _get_git_repo(self, dirname):349 if not os.path.isabs(dirname):350 dirname = os.path.abspath(os.path.join(_PWD, dirname))351 while dirname != os.path.normpath(os.path.abspath(os.path.sep)):352 if os.path.isdir(os.path.join(dirname, '.git')):353 return git.Repo(dirname)354 dirname = os.path.normpath(os.path.abspath(os.path.join(dirname, '..')))355 return None356 @handle_exceptions357 def test_end(self):358 if self.current_test is None:359 return360 details = {}361 if hasattr(slash.context.result, 'details'):362 additional = slash.context.result.details.all()363 else:364 additional = slash.context.result.get_additional_details()365 details.update(additional)366 self.current_test.set_metadata_dict(details)367 self.current_test.report_end()368 self.current_test = None369 @handle_exceptions370 def session_end(self):371 self._session_report_end('session_end')372 @slash.plugins.register_if(_HAS_APP_QUIT)373 @handle_exceptions # pylint: disable=unused-argument374 def app_quit(self):375 self._session_report_end('app_quit')376 def _session_report_end(self, hook_name):377 if not self._started:378 return379 try:380 if self._keepalive_thread is not None:381 self._keepalive_thread.stop()382 kwargs = {}383 session_results = getattr(slash.session, 'results', None)384 has_fatal_errors = hasattr(session_results, 'has_fatal_errors') and session_results.has_fatal_errors()385 if self.client.api.info().endpoints.report_session_end.version >= 2:386 kwargs['has_fatal_errors'] = has_fatal_errors387 self.session.report_end(**kwargs)388 self._started = False389 except Exception: # pylint: disable=broad-except390 _logger.error(f'Exception ignored in {hook_name}', exc_info=True)391 @handle_exceptions392 def error_added(self, result, error):393 if self._adding_error:394 return395 self._adding_error = True396 try:397 with slash.exception_handling.handling_exceptions():398 self._add_exception(result=result, exception=error, is_fatal=error.is_fatal())399 finally:400 self._adding_error = False401 @slash.plugins.register_if(hasattr(slash.hooks, 'interruption_added'))402 @handle_exceptions403 def interruption_added(self, result, exception):404 self._add_exception(result=result, exception=exception, is_interruption=True)405 def _add_exception(self, result, exception, is_interruption=False, is_fatal=False):406 has_interruptions = self.client.api.info().endpoints.add_error.version >= 4407 if is_interruption and not has_interruptions:408 _logger.debug('Server does not support recording is_interruption exceptions. Skipping reporting')409 return410 if result is slash.session.results.global_result:411 error_container = self.session412 else:413 error_container = self._error_containers.get(result.test_metadata.id, self.current_test) or self.session414 if error_container is None:415 _logger.debug('Could not determine error container to report on for {}', result)416 return417 with vintage.get_no_deprecations_context():418 exception_attrs = getattr(exception, 'exception_attributes', NOTHING)419 if exception_attrs is NOTHING and hasattr(exception, 'exc_info'):420 exception_attrs = distill_object_attributes(exception.exc_info[1])421 kwargs = {'exception_type': exception.exception_type.__name__ if exception.exception_type is not None else None,422 'traceback': distill_slash_traceback(exception), 'exception_attrs': exception_attrs}423 if exception.message:424 message = exception.message425 elif hasattr(exception, 'exception_str'):426 message = exception.exception_str427 else:428 message = str(exception.exception)429 kwargs['message'] = message430 if has_interruptions:431 kwargs['is_interruption'] = is_interruption432 has_fatal = self.client.api.info().endpoints.add_error.version >= 5433 if has_fatal:434 kwargs['is_fatal'] = is_fatal435 for compact_variables in [False, True]:436 if compact_variables:437 for frame in kwargs['traceback']:438 frame['globals'] = None439 frame['locals'] = None440 try:441 error_container.add_error(**kwargs)442 except ParamsTooLarge:443 if compact_variables:444 raise445 # continue to try compacting446 else:447 break448 @handle_exceptions449 def warning_added(self, warning):450 if any(issubclass(warning.category, b_cls) for b_cls in self.current_config.blacklisted_warnings_category):451 return452 kwargs = {'message': warning.message, 'filename': warning.filename, 'lineno': warning.lineno}453 warning_obj = self.current_test if self.current_test is not None else self.session454 if warning_obj is not None:455 warning_obj.add_warning(**kwargs)456 @handle_exceptions457 def exception_caught_before_debugger(self, **_):458 if self.session is not None and slash.config.root.debug.enabled:459 self.session.report_in_pdb()460 @handle_exceptions461 def exception_caught_after_debugger(self, **_):462 if self.session is not None and slash.config.root.debug.enabled:463 self.session.report_not_in_pdb()464 #### Token Setup #########465 def _ensure_run_token(self):466 if self._runtoken is None:467 tokens = self._get_existing_tokens()468 self._runtoken = tokens.get(self._get_backslash_url())469 if self._runtoken is None:470 self._runtoken = self._fetch_token()471 self._save_token(self._runtoken)472 return self._runtoken473 def _get_existing_tokens(self):474 return self._get_config().get('run_tokens', {})475 def _get_config(self):476 if not os.path.isfile(self._config_filename):477 return {}478 with open(self._config_filename) as f:479 return json.load(f)480 def _save_token(self, token):481 tmp_filename = self._config_filename + '.tmp'482 cfg = self._get_config()483 cfg.setdefault('run_tokens', {})[self._get_backslash_url()] = token484 ensure_dir(os.path.dirname(tmp_filename))485 with open(tmp_filename, 'w') as f:486 json.dump(cfg, f, indent=2)487 os.rename(tmp_filename, self._config_filename)488 @registers_on(None)489 def fetch_token(self, username, password):490 url = URL(self._get_backslash_url())491 with requests.Session() as s:492 resp = s.get(self._get_token_request_url())493 resp.raise_for_status()494 response_url = resp.json()['url']495 request_id = response_url.split('/')[-1]496 s.post(url.add_path('login'),497 data=json.dumps({'username': username, 'password': password}),498 headers={'Content-type': 'application/json'})\499 .raise_for_status()500 s.post(URL(self._get_backslash_url()).add_path(f'/runtoken/request/{request_id}/complete'))\501 .raise_for_status()502 resp = s.get(response_url)...

Full Screen

Full Screen

test_plugins.py

Source:test_plugins.py Github

copy

Full Screen

...10 @slash.plugins.active # pylint: disable=unused-variable11 class SamplePlugin(PluginInterface):12 def get_name(self):13 return 'sample'14 @plugins.registers_on(None)15 def some_method_here(self):16 checkpoint()17 gossip.trigger('slash.some_method_here')18 assert not checkpoint.called19@pytest.mark.parametrize('class_level_needs', [True, False])20@pytest.mark.parametrize('class_level_provides', [True, False])21def test_registers_on_kwargs(class_level_needs, class_level_provides):22 needs_decorator = plugins.needs('other_requirement')23 provides_decorator = plugins.provides('another_provided_requirement')24 @slash.plugins.active # pylint: disable=unused-variable25 @maybe_decorate(needs_decorator, class_level_needs)26 @maybe_decorate(provides_decorator, class_level_provides)27 class SamplePlugin(PluginInterface):28 def get_name(self):29 return 'sample'30 @plugins.registers_on('some.hook', provides=['provided_requirement'], needs=['some_requirement'], tags=['tag'])31 @maybe_decorate(needs_decorator, not class_level_needs)32 @maybe_decorate(provides_decorator, not class_level_provides)33 def plugin_method(self):34 pass35 @gossip.register('some.hook', provides=['some_requirement', 'other_requirement'])36 def _unused():37 pass38 gossip.trigger('some.hook')39 hook = gossip.get_hook('some.hook')40 [registration] = [reg for reg in hook.get_registrations() if reg.func.__name__ == 'plugin_method']41 assert registration.tags == {'tag'}42 assert registration.needs == frozenset(['some_requirement', 'other_requirement'])43 assert registration.provides == frozenset(['provided_requirement', 'another_provided_requirement'])44def test_registers_on_with_private_methods(restore_plugins_on_cleanup, checkpoint):45 @slash.plugins.active # pylint: disable=unused-variable46 class SamplePlugin(PluginInterface):47 def get_name(self):48 return 'sample'49 @plugins.registers_on('some_hook')50 def _handler(self):51 checkpoint()52 assert not checkpoint.called53 gossip.trigger('some_hook')54 assert checkpoint.called55def test_class_variables_allowed(restore_plugins_on_cleanup):56 @slash.plugins.active # pylint: disable=unused-variable57 class SamplePlugin(PluginInterface):58 ATTRIBUTE = 'some_value'59 def get_name(self):60 return 'sample'61def test_active_decorator(restore_plugins_on_cleanup):62 plugins.manager.uninstall_all()63 @slash.plugins.active64 class SamplePlugin(PluginInterface):65 def get_name(self):66 return 'sample'67 assert isinstance(SamplePlugin, type)68 assert issubclass(SamplePlugin, PluginInterface)69 [active] = plugins.manager.get_active_plugins().values()70 assert isinstance(active, SamplePlugin)71@pytest.mark.parametrize('is_internal', [True, False])72def test_custom_hook_registration(request, is_internal):73 hook_name = 'some_hook'74 with pytest.raises(LookupError):75 gossip.get_hook(hook_name)76 class MyPlugin(PluginInterface):77 def get_name(self):78 return "plugin"79 @plugins.registers_on(hook_name)80 def unknown(self):81 pass82 p = MyPlugin()83 plugins.manager.install(p, activate=True, is_internal=is_internal)84 @request.addfinalizer85 def cleanup(): # pylint: disable=unused-variable86 plugins.manager.uninstall(p)87 registrations = gossip.get_hook(hook_name).get_registrations()88 assert len(registrations) == 189 [r] = registrations90 assert r.func.__func__ is MyPlugin.unknown91 # make sure we deactivate properly as well92 plugins.manager.deactivate(p)93 assert not gossip.get_hook(hook_name).get_registrations()94def test_multiple_registers_on(request):95 hook_names = ['some_hook_{}'.format(i) for i in range(2)]96 class MyPlugin(PluginInterface):97 def get_name(self):98 return "plugin"99 @plugins.registers_on(hook_names[0])100 @plugins.registers_on(hook_names[1])101 def unknown(self):102 pass103 expected_func = MyPlugin.unknown104 p = MyPlugin()105 plugins.manager.install(p, activate=True)106 @request.addfinalizer107 def cleanup(): # pylint: disable=unused-variable108 plugins.manager.uninstall(p)109 for hook_name in hook_names:110 registrations = gossip.get_hook(hook_name).get_registrations()111 assert len(registrations) == 1112 assert registrations[0].func.__func__ is expected_func113 plugins.manager.deactivate(p)114 for hook_name in hook_names:115 assert not gossip.get_hook(hook_name).get_registrations()116def test_register_invalid_hook():117 initially_installed = list(plugins.manager.get_installed_plugins())118 class MyPlugin(PluginInterface):119 def get_name(self):120 return "plugin"121 def unknown(self):122 pass123 with pytest.raises(IncompatiblePlugin):124 plugins.manager.install(MyPlugin(), activate=True)125 assert set(plugins.manager.get_installed_plugins()) == set(initially_installed)126def test_register_custom_hooks_strict_group():127 initially_installed = list(plugins.manager.get_installed_plugins())128 hook_name = "some_group.some_hook"129 gossip.get_or_create_group("some_group").set_strict()130 class MyPlugin(PluginInterface):131 def get_name(self):132 return "plugin"133 @plugins.registers_on(hook_name)134 def unknown(self):135 pass136 with pytest.raises(IncompatiblePlugin):137 plugins.manager.install(MyPlugin(), activate=True)138 assert list(plugins.manager.get_installed_plugins()) == initially_installed139def test_builtin_plugins_hooks_start_condition():140 "make sure that all hooks are either empty, or contain callbacks marked with `slash.<identifier>`"141 for hook in gossip.get_group('slash').get_hooks():142 for registration in hook.get_registrations():143 assert registration.token.startswith('slash.'), 'Callback {} is not a builtin!'.format(hook.full_name)144def test_builtin_plugins_are_installed():145 installed = plugins.manager.get_installed_plugins()146 assert installed147 for filename in os.listdir(os.path.join(os.path.dirname(plugins.__file__), "builtin")):...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1from ..utils.marks import mark2from ..utils import parallel_utils3from .interface import PluginInterface # pylint: disable=unused-import4def registers_on(hook_name, **kwargs):5 """Marks the decorated plugin method to register on a custom hook, rather than6 the method name in the 'slash' group, which is the default behavior for plugins7 Specifying ``registers_on(None)`` means that this is not a hook entry point at all.8 .. note:: All keyword arguments are forwarded to gossip's ``register`` API9 """10 return mark("register_on", RegistrationInfo(hook_name, expect_exists=False, register_kwargs=kwargs), append=True)11def parallel_mode(mode):12 """Marks compatibility of a specific plugin to parallel execution.13 :param mode: Can be either ``disabled``, ``enabled``, ``parent-only`` or ``child-only``14 """15 possible_values = parallel_utils.ParallelPluginModes.MODES16 assert mode in possible_values, "parallel mode value must be one of {}".format(possible_values)17 return mark("parallel_mode", mode)18def register_if(condition):19 """Marks the decorated plugins method to only be registered if *condition* is ``True``20 """21 return mark("register_if", condition)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful