How to use get_store method in localstack

Best Python code snippet using localstack_python

task_steps.py

Source:task_steps.py Github

copy

Full Screen

...28 )29 return json.loads(value)30@then('a task with the {field} "{value}" will exist')31def task_with_field_exists(context, field, value):32 store = get_store()33 matches = store.client.filter_tasks({field: value})34 assert len(matches) > 0, f"No task found with {field} == {value}"35@then("a single {status} task with the following details will exist")36def task_with_details(context, status):37 store = get_store()38 tasks = store.client.filter_tasks({"status": status})39 assert len(tasks) == 1, "Asserted single task to be found, found %s" % (len(tasks))40 task = tasks[0]41 for key, value in context.table.rows:42 assert task.get(key) == get_json_value(43 value, time_zone=context.time_zone44 ), "Task field {}'s value is {}, not {}".format(key, task.get(key), value)45@given("the user's task store is configured with the following options")46def store_configuration(context):47 store = get_store()48 for key, value in context.table.rows:49 setattr(store, key, get_json_value(value, time_zone=context.time_zone))50 store.save()51@then('a single {status} task will have its "{field}" field set')52def task_non_null_field(context, status, field):53 store = get_store()54 tasks = store.client.filter_tasks({"status": status})55 assert len(tasks) == 1, "Asserted single task to be found, found %s" % (len(tasks))56 task = tasks[0]57 assert task.get(field)58@then('a single {status} task will not have its "{field}" field set')59def task_null_field(context, status, field):60 store = get_store()61 tasks = store.client.filter_tasks({"status": status})62 assert len(tasks) == 1, "Asserted single task to be found, found %s" % (len(tasks))63 task = tasks[0]64 assert not task.get(field)65@given('a task with the {field} "{value}" exists')66def task_existing_with_value(context, field, value):67 store = get_store()68 basic_task = {69 "description": "Gather at Terminus for Hari Seldon's Address",70 "project": "terminus_empire",71 "tags": ["next_steps", "mule"],72 }73 if field == "tags":74 value = value.split(",")75 basic_task[field] = value76 task = store.client.task_add(**basic_task)77 context.created_task_id = task["uuid"]78@then('a task named "{value}" is visible in the task list')79def task_with_description_visible(context, value):80 context.execute_steps(81 """82 then the element at CSS selector "{selector}" has text "{value}"83 """.format(84 selector="div.task-list-item p.description", value=value85 )86 )87@then('a task named "{value}" is the opened task')88def task_with_description_visible_main(context, value):89 context.execute_steps(90 """91 then the element at CSS selector "{selector}" has text "{value}"92 """.format(93 selector="h1.title", value=value94 )95 )96@when('the user creates a new task with the description "{value}"')97def task_with_new_description(context, value):98 context.execute_steps(99 """100 When the user clicks the link "New"101 And the user waits for 1 seconds102 And the user enters the text "{val}" into the field named "description"103 And the user clicks the button labeled "Save"104 And the user waits for 1 seconds105 """.format(106 val=value107 )108 )109@given('the user is viewing an existing task with the {field} "{value}"')110def logged_in_and_viewing_task(context, field, value):111 context.execute_steps(112 """113 Given the user is logged-in114 And a task with the {field} "{value}" exists115 And the user goes to the task's URL116 """.format(117 field=field,118 value=value,119 )120 )121@step("the user goes to the task's URL")122def user_goes_to_tasks_url(context):123 context.execute_steps(124 """125 then the user accesses the url "%s"126 """127 % ("/tasks/%s" % context.created_task_id)128 )129@step("the user clicks the refresh button")130def user_clicks_refresh(context):131 context.browser.find_by_id("refresh-link").first.find_by_tag("a").click()132 time.sleep(1)133@given("a task with the following details exists")134def existing_task_with_details(context):135 task = {"description": "Untitled"}136 for key, value in context.table.rows:137 task[key] = get_json_value(value, time_zone=context.time_zone)138 task["uuid"] = str(uuid.uuid4())139 if "annotations" in task:140 final_annotations = []141 for annotation in task["annotations"]:142 if isinstance(annotation, basestring):143 final_annotations.append(144 {"description": annotation, "entry": datetime.datetime.utcnow()}145 )146 else:147 final_annotations.append(annotation)148 task["annotations"] = final_annotations149 store = get_store()150 task = store.client.import_task(task)151 context.created_task_id = task["uuid"]152 context.execute_steps(153 """154 then the user clicks the refresh button155 """156 )157@then("{count} {status} tasks exist in the user's task list")158def task_count_matches(context, count, status):159 count = int(count)160 store = get_store()161 tasks = store.client.filter_tasks({"status": status})162 assert len(tasks) == count163@then("the following values are visible in the task's details")164def following_values_visible_details(context):165 visible_data = {}166 for row in context.browser.find_by_xpath("//table[@class='details']//tr"):167 key = row.find_by_tag("th")[0].text.lower()168 value = row.find_by_tag("td")[0].text169 visible_data[key] = value170 for key, value in context.table.rows:171 actual_value = visible_data.get(key.lower(), None)172 assert actual_value == value, f"{actual_value} != {value}"173@given('a task "{name}" with the following details')174def task_with_following_details(context, name):175 task = {176 row[0]: get_json_value(row[1], time_zone=context.time_zone)177 for row in context.table.rows178 }179 task["uuid"] = str(uuid.uuid4())180 if "annotations" in task:181 final_annotations = []182 for annotation in task["annotations"]:183 if isinstance(annotation, basestring):184 final_annotations.append(185 {"description": annotation, "entry": datetime.datetime.utcnow()}186 )187 else:188 final_annotations.append(annotation)189 task["annotations"] = final_annotations190 store = get_store()191 task = store.client.import_task(task)192 if not hasattr(context, "named_tasks"):193 context.named_tasks = {}194 context.named_tasks[name] = task195@when('the tasks "{left}" and "{right}" are merged')196def named_tasks_merged(context, left, right):197 alpha = context.named_tasks[left]198 beta = context.named_tasks[right]199 alpha, beta = merge_task_data(alpha, beta)200 context.named_tasks[left] = alpha201 context.named_tasks[right] = beta202@then('the task "{left}" will be marked as a duplicate of "{right}"')203def task_annotated_as_duplicate(context, left, right):204 store = get_store()205 beta = store.client.filter_tasks({"uuid": context.named_tasks[left]["uuid"]})[0]206 alpha = store.client.filter_tasks({"uuid": context.named_tasks[right]["uuid"]})[0]207 assert alpha["intheammergedfrom"] == str(beta["uuid"]), "No backreference set."208 assert beta["intheamduplicateof"] == str(alpha["uuid"]), "Not marked as duplicate."209 context.named_tasks[left] = beta210 context.named_tasks[right] = alpha211@when("I search for duplicate tasks")212def when_search_for_duplicate_tasks(context):213 store = get_store()214 context.duplicates_found = find_all_duplicate_tasks(store)215@when("I merge duplicate tasks")216def when_merge_duplicate_tasks(context):217 store = get_store()218 merge_all_duplicate_tasks(store)219@when('I check for duplicate tasks of "{name}"')220def when_merge_singular_task(context, name):221 store = get_store()222 task_record = context.named_tasks[name]223 merge_duplicate_tasks(store, task_record)224@then('task "{name}"\'s "{fieldname}" field is set to "{value}"')225def task_whatever_is_marked_as_whatnot(context, name, fieldname, value):226 store = get_store()227 old_task_data = context.named_tasks[name]228 task = store.client.filter_tasks({"uuid": old_task_data["uuid"]})[0]229 assert (230 task[fieldname] == value231 ), "Task {} '{}' does not match expectation '{}'".format(232 fieldname,233 task[fieldname],234 value,235 )236@then('task "{left}" and task "{right}" are found as duplicates')237def tasks_are_found_as_duplicates(context, left, right):238 alpha = context.named_tasks[left]239 beta = context.named_tasks[right]240 to_find = {alpha["uuid"], beta["uuid"]}241 assert to_find in context.duplicates_found, (242 "Tasks were not found in duplicates: %s" % context.duplicates_found243 )244@when('I search for duplicates of task "{name}"')245def task_search_for_duplicates_of(context, name):246 store = get_store()247 task = context.named_tasks[name]248 context.duplicate_found = find_duplicate_tasks(store, task)249@then("the task I searched for duplicates of is found to be a " 'duplicate of "{name}"')250def task_found_to_be_duplicate_of(context, name):251 task = context.named_tasks[name]252 assert context.duplicate_found == {task["uuid"]}, (253 "Appropriate duplicate was not found: %s" % context.duplicate_found...

Full Screen

Full Screen

init_test.py

Source:init_test.py Github

copy

Full Screen

...36 name='redis',37 **redis_store.kwargs,38 )39 assert isinstance(redis, ps.store.redis.RedisStore)40 assert local == ps.store.get_store('local')41 assert redis == ps.store.get_store('redis')42 ps.store._stores = {}43 # Init by enum44 local = ps.store.init_store(45 STORES.LOCAL,46 name='local',47 **local_store.kwargs,48 )49 assert isinstance(local, ps.store.local.LocalStore)50 redis = ps.store.init_store(51 STORES.REDIS,52 name='redis',53 **redis_store.kwargs,54 )55 assert isinstance(redis, ps.store.redis.RedisStore)56 assert local == ps.store.get_store('local')57 assert redis == ps.store.get_store('redis')58 # Init by class type59 local = ps.store.init_store(60 ps.store.local.LocalStore,61 name='local',62 **local_store.kwargs,63 )64 assert isinstance(local, ps.store.local.LocalStore)65 ps.store._stores = {}66 # Specify name to have multiple stores of same type67 local1 = ps.store.init_store(STORES.LOCAL, 'l1', **local_store.kwargs)68 ps.store.init_store(STORES.LOCAL, 'l2', **local_store.kwargs)69 assert ps.store.get_store('l1') is not ps.store.get_store('l2')70 # Should overwrite old store71 ps.store.init_store(STORES.LOCAL, 'l1', **local_store.kwargs)72 assert local1 is not ps.store.get_store('l1')73 # Return None if store with name does not exist74 assert ps.store.get_store('unknown') is None75def test_get_enum_by_type() -> None:76 """Test getting enum with type."""77 t = STORES.get_str_by_type(ps.store.local.LocalStore)78 assert isinstance(t, str)79 assert STORES[t].value == ps.store.local.LocalStore80 class FakeStore(ps.store.base.Store[Any]):81 """FakeStore type."""82 pass83 with pytest.raises(KeyError):84 STORES.get_str_by_type(FakeStore) # type: ignore85def test_init_store_raises() -> None:86 """Test init_store raises."""87 with pytest.raises(UnknownStoreError):88 # Raise error because name cannot be found in STORES89 ps.store.init_store('unknown', name='')90 with pytest.raises(UnknownStoreError):91 # Raises error because type is not a subclass of Store92 class TestStore:93 pass94 ps.store.init_store(TestStore, name='') # type: ignore95def test_register_unregister_store() -> None:96 """Test registering and unregistering stores directly."""97 store = LocalStore(name='test')98 ps.store.register_store(store)99 assert ps.store.get_store('test') == store100 with pytest.raises(StoreExistsError):101 ps.store.register_store(store)102 ps.store.register_store(store, exist_ok=True)103 ps.store.unregister_store(store.name)104 assert ps.store.get_store('test') is None105 # does not raise error106 ps.store.unregister_store('not a valid store name')107def test_lookup_by_proxy(local_store, redis_store) -> None:108 """Make sure get_store works with a proxy."""109 # Init by enum110 local = ps.store.init_store(111 STORES.LOCAL,112 name='local',113 **local_store.kwargs,114 )115 redis = ps.store.init_store(116 STORES.REDIS,117 name='redis',118 **redis_store.kwargs,119 )120 # Make a proxy with both121 local_proxy: Proxy[list[int]] = local.proxy([1, 2, 3])122 redis_proxy: Proxy[list[int]] = redis.proxy([1, 2, 3])123 # Make sure both look up correctly124 sr = ps.store.get_store(redis_proxy)125 assert sr is not None126 assert sr.name == redis.name127 sl = ps.store.get_store(local_proxy)128 assert sl is not None129 assert sl.name == local.name130 # Make a proxy without an associated store131 f = SimpleFactory([1, 2, 3])132 p = Proxy(f)133 with pytest.raises(ProxyStoreFactoryError):...

Full Screen

Full Screen

test_repository.py

Source:test_repository.py Github

copy

Full Screen

...8 def tearDown(self):9 flexmock_teardown()10 def test_get_returns_value_from_cache(self):11 repo = self._get_repository()12 repo.get_store().should_receive('get').once().with_args('foo').and_return('bar')13 self.assertEqual('bar', repo.get('foo'))14 def test_default_value_is_returned(self):15 repo = self._get_repository()16 repo.get_store().should_receive('get').and_return(None)17 self.assertEqual('bar', repo.get('foo', 'bar'))18 self.assertEqual('baz', repo.get('foo', lambda: 'baz'))19 def test_set_default_cache_time(self):20 repo = self._get_repository()21 repo.set_default_cache_time(10)22 self.assertEqual(10, repo.get_default_cache_time())23 def test_has_method(self):24 repo = self._get_repository()25 repo.get_store().should_receive('get').with_args('foo').and_return(None)26 repo.get_store().should_receive('get').with_args('bar').and_return('baz')27 self.assertFalse(repo.has('foo'))28 self.assertTrue(repo.has('bar'))29 def test_pull(self):30 repo = self._get_repository()31 repo.get_store().should_receive('get').with_args('foo').and_return('bar')32 repo.get_store().should_receive('forget').with_args('foo')33 self.assertEqual('bar', repo.get('foo'))34 def test_put(self):35 repo = self._get_repository()36 repo.get_store().should_receive('put').with_args('foo', 'bar', 10)37 repo.put('foo', 'bar', 10)38 def test_put_supports_datetime_as_minutes(self):39 repo = self._get_repository()40 repo.get_store().should_receive('put').with_args('foo', 'bar', 60)41 repo.put('foo', 'bar', datetime.datetime.now() + datetime.timedelta(hours=1))42 def test_put_with_minutes_to_zero_doesnt_store(self):43 repo = self._get_repository()44 repo.get_store().should_receive('put').never()45 repo.put('foo', 'bar', datetime.datetime.now() - datetime.timedelta(hours=1))46 def test_add(self):47 repo = self._get_repository()48 repo.get_store().should_receive('get').once().with_args('foo').and_return(None)49 repo.get_store().should_receive('get').once().with_args('bar').and_return('baz')50 repo.get_store().should_receive('put').once().with_args('foo', 'bar', 10)51 repo.get_store().should_receive('put').never().with_args('bar', 'baz', 10)52 self.assertTrue(repo.add('foo', 'bar', 10))53 self.assertFalse(repo.add('bar', 'baz', 10))54 def test_forever(self):55 repo = self._get_repository()56 repo.get_store().should_receive('forever').once().with_args('foo', 'bar')57 repo.forever('foo', 'bar')58 def test_remember_calls_put_and_returns_default(self):59 repo = self._get_repository()60 repo.get_store().should_receive('get').and_return(None)61 repo.get_store().should_receive('put').once().with_args('foo', 'bar', 10)62 result = repo.remember('foo', 10, lambda: 'bar')63 self.assertEqual('bar', result)64 def test_remember_forever_calls_forever_and_returns_default(self):65 repo = self._get_repository()66 repo.get_store().should_receive('get').and_return(None)67 repo.get_store().should_receive('forever').once().with_args('foo', 'bar')68 result = repo.remember_forever('foo', lambda: 'bar')69 self.assertEqual('bar', result)70 def test_repository_can_serve_as_a_decorator(self):71 repo = self._get_repository()72 repo.get_store().should_receive('get').and_return(None, 6, 6).one_by_one()73 repo.get_store().should_receive('put').once()74 calls = []75 @repo76 def test(i, m=3):77 calls.append(i)78 return i*379 test(2)80 test(2)81 test(2)82 self.assertEqual(1, len(calls))83 def test_repository_can_serve_as_a_decorator_with_key_and_minutes(self):84 repo = flexmock(self._get_repository())85 repo.should_receive('_get_key').with_args('my_key', (2,), {'m': 4}).and_return('foo')86 repo.get_store().should_receive('get').and_return(None, 6, 6).one_by_one()87 repo.get_store().should_receive('put').once()\88 .with_args('foo', 6, 35)89 calls = []90 @repo(key='my_key', minutes=35)91 def test(i, m=3):92 calls.append(i)93 return i*394 test(2, m=4)95 test(2, m=4)96 test(2, m=4)97 self.assertEqual(1, len(calls))98 def _get_repository(self):99 repo = Repository(flexmock(Store()))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful