Best Python code snippet using grail_python
environment.py
Source:environment.py  
1# pylint: disable=missing-docstring,unused-argument2import copy3import logging4import os5import pickle6import uuid7import re8import redis9from features.steps import oauth, utils10from testsuite import fhir11from testsuite.config_reader import get_vendor_config, get_env_config12from testsuite.oauth import authorize, factory13CACHE_TTL = 60 * 60  # 1 hour14FHIR_RESOURCE_TAGS = {15    'patient-demographics',16    'smoking-status',17    'problems',18    'medication-orders',19    'medication-requests',20    'medication-statements',21    'medication-dispensations',22    'medication-administrations',23    'allergies-and-intolerances',24    'lab-results',25    'vital-signs',26    'procedures',27    'immunizations',28    'patient-documents',29    'coverage',30    'explanation-of-benefit'31}32def before_all(context):33    """ Runs once before all tests.34    Set up some global state necessary to the tests and test runner.35    * Get the vendor config and attach it to the context.36    * Authorize against the vendor FHIR server and store the authorization.37    * Get the test plan so that we can show a progress meter.38    * Load the conformance statement so we know which resources are supported.39    """40    # Get the vendor config and attach it to the context.41    vendor = getattr(context.config, 'vendor', os.getenv('VENDOR'))42    override = getattr(context.config, 'override', os.getenv('CONFIG_OVERRIDE', ''))43    vendor_config = get_vendor_config(vendor, override)44    # Get configuration from the test server's environment.45    env_config = get_env_config()46    # Attempt to retrieve the security URL for this version.47    vendor_config['versioned_auth']['aud'] = vendor_config['versioned_api']['url']48    context.vendor_config = copy.deepcopy(vendor_config)49    context.env_config = copy.deepcopy(env_config)50    # Filter out any tagged vendor config steps51    steps = vendor_config['versioned_auth'].get('steps', [])52    steps = [step for step in steps if 'when' not in step]53    vendor_config['versioned_auth']['steps'] = steps54    # Set the ElasticSearch logging endpoint55    context.config.es_url = os.getenv('ES_URL')56    # Authorize against the vendor FHIR server.57    try:58        context.oauth = factory(vendor_config)59        context.oauth.authorize()60        if getattr(context.oauth, 'patient', None) is not None:61            context.vendor_config['versioned_api']['patient'] = context.oauth.patient62    except AssertionError as error:63        logging.error(utils.bad_response_assert(error.args[0], ''))64        raise Exception(utils.bad_response_assert(error.args[0], ''))65    except authorize.AuthorizationException as err:66        error = oauth.ERROR_SELENIUM_SCREENSHOT.format(67            err.args[0],68            err.args[1],69            err.args[2],70            context.vendor_config['host'],71        )72        raise Exception(error)73    except ValueError as error:74        logging.error(utils.bad_response_assert(error.response, ''))75        raise Exception(utils.bad_response_assert(error.response, ''))76    # Get the test plan so that we can show a progress meter.77    context.config.plan = []78    # There is no other way to get a feature list from the context.79    # Since this is for display purposes only, this should be safe.80    features = context._runner.features  # pylint: disable=protected-access81    for feature in features:82        scenariolist = []83        context.config.plan.append({84            'name': feature.name,85            'location': str(feature.location),86            'scenarios': scenariolist})87        for scenario in feature.scenarios:88            scenariolist.append({89                'name': scenario.name,90                'location': str(scenario.location)})91    # Download the conformance statement92    try:93        context.conformance = fhir.get_conformance_statement(vendor_config['versioned_auth']['aud'])94    except ValueError as error:95        context.conformance = None96        logging.error(utils.bad_response_assert(error.response, ''))97    # Define a global cache98    context.cache = Cache(redis.StrictRedis())99def before_feature(context, feature):100    """ Configure Feature scope.101    Some features need feature-level resources so that we don't need to102    make a bunch of API requests and slow things down.103    """104    # We handle the with_use_case tag with this custom functionality.105    # Extract which use case this feature is part of and determine106    # if we need to run it. Store use_case in the feature object for use107    # by child scenarios.108    feature.use_case = None109    skip_use_case = True110    for tag in feature.tags:111        use_case_matches = re.match("use.with_use_case=(.*)", tag)112        version_matches = re.match("use.with_version=(.*)", tag)113        if use_case_matches and feature.use_case is None:114            use_case = use_case_matches.groups()[0]115            if use_case in context.vendor_config["use_cases"]:116                feature.use_case = use_case117                skip_use_case = False118        if version_matches and feature.use_case:119            version = version_matches.groups()[0]120            if version != context.vendor_config["use_cases"][feature.use_case]:121                feature.skip("Feature version (%s) not supported in this use case (%s)."122                             % (version, use_case))123    if skip_use_case:124        feature.skip("Feature (%s) not in any use case." % feature.name)125        return126    try:127        ignored_steps = context.vendor_config["ignored_steps"][feature.location.filename]128        for step in ignored_steps:129            if step == "all":130                feature.skip("Feature (%s) requested skip by vendor." % feature.name)131    except KeyError:132        pass133    tags = list(FHIR_RESOURCE_TAGS.intersection(feature.tags))134    if len(tags) > 1:135        raise Exception('Too many CCDS tags', tags)136    if len(tags) == 1:137        ccds_type = tags[0].capitalize().replace('-', ' ')138        steps = [139            'Given I am logged in',140            'And this server supports {0}'.format(ccds_type),141            'When I request {0}'.format(ccds_type),142        ]143        try:144            context.execute_steps('\n'.join(steps))145        except AssertionError as error:146            feature.skip(error.args[0])147def before_scenario(context, scenario):148    # The skipping logic here only applies when a use case has been defined on this feature.149    if scenario.feature.use_case:150        use_case_version = context.vendor_config["use_cases"][scenario.feature.use_case]151        # Skip the scenario if its version doesn't much the use_case version.152        for tag in scenario.effective_tags:153            matches = re.match("use.with_version=(.*)", tag)154            if matches and matches.groups()[0] != use_case_version:155                scenario.skip("Scenario's version (%s) not in Use Case (%s)."156                              % (matches.groups()[0], scenario.feature.use_case))157def before_step(context, step):158    context.vendor_skip = False159    try:160        ignored_steps = context.vendor_config["ignored_steps"][step.location.filename]161        for ignored_step in ignored_steps:162            if step.name == ignored_step:163                context.vendor_skip = True164                break165    except KeyError:166        pass167class Cache(object):168    """ A minimal caching layer.169    """170    def __init__(self, redis_client):171        self.redis_client = redis_client172        # A unique prefix ensures that each test run does not share a cache.173        self.prefix = 'cache-{0}-'.format(uuid.uuid4())174    def __getitem__(self, key):175        key = self.prefix + key176        return pickle.loads(self.redis_client.get(key))177    def __setitem__(self, key, value):178        key = self.prefix + key179        self.redis_client.setex(key,180                                CACHE_TTL,181                                pickle.dumps(value))182    def __contains__(self, key):183        key = self.prefix + key184        return self.redis_client.exists(key)185    def clear(self):186        pattern = self.prefix + '*'187        keys = self.redis_client.keys(pattern)188        if keys:189            deleted = self.redis_client.delete(*keys)...test_steps.py
Source:test_steps.py  
...35    @step36    def fail_step(self):37        ok_(False, 'Explicit error')38    @step39    def ignored_step(self):40        pass41    @step(step_group=True)42    def pass_fail_ignore_group(self):43        self.step1()44        self.fail_step()45        self.ignored_step()46    @step(step_group=True)47    def pass_pending_ignore_group(self):48        self.step1()49        self.pending_step()50        self.ignored_step()51    @step52    def incorrect_step_call(self):53        self.step1()54    @step(treat_nested_steps_as_methods=True)55    def explicit_step_call_skip(self):56        self.step1()57    @step(step_group=True, treat_nested_steps_as_methods=True)58    def incorrect_step_group(self):59        self.step1()60        self.step2()61    def setUp(self):62        grail.state.is_test_wrapped = True63    def tearDown(self):64        grail.state.reset()...modifications.py
Source:modifications.py  
1"""2Modification mixins extending the traditional chaos game.3"""4import abc5import random6import functools7class Modification(abc.ABC):8    """9    The base class for modification mixins.10    Subclasses should override the initialization method11    to implement customization of the modification.12    """13    # The game which this modification modifies.14    # The game instance sets this attribute for each modification.15    game = None16class VertexesModification(Modification):17    """18    The base class for mixins which modify the vertexes.19    """20    @abc.abstractmethod21    def get_vertexes(self):22        pass23class NextVertexModification(Modification):24    """25    The base class for mixins which modify the next vertex.26    """27    @abc.abstractmethod28    def get_next_vertex_index(self):29        pass30class IgnorePreviousVertexesModification(NextVertexModification):31    """32    A modification which ignores vertexes selected some number of steps ago.33    """34    # The steps ago of the ignored vertexes, represented as negative integers.35    # For example, if the list has negative one, the last vertex is ignored.36    ignored_steps: list[int] = []37    def __init__(self, ignored_steps: list[int]):38        """39        Customize the modification.40        """41        self.ignored_steps = ignored_steps42    def get_next_vertex_index(self) -> int:43        """44        Choose from the vertexes at random, but ignore some previous vertexes.45        """46        vertex_indexes = list(range(len(self.game.get_vertexes())))47        assert len(self.ignored_steps) < len(vertex_indexes), (48            "The number of potential ignored vertexes"49            " must be less than the total number of vertexes."50        )51        for ignored_step in self.ignored_steps:52            if -len(self.game.selected_vertex_indexes) <= ignored_step < 0:53                ignored_vertex_index = self.game.selected_vertex_indexes[ignored_step]54                while ignored_vertex_index in vertex_indexes:55                    vertex_indexes.remove(ignored_vertex_index)56        return random.choice(vertex_indexes)57class IgnoreShiftedVertexesModification(NextVertexModification):58    """59    A modification which ignores vertexes shifted from the current vertex.60    """61    # The shifts of the ignored vertexes, represented62    # as positive (counterclockwise) or negative (clockwise) integers.63    ignored_shifts: list[int] = []64    def __init__(self, ignored_shifts: list[int]):65        """66        Customize the modification.67        """68        self.ignored_shifts = ignored_shifts69    def get_next_vertex_index(self) -> int:70        """71        Choose from the vertexes at random, but ignore some shifted vertexes.72        """73        current_vertex_index = self.game.selected_vertex_indexes[-1]74        vertex_count = len(self.game.get_vertexes())75        vertex_indexes = list(range(vertex_count))76        assert len(self.ignored_shifts) < len(vertex_indexes), (77            "The number of ignored vertexes"78            " must be less than the total number of vertexes."79        )80        for ignored_shift in self.ignored_shifts:81            ignored_vertex_index = current_vertex_index + ignored_shift82            ignored_vertex_index %= vertex_count83            while ignored_vertex_index in vertex_indexes:84                vertex_indexes.remove(ignored_vertex_index)85        return random.choice(vertex_indexes)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
