How to use test_entry method in autotest

Best Python code snippet using autotest_python

test_store.py

Source:test_store.py Github

copy

Full Screen

1import asyncio2import os3from pytest import mark, raises4import pytest_asyncio5from aries_askar import (6 AskarError,7 KeyAlg,8 Key,9 Store,10)11TEST_STORE_URI = os.getenv("TEST_STORE_URI", "sqlite://:memory:")12TEST_ENTRY = {13 "category": "test category",14 "name": "test name",15 "value": b"test_value",16 "tags": {"~plaintag": "a", "enctag": {"b", "c"}},17}18def raw_key() -> str:19 return Store.generate_raw_key(b"00000000000000000000000000000My1")20@pytest_asyncio.fixture21@mark.asyncio22async def store() -> Store:23 key = raw_key()24 store = await Store.provision(TEST_STORE_URI, "raw", key, recreate=True)25 yield store26 await store.close(remove=True)27@mark.asyncio28async def test_insert_update(store: Store):29 async with store as session:30 # Insert a new entry31 await session.insert(32 TEST_ENTRY["category"],33 TEST_ENTRY["name"],34 TEST_ENTRY["value"],35 TEST_ENTRY["tags"],36 )37 # Count rows by category and (optional) tag filter38 assert (39 await session.count(40 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}41 )42 ) == 143 # Fetch an entry by category and name44 found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])45 assert dict(found) == TEST_ENTRY46 # Fetch entries by category and tag filter47 found = await session.fetch_all(48 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}49 )50 assert len(found) == 1 and dict(found[0]) == TEST_ENTRY51 # Update an entry (outside of a transaction)52 upd_entry = TEST_ENTRY.copy()53 upd_entry["value"] = b"new_value"54 upd_entry["tags"] = {"upd": "tagval"}55 await session.replace(56 TEST_ENTRY["category"],57 TEST_ENTRY["name"],58 upd_entry["value"],59 upd_entry["tags"],60 )61 found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])62 assert dict(found) == upd_entry63 # Remove entry64 await session.remove(TEST_ENTRY["category"], TEST_ENTRY["name"])65 found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])66 assert found is None67@mark.asyncio68async def test_remove_all(store: Store):69 async with store as session:70 # Insert a new entry71 await session.insert(72 TEST_ENTRY["category"],73 TEST_ENTRY["name"],74 TEST_ENTRY["value"],75 TEST_ENTRY["tags"],76 )77 # Remove using remove_all78 await session.remove_all(79 TEST_ENTRY["category"],80 # note: this query syntax is optional81 {"~plaintag": "a", "$and": [{"enctag": "b"}, {"enctag": "c"}]},82 ),83 # Check removed84 found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])85 assert found is None86@mark.asyncio87async def test_scan(store: Store):88 async with store as session:89 await session.insert(90 TEST_ENTRY["category"],91 TEST_ENTRY["name"],92 TEST_ENTRY["value"],93 TEST_ENTRY["tags"],94 )95 # Scan entries by category and (optional) tag filter)96 rows = await store.scan(97 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}98 ).fetch_all()99 assert len(rows) == 1 and dict(rows[0]) == TEST_ENTRY100@mark.asyncio101async def test_txn_basic(store: Store):102 async with store.transaction() as txn:103 # Insert a new entry104 await txn.insert(105 TEST_ENTRY["category"],106 TEST_ENTRY["name"],107 TEST_ENTRY["value"],108 TEST_ENTRY["tags"],109 )110 # Count rows by category and (optional) tag filter111 assert (112 await txn.count(TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"})113 ) == 1114 # Fetch an entry by category and name115 found = await txn.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])116 assert dict(found) == TEST_ENTRY117 # Fetch entries by category and tag filter118 found = await txn.fetch_all(119 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}120 )121 assert len(found) == 1 and dict(found[0]) == TEST_ENTRY122 await txn.commit()123 # Check the transaction was committed124 async with store.session() as session:125 found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])126 assert dict(found) == TEST_ENTRY127@mark.asyncio128async def test_txn_contention(store: Store):129 async with store.transaction() as txn:130 await txn.insert(131 TEST_ENTRY["category"],132 TEST_ENTRY["name"],133 "0",134 )135 await txn.commit()136 INC_COUNT = 1000137 TASKS = 10138 async def inc():139 for _ in range(INC_COUNT):140 async with store.transaction() as txn:141 row = await txn.fetch(142 TEST_ENTRY["category"], TEST_ENTRY["name"], for_update=True143 )144 if not row:145 raise Exception("Row not found")146 new_value = str(int(row.value) + 1)147 await txn.replace(TEST_ENTRY["category"], TEST_ENTRY["name"], new_value)148 await txn.commit()149 tasks = [asyncio.create_task(inc()) for _ in range(TASKS)]150 await asyncio.gather(*tasks)151 # Check all the updates completed152 async with store.session() as session:153 result = await session.fetch(154 TEST_ENTRY["category"],155 TEST_ENTRY["name"],156 )157 assert int(result.value) == INC_COUNT * TASKS158@mark.asyncio159async def test_key_store(store: Store):160 # test key operations in a new session161 async with store as session:162 # Create a new keypair163 keypair = Key.generate(KeyAlg.ED25519)164 # Store keypair165 key_name = "testkey"166 await session.insert_key(167 key_name, keypair, metadata="metadata", tags={"a": "b"}168 )169 # Fetch keypair170 fetch_key = await session.fetch_key(key_name)171 assert fetch_key and fetch_key.name == key_name and fetch_key.tags == {"a": "b"}172 # Update keypair173 await session.update_key(key_name, metadata="updated metadata", tags={"a": "c"})174 # Fetch keypair175 fetch_key = await session.fetch_key(key_name)176 assert fetch_key and fetch_key.name == key_name and fetch_key.tags == {"a": "c"}177 # Check key equality178 thumbprint = keypair.get_jwk_thumbprint()179 assert fetch_key.key.get_jwk_thumbprint() == thumbprint180 # Fetch with filters181 keys = await session.fetch_all_keys(182 alg=KeyAlg.ED25519, thumbprint=thumbprint, tag_filter={"a": "c"}, limit=1183 )184 assert len(keys) == 1 and keys[0].name == key_name185 # Remove186 await session.remove_key(key_name)187 assert await session.fetch_key(key_name) is None188@mark.asyncio189async def test_profile(store: Store):190 # New session in the default profile191 async with store as session:192 # Insert a new entry193 await session.insert(194 TEST_ENTRY["category"],195 TEST_ENTRY["name"],196 TEST_ENTRY["value"],197 TEST_ENTRY["tags"],198 )199 profile = await store.create_profile()200 async with store.session(profile) as session:201 # Should not find previously stored record202 assert (203 await session.count(204 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}205 )206 ) == 0207 # Insert a new entry208 await session.insert(209 TEST_ENTRY["category"],210 TEST_ENTRY["name"],211 TEST_ENTRY["value"],212 TEST_ENTRY["tags"],213 )214 assert (215 await session.count(216 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}217 )218 ) == 1219 if ":memory:" not in TEST_STORE_URI:220 # Test accessing profile after re-opening221 key = raw_key()222 store_2 = await Store.open(TEST_STORE_URI, "raw", key)223 async with store_2.session(profile) as session:224 # Should not find previously stored record225 assert (226 await session.count(227 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}228 )229 ) == 1230 await store_2.close()231 with raises(AskarError, match="Duplicate"):232 _ = await store.create_profile(profile)233 # check profile is still usable234 async with store.session(profile) as session:235 assert (236 await session.count(237 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}238 )239 ) == 1240 await store.remove_profile(profile)241 # profile key is cached242 async with store.session(profile) as session:243 assert (244 await session.count(245 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}246 )247 ) == 0248 with raises(AskarError, match="not found"):249 async with store.session("unknown profile") as session:250 await session.count(251 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}252 )253 await store.create_profile(profile)254 async with store.session(profile) as session:255 assert (256 await session.count(257 TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}258 )...

Full Screen

Full Screen

test_entry.py

Source:test_entry.py Github

copy

Full Screen

1from nexus_constructor.common_attrs import NX_USER, CommonAttrs2from nexus_constructor.model.entry import (3 EXP_ID_PLACEHOLDER_VALUE,4 NEXUS_EXP_ID_NAME,5 NEXUS_TITLE_NAME,6 TITLE_PLACEHOLDER_VALUE,7 USERS_PLACEHOLDER,8 Entry,9)10def find_child_dataset_by_name(dictionary, name):11 for child in dictionary["children"]:12 if child.get("module", "") == "dataset" and child["config"]["name"] == name:13 return child14 return None15def extract_based_on_nx_class(dictionary, nx_class):16 result = []17 for child in dictionary["children"]:18 if "attributes" in child:19 for attribute in child["attributes"]:20 name = attribute.get("name", "")21 if name == CommonAttrs.NX_CLASS:22 if attribute.get("values", "") == nx_class:23 result.append(child)24 return result25def assert_matching_datasets_exist(dictionary, expected_values):26 for name, value in expected_values.items():27 ds = find_child_dataset_by_name(dictionary, name)28 if not ds["config"]["values"] == value:29 assert (30 False31 ), f"Could not find expected_value ({value}) for dataset name ({name})"32 assert True33def test_proposal_id_is_initially_empty():34 test_entry = Entry()35 assert test_entry.proposal_id == ("", False)36def test_proposal_id_is_set_to_use_placeholder():37 test_entry = Entry()38 test_entry.proposal_id = ("will be ignored", True)39 assert test_entry.proposal_id == (EXP_ID_PLACEHOLDER_VALUE, True)40def test_proposal_id_is_set_to_custom_value():41 test_entry = Entry()42 test_entry.proposal_id = ("MY_PROP_ID", False)43 assert test_entry.proposal_id == ("MY_PROP_ID", False)44def test_blank_proposal_id_is_not_in_dictionary():45 test_entry = Entry()46 dictionary = test_entry.as_dict([])47 assert find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME) is None48def test_blank_proposal_id_is_not_in_dictionary_after_clearing():49 test_entry = Entry()50 test_entry.proposal_id = ("MY_PROP_ID", False)51 test_entry.proposal_id = ("", False)52 dictionary = test_entry.as_dict([])53 assert find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME) is None54def test_defined_proposal_id_is_in_dictionary():55 test_entry = Entry()56 test_entry.proposal_id = ("MY_PROP_ID", False)57 dictionary = test_entry.as_dict([])58 result = find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME)59 assert result is not None60 assert result["config"]["values"] == "MY_PROP_ID"61def test_title_is_initially_empty():62 test_entry = Entry()63 assert test_entry.title == ("", False)64def test_title_is_set_to_use_placeholder():65 test_entry = Entry()66 test_entry.title = ("will be ignored", True)67 assert test_entry.title == (TITLE_PLACEHOLDER_VALUE, True)68def test_title_is_set_to_custom_value():69 test_entry = Entry()70 test_entry.title = ("MY_TITLE", False)71 assert test_entry.title == ("MY_TITLE", False)72def test_blank_title_is_not_in_dictionary():73 test_entry = Entry()74 dictionary = test_entry.as_dict([])75 assert find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME) is None76def test_blank_title_is_not_in_dictionary_after_clearing():77 test_entry = Entry()78 test_entry.title = ("MY_TITLE", False)79 test_entry.title = ("", False)80 dictionary = test_entry.as_dict([])81 assert find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME) is None82def test_defined_title_is_in_dictionary():83 test_entry = Entry()84 test_entry.title = ("MY_TITLE", False)85 dictionary = test_entry.as_dict([])86 result = find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME)87 assert result is not None88 assert result["config"]["values"] == "MY_TITLE"89def test_users_is_initially_empty():90 test_entry = Entry()91 assert len(test_entry.users) == 092def test_users_can_be_edited_using_simple_dict_representation():93 user_john = {94 "name": "John Smith",95 "email": "js@ess.eu",96 "facility_user_id": "js90",97 "affiliation": "ESS",98 }99 test_entry = Entry()100 test_entry.users = [user_john]101 assert test_entry.users == [user_john]102def test_users_are_in_dictionary():103 user_john = {104 "name": "John Smith",105 "email": "js@ess.eu",106 "facility_user_id": "js90",107 "affiliation": "ESS",108 }109 user_betty = {110 "name": "Betty Boo",111 "email": "bb@doing.the.do",112 "facility_user_id": "bb70",113 "affiliation": "She Rockers",114 }115 test_entry = Entry()116 test_entry.users = [user_john, user_betty]117 dictionary = test_entry.as_dict([])118 result = extract_based_on_nx_class(dictionary, NX_USER)119 assert_matching_datasets_exist(result[0], user_john)120 assert_matching_datasets_exist(result[1], user_betty)121def test_if_placeholder_used_then_users_replaced_by_placeholder():122 user_john = {123 "name": "John Smith",124 "email": "js@ess.eu",125 "facility_user_id": "js90",126 "affiliation": "ESS",127 }128 test_entry = Entry()129 test_entry.users = [user_john]130 test_entry.users_placeholder = True131 dictionary = test_entry.as_dict([])132 assert len(extract_based_on_nx_class(dictionary, NX_USER)) == 0...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful