How to use multiremove method in pyatom

Best Python code snippet using pyatom_python

test_ShareDB.py

Source:test_ShareDB.py Github

copy

Full Screen

...348 with pytest.raises(TypeError) as error:349 myDB.remove(None)350 clean_myDB_resources(myDB, key_val_dict, non_key_set)351@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])352def test_random_multiremove(total):353 '''354 Test random multiremove and related Exception.355 '''356 myDB, key_val_dict, non_key_set = get_myDB_resources(total)357 # Successful random multiremove358 while len(myDB):359 factor = random.randint(0, len(myDB))360 prev_len = len(myDB)361 myDB.multiremove(key_iter=(key_val_dict.popitem()[0] for _ in range(factor)))362 assert len(myDB) == prev_len - factor363 myDB.multiremove(non_key_set)364 # multiremove that raises Exception365 with pytest.raises(Exception) as error:366 myDB.multiremove([None]*factor)367 clean_myDB_resources(myDB, key_val_dict, non_key_set)368@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])369def test_random_pop(total):370 '''371 Test random pop and related Exceptions.372 '''373 myDB, key_val_dict, non_key_set = get_myDB_resources(total)374 # Successful random pop375 for key in key_val_dict:376 val = myDB.pop(key)377 assert key not in myDB378 assert key_val_dict[key] == val379 # pops that raise KeyError380 for key in key_val_dict:381 with pytest.raises(KeyError) as error:382 myDB.pop(key)383 for non_key in non_key_set:384 with pytest.raises(KeyError) as error:385 myDB.pop(non_key)386 clean_myDB_resources(myDB, key_val_dict, non_key_set)387@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])388def test_random_multipop(total):389 '''390 Test random multipop.391 '''392 myDB, key_val_dict, non_key_set = get_myDB_resources(total)393 # Successful random multipop394 while len(key_val_dict):395 factor = random.randint(0, len(myDB))396 prev_len = len(myDB)397 keys_to_pop = list(key_val_dict.keys())[:factor]398 popped_vals = list(myDB.multipop(key_iter=keys_to_pop))399 assert len(myDB) == prev_len - factor400 assert [key_val_dict[key] for key in keys_to_pop] == popped_vals401 for key in keys_to_pop:402 key_val_dict.pop(key)403 # multipop that raises Exception404 with pytest.raises(Exception) as error:405 next(myDB.multipop(key_iter=non_key_set))406 clean_myDB_resources(myDB, key_val_dict, non_key_set)407@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])408def test_random_items(total):409 '''410 Test random items.411 '''412 myDB, key_val_dict, non_key_set = get_myDB_resources(total)413 myDB_items = set(myDB.items())414 mydict_items = set(key_val_dict.items())415 assert myDB_items == mydict_items416 clean_myDB_resources(myDB, key_val_dict, non_key_set)417@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])418def test_random_keys(total):419 '''420 Test random keys.421 '''422 myDB, key_val_dict, non_key_set = get_myDB_resources(total)423 myDB_keys = set(myDB.keys())424 mydict_keys = set(key_val_dict.keys())425 assert myDB_keys == mydict_keys426 clean_myDB_resources(myDB, key_val_dict, non_key_set)427@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])428def test_random_iters(total):429 '''430 Test random iters.431 '''432 myDB, key_val_dict, non_key_set = get_myDB_resources(total)433 selected_keys = list(key_val_dict.keys())[:random.randint(0, total//10)]434 for key in selected_keys:435 keys = iter(myDB)436 assert key in keys437 clean_myDB_resources(myDB, key_val_dict, non_key_set)438@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])439def test_random_values(total):440 '''441 Test random values.442 '''443 myDB, key_val_dict, non_key_set = get_myDB_resources(total)444 myDB_vals = set(myDB.values())445 mydict_vals = set(key_val_dict.values())446 assert myDB_vals == mydict_vals447 clean_myDB_resources(myDB, key_val_dict, non_key_set)448@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])449def test_random_popitem(total):450 '''451 Test random popitem.452 '''453 myDB, key_val_dict, non_key_set = get_myDB_resources(total)454 while key_val_dict:455 popped_key, popped_val = myDB.popitem()456 assert key_val_dict[popped_key] == popped_val457 key_val_dict.pop(popped_key)458 clean_myDB_resources(myDB, key_val_dict, non_key_set)459@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])460def test_random_multipopitem(total):461 '''462 Test random multipopitem.463 '''464 myDB, key_val_dict, non_key_set = get_myDB_resources(total)465 while len(myDB):466 num_items = random.randint(0, len(myDB)*2)467 prev_len = len(myDB)468 popped_items = myDB.multipopitem(num_items=num_items)469 for popped_key,popped_val in popped_items:470 assert key_val_dict[popped_key] == popped_val471 assert len(myDB) == max(0, prev_len - num_items)472 # multiremove that raises Exception473 with pytest.raises(Exception) as error:474 myDB.multiremove([None]*factor)475 clean_myDB_resources(myDB, key_val_dict, non_key_set)476@pytest.mark.parametrize('total', [random.randint(10**3, 10**4) for _ in range(5)])477def test_random_clear(total):478 '''479 Test random clear.480 '''481 myDB, key_val_dict, non_key_set = get_myDB_resources(total)482 myDB.clear()483 assert len(myDB) == 0484 clean_myDB_resources(myDB, key_val_dict, non_key_set)485if __name__ == '__main__':...

Full Screen

Full Screen

remove_pb2_grpc.py

Source:remove_pb2_grpc.py Github

copy

Full Screen

1# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!2"""Client and server classes corresponding to protobuf-defined services."""3import grpc4from vald.v1.payload import payload_pb2 as vald_dot_v1_dot_payload_dot_payload__pb25class RemoveStub(object):6 """Remove service provides ways to remove indexed vectors.7 """8 def __init__(self, channel):9 """Constructor.10 Args:11 channel: A grpc.Channel.12 """13 self.Remove = channel.unary_unary(14 '/vald.v1.Remove/Remove',15 request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,16 response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.FromString,17 )18 self.StreamRemove = channel.stream_stream(19 '/vald.v1.Remove/StreamRemove',20 request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,21 response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.FromString,22 )23 self.MultiRemove = channel.unary_unary(24 '/vald.v1.Remove/MultiRemove',25 request_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.SerializeToString,26 response_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.FromString,27 )28class RemoveServicer(object):29 """Remove service provides ways to remove indexed vectors.30 """31 def Remove(self, request, context):32 """A method to remove an indexed vector.33 """34 context.set_code(grpc.StatusCode.UNIMPLEMENTED)35 context.set_details('Method not implemented!')36 raise NotImplementedError('Method not implemented!')37 def StreamRemove(self, request_iterator, context):38 """A method to remove multiple indexed vectors by bidirectional streaming.39 """40 context.set_code(grpc.StatusCode.UNIMPLEMENTED)41 context.set_details('Method not implemented!')42 raise NotImplementedError('Method not implemented!')43 def MultiRemove(self, request, context):44 """A method to remove multiple indexed vectors in a single request.45 """46 context.set_code(grpc.StatusCode.UNIMPLEMENTED)47 context.set_details('Method not implemented!')48 raise NotImplementedError('Method not implemented!')49def add_RemoveServicer_to_server(servicer, server):50 rpc_method_handlers = {51 'Remove': grpc.unary_unary_rpc_method_handler(52 servicer.Remove,53 request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.FromString,54 response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.SerializeToString,55 ),56 'StreamRemove': grpc.stream_stream_rpc_method_handler(57 servicer.StreamRemove,58 request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.FromString,59 response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.SerializeToString,60 ),61 'MultiRemove': grpc.unary_unary_rpc_method_handler(62 servicer.MultiRemove,63 request_deserializer=vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.FromString,64 response_serializer=vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.SerializeToString,65 ),66 }67 generic_handler = grpc.method_handlers_generic_handler(68 'vald.v1.Remove', rpc_method_handlers)69 server.add_generic_rpc_handlers((generic_handler,))70 # This class is part of an EXPERIMENTAL API.71class Remove(object):72 """Remove service provides ways to remove indexed vectors.73 """74 @staticmethod75 def Remove(request,76 target,77 options=(),78 channel_credentials=None,79 call_credentials=None,80 insecure=False,81 compression=None,82 wait_for_ready=None,83 timeout=None,84 metadata=None):85 return grpc.experimental.unary_unary(request, target, '/vald.v1.Remove/Remove',86 vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,87 vald_dot_v1_dot_payload_dot_payload__pb2.Object.Location.FromString,88 options, channel_credentials,89 insecure, call_credentials, compression, wait_for_ready, timeout, metadata)90 @staticmethod91 def StreamRemove(request_iterator,92 target,93 options=(),94 channel_credentials=None,95 call_credentials=None,96 insecure=False,97 compression=None,98 wait_for_ready=None,99 timeout=None,100 metadata=None):101 return grpc.experimental.stream_stream(request_iterator, target, '/vald.v1.Remove/StreamRemove',102 vald_dot_v1_dot_payload_dot_payload__pb2.Remove.Request.SerializeToString,103 vald_dot_v1_dot_payload_dot_payload__pb2.Object.StreamLocation.FromString,104 options, channel_credentials,105 insecure, call_credentials, compression, wait_for_ready, timeout, metadata)106 @staticmethod107 def MultiRemove(request,108 target,109 options=(),110 channel_credentials=None,111 call_credentials=None,112 insecure=False,113 compression=None,114 wait_for_ready=None,115 timeout=None,116 metadata=None):117 return grpc.experimental.unary_unary(request, target, '/vald.v1.Remove/MultiRemove',118 vald_dot_v1_dot_payload_dot_payload__pb2.Remove.MultiRequest.SerializeToString,119 vald_dot_v1_dot_payload_dot_payload__pb2.Object.Locations.FromString,120 options, channel_credentials,...

Full Screen

Full Screen

driver.py

Source:driver.py Github

copy

Full Screen

...17 for k, v in zip(keys, vals):18 self._put(conn, k, v)19 def _remove(self, conn, key):20 raise NotImplementedError()21 def _multiremove(self, conn, keys):22 for key in keys:23 self._remove(conn, key)24 def _exists(self, conn, key):25 raise NotImplementedError()26 def _multiexists(self, conn, keys):27 return [28 self._exists(conn, key)29 for key in keys30 ]31 def _keys(self, conn):32 raise NotImplementedError()33 def get(self, key):34 """35 Get value from the store.36 :param key: Associated key37 :type key: str38 :returns: Value39 :raises KeyError: if key does not exist40 """41 return self._get(self.conn, key)42 def multiget(self, keys):43 """44 Get values from the store.45 :param keys: Associated keys46 :type keys: list47 :returns: Values48 :raises KeyError: if one key does not exist49 """50 return self._multiget(self.conn, keys)51 def put(self, key, val):52 """53 Set value in the store.54 :param key: Associated key55 :type key: str56 :param val: Value to set57 :type val: any58 """59 self._put(self.conn, key, val)60 def multiput(self, keys, vals):61 """62 Set values in the store.63 :param keys: Associated keys64 :type keys: list65 :param vals: Values to set66 :type val: any67 """68 self._multiput(self.conn, keys, vals)69 def remove(self, key):70 """71 Remove value from the store.72 :param key: Associated key73 :type key: str74 :raises KeyError: if key does not exist75 """76 self._remove(self.conn, key)77 def multiremove(self, keys):78 """79 Remove values from the store.80 :param keys: Associated keys81 :type keys: list82 :raises KeyError: if one key does not exist83 """84 self._multiremove(self.conn, keys)85 def exists(self, key):86 """87 Check if key exists in store.88 :param key: Key to check for89 :type key: str90 :rtype: bool91 """92 return self._exists(self.conn, key)93 def multiexists(self, keys):94 """95 Check if keys exists in store.96 :param keys: Keys to check for97 :type keys: list98 :rtype: list...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pyatom automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful