Best Python code snippet using pyatom_python
test_ShareDB.py
Source:test_ShareDB.py  
...348    with pytest.raises(TypeError) as error:349        myDB.remove(None)350    clean_myDB_resources(myDB, key_val_dict, non_key_set)351@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])352def test_random_multiremove(total):353    '''354    Test random multiremove and related Exception.355    '''356    myDB, key_val_dict, non_key_set = get_myDB_resources(total)357    # Successful random multiremove358    while len(myDB):359        factor   = random.randint(0, len(myDB))360        prev_len = len(myDB)361        myDB.multiremove(key_iter=(key_val_dict.popitem()[0] for _ in range(factor)))362        assert len(myDB) == prev_len - factor363    myDB.multiremove(non_key_set)364    # multiremove that raises Exception365    with pytest.raises(Exception) as error:366        myDB.multiremove([None]*factor)367    clean_myDB_resources(myDB, key_val_dict, non_key_set)368@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])369def test_random_pop(total):370    '''371    Test random pop and related Exceptions.372    '''373    myDB, key_val_dict, non_key_set = get_myDB_resources(total)374    # Successful random pop375    for key in key_val_dict:376        val = myDB.pop(key)377        assert key not in myDB378        assert key_val_dict[key] == val379    # pops that raise KeyError380    for key in key_val_dict:381        with pytest.raises(KeyError) as error:382            myDB.pop(key)383    for non_key in non_key_set:384        with pytest.raises(KeyError) as error:385            myDB.pop(non_key)386    clean_myDB_resources(myDB, key_val_dict, non_key_set)387@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])388def test_random_multipop(total):389    '''390    Test random multipop.391    '''392    myDB, key_val_dict, non_key_set = get_myDB_resources(total)393    # Successful random multipop394    while len(key_val_dict):395        factor   = random.randint(0, len(myDB))396        prev_len = len(myDB)397        keys_to_pop = list(key_val_dict.keys())[:factor]398        popped_vals = list(myDB.multipop(key_iter=keys_to_pop))399        assert len(myDB) == prev_len - factor400        assert [key_val_dict[key] for key in keys_to_pop] == popped_vals401        for key in keys_to_pop:402            key_val_dict.pop(key)403    # multipop that raises Exception404    with pytest.raises(Exception) as error:405        next(myDB.multipop(key_iter=non_key_set))406    clean_myDB_resources(myDB, key_val_dict, non_key_set)407@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])408def test_random_items(total):409    '''410    Test random items.411    '''412    myDB, key_val_dict, non_key_set = get_myDB_resources(total)413    myDB_items   = set(myDB.items())414    mydict_items = set(key_val_dict.items())415    assert myDB_items == mydict_items416    clean_myDB_resources(myDB, key_val_dict, non_key_set)417@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])418def test_random_keys(total):419    '''420    Test random keys.421    '''422    myDB, key_val_dict, non_key_set = get_myDB_resources(total)423    myDB_keys   = set(myDB.keys())424    mydict_keys = set(key_val_dict.keys())425    assert myDB_keys == mydict_keys426    clean_myDB_resources(myDB, key_val_dict, non_key_set)427@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])428def test_random_iters(total):429    '''430    Test random iters.431    '''432    myDB, key_val_dict, non_key_set = get_myDB_resources(total)433    selected_keys = list(key_val_dict.keys())[:random.randint(0, total//10)]434    for key in selected_keys:435        keys = iter(myDB)436        assert key in keys437    clean_myDB_resources(myDB, key_val_dict, non_key_set)438@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])439def test_random_values(total):440    '''441    Test random values.442    '''443    myDB, key_val_dict, non_key_set = get_myDB_resources(total)444    myDB_vals   = set(myDB.values())445    mydict_vals = set(key_val_dict.values())446    assert myDB_vals == mydict_vals447    clean_myDB_resources(myDB, key_val_dict, non_key_set)448@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])449def test_random_popitem(total):450    '''451    Test random popitem.452    '''453    myDB, key_val_dict, non_key_set = get_myDB_resources(total)454    while key_val_dict:455        popped_key, popped_val = myDB.popitem()456        assert key_val_dict[popped_key] == popped_val457        key_val_dict.pop(popped_key)458    clean_myDB_resources(myDB, key_val_dict, non_key_set)459@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])460def test_random_multipopitem(total):461    '''462    Test random multipopitem.463    '''464    myDB, key_val_dict, non_key_set = get_myDB_resources(total)465    while len(myDB):466        num_items = random.randint(0, len(myDB)*2)467        prev_len  = len(myDB)468        popped_items = myDB.multipopitem(num_items=num_items)469        for popped_key,popped_val in popped_items:470            assert key_val_dict[popped_key] == popped_val471        assert len(myDB) == max(0, prev_len - num_items)472    # multiremove that raises Exception473    with pytest.raises(Exception) as error:474        myDB.multiremove([None]*factor)475    clean_myDB_resources(myDB, key_val_dict, non_key_set)476@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])477def test_random_clear(total):478    '''479    Test random clear.480    '''481    myDB, key_val_dict, non_key_set = get_myDB_resources(total)482    myDB.clear()483    assert len(myDB) == 0484    clean_myDB_resources(myDB, key_val_dict, non_key_set)485if __name__ == '__main__':...remove_pb2_grpc.py
Source:remove_pb2_grpc.py  
1# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!2"""Client and server classes corresponding to protobuf-defined services."""3import grpc4from vald.v1.payload import payload_pb2 as vald_dot_v1_dot_payload_dot_payload__pb25class RemoveStub(object):6    """Remove service provides ways to remove indexed vectors.7    """8    def __init__(self, channel):9        """Constructor.10        Args:11            channel: A grpc.Channel.12        """13        self.Remove = channel.unary_unary(14                '/vald.v1.Remove/Remove',15                request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,16                response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.FromString,17                )18        self.StreamRemove = channel.stream_stream(19                '/vald.v1.Remove/StreamRemove',20                request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,21                response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.FromString,22                )23        self.MultiRemove = channel.unary_unary(24                '/vald.v1.Remove/MultiRemove',25                request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.SerializeToString,26                response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.FromString,27                )28class RemoveServicer(object):29    """Remove service provides ways to remove indexed vectors.30    """31    def Remove(self, request, context):32        """A method to remove an indexed vector.33        """34        context.set_code(grpc.StatusCode.UNIMPLEMENTED)35        context.set_details('Method not implemented!')36        raise NotImplementedError('Method not implemented!')37    def StreamRemove(self, request_iterator, context):38        """A method to remove multiple indexed vectors by bidirectional streaming.39        """40        context.set_code(grpc.StatusCode.UNIMPLEMENTED)41        context.set_details('Method not implemented!')42        raise NotImplementedError('Method not implemented!')43    def MultiRemove(self, request, context):44        """A method to remove multiple indexed vectors in a single request.45        """46        context.set_code(grpc.StatusCode.UNIMPLEMENTED)47        context.set_details('Method not implemented!')48        raise NotImplementedError('Method not implemented!')49def add_RemoveServicer_to_server(servicer, server):50    rpc_method_handlers = {51            'Remove': grpc.unary_unary_rpc_method_handler(52                    servicer.Remove,53                    request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.FromString,54                    response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.SerializeToString,55            ),56            'StreamRemove': grpc.stream_stream_rpc_method_handler(57                    servicer.StreamRemove,58                    request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.FromString,59                    response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.SerializeToString,60            ),61            'MultiRemove': grpc.unary_unary_rpc_method_handler(62                    servicer.MultiRemove,63                    request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.FromString,64                    response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.SerializeToString,65            ),66    }67    generic_handler = grpc.method_handlers_generic_handler(68            'vald.v1.Remove', rpc_method_handlers)69    server.add_generic_rpc_handlers((generic_handler,))70 # This class is part of an EXPERIMENTAL API.71class Remove(object):72    """Remove service provides ways to remove indexed vectors.73    """74    @staticmethod75    def Remove(request,76            target,77            options=(),78            channel_credentials=None,79            call_credentials=None,80            insecure=False,81            compression=None,82            wait_for_ready=None,83            timeout=None,84            metadata=None):85        return grpc.experimental.unary_unary(request, target, '/vald.v1.Remove/Remove',86            vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,87            vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.FromString,88            options, channel_credentials,89            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)90    @staticmethod91    def StreamRemove(request_iterator,92            target,93            options=(),94            channel_credentials=None,95            call_credentials=None,96            insecure=False,97            compression=None,98            wait_for_ready=None,99            timeout=None,100            metadata=None):101        return grpc.experimental.stream_stream(request_iterator, target, '/vald.v1.Remove/StreamRemove',102            vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,103            vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.FromString,104            options, channel_credentials,105            insecure, call_credentials, compression, wait_for_ready, timeout, metadata)106    @staticmethod107    def MultiRemove(request,108            target,109            options=(),110            channel_credentials=None,111            call_credentials=None,112            insecure=False,113            compression=None,114            wait_for_ready=None,115            timeout=None,116            metadata=None):117        return grpc.experimental.unary_unary(request, target, '/vald.v1.Remove/MultiRemove',118            vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.SerializeToString,119            vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.FromString,120            options, channel_credentials,...driver.py
Source:driver.py  
...17        for k, v in zip(keys, vals):18            self._put(conn, k, v)19    def _remove(self, conn, key):20        raise NotImplementedError()21    def _multiremove(self, conn, keys):22        for key in keys:23            self._remove(conn, key)24    def _exists(self, conn, key):25        raise NotImplementedError()26    def _multiexists(self, conn, keys):27        return [28            self._exists(conn, key)29            for key in keys30        ]31    def _keys(self, conn):32        raise NotImplementedError()33    def get(self, key):34        """35        Get value from the store.36        :param key: Associated key37        :type key: str38        :returns: Value39        :raises KeyError: if key does not exist40        """41        return self._get(self.conn, key)42    def multiget(self, keys):43        """44        Get values from the store.45        :param keys: Associated keys46        :type keys: list47        :returns: Values48        :raises KeyError: if one key does not exist49        """50        return self._multiget(self.conn, keys)51    def put(self, key, val):52        """53        Set value in the store.54        :param key: Associated key55        :type key: str56        :param val: Value to set57        :type val: any58        """59        self._put(self.conn, key, val)60    def multiput(self, keys, vals):61        """62        Set values in the store.63        :param keys: Associated keys64        :type keys: list65        :param vals: Values to set66        :type val: any67        """68        self._multiput(self.conn, keys, vals)69    def remove(self, key):70        """71        Remove value from the store.72        :param key: Associated key73        :type key: str74        :raises KeyError: if key does not exist75        """76        self._remove(self.conn, key)77    def multiremove(self, keys):78        """79        Remove values from the store.80        :param keys: Associated keys81        :type keys: list82        :raises KeyError: if one key does not exist83        """84        self._multiremove(self.conn, keys)85    def exists(self, key):86        """87        Check if key exists in store.88        :param key: Key to check for89        :type key: str90        :rtype: bool91        """92        return self._exists(self.conn, key)93    def multiexists(self, keys):94        """95        Check if keys exists in store.96        :param keys: Keys to check for97        :type keys: list98        :rtype: list...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
