How to use assert_updates method in localstack

Best Python code snippet using localstack_python

test_integration.py

Source:test_integration.py Github

copy

Full Screen

...412 item_id = "testId%d" % i413 matching = [i for i in inserts if i["new_image"]["id"] == item_id][0]414 self.assertEqual({"id": item_id, "data": "foobar123"}, matching["new_image"])415 # assert that all updates were received416 def assert_updates(expected_updates, modifies):417 def found(update):418 for modif in modifies:419 if modif["old_image"]["id"] == update["id"]:420 self.assertEqual(421 modif["old_image"],422 {"id": update["id"], "data": update["old"]},423 )424 self.assertEqual(425 modif["new_image"],426 {"id": update["id"], "data": update["new"]},427 )428 return True429 for update in expected_updates:430 self.assertTrue(found(update))431 updates1 = [432 {"id": "testId6", "old": "foobar123", "new": "foobar123_updated1"},433 {"id": "testId7", "old": "foobar123", "new": "foobar123_updated1"},434 ]435 updates2 = [436 {437 "id": "testId6",438 "old": "foobar123_updated1",439 "new": "foobar123_updated2",440 },441 {442 "id": "testId7",443 "old": "foobar123_updated1",444 "new": "foobar123_updated2",445 },446 {"id": "testId8", "old": "foobar123", "new": "foobar123_updated2"},447 ]448 assert_updates(updates1, modifies[:2])449 assert_updates(updates2, modifies[2:])450 # assert that all removes were received451 for i, event in enumerate(removes):452 self.assertNotIn("new_image", event)453 item_id = "testId%d" % i454 matching = [i for i in removes if i["old_image"]["id"] == item_id][0]455 self.assertEqual({"id": item_id, "data": "foobar123"}, matching["old_image"])456 # this can take a long time in CI, make sure we give it enough time/retries457 retry(check_events, retries=30, sleep=4)458 # clean up459 testutil.delete_lambda_function(lambda_ddb_name)460 def test_scheduled_lambda(self):461 def check_invocation(*args):462 log_events = get_lambda_logs(self.scheduled_lambda_name)463 self.assertGreater(len(log_events), 0)...

Full Screen

Full Screen

client_server_test_base.py

Source:client_server_test_base.py Github

copy

Full Screen

...62 def assert_set_response(response, path_op):63 assert (response.path == path_op[0])64 assert (response.op == path_op[1])65 @staticmethod66 def assert_updates(updates, path_vals):67 assert (len(updates) == len(path_vals))68 for i, u in enumerate(updates):69 GrpcBase.assert_update(u, path_vals[i])70 @staticmethod71 def assert_in_updates(updates, path_vals):72 log.debug("==> updates=%s path_vals=%s", updates, path_vals)73 assert (len(updates) >= len(path_vals))74 for pv in path_vals:75 found = False76 for u in updates:77 if u.path == pv[0] and u.val == gnmi_pb2.TypedValue(78 string_val=pv[1]):79 found = True80 break81 assert found82 log.debug("<==")83 def verify_get_response_updates(self, prefix, paths, path_value,84 datatype, encoding, assert_fun=None):85 if assert_fun is None:86 assert_fun = GrpcBase.assert_updates87 log.debug("prefix=%s paths=%s pv_list=%s datatype=%s encoding=%s",88 prefix, paths, path_value, datatype, encoding)89 notification = self.client.get(prefix, paths, datatype, encoding)90 log.debug("notification=%s", notification)91 for n in notification:92 log.debug("n=%s", n)93 assert (n.prefix == prefix)94 assert_fun(n.update, path_value)95 def verify_sub_sub_response_updates(self, prefix, paths, path_value,96 assert_fun=None,97 subscription_mode=gnmi_pb2.SubscriptionList.ONCE,98 poll_interval=0,99 poll_count=0, read_count=-1):100 if assert_fun is None:101 assert_fun = GrpcBase.assert_updates102 log.debug("paths=%s path_value=%s", paths, path_value)103 response_count = 0104 thread_exception = False105 pv_idx = 0106 for pv in path_value:107 if not isinstance(pv, list):108 pv_idx = -1109 break110 log.debug("pv_idx=%s", pv_idx)111 def read_subscribe_responses(responses, read_count=-1):112 nonlocal response_count, pv_idx, thread_exception113 try:114 for response in responses:115 response_count += 1116 log.debug("response=%s response_count=%i", response,117 response_count)118 assert (response.update.prefix == prefix)119 pv_to_check = path_value120 if pv_idx != -1:121 assert pv_idx < len(path_value)122 pv_to_check = path_value[pv_idx]123 pv_idx += 1124 if len(pv_to_check) > 0: # skip empty arrays125 assert_fun(response.update.update, pv_to_check)126 log.info("response_count=%i", response_count)127 if read_count > 0:128 read_count -= 1129 if read_count == 0:130 log.info("read count reached")131 break132 assert read_count == -1 or read_count == 0133 except Exception as e:134 log.exception(e)135 thread_exception = True136 read_fun = read_subscribe_responses137 subscription_list = \138 ConfDgNMIClient.make_subscription_list(prefix,139 paths,140 subscription_mode)141 responses = self.client.subscribe(subscription_list,142 read_fun=read_fun,143 poll_interval=poll_interval,144 poll_count=poll_count,145 read_count=read_count)146 assert thread_exception is False147 log.debug("responses=%s", responses)148 if poll_count:149 assert (poll_count + 1 == response_count)150 def _test_get_subscribe(self, is_subscribe=False,151 datatype=gnmi_pb2.GetRequest.DataType.CONFIG,152 subscription_mode=gnmi_pb2.SubscriptionList.ONCE,153 poll_interval=0,154 poll_count=0, read_count=-1):155 kwargs = {"assert_fun": GrpcBase.assert_updates}156 if_state_str = prefix_state_str = ""157 if datatype == gnmi_pb2.GetRequest.DataType.STATE:158 prefix_state_str = "-state"159 if_state_str = "state_"160 prefix = make_gnmi_path("/interfaces{}".format(prefix_state_str))161 kwargs["prefix"] = prefix162 if_id = 8163 leaf_paths = [164 GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[0], if_state_str,165 if_id),166 GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], if_state_str,167 if_id)]168 list_paths = [169 GrpcBase.mk_gnmi_if_path(self.list_paths_str[0], if_state_str,170 if_id),171 GrpcBase.mk_gnmi_if_path(self.list_paths_str[1], if_state_str,172 if_id)]173 ifname = "{}if_{}".format(if_state_str, if_id)174 if is_subscribe:175 verify_response_updates = self.verify_sub_sub_response_updates176 kwargs["subscription_mode"] = subscription_mode177 kwargs["poll_interval"] = poll_interval178 kwargs["poll_count"] = poll_count179 kwargs["read_count"] = read_count180 else:181 encoding = gnmi_pb2.Encoding.BYTES182 verify_response_updates = self.verify_get_response_updates183 kwargs["datatype"] = datatype184 kwargs["encoding"] = encoding185 kwargs["paths"] = [leaf_paths[0]]186 kwargs["path_value"] = [(leaf_paths[0], ifname)]187 verify_response_updates(**kwargs)188 kwargs["paths"] = [leaf_paths[1]]189 kwargs["path_value"] = [(leaf_paths[1], "gigabitEthernet")]190 verify_response_updates(**kwargs)191 kwargs["paths"] = leaf_paths192 kwargs["path_value"] = [(leaf_paths[0], ifname),193 (leaf_paths[1], "gigabitEthernet")]194 verify_response_updates(**kwargs)195 kwargs["paths"] = [list_paths[0]]196 kwargs["path_value"] = [(leaf_paths[0], ifname), (leaf_paths[1],197 "gigabitEthernet")]198 verify_response_updates(**kwargs)199 pv = []200 for i in range(1, GnmiDemoServerAdapter.num_of_ifs):201 pv.append((GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[0],202 if_state_str, i),203 "{}if_{}".format(if_state_str, i)))204 pv.append((GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1],205 if_state_str, i),206 "gigabitEthernet"))207 kwargs["paths"] = [list_paths[1]]208 kwargs["path_value"] = pv209 kwargs["assert_fun"] = GrpcBase.assert_in_updates210 verify_response_updates(**kwargs)211 pass212 @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])213 def test_get(self, request, data_type):214 log.info("testing get")215 self._test_get_subscribe(datatype=get_data_type(data_type))216 @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])217 def test_subscribe_once(self, request, data_type):218 log.info("testing subscribe_once")219 self._test_get_subscribe(is_subscribe=True,220 datatype=get_data_type(data_type))221 @pytest.mark.long222 @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])223 @pytest.mark.parametrize("poll_args",224 [(0.2, 2), (0.5, 2), (1, 2), (0.2, 10)])225 def test_subscribe_poll(self, request, data_type, poll_args):226 log.info("testing subscribe_poll")227 self._test_get_subscribe(is_subscribe=True,228 datatype=get_data_type(data_type),229 subscription_mode=gnmi_pb2.SubscriptionList.POLL,230 poll_interval=poll_args[0],231 poll_count=poll_args[1])232 def _send_change_list_to_confd_thread(self, prefix_str, changes_list):233 log.info("==>")234 log.debug("prefix_str=%s change_list=%s", prefix_str, changes_list)235 oper_cmd = ""236 config_cmd = ""237 sleep(1)238 def send_to_confd(config_cmd, oper_cmd):239 def confd_cmd_subprocess(confd_cmd):240 log.info("confd_cmd=%s", confd_cmd)241 subprocess.run(confd_cmd, shell=True, check=True)242 log.info("config_cmd=%s oper_cmd=%s", config_cmd, oper_cmd)243 if config_cmd != "":244 confd_cmd = "confd_cmd -c \"{}\"".format(245 config_cmd)246 confd_cmd_subprocess(confd_cmd)247 if oper_cmd != "":248 confd_cmd = "confd_cmd -o -fr -c \"set {}\"".format(249 oper_cmd)250 confd_cmd_subprocess(confd_cmd)251 for c in changes_list:252 log.debug("processing c=%s", c)253 if isinstance(c, str) and c == "send":254 send_to_confd(config_cmd, oper_cmd)255 oper_cmd = config_cmd = ""256 sleep(1)257 else:258 path_prefix = make_gnmi_path(prefix_str)259 path = make_gnmi_path(c[0])260 cmd = "{} {}".format(261 make_formatted_path(path, gnmi_prefix=path_prefix),262 c[1])263 if "state" in prefix_str:264 oper_cmd += "{} ".format(cmd)265 else:266 config_cmd += "mset {};".format(cmd)267 log.info("<==")268 @staticmethod269 def _changes_list_to_pv(changes_list):270 '''271 Return path_value_list created from changes_list.272 :param changes_list:273 :return:274 '''275 path_value = []276 pv_idx = 0277 for c in changes_list:278 if isinstance(c, str):279 if c == "send":280 pv_idx += 1281 else:282 if len(path_value) < pv_idx + 1:283 path_value.append([])284 path_value[pv_idx].append((make_gnmi_path(c[0]), c[1]))285 log.debug("path_value=%s", path_value)286 return path_value287 @staticmethod288 def _changes_list_to_xml(changes_list, prefix_str):289 demo = ET.Element("demo")290 sub = ET.SubElement(demo, "subscription")291 stream = ET.SubElement(sub, "STREAM")292 changes = ET.SubElement(stream, "changes")293 for c in changes_list:294 el = ET.SubElement(changes, "element")295 if isinstance(c, str):296 el.text = c297 else:298 ET.SubElement(el, "path").text = "{}/{}".format(prefix_str,299 c[0])300 ET.SubElement(el, "val").text = c[1]301 xml_str = ET.tostring(demo, encoding='unicode')302 log.debug("xml_str=%s", xml_str)303 return xml_str304 @pytest.mark.long305 @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])306 def test_subscribe_stream(self, request, data_type):307 log.info("testing subscribe_stream")308 if_state_str = prefix_state_str = ""309 if data_type == "STATE":310 prefix_state_str = "-state"311 if_state_str = "state_"312 changes_list = [313 ("interface[name={}if_5]/type".format(if_state_str),314 "fastEther"),315 ("interface[name={}if_6]/type".format(if_state_str),316 "fastEther"),317 "send",318 ("interface[name={}if_5]/type".format(if_state_str),319 "gigabitEthernet"),320 ("interface[name={}if_6]/type".format(if_state_str),321 "gigabitEthernet"),322 "send",323 ]324 if data_type == "STATE" and self.adapter_type == AdapterType.API:325 # TODO state `confd_cmd` is not transactional, so we need to check326 # every item - add 'send' after each item (or fix checking method?)327 new_change_list = []328 for c in [x for x in changes_list if x != "send"]:329 new_change_list.append(c)330 new_change_list.append("send")331 changes_list = new_change_list332 log.info("change_list=%s", changes_list)333 path_value = [[]] # empty element means no check334 path_value.extend(self._changes_list_to_pv(changes_list))335 prefix_str = "/interfaces{}".format(prefix_state_str)336 prefix = make_gnmi_path(prefix_str)337 paths = [GrpcBase.mk_gnmi_if_path(self.list_paths_str[1], if_state_str,338 "N/A")]339 kwargs = {"assert_fun": GrpcBase.assert_in_updates}340 kwargs["prefix"] = prefix341 kwargs["paths"] = paths342 kwargs["path_value"] = path_value343 kwargs["subscription_mode"] = gnmi_pb2.SubscriptionList.STREAM344 kwargs["read_count"] = len(path_value)345 kwargs["assert_fun"] = GrpcBase.assert_in_updates346 if self.adapter_type == AdapterType.DEMO:347 GnmiDemoServerAdapter.load_config_string(348 self._changes_list_to_xml(changes_list, prefix_str))349 if self.adapter_type == AdapterType.API:350 sleep(1)351 thr = threading.Thread(352 target=self._send_change_list_to_confd_thread,353 args=(prefix_str, changes_list,))354 thr.start()355 self.verify_sub_sub_response_updates(**kwargs)356 if self.adapter_type == AdapterType.API:357 thr.join()358 # TODO reset ConfD DB to original values359 def test_set(self, request):360 log.info("testing set")361 if_id = 8362 prefix = make_gnmi_path("/interfaces")363 paths = [GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], "", if_id)]364 vals = [gnmi_pb2.TypedValue(string_val="fastEther")]365 response = self.client.set(prefix, list(zip(paths, vals)))366 assert (response.prefix == prefix)367 GrpcBase.assert_set_response(response.response[0],368 (paths[0], gnmi_pb2.UpdateResult.UPDATE))369 # fetch with get and see value has changed370 datatype = gnmi_pb2.GetRequest.DataType.CONFIG371 encoding = gnmi_pb2.Encoding.BYTES372 notification = self.client.get(prefix, paths, datatype, encoding)373 for n in notification:374 log.debug("n=%s", n)375 assert (n.prefix == prefix)376 GrpcBase.assert_updates(n.update, [(paths[0], "fastEther")])377 # put value back378 vals = [gnmi_pb2.TypedValue(string_val="gigabitEthernet")]379 response = self.client.set(prefix, list(zip(paths, vals)))380 GrpcBase.assert_set_response(response.response[0],...

Full Screen

Full Screen

test_GreedyEpochIterator.py

Source:test_GreedyEpochIterator.py Github

copy

Full Screen

1import numpy as np2from ordered_set import OrderedSet3from text_selection.greedy.greedy_epoch_iterator import EpochProxyIterator4from text_selection.greedy.greedy_iterator import GreedyIterator5from text_selection.selection import FirstKeySelector6def test_update_is_correct():7 it = GreedyIterator(8 data=np.array([[0, 1], [1, 1], [1, 0]]),9 data_indices=OrderedSet([0, 1, 2]),10 key_selector=FirstKeySelector(),11 preselection=np.array([0, 0]),12 )13 it = EpochProxyIterator(14 iterator=it,15 epochs=2,16 )17 assert_updates = (1, 0, 1)18 for _, assert_update in zip(it, assert_updates):19 assert it.tqdm_update == assert_update20def test_returns_greedy_indices():21 it = GreedyIterator(22 data=np.array([[0, 1], [1, 1], [1, 0]]),23 data_indices=OrderedSet([0, 1, 2]),24 key_selector=FirstKeySelector(),25 preselection=np.array([0, 0]),26 )27 it = EpochProxyIterator(28 iterator=it,29 epochs=2,30 )31 result = list(it)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful