How to use _make_result method in stestr

Best Python code snippet using stestr_python

test_graph.py

Source:test_graph.py Github

copy

Full Screen

...20class GraphResultTests(unittest.TestCase):21 _values = (None, 1, 1.2, True, False, [1, 2, 3], {'x': 1, 'y': 2})22 def test_result_value(self):23 for v in self._values:24 result = self._make_result(v)25 self.assertEqual(result.value, v)26 def test_result_attr(self):27 # value is not a dict28 result = self._make_result(123)29 with self.assertRaises(ValueError):30 result.something31 expected = {'a': 1, 'b': 2}32 result = self._make_result(expected)33 self.assertEqual(result.a, 1)34 self.assertEqual(result.b, 2)35 with self.assertRaises(AttributeError):36 result.not_present37 def test_result_item(self):38 # value is not a dict, list39 result = self._make_result(123)40 with self.assertRaises(ValueError):41 result['something']42 with self.assertRaises(ValueError):43 result[0]44 # dict key access45 expected = {'a': 1, 'b': 2}46 result = self._make_result(expected)47 self.assertEqual(result['a'], 1)48 self.assertEqual(result['b'], 2)49 with self.assertRaises(KeyError):50 result['not_present']51 with self.assertRaises(ValueError):52 result[0]53 # list index access54 expected = [0, 1]55 result = self._make_result(expected)56 self.assertEqual(result[0], 0)57 self.assertEqual(result[1], 1)58 with self.assertRaises(IndexError):59 result[2]60 with self.assertRaises(ValueError):61 result['something']62 def test_as_vertex(self):63 prop_name = 'name'64 prop_val = 'val'65 vertex_dict = {'id': object(),66 'label': object(),67 'type': 'vertex',68 'properties': {prop_name: [{'value': prop_val, 'whatever': object()}]}69 }70 required_attrs = [k for k in vertex_dict if k != 'properties']71 result = self._make_result(vertex_dict)72 vertex = result.as_vertex()73 for attr in required_attrs:74 self.assertEqual(getattr(vertex, attr), vertex_dict[attr])75 self.assertEqual(len(vertex.properties), 1)76 self.assertEqual(vertex.properties[prop_name][0].value, prop_val)77 # no props78 modified_vertex_dict = vertex_dict.copy()79 del modified_vertex_dict['properties']80 vertex = self._make_result(modified_vertex_dict).as_vertex()81 self.assertEqual(vertex.properties, {})82 # wrong 'type'83 modified_vertex_dict = vertex_dict.copy()84 modified_vertex_dict['type'] = 'notavertex'85 result = self._make_result(modified_vertex_dict)86 self.assertRaises(TypeError, result.as_vertex)87 # missing required properties88 for attr in required_attrs:89 modified_vertex_dict = vertex_dict.copy()90 del modified_vertex_dict[attr]91 result = self._make_result(modified_vertex_dict)92 self.assertRaises(TypeError, result.as_vertex)93 def test_as_edge(self):94 prop_name = 'name'95 prop_val = 'val'96 edge_dict = {'id': object(),97 'label': object(),98 'type': 'edge',99 'inV': object(),100 'inVLabel': object(),101 'outV': object(),102 'outVLabel': object(),103 'properties': {prop_name: prop_val}104 }105 required_attrs = [k for k in edge_dict if k != 'properties']106 result = self._make_result(edge_dict)107 edge = result.as_edge()108 for attr in required_attrs:109 self.assertEqual(getattr(edge, attr), edge_dict[attr])110 self.assertEqual(len(edge.properties), 1)111 self.assertEqual(edge.properties[prop_name], prop_val)112 # no props113 modified_edge_dict = edge_dict.copy()114 del modified_edge_dict['properties']115 edge = self._make_result(modified_edge_dict).as_edge()116 self.assertEqual(edge.properties, {})117 # wrong 'type'118 modified_edge_dict = edge_dict.copy()119 modified_edge_dict['type'] = 'notanedge'120 result = self._make_result(modified_edge_dict)121 self.assertRaises(TypeError, result.as_edge)122 # missing required properties123 for attr in required_attrs:124 modified_edge_dict = edge_dict.copy()125 del modified_edge_dict[attr]126 result = self._make_result(modified_edge_dict)127 self.assertRaises(TypeError, result.as_edge)128 def test_as_path(self):129 vertex_dict = {'id': object(),130 'label': object(),131 'type': 'vertex',132 'properties': {'name': [{'value': 'val', 'whatever': object()}]}133 }134 edge_dict = {'id': object(),135 'label': object(),136 'type': 'edge',137 'inV': object(),138 'inVLabel': object(),139 'outV': object(),140 'outVLabel': object(),141 'properties': {'name': 'val'}142 }143 path_dict = {'labels': [['a', 'b'], ['c']],144 'objects': [vertex_dict, edge_dict]145 }146 result = self._make_result(path_dict)147 path = result.as_path()148 self.assertEqual(path.labels, path_dict['labels'])149 # make sure inner objects are bound correctly150 self.assertIsInstance(path.objects[0], Vertex)151 self.assertIsInstance(path.objects[1], Edge)152 # missing required properties153 for attr in path_dict:154 modified_path_dict = path_dict.copy()155 del modified_path_dict[attr]156 result = self._make_result(modified_path_dict)157 self.assertRaises(TypeError, result.as_path)158 def test_str(self):159 for v in self._values:160 self.assertEqual(str(self._make_result(v)), str(v))161 def test_repr(self):162 for v in self._values:163 result = self._make_result(v)164 self.assertEqual(eval(repr(result)), result)165 def _make_result(self, value):166 # direct pass-through now167 return Result(value)168class GraphTypeTests(unittest.TestCase):169 # see also: GraphResultTests.test_as_*170 def test_vertex_str_repr(self ):171 prop_name = 'name'172 prop_val = 'val'173 kwargs = {'id': 'id_val', 'label': 'label_val', 'type': 'vertex', 'properties': {prop_name: [{'value': prop_val}]}}174 vertex = Vertex(**kwargs)175 transformed = kwargs.copy()176 transformed['properties'] = {prop_name: [VertexProperty(prop_val)]}177 self.assertEqual(eval(str(vertex)), transformed)178 self.assertEqual(eval(repr(vertex)), vertex)179 def test_edge_str_repr(self ):...

Full Screen

Full Screen

os.bzl

Source:os.bzl Github

copy

Full Screen

...31 )32 return struct(stdout = result.stdout, error = error)33 # Success.34 return struct(stdout = result.stdout, error = None)35def _make_result(36 error = None,37 ubuntu_release = None,38 macos_release = None):39 """Return a fully-populated struct result for determine_os, below."""40 if ubuntu_release != None:41 distribution = "ubuntu"42 elif macos_release != None:43 distribution = "macos"44 else:45 distribution = None46 return struct(47 error = error,48 distribution = distribution,49 is_macos = (macos_release != None),50 is_ubuntu = (ubuntu_release != None),51 ubuntu_release = ubuntu_release,52 macos_release = macos_release,53 )54def _determine_linux(repository_ctx):55 """Handle determine_os on Linux."""56 # Shared error message text across different failure cases.57 error_prologue = "could not determine Linux distribution: "58 # Run sed to determine Linux NAME and VERSION_ID.59 sed = exec_using_which(repository_ctx, [60 "sed",61 "-n",62 "/^\(NAME\|VERSION_ID\)=/{s/[^=]*=//;s/\"//g;p}",63 "/etc/os-release",64 ])65 if sed.error != None:66 return _make_result(error = error_prologue + sed.error)67 # Compute an identifying string, in the form of "$NAME $VERSION_ID".68 lines = [line.strip() for line in sed.stdout.strip().split("\n")]69 distro = " ".join([x for x in lines if len(x) > 0])70 # Match supported Ubuntu release(s). These should match those listed in71 # both doc/developers.rst the root CMakeLists.txt.72 for ubuntu_release in ["16.04", "18.04"]:73 if distro == "Ubuntu " + ubuntu_release:74 return _make_result(ubuntu_release = ubuntu_release)75 # Nothing matched.76 return _make_result(77 error = error_prologue + "unsupported distribution '%s'" % distro,78 )79def _determine_macos(repository_ctx):80 """Handle determine_os on macOS."""81 # Shared error message text across different failure cases.82 error_prologue = "could not determine macOS version: "83 # Run sw_vers to determine macOS version.84 sw_vers = exec_using_which(repository_ctx, [85 "sw_vers",86 "-productVersion",87 ])88 if sw_vers.error != None:89 return _make_result(error = error_prologue + sw_vers.error)90 major_minor_versions = sw_vers.stdout.strip().split(".")[:2]91 macos_release = ".".join(major_minor_versions)92 # Match supported macOS release(s).93 if macos_release in ["10.13", "10.14"]:94 return _make_result(macos_release = macos_release)95 # Nothing matched.96 return _make_result(97 error = error_prologue + "unsupported macOS '%s'" % macos_release,98 )99def determine_os(repository_ctx):100 """101 A repository_rule helper function that determines which of the supported OS102 versions we are targeting.103 Argument:104 repository_ctx: The context passed to the repository_rule calling this.105 Result:106 a struct, with attributes:107 - error: str iff any error occurred, else None108 - distribution: str either "ubuntu" or "macos" if no error109 - is_macos: True iff on a supported macOS release, else False110 - macos_release: str like "10.14" iff on a supported macOS, else None111 - is_ubuntu: True iff on a supported Ubuntu version, else False112 - ubuntu_release: str like "16.04" iff on a supported ubuntu, else None113 """114 os_name = repository_ctx.os.name115 if os_name == "mac os x":116 return _determine_macos(repository_ctx)117 elif os_name == "linux":118 return _determine_linux(repository_ctx)119 else:120 return _make_result(error = "unknown or unsupported OS '%s'" % os_name)121def os_specific_alias(repository_ctx, mapping):122 """123 A repository_rule helper function that creates a BUILD file with alias()124 declarations based on which supported OS version we are targeting.125 Argument:126 repository_ctx: The context passed to the repository_rule calling this.127 mapping: dict(str, list(str)) where the keys match the OS, and the list128 of values are of the form name=actual as in alias(name, actual).129 The keys of mapping are searched in the following preferential order:130 - Exact release, via e.g., "Ubuntu 16.04" or "macOS 10.14"131 - Any release, via "Ubuntu default" or "macOS default"132 - Anything else, via "default"133 """134 os_result = determine_os(repository_ctx)...

Full Screen

Full Screen

test_hyperband_cost_promotion.py

Source:test_hyperband_cost_promotion.py Github

copy

Full Screen

...14from syne_tune.optimizer.schedulers.hyperband import HyperbandScheduler15from syne_tune.search_space import randint, uniform16from syne_tune.backend.trial_status import Trial17from syne_tune.optimizer.scheduler import SchedulerDecision18def _make_result(epoch, metric, cost):19 return dict(epoch=epoch, metric=metric, cost=cost)20def _new_trial(trial_id: int, config: dict):21 return Trial(22 trial_id=trial_id,23 config=config,24 creation_time=datetime.now())25def test_cost_offset():26 config_space = {27 'int': randint(1, 2),28 'float': uniform(5.5, 6.5),29 'epochs': 27}30 scheduler = HyperbandScheduler(31 config_space,32 searcher='random',33 metric='metric',34 mode='max',35 resource_attr='epoch',36 type='cost_promotion',37 max_resource_attr='epochs',38 rung_system_kwargs={'cost_attr': 'cost'})39 # Start 4 trials40 trials = dict()41 for trial_id in range(4):42 trials[trial_id] = _new_trial(43 trial_id, scheduler.suggest(trial_id=trial_id).config)44 # Make sure that 0, 1 are promoted eventually45 decision = scheduler.on_trial_result(46 trials[0], _make_result(1, 0.9, 1.0))47 assert decision == SchedulerDecision.PAUSE48 assert scheduler._cost_offset[str(0)] == 1.049 sugg = scheduler.suggest(trial_id=len(trials))50 assert sugg.spawn_new_trial_id51 trials[4] = _new_trial(4, sugg.config)52 decision = scheduler.on_trial_result(53 trials[1], _make_result(1, 0.8, 3.0))54 assert decision == SchedulerDecision.PAUSE55 assert scheduler._cost_offset[str(1)] == 3.056 # 1 < 4/3 -> Promote 057 sugg = scheduler.suggest(trial_id=len(trials))58 assert not sugg.spawn_new_trial_id59 assert sugg.checkpoint_trial_id == 060 assert sugg.config is not None61 trials[0].config = sugg.config62 decision = scheduler.on_trial_result(63 trials[2], _make_result(1, 0.7, 10.0))64 assert decision == SchedulerDecision.PAUSE65 assert scheduler._cost_offset[str(2)] == 10.066 decision = scheduler.on_trial_result(67 trials[0], _make_result(3, 0.95, 3.0))68 assert decision == SchedulerDecision.PAUSE69 assert scheduler._cost_offset[str(0)] == 4.070 # 4 < 14/3 -> Promote 171 sugg = scheduler.suggest(trial_id=len(trials))72 assert not sugg.spawn_new_trial_id73 assert sugg.checkpoint_trial_id == 174 assert sugg.config is not None75 trials[1].config = sugg.config76 decision = scheduler.on_trial_result(77 trials[1], _make_result(3, 0.85, 4.0))78 assert decision == SchedulerDecision.PAUSE79 assert scheduler._cost_offset[str(1)] == 7.080 new_trial_id = len(trials)81 # Nothing can be promoted here (4 > 11/3)82 sugg = scheduler.suggest(trial_id=new_trial_id)83 assert sugg.spawn_new_trial_id84 trials[new_trial_id] = _new_trial(new_trial_id, sugg.config)85# Same scenario as above, but resumed trials start from86# scratch, which should lead to cost offsets being reset87def test_reset_cost_offset():88 config_space = {89 'int': randint(1, 2),90 'float': uniform(5.5, 6.5),91 'epochs': 27}92 scheduler = HyperbandScheduler(93 config_space,94 searcher='random',95 metric='metric',96 mode='max',97 resource_attr='epoch',98 type='cost_promotion',99 max_resource_attr='epochs',100 rung_system_kwargs={'cost_attr': 'cost'})101 # Start 4 trials102 trials = dict()103 for trial_id in range(4):104 trials[trial_id] = _new_trial(105 trial_id, scheduler.suggest(trial_id=trial_id).config)106 # Make sure that 0, 1 are promoted eventually107 decision = scheduler.on_trial_result(108 trials[0], _make_result(1, 0.9, 1.0))109 assert decision == SchedulerDecision.PAUSE110 assert scheduler._cost_offset[str(0)] == 1.0111 sugg = scheduler.suggest(trial_id=len(trials))112 assert sugg.spawn_new_trial_id113 trials[4] = _new_trial(4, sugg.config)114 decision = scheduler.on_trial_result(115 trials[1], _make_result(1, 0.8, 3.0))116 assert decision == SchedulerDecision.PAUSE117 assert scheduler._cost_offset[str(1)] == 3.0118 # 1 < 4/3 -> Promote 0119 sugg = scheduler.suggest(trial_id=len(trials))120 assert not sugg.spawn_new_trial_id121 assert sugg.checkpoint_trial_id == 0122 assert sugg.config is not None123 trials[0].config = sugg.config124 decision = scheduler.on_trial_result(125 trials[2], _make_result(1, 0.7, 10.0))126 assert decision == SchedulerDecision.PAUSE127 assert scheduler._cost_offset[str(2)] == 10.0128 # trial_id 0 reports for epoch=1, which signals restart129 # This should trigger reset of cost offset130 decision = scheduler.on_trial_result(131 trials[0], _make_result(1, 0.9, 1.5))132 assert decision == SchedulerDecision.CONTINUE133 assert scheduler._cost_offset[str(0)] == 0.0134 decision = scheduler.on_trial_result(135 trials[0], _make_result(2, 0.91, 2.5))136 assert decision == SchedulerDecision.CONTINUE137 assert scheduler._cost_offset[str(0)] == 0.0138 decision = scheduler.on_trial_result(139 trials[0], _make_result(3, 0.95, 3.0))140 assert decision == SchedulerDecision.PAUSE141 assert scheduler._cost_offset[str(0)] == 3.0142 # 4 < 14/3 -> Promote 1143 sugg = scheduler.suggest(trial_id=len(trials))144 assert not sugg.spawn_new_trial_id145 assert sugg.checkpoint_trial_id == 1146 assert sugg.config is not None147 trials[1].config = sugg.config148 # trial_id 1 reports for epoch=1, which signals restart149 # This should trigger reset of cost offset150 decision = scheduler.on_trial_result(151 trials[1], _make_result(1, 0.8, 2.5))152 assert decision == SchedulerDecision.CONTINUE153 assert scheduler._cost_offset[str(1)] == 0.0154 decision = scheduler.on_trial_result(155 trials[1], _make_result(2, 0.81, 3.5))156 assert decision == SchedulerDecision.CONTINUE157 assert scheduler._cost_offset[str(1)] == 0.0158 decision = scheduler.on_trial_result(159 trials[1], _make_result(3, 0.85, 4.0))160 assert decision == SchedulerDecision.PAUSE...

Full Screen

Full Screen

dateutils.py

Source:dateutils.py Github

copy

Full Screen

...28 )29 raise TypeError(30 'Incorrect type of object {} Expected "datetime.date" or "datetime.datetime"'.format(instance),31 )32def _make_result(data, *handlers):33 for handler in handlers:34 data = handler(data)35 return data36def to_string(d, fmt=None):37 default_fmt = (38 DEFAULT_DATETIME_FORMAT39 if isinstance(d, _dt.datetime)40 else41 DEFAULT_DATE_FORMAT42 )43 return d.strftime(fmt or default_fmt)44def date_args_to_string(fmt):45 def wrapper(f):46 @wraps(f)47 def wrapped(*args, **kwargs):48 new_args = []49 for a in args:50 if isinstance(a, _dt.date):51 new_args.append(to_string(a, fmt))52 else:53 new_args.append(a)54 for k, v in kwargs.items():55 if isinstance(v, _dt.date):56 kwargs[k] = to_string(v, fmt)57 return f(*new_args, **kwargs)58 return wrapped59 return wrapper60def minus_delta(datetime, **kwargs):61 return _make_copy(datetime - _dt.timedelta(**kwargs))62def plus_delta(datetime, **kwargs):63 return _make_copy(datetime + _dt.timedelta(**kwargs))64def minus_seconds(datetime, seconds, *handlers):65 return _make_result(minus_delta(datetime, seconds=seconds), *handlers)66def plus_seconds(datetime, seconds, *handlers):67 return _make_result(plus_delta(datetime, seconds=seconds), *handlers)68def minus_minutes(datetime, minutes, *handlers):69 return _make_result(minus_delta(datetime, minutes=minutes), *handlers)70def plus_minutes(datetime, minutes, *handlers):71 return _make_result(plus_delta(datetime, minutes=minutes), *handlers)72def minus_hours(datetime, hours, *handlers):73 return _make_result(minus_delta(datetime, hours=hours), *handlers)74def plus_hours(datetime, hours, *handlers):75 return _make_result(plus_delta(datetime, hours=hours), *handlers)76def minus_days(datetime, days, *handlers):77 return _make_result(minus_delta(datetime, days=days), *handlers)78def plus_days(datetime, days, *handlers):79 return _make_result(plus_delta(datetime, days=days), *handlers)80def minus_weeks(datetime, weeks, *handlers):81 return _make_result(minus_delta(datetime, weeks=weeks), *handlers)82def plus_weeks(datetime, weeks, *handlers):83 return _make_result(plus_delta(datetime, weeks=weeks), *handlers)84def minus_months(datetime, months, *handlers):85 return _make_result(_make_copy(datetime - relativedelta(months=months)), *handlers)86def plus_months(datetime, months, *handlers):87 return _make_result(_make_copy(datetime - relativedelta(months=-months)), *handlers)88def minus_years(datetime, years, *handlers):89 return _make_result(_make_copy(datetime - relativedelta(years=years)), *handlers)90def plus_years(datetime, years, *handlers):91 return _make_result(_make_copy(datetime - relativedelta(years=-years)), *handlers)92def to_start_week(datetime, *handlers):93 days = _dt.datetime.now().weekday()94 return _make_result(minus_delta(datetime, days=days), *handlers)95def to_start_month(datetime, *handlers):96 return _make_result(_make_copy(datetime.replace(day=1)), *handlers)97def to_start_year(datetime, *handlers):98 return _make_result(_make_copy(datetime.replace(day=1, month=1)), *handlers)99def to_end_month(datetime, *handlers):100 _, end_day = calendar.monthrange(datetime.year, datetime.month)101 return _make_result(_make_copy(datetime.replace(day=end_day)), *handlers)102def to_date(date, *handlers):103 return _make_result(_dt.date(date.year, date.month, date.day), *handlers)104def now(*handlers):105 return _make_result(_dt.datetime.now(), *handlers)106def date(year, month, day, *handlers):107 return _make_result(_dt.date(year, month, day), *handlers)108def today(*handlers):109 return _make_result(_dt.date.today(), *handlers)110class delta:111 minus_seconds = staticmethod(lambda s: lambda d: minus_seconds(d, s))112 plus_seconds = staticmethod(lambda s: lambda d: plus_seconds(d, s))113 minus_minutes = staticmethod(lambda m: lambda d: minus_minutes(d, m))114 plus_minutes = staticmethod(lambda m: lambda d: plus_minutes(d, m))115 minus_hours = staticmethod(lambda h: lambda d: minus_hours(d, h))116 plus_hours = staticmethod(lambda h: lambda d: plus_hours(d, h))117 minus_days = staticmethod(lambda y: lambda d: minus_days(d, y))118 plus_days = staticmethod(lambda y: lambda d: plus_days(d, y))119 minus_weeks = staticmethod(lambda w: lambda d: minus_weeks(d, w))120 plus_weeks = staticmethod(lambda w: lambda d: plus_weeks(d, w))121 minus_months = staticmethod(lambda m: lambda d: minus_months(d, m))122 plus_months = staticmethod(lambda m: lambda d: plus_months(d, m))123 minus_years = staticmethod(lambda y: lambda d: minus_years(d, y))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful