Best Python code snippet using autotest_python
test_store.py
Source:test_store.py  
1import asyncio2import os3from pytest import mark, raises4import pytest_asyncio5from aries_askar import (6    AskarError,7    KeyAlg,8    Key,9    Store,10)11TEST_STORE_URI = os.getenv("TEST_STORE_URI", "sqlite://:memory:")12TEST_ENTRY = {13    "category": "test category",14    "name": "test name",15    "value": b"test_value",16    "tags": {"~plaintag": "a", "enctag": {"b", "c"}},17}18def raw_key() -> str:19    return Store.generate_raw_key(b"00000000000000000000000000000My1")20@pytest_asyncio.fixture21@mark.asyncio22async def store() -> Store:23    key = raw_key()24    store = await Store.provision(TEST_STORE_URI, "raw", key, recreate=True)25    yield store26    await store.close(remove=True)27@mark.asyncio28async def test_insert_update(store: Store):29    async with store as session:30        # Insert a new entry31        await session.insert(32            TEST_ENTRY["category"],33            TEST_ENTRY["name"],34            TEST_ENTRY["value"],35            TEST_ENTRY["tags"],36        )37        # Count rows by category and (optional) tag filter38        assert (39            await session.count(40                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}41            )42        ) == 143        # Fetch an entry by category and name44        found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])45        assert dict(found) == TEST_ENTRY46        # Fetch entries by category and tag filter47        found = await session.fetch_all(48            TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}49        )50        assert len(found) == 1 and dict(found[0]) == TEST_ENTRY51        # Update an entry (outside of a transaction)52        upd_entry = TEST_ENTRY.copy()53        upd_entry["value"] = b"new_value"54        upd_entry["tags"] = {"upd": "tagval"}55        await session.replace(56            TEST_ENTRY["category"],57            TEST_ENTRY["name"],58            upd_entry["value"],59            upd_entry["tags"],60        )61        found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])62        assert dict(found) == upd_entry63        # Remove entry64        await session.remove(TEST_ENTRY["category"], TEST_ENTRY["name"])65        found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])66        assert found is None67@mark.asyncio68async def test_remove_all(store: Store):69    async with store as session:70        # Insert a new entry71        await session.insert(72            TEST_ENTRY["category"],73            TEST_ENTRY["name"],74            TEST_ENTRY["value"],75            TEST_ENTRY["tags"],76        )77        # Remove using remove_all78        await session.remove_all(79            TEST_ENTRY["category"],80            # note: this query syntax is optional81            {"~plaintag": "a", "$and": [{"enctag": "b"}, {"enctag": "c"}]},82        ),83        # Check removed84        found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])85        assert found is None86@mark.asyncio87async def test_scan(store: Store):88    async with store as session:89        await session.insert(90            TEST_ENTRY["category"],91            TEST_ENTRY["name"],92            TEST_ENTRY["value"],93            TEST_ENTRY["tags"],94        )95    # Scan entries by category and (optional) tag filter)96    rows = await store.scan(97        TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}98    ).fetch_all()99    assert len(rows) == 1 and dict(rows[0]) == TEST_ENTRY100@mark.asyncio101async def test_txn_basic(store: Store):102    async with store.transaction() as txn:103        # Insert a new entry104        await txn.insert(105            TEST_ENTRY["category"],106            TEST_ENTRY["name"],107            TEST_ENTRY["value"],108            TEST_ENTRY["tags"],109        )110        # Count rows by category and (optional) tag filter111        assert (112            await txn.count(TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"})113        ) == 1114        # Fetch an entry by category and name115        found = await txn.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])116        assert dict(found) == TEST_ENTRY117        # Fetch entries by category and tag filter118        found = await txn.fetch_all(119            TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}120        )121        assert len(found) == 1 and dict(found[0]) == TEST_ENTRY122        await txn.commit()123    # Check the transaction was committed124    async with store.session() as session:125        found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])126        assert dict(found) == TEST_ENTRY127@mark.asyncio128async def test_txn_contention(store: Store):129    async with store.transaction() as txn:130        await txn.insert(131            TEST_ENTRY["category"],132            TEST_ENTRY["name"],133            "0",134        )135        await txn.commit()136    INC_COUNT = 1000137    TASKS = 10138    async def inc():139        for _ in range(INC_COUNT):140            async with store.transaction() as txn:141                row = await txn.fetch(142                    TEST_ENTRY["category"], TEST_ENTRY["name"], for_update=True143                )144                if not row:145                    raise Exception("Row not found")146                new_value = str(int(row.value) + 1)147                await txn.replace(TEST_ENTRY["category"], TEST_ENTRY["name"], new_value)148                await txn.commit()149    tasks = [asyncio.create_task(inc()) for _ in range(TASKS)]150    await asyncio.gather(*tasks)151    # Check all the updates completed152    async with store.session() as session:153        result = await session.fetch(154            TEST_ENTRY["category"],155            TEST_ENTRY["name"],156        )157        assert int(result.value) == INC_COUNT * TASKS158@mark.asyncio159async def test_key_store(store: Store):160    # test key operations in a new session161    async with store as session:162        # Create a new keypair163        keypair = Key.generate(KeyAlg.ED25519)164        # Store keypair165        key_name = "testkey"166        await session.insert_key(167            key_name, keypair, metadata="metadata", tags={"a": "b"}168        )169        # Fetch keypair170        fetch_key = await session.fetch_key(key_name)171        assert fetch_key and fetch_key.name == key_name and fetch_key.tags == {"a": "b"}172        # Update keypair173        await session.update_key(key_name, metadata="updated metadata", tags={"a": "c"})174        # Fetch keypair175        fetch_key = await session.fetch_key(key_name)176        assert fetch_key and fetch_key.name == key_name and fetch_key.tags == {"a": "c"}177        # Check key equality178        thumbprint = keypair.get_jwk_thumbprint()179        assert fetch_key.key.get_jwk_thumbprint() == thumbprint180        # Fetch with filters181        keys = await session.fetch_all_keys(182            alg=KeyAlg.ED25519, thumbprint=thumbprint, tag_filter={"a": "c"}, limit=1183        )184        assert len(keys) == 1 and keys[0].name == key_name185        # Remove186        await session.remove_key(key_name)187        assert await session.fetch_key(key_name) is None188@mark.asyncio189async def test_profile(store: Store):190    # New session in the default profile191    async with store as session:192        # Insert a new entry193        await session.insert(194            TEST_ENTRY["category"],195            TEST_ENTRY["name"],196            TEST_ENTRY["value"],197            TEST_ENTRY["tags"],198        )199    profile = await store.create_profile()200    async with store.session(profile) as session:201        # Should not find previously stored record202        assert (203            await session.count(204                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}205            )206        ) == 0207        # Insert a new entry208        await session.insert(209            TEST_ENTRY["category"],210            TEST_ENTRY["name"],211            TEST_ENTRY["value"],212            TEST_ENTRY["tags"],213        )214        assert (215            await session.count(216                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}217            )218        ) == 1219    if ":memory:" not in TEST_STORE_URI:220        # Test accessing profile after re-opening221        key = raw_key()222        store_2 = await Store.open(TEST_STORE_URI, "raw", key)223        async with store_2.session(profile) as session:224            # Should not find previously stored record225            assert (226                await session.count(227                    TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}228                )229            ) == 1230        await store_2.close()231    with raises(AskarError, match="Duplicate"):232        _ = await store.create_profile(profile)233    # check profile is still usable234    async with store.session(profile) as session:235        assert (236            await session.count(237                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}238            )239        ) == 1240    await store.remove_profile(profile)241    # profile key is cached242    async with store.session(profile) as session:243        assert (244            await session.count(245                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}246            )247        ) == 0248    with raises(AskarError, match="not found"):249        async with store.session("unknown profile") as session:250            await session.count(251                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}252            )253    await store.create_profile(profile)254    async with store.session(profile) as session:255        assert (256            await session.count(257                TEST_ENTRY["category"], {"~plaintag": "a", "enctag": "b"}258            )...test_entry.py
Source:test_entry.py  
1from nexus_constructor.common_attrs import NX_USER, CommonAttrs2from nexus_constructor.model.entry import (3    EXP_ID_PLACEHOLDER_VALUE,4    NEXUS_EXP_ID_NAME,5    NEXUS_TITLE_NAME,6    TITLE_PLACEHOLDER_VALUE,7    USERS_PLACEHOLDER,8    Entry,9)10def find_child_dataset_by_name(dictionary, name):11    for child in dictionary["children"]:12        if child.get("module", "") == "dataset" and child["config"]["name"] == name:13            return child14    return None15def extract_based_on_nx_class(dictionary, nx_class):16    result = []17    for child in dictionary["children"]:18        if "attributes" in child:19            for attribute in child["attributes"]:20                name = attribute.get("name", "")21                if name == CommonAttrs.NX_CLASS:22                    if attribute.get("values", "") == nx_class:23                        result.append(child)24    return result25def assert_matching_datasets_exist(dictionary, expected_values):26    for name, value in expected_values.items():27        ds = find_child_dataset_by_name(dictionary, name)28        if not ds["config"]["values"] == value:29            assert (30                False31            ), f"Could not find expected_value ({value}) for dataset name ({name})"32    assert True33def test_proposal_id_is_initially_empty():34    test_entry = Entry()35    assert test_entry.proposal_id == ("", False)36def test_proposal_id_is_set_to_use_placeholder():37    test_entry = Entry()38    test_entry.proposal_id = ("will be ignored", True)39    assert test_entry.proposal_id == (EXP_ID_PLACEHOLDER_VALUE, True)40def test_proposal_id_is_set_to_custom_value():41    test_entry = Entry()42    test_entry.proposal_id = ("MY_PROP_ID", False)43    assert test_entry.proposal_id == ("MY_PROP_ID", False)44def test_blank_proposal_id_is_not_in_dictionary():45    test_entry = Entry()46    dictionary = test_entry.as_dict([])47    assert find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME) is None48def test_blank_proposal_id_is_not_in_dictionary_after_clearing():49    test_entry = Entry()50    test_entry.proposal_id = ("MY_PROP_ID", False)51    test_entry.proposal_id = ("", False)52    dictionary = test_entry.as_dict([])53    assert find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME) is None54def test_defined_proposal_id_is_in_dictionary():55    test_entry = Entry()56    test_entry.proposal_id = ("MY_PROP_ID", False)57    dictionary = test_entry.as_dict([])58    result = find_child_dataset_by_name(dictionary, NEXUS_EXP_ID_NAME)59    assert result is not None60    assert result["config"]["values"] == "MY_PROP_ID"61def test_title_is_initially_empty():62    test_entry = Entry()63    assert test_entry.title == ("", False)64def test_title_is_set_to_use_placeholder():65    test_entry = Entry()66    test_entry.title = ("will be ignored", True)67    assert test_entry.title == (TITLE_PLACEHOLDER_VALUE, True)68def test_title_is_set_to_custom_value():69    test_entry = Entry()70    test_entry.title = ("MY_TITLE", False)71    assert test_entry.title == ("MY_TITLE", False)72def test_blank_title_is_not_in_dictionary():73    test_entry = Entry()74    dictionary = test_entry.as_dict([])75    assert find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME) is None76def test_blank_title_is_not_in_dictionary_after_clearing():77    test_entry = Entry()78    test_entry.title = ("MY_TITLE", False)79    test_entry.title = ("", False)80    dictionary = test_entry.as_dict([])81    assert find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME) is None82def test_defined_title_is_in_dictionary():83    test_entry = Entry()84    test_entry.title = ("MY_TITLE", False)85    dictionary = test_entry.as_dict([])86    result = find_child_dataset_by_name(dictionary, NEXUS_TITLE_NAME)87    assert result is not None88    assert result["config"]["values"] == "MY_TITLE"89def test_users_is_initially_empty():90    test_entry = Entry()91    assert len(test_entry.users) == 092def test_users_can_be_edited_using_simple_dict_representation():93    user_john = {94        "name": "John Smith",95        "email": "js@ess.eu",96        "facility_user_id": "js90",97        "affiliation": "ESS",98    }99    test_entry = Entry()100    test_entry.users = [user_john]101    assert test_entry.users == [user_john]102def test_users_are_in_dictionary():103    user_john = {104        "name": "John Smith",105        "email": "js@ess.eu",106        "facility_user_id": "js90",107        "affiliation": "ESS",108    }109    user_betty = {110        "name": "Betty Boo",111        "email": "bb@doing.the.do",112        "facility_user_id": "bb70",113        "affiliation": "She Rockers",114    }115    test_entry = Entry()116    test_entry.users = [user_john, user_betty]117    dictionary = test_entry.as_dict([])118    result = extract_based_on_nx_class(dictionary, NX_USER)119    assert_matching_datasets_exist(result[0], user_john)120    assert_matching_datasets_exist(result[1], user_betty)121def test_if_placeholder_used_then_users_replaced_by_placeholder():122    user_john = {123        "name": "John Smith",124        "email": "js@ess.eu",125        "facility_user_id": "js90",126        "affiliation": "ESS",127    }128    test_entry = Entry()129    test_entry.users = [user_john]130    test_entry.users_placeholder = True131    dictionary = test_entry.as_dict([])132    assert len(extract_based_on_nx_class(dictionary, NX_USER)) == 0...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
