Best Python code snippet using stestr_python
test_graph.py
Source:test_graph.py  
...20class GraphResultTests(unittest.TestCase):21    _values = (None, 1, 1.2, True, False, [1, 2, 3], {'x': 1, 'y': 2})22    def test_result_value(self):23        for v in self._values:24            result = self._make_result(v)25            self.assertEqual(result.value, v)26    def test_result_attr(self):27        # value is not a dict28        result = self._make_result(123)29        with self.assertRaises(ValueError):30            result.something31        expected = {'a': 1, 'b': 2}32        result = self._make_result(expected)33        self.assertEqual(result.a, 1)34        self.assertEqual(result.b, 2)35        with self.assertRaises(AttributeError):36            result.not_present37    def test_result_item(self):38        # value is not a dict, list39        result = self._make_result(123)40        with self.assertRaises(ValueError):41            result['something']42        with self.assertRaises(ValueError):43            result[0]44        # dict key access45        expected = {'a': 1, 'b': 2}46        result = self._make_result(expected)47        self.assertEqual(result['a'], 1)48        self.assertEqual(result['b'], 2)49        with self.assertRaises(KeyError):50            result['not_present']51        with self.assertRaises(ValueError):52            result[0]53        # list index access54        expected = [0, 1]55        result = self._make_result(expected)56        self.assertEqual(result[0], 0)57        self.assertEqual(result[1], 1)58        with self.assertRaises(IndexError):59            result[2]60        with self.assertRaises(ValueError):61            result['something']62    def test_as_vertex(self):63        prop_name = 'name'64        prop_val = 'val'65        vertex_dict = {'id': object(),66                       'label': object(),67                       'type': 'vertex',68                       'properties': {prop_name: [{'value': prop_val, 'whatever': object()}]}69                      }70        required_attrs = [k for k in vertex_dict if k != 'properties']71        result = self._make_result(vertex_dict)72        vertex = result.as_vertex()73        for attr in required_attrs:74            self.assertEqual(getattr(vertex, attr), vertex_dict[attr])75        self.assertEqual(len(vertex.properties), 1)76        self.assertEqual(vertex.properties[prop_name][0].value, prop_val)77        # no props78        modified_vertex_dict = vertex_dict.copy()79        del modified_vertex_dict['properties']80        vertex = self._make_result(modified_vertex_dict).as_vertex()81        self.assertEqual(vertex.properties, {})82        # wrong 'type'83        modified_vertex_dict = vertex_dict.copy()84        modified_vertex_dict['type'] = 'notavertex'85        result = self._make_result(modified_vertex_dict)86        self.assertRaises(TypeError, result.as_vertex)87        # missing required properties88        for attr in required_attrs:89            modified_vertex_dict = vertex_dict.copy()90            del modified_vertex_dict[attr]91            result = self._make_result(modified_vertex_dict)92            self.assertRaises(TypeError, result.as_vertex)93    def test_as_edge(self):94        prop_name = 'name'95        prop_val = 'val'96        edge_dict = {'id': object(),97                     'label': object(),98                     'type': 'edge',99                     'inV': object(),100                     'inVLabel': object(),101                     'outV': object(),102                     'outVLabel': object(),103                     'properties': {prop_name: prop_val}104                    }105        required_attrs = [k for k in edge_dict if k != 'properties']106        result = self._make_result(edge_dict)107        edge = result.as_edge()108        for attr in required_attrs:109            self.assertEqual(getattr(edge, attr), edge_dict[attr])110        self.assertEqual(len(edge.properties), 1)111        self.assertEqual(edge.properties[prop_name], prop_val)112        # no props113        modified_edge_dict = edge_dict.copy()114        del modified_edge_dict['properties']115        edge = self._make_result(modified_edge_dict).as_edge()116        self.assertEqual(edge.properties, {})117        # wrong 'type'118        modified_edge_dict = edge_dict.copy()119        modified_edge_dict['type'] = 'notanedge'120        result = self._make_result(modified_edge_dict)121        self.assertRaises(TypeError, result.as_edge)122        # missing required properties123        for attr in required_attrs:124            modified_edge_dict = edge_dict.copy()125            del modified_edge_dict[attr]126            result = self._make_result(modified_edge_dict)127            self.assertRaises(TypeError, result.as_edge)128    def test_as_path(self):129        vertex_dict = {'id': object(),130                       'label': object(),131                       'type': 'vertex',132                       'properties': {'name': [{'value': 'val', 'whatever': object()}]}133                       }134        edge_dict = {'id': object(),135                     'label': object(),136                     'type': 'edge',137                     'inV': object(),138                     'inVLabel': object(),139                     'outV': object(),140                     'outVLabel': object(),141                     'properties': {'name': 'val'}142                     }143        path_dict = {'labels': [['a', 'b'], ['c']],144                     'objects': [vertex_dict, edge_dict]145                    }146        result = self._make_result(path_dict)147        path = result.as_path()148        self.assertEqual(path.labels, path_dict['labels'])149        # make sure inner objects are bound correctly150        self.assertIsInstance(path.objects[0], Vertex)151        self.assertIsInstance(path.objects[1], Edge)152        # missing required properties153        for attr in path_dict:154            modified_path_dict = path_dict.copy()155            del modified_path_dict[attr]156            result = self._make_result(modified_path_dict)157            self.assertRaises(TypeError, result.as_path)158    def test_str(self):159        for v in self._values:160            self.assertEqual(str(self._make_result(v)), str(v))161    def test_repr(self):162        for v in self._values:163            result = self._make_result(v)164            self.assertEqual(eval(repr(result)), result)165    def _make_result(self, value):166        # direct pass-through now167        return Result(value)168class GraphTypeTests(unittest.TestCase):169    # see also: GraphResultTests.test_as_*170    def test_vertex_str_repr(self ):171        prop_name = 'name'172        prop_val = 'val'173        kwargs = {'id': 'id_val', 'label': 'label_val', 'type': 'vertex', 'properties': {prop_name: [{'value': prop_val}]}}174        vertex = Vertex(**kwargs)175        transformed = kwargs.copy()176        transformed['properties'] = {prop_name: [VertexProperty(prop_val)]}177        self.assertEqual(eval(str(vertex)), transformed)178        self.assertEqual(eval(repr(vertex)), vertex)179    def test_edge_str_repr(self ):...os.bzl
Source:os.bzl  
...31        )32        return struct(stdout = result.stdout, error = error)33    # Success.34    return struct(stdout = result.stdout, error = None)35def _make_result(36        error = None,37        ubuntu_release = None,38        macos_release = None):39    """Return a fully-populated struct result for determine_os, below."""40    if ubuntu_release != None:41        distribution = "ubuntu"42    elif macos_release != None:43        distribution = "macos"44    else:45        distribution = None46    return struct(47        error = error,48        distribution = distribution,49        is_macos = (macos_release != None),50        is_ubuntu = (ubuntu_release != None),51        ubuntu_release = ubuntu_release,52        macos_release = macos_release,53    )54def _determine_linux(repository_ctx):55    """Handle determine_os on Linux."""56    # Shared error message text across different failure cases.57    error_prologue = "could not determine Linux distribution: "58    # Run sed to determine Linux NAME and VERSION_ID.59    sed = exec_using_which(repository_ctx, [60        "sed",61        "-n",62        "/^\(NAME\|VERSION_ID\)=/{s/[^=]*=//;s/\"//g;p}",63        "/etc/os-release",64    ])65    if sed.error != None:66        return _make_result(error = error_prologue + sed.error)67    # Compute an identifying string, in the form of "$NAME $VERSION_ID".68    lines = [line.strip() for line in sed.stdout.strip().split("\n")]69    distro = " ".join([x for x in lines if len(x) > 0])70    # Match supported Ubuntu release(s). These should match those listed in71    # both doc/developers.rst the root CMakeLists.txt.72    for ubuntu_release in ["16.04", "18.04"]:73        if distro == "Ubuntu " + ubuntu_release:74            return _make_result(ubuntu_release = ubuntu_release)75    # Nothing matched.76    return _make_result(77        error = error_prologue + "unsupported distribution '%s'" % distro,78    )79def _determine_macos(repository_ctx):80    """Handle determine_os on macOS."""81    # Shared error message text across different failure cases.82    error_prologue = "could not determine macOS version: "83    # Run sw_vers to determine macOS version.84    sw_vers = exec_using_which(repository_ctx, [85        "sw_vers",86        "-productVersion",87    ])88    if sw_vers.error != None:89        return _make_result(error = error_prologue + sw_vers.error)90    major_minor_versions = sw_vers.stdout.strip().split(".")[:2]91    macos_release = ".".join(major_minor_versions)92    # Match supported macOS release(s).93    if macos_release in ["10.13", "10.14"]:94        return _make_result(macos_release = macos_release)95    # Nothing matched.96    return _make_result(97        error = error_prologue + "unsupported macOS '%s'" % macos_release,98    )99def determine_os(repository_ctx):100    """101    A repository_rule helper function that determines which of the supported OS102    versions we are targeting.103    Argument:104        repository_ctx: The context passed to the repository_rule calling this.105    Result:106        a struct, with attributes:107        - error: str iff any error occurred, else None108        - distribution: str either "ubuntu" or "macos" if no error109        - is_macos: True iff on a supported macOS release, else False110        - macos_release: str like "10.14" iff on a supported macOS, else None111        - is_ubuntu: True iff on a supported Ubuntu version, else False112        - ubuntu_release: str like "16.04" iff on a supported ubuntu, else None113    """114    os_name = repository_ctx.os.name115    if os_name == "mac os x":116        return _determine_macos(repository_ctx)117    elif os_name == "linux":118        return _determine_linux(repository_ctx)119    else:120        return _make_result(error = "unknown or unsupported OS '%s'" % os_name)121def os_specific_alias(repository_ctx, mapping):122    """123    A repository_rule helper function that creates a BUILD file with alias()124    declarations based on which supported OS version we are targeting.125    Argument:126        repository_ctx: The context passed to the repository_rule calling this.127        mapping: dict(str, list(str)) where the keys match the OS, and the list128            of values are of the form name=actual as in alias(name, actual).129    The keys of mapping are searched in the following preferential order:130    - Exact release, via e.g., "Ubuntu 16.04" or "macOS 10.14"131    - Any release, via "Ubuntu default" or "macOS default"132    - Anything else, via "default"133    """134    os_result = determine_os(repository_ctx)...test_hyperband_cost_promotion.py
Source:test_hyperband_cost_promotion.py  
...14from syne_tune.optimizer.schedulers.hyperband import HyperbandScheduler15from syne_tune.search_space import randint, uniform16from syne_tune.backend.trial_status import Trial17from syne_tune.optimizer.scheduler import SchedulerDecision18def _make_result(epoch, metric, cost):19    return dict(epoch=epoch, metric=metric, cost=cost)20def _new_trial(trial_id: int, config: dict):21    return Trial(22        trial_id=trial_id,23        config=config,24        creation_time=datetime.now())25def test_cost_offset():26    config_space = {27        'int': randint(1, 2),28        'float': uniform(5.5, 6.5),29        'epochs': 27}30    scheduler = HyperbandScheduler(31        config_space,32        searcher='random',33        metric='metric',34        mode='max',35        resource_attr='epoch',36        type='cost_promotion',37        max_resource_attr='epochs',38        rung_system_kwargs={'cost_attr': 'cost'})39    # Start 4 trials40    trials = dict()41    for trial_id in range(4):42        trials[trial_id] = _new_trial(43            trial_id, scheduler.suggest(trial_id=trial_id).config)44    # Make sure that 0, 1 are promoted eventually45    decision = scheduler.on_trial_result(46        trials[0], _make_result(1, 0.9, 1.0))47    assert decision == SchedulerDecision.PAUSE48    assert scheduler._cost_offset[str(0)] == 1.049    sugg = scheduler.suggest(trial_id=len(trials))50    assert sugg.spawn_new_trial_id51    trials[4] = _new_trial(4, sugg.config)52    decision = scheduler.on_trial_result(53        trials[1], _make_result(1, 0.8, 3.0))54    assert decision == SchedulerDecision.PAUSE55    assert scheduler._cost_offset[str(1)] == 3.056    # 1 < 4/3 -> Promote 057    sugg = scheduler.suggest(trial_id=len(trials))58    assert not sugg.spawn_new_trial_id59    assert sugg.checkpoint_trial_id == 060    assert sugg.config is not None61    trials[0].config = sugg.config62    decision = scheduler.on_trial_result(63        trials[2], _make_result(1, 0.7, 10.0))64    assert decision == SchedulerDecision.PAUSE65    assert scheduler._cost_offset[str(2)] == 10.066    decision = scheduler.on_trial_result(67        trials[0], _make_result(3, 0.95, 3.0))68    assert decision == SchedulerDecision.PAUSE69    assert scheduler._cost_offset[str(0)] == 4.070    # 4 < 14/3 -> Promote 171    sugg = scheduler.suggest(trial_id=len(trials))72    assert not sugg.spawn_new_trial_id73    assert sugg.checkpoint_trial_id == 174    assert sugg.config is not None75    trials[1].config = sugg.config76    decision = scheduler.on_trial_result(77        trials[1], _make_result(3, 0.85, 4.0))78    assert decision == SchedulerDecision.PAUSE79    assert scheduler._cost_offset[str(1)] == 7.080    new_trial_id = len(trials)81    # Nothing can be promoted here (4 > 11/3)82    sugg = scheduler.suggest(trial_id=new_trial_id)83    assert sugg.spawn_new_trial_id84    trials[new_trial_id] = _new_trial(new_trial_id, sugg.config)85# Same scenario as above, but resumed trials start from86# scratch, which should lead to cost offsets being reset87def test_reset_cost_offset():88    config_space = {89        'int': randint(1, 2),90        'float': uniform(5.5, 6.5),91        'epochs': 27}92    scheduler = HyperbandScheduler(93        config_space,94        searcher='random',95        metric='metric',96        mode='max',97        resource_attr='epoch',98        type='cost_promotion',99        max_resource_attr='epochs',100        rung_system_kwargs={'cost_attr': 'cost'})101    # Start 4 trials102    trials = dict()103    for trial_id in range(4):104        trials[trial_id] = _new_trial(105            trial_id, scheduler.suggest(trial_id=trial_id).config)106    # Make sure that 0, 1 are promoted eventually107    decision = scheduler.on_trial_result(108        trials[0], _make_result(1, 0.9, 1.0))109    assert decision == SchedulerDecision.PAUSE110    assert scheduler._cost_offset[str(0)] == 1.0111    sugg = scheduler.suggest(trial_id=len(trials))112    assert sugg.spawn_new_trial_id113    trials[4] = _new_trial(4, sugg.config)114    decision = scheduler.on_trial_result(115        trials[1], _make_result(1, 0.8, 3.0))116    assert decision == SchedulerDecision.PAUSE117    assert scheduler._cost_offset[str(1)] == 3.0118    # 1 < 4/3 -> Promote 0119    sugg = scheduler.suggest(trial_id=len(trials))120    assert not sugg.spawn_new_trial_id121    assert sugg.checkpoint_trial_id == 0122    assert sugg.config is not None123    trials[0].config = sugg.config124    decision = scheduler.on_trial_result(125        trials[2], _make_result(1, 0.7, 10.0))126    assert decision == SchedulerDecision.PAUSE127    assert scheduler._cost_offset[str(2)] == 10.0128    # trial_id 0 reports for epoch=1, which signals restart129    # This should trigger reset of cost offset130    decision = scheduler.on_trial_result(131        trials[0], _make_result(1, 0.9, 1.5))132    assert decision == SchedulerDecision.CONTINUE133    assert scheduler._cost_offset[str(0)] == 0.0134    decision = scheduler.on_trial_result(135        trials[0], _make_result(2, 0.91, 2.5))136    assert decision == SchedulerDecision.CONTINUE137    assert scheduler._cost_offset[str(0)] == 0.0138    decision = scheduler.on_trial_result(139        trials[0], _make_result(3, 0.95, 3.0))140    assert decision == SchedulerDecision.PAUSE141    assert scheduler._cost_offset[str(0)] == 3.0142    # 4 < 14/3 -> Promote 1143    sugg = scheduler.suggest(trial_id=len(trials))144    assert not sugg.spawn_new_trial_id145    assert sugg.checkpoint_trial_id == 1146    assert sugg.config is not None147    trials[1].config = sugg.config148    # trial_id 1 reports for epoch=1, which signals restart149    # This should trigger reset of cost offset150    decision = scheduler.on_trial_result(151        trials[1], _make_result(1, 0.8, 2.5))152    assert decision == SchedulerDecision.CONTINUE153    assert scheduler._cost_offset[str(1)] == 0.0154    decision = scheduler.on_trial_result(155        trials[1], _make_result(2, 0.81, 3.5))156    assert decision == SchedulerDecision.CONTINUE157    assert scheduler._cost_offset[str(1)] == 0.0158    decision = scheduler.on_trial_result(159        trials[1], _make_result(3, 0.85, 4.0))160    assert decision == SchedulerDecision.PAUSE...dateutils.py
Source:dateutils.py  
...28        )29    raise TypeError(30        'Incorrect type of object {} Expected "datetime.date" or "datetime.datetime"'.format(instance),31    )32def _make_result(data, *handlers):33    for handler in handlers:34        data = handler(data)35    return data36def to_string(d, fmt=None):37    default_fmt = (38        DEFAULT_DATETIME_FORMAT39        if isinstance(d, _dt.datetime)40        else41        DEFAULT_DATE_FORMAT42    )43    return d.strftime(fmt or default_fmt)44def date_args_to_string(fmt):45    def wrapper(f):46        @wraps(f)47        def wrapped(*args, **kwargs):48            new_args = []49            for a in args:50                if isinstance(a, _dt.date):51                    new_args.append(to_string(a, fmt))52                else:53                    new_args.append(a)54            for k, v in kwargs.items():55                if isinstance(v, _dt.date):56                    kwargs[k] = to_string(v, fmt)57            return f(*new_args, **kwargs)58        return wrapped59    return wrapper60def minus_delta(datetime, **kwargs):61    return _make_copy(datetime - _dt.timedelta(**kwargs))62def plus_delta(datetime, **kwargs):63    return _make_copy(datetime + _dt.timedelta(**kwargs))64def minus_seconds(datetime, seconds, *handlers):65    return _make_result(minus_delta(datetime, seconds=seconds), *handlers)66def plus_seconds(datetime, seconds, *handlers):67    return _make_result(plus_delta(datetime, seconds=seconds), *handlers)68def minus_minutes(datetime, minutes, *handlers):69    return _make_result(minus_delta(datetime, minutes=minutes), *handlers)70def plus_minutes(datetime, minutes, *handlers):71    return _make_result(plus_delta(datetime, minutes=minutes), *handlers)72def minus_hours(datetime, hours, *handlers):73    return _make_result(minus_delta(datetime, hours=hours), *handlers)74def plus_hours(datetime, hours, *handlers):75    return _make_result(plus_delta(datetime, hours=hours), *handlers)76def minus_days(datetime, days, *handlers):77    return _make_result(minus_delta(datetime, days=days), *handlers)78def plus_days(datetime, days, *handlers):79    return _make_result(plus_delta(datetime, days=days), *handlers)80def minus_weeks(datetime, weeks, *handlers):81    return _make_result(minus_delta(datetime, weeks=weeks), *handlers)82def plus_weeks(datetime, weeks, *handlers):83    return _make_result(plus_delta(datetime, weeks=weeks), *handlers)84def minus_months(datetime, months, *handlers):85    return _make_result(_make_copy(datetime - relativedelta(months=months)), *handlers)86def plus_months(datetime, months, *handlers):87    return _make_result(_make_copy(datetime - relativedelta(months=-months)), *handlers)88def minus_years(datetime, years, *handlers):89    return _make_result(_make_copy(datetime - relativedelta(years=years)), *handlers)90def plus_years(datetime, years, *handlers):91    return _make_result(_make_copy(datetime - relativedelta(years=-years)), *handlers)92def to_start_week(datetime, *handlers):93    days = _dt.datetime.now().weekday()94    return _make_result(minus_delta(datetime, days=days), *handlers)95def to_start_month(datetime, *handlers):96    return _make_result(_make_copy(datetime.replace(day=1)), *handlers)97def to_start_year(datetime, *handlers):98    return _make_result(_make_copy(datetime.replace(day=1, month=1)), *handlers)99def to_end_month(datetime, *handlers):100    _, end_day = calendar.monthrange(datetime.year, datetime.month)101    return _make_result(_make_copy(datetime.replace(day=end_day)), *handlers)102def to_date(date, *handlers):103    return _make_result(_dt.date(date.year, date.month, date.day), *handlers)104def now(*handlers):105   return _make_result(_dt.datetime.now(), *handlers)106def date(year, month, day, *handlers):107    return _make_result(_dt.date(year, month, day), *handlers)108def today(*handlers):109    return _make_result(_dt.date.today(), *handlers)110class delta:111    minus_seconds = staticmethod(lambda s: lambda d: minus_seconds(d, s))112    plus_seconds = staticmethod(lambda s: lambda d: plus_seconds(d, s))113    minus_minutes = staticmethod(lambda m: lambda d: minus_minutes(d, m))114    plus_minutes = staticmethod(lambda m: lambda d: plus_minutes(d, m))115    minus_hours = staticmethod(lambda h: lambda d: minus_hours(d, h))116    plus_hours = staticmethod(lambda h: lambda d: plus_hours(d, h))117    minus_days = staticmethod(lambda y: lambda d: minus_days(d, y))118    plus_days = staticmethod(lambda y: lambda d: plus_days(d, y))119    minus_weeks = staticmethod(lambda w: lambda d: minus_weeks(d, w))120    plus_weeks = staticmethod(lambda w: lambda d: plus_weeks(d, w))121    minus_months = staticmethod(lambda m: lambda d: minus_months(d, m))122    plus_months = staticmethod(lambda m: lambda d: plus_months(d, m))123    minus_years = staticmethod(lambda y: lambda d: minus_years(d, y))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
