Best Python code snippet using localstack_python
test_integration.py
Source:test_integration.py  
...412                item_id = "testId%d" % i413                matching = [i for i in inserts if i["new_image"]["id"] == item_id][0]414                self.assertEqual({"id": item_id, "data": "foobar123"}, matching["new_image"])415            # assert that all updates were received416            def assert_updates(expected_updates, modifies):417                def found(update):418                    for modif in modifies:419                        if modif["old_image"]["id"] == update["id"]:420                            self.assertEqual(421                                modif["old_image"],422                                {"id": update["id"], "data": update["old"]},423                            )424                            self.assertEqual(425                                modif["new_image"],426                                {"id": update["id"], "data": update["new"]},427                            )428                            return True429                for update in expected_updates:430                    self.assertTrue(found(update))431            updates1 = [432                {"id": "testId6", "old": "foobar123", "new": "foobar123_updated1"},433                {"id": "testId7", "old": "foobar123", "new": "foobar123_updated1"},434            ]435            updates2 = [436                {437                    "id": "testId6",438                    "old": "foobar123_updated1",439                    "new": "foobar123_updated2",440                },441                {442                    "id": "testId7",443                    "old": "foobar123_updated1",444                    "new": "foobar123_updated2",445                },446                {"id": "testId8", "old": "foobar123", "new": "foobar123_updated2"},447            ]448            assert_updates(updates1, modifies[:2])449            assert_updates(updates2, modifies[2:])450            # assert that all removes were received451            for i, event in enumerate(removes):452                self.assertNotIn("new_image", event)453                item_id = "testId%d" % i454                matching = [i for i in removes if i["old_image"]["id"] == item_id][0]455                self.assertEqual({"id": item_id, "data": "foobar123"}, matching["old_image"])456        # this can take a long time in CI, make sure we give it enough time/retries457        retry(check_events, retries=30, sleep=4)458        # clean up459        testutil.delete_lambda_function(lambda_ddb_name)460    def test_scheduled_lambda(self):461        def check_invocation(*args):462            log_events = get_lambda_logs(self.scheduled_lambda_name)463            self.assertGreater(len(log_events), 0)...client_server_test_base.py
Source:client_server_test_base.py  
...62    def assert_set_response(response, path_op):63        assert (response.path == path_op[0])64        assert (response.op == path_op[1])65    @staticmethod66    def assert_updates(updates, path_vals):67        assert (len(updates) == len(path_vals))68        for i, u in enumerate(updates):69            GrpcBase.assert_update(u, path_vals[i])70    @staticmethod71    def assert_in_updates(updates, path_vals):72        log.debug("==> updates=%s path_vals=%s", updates, path_vals)73        assert (len(updates) >= len(path_vals))74        for pv in path_vals:75            found = False76            for u in updates:77                if u.path == pv[0] and u.val == gnmi_pb2.TypedValue(78                        string_val=pv[1]):79                    found = True80                    break81            assert found82        log.debug("<==")83    def verify_get_response_updates(self, prefix, paths, path_value,84                                    datatype, encoding, assert_fun=None):85        if assert_fun is None:86            assert_fun = GrpcBase.assert_updates87        log.debug("prefix=%s paths=%s pv_list=%s datatype=%s encoding=%s",88                  prefix, paths, path_value, datatype, encoding)89        notification = self.client.get(prefix, paths, datatype, encoding)90        log.debug("notification=%s", notification)91        for n in notification:92            log.debug("n=%s", n)93            assert (n.prefix == prefix)94            assert_fun(n.update, path_value)95    def verify_sub_sub_response_updates(self, prefix, paths, path_value,96                                        assert_fun=None,97                                        subscription_mode=gnmi_pb2.SubscriptionList.ONCE,98                                        poll_interval=0,99                                        poll_count=0, read_count=-1):100        if assert_fun is None:101            assert_fun = GrpcBase.assert_updates102        log.debug("paths=%s path_value=%s", paths, path_value)103        response_count = 0104        thread_exception = False105        pv_idx = 0106        for pv in path_value:107            if not isinstance(pv, list):108                pv_idx = -1109                break110        log.debug("pv_idx=%s", pv_idx)111        def read_subscribe_responses(responses, read_count=-1):112            nonlocal response_count, pv_idx, thread_exception113            try:114                for response in responses:115                    response_count += 1116                    log.debug("response=%s response_count=%i", response,117                              response_count)118                    assert (response.update.prefix == prefix)119                    pv_to_check = path_value120                    if pv_idx != -1:121                        assert pv_idx < len(path_value)122                        pv_to_check = path_value[pv_idx]123                        pv_idx += 1124                    if len(pv_to_check) > 0:  # skip empty arrays125                        assert_fun(response.update.update, pv_to_check)126                    log.info("response_count=%i", response_count)127                    if read_count > 0:128                        read_count -= 1129                        if read_count == 0:130                            log.info("read count reached")131                            break132                assert read_count == -1 or read_count == 0133            except Exception as e:134                log.exception(e)135                thread_exception = True136        read_fun = read_subscribe_responses137        subscription_list = \138            ConfDgNMIClient.make_subscription_list(prefix,139                                                   paths,140                                                   subscription_mode)141        responses = self.client.subscribe(subscription_list,142                                          read_fun=read_fun,143                                          poll_interval=poll_interval,144                                          poll_count=poll_count,145                                          read_count=read_count)146        assert thread_exception is False147        log.debug("responses=%s", responses)148        if poll_count:149            assert (poll_count + 1 == response_count)150    def _test_get_subscribe(self, is_subscribe=False,151                            datatype=gnmi_pb2.GetRequest.DataType.CONFIG,152                            subscription_mode=gnmi_pb2.SubscriptionList.ONCE,153                            poll_interval=0,154                            poll_count=0, read_count=-1):155        kwargs = {"assert_fun": GrpcBase.assert_updates}156        if_state_str = prefix_state_str = ""157        if datatype == gnmi_pb2.GetRequest.DataType.STATE:158            prefix_state_str = "-state"159            if_state_str = "state_"160        prefix = make_gnmi_path("/interfaces{}".format(prefix_state_str))161        kwargs["prefix"] = prefix162        if_id = 8163        leaf_paths = [164            GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[0], if_state_str,165                                     if_id),166            GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], if_state_str,167                                     if_id)]168        list_paths = [169            GrpcBase.mk_gnmi_if_path(self.list_paths_str[0], if_state_str,170                                     if_id),171            GrpcBase.mk_gnmi_if_path(self.list_paths_str[1], if_state_str,172                                     if_id)]173        ifname = "{}if_{}".format(if_state_str, if_id)174        if is_subscribe:175            verify_response_updates = self.verify_sub_sub_response_updates176            kwargs["subscription_mode"] = subscription_mode177            kwargs["poll_interval"] = poll_interval178            kwargs["poll_count"] = poll_count179            kwargs["read_count"] = read_count180        else:181            encoding = gnmi_pb2.Encoding.BYTES182            verify_response_updates = self.verify_get_response_updates183            kwargs["datatype"] = datatype184            kwargs["encoding"] = encoding185        kwargs["paths"] = [leaf_paths[0]]186        kwargs["path_value"] = [(leaf_paths[0], ifname)]187        verify_response_updates(**kwargs)188        kwargs["paths"] = [leaf_paths[1]]189        kwargs["path_value"] = [(leaf_paths[1], "gigabitEthernet")]190        verify_response_updates(**kwargs)191        kwargs["paths"] = leaf_paths192        kwargs["path_value"] = [(leaf_paths[0], ifname),193                                (leaf_paths[1], "gigabitEthernet")]194        verify_response_updates(**kwargs)195        kwargs["paths"] = [list_paths[0]]196        kwargs["path_value"] = [(leaf_paths[0], ifname), (leaf_paths[1],197                                                          "gigabitEthernet")]198        verify_response_updates(**kwargs)199        pv = []200        for i in range(1, GnmiDemoServerAdapter.num_of_ifs):201            pv.append((GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[0],202                                                if_state_str, i),203                       "{}if_{}".format(if_state_str, i)))204            pv.append((GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1],205                                                if_state_str, i),206                       "gigabitEthernet"))207        kwargs["paths"] = [list_paths[1]]208        kwargs["path_value"] = pv209        kwargs["assert_fun"] = GrpcBase.assert_in_updates210        verify_response_updates(**kwargs)211        pass212    @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])213    def test_get(self, request, data_type):214        log.info("testing get")215        self._test_get_subscribe(datatype=get_data_type(data_type))216    @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])217    def test_subscribe_once(self, request, data_type):218        log.info("testing subscribe_once")219        self._test_get_subscribe(is_subscribe=True,220                                 datatype=get_data_type(data_type))221    @pytest.mark.long222    @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])223    @pytest.mark.parametrize("poll_args",224                             [(0.2, 2), (0.5, 2), (1, 2), (0.2, 10)])225    def test_subscribe_poll(self, request, data_type, poll_args):226        log.info("testing subscribe_poll")227        self._test_get_subscribe(is_subscribe=True,228                                 datatype=get_data_type(data_type),229                                 subscription_mode=gnmi_pb2.SubscriptionList.POLL,230                                 poll_interval=poll_args[0],231                                 poll_count=poll_args[1])232    def _send_change_list_to_confd_thread(self, prefix_str, changes_list):233        log.info("==>")234        log.debug("prefix_str=%s change_list=%s", prefix_str, changes_list)235        oper_cmd = ""236        config_cmd = ""237        sleep(1)238        def send_to_confd(config_cmd, oper_cmd):239            def confd_cmd_subprocess(confd_cmd):240                log.info("confd_cmd=%s", confd_cmd)241                subprocess.run(confd_cmd, shell=True, check=True)242            log.info("config_cmd=%s oper_cmd=%s", config_cmd, oper_cmd)243            if config_cmd != "":244                confd_cmd = "confd_cmd -c \"{}\"".format(245                    config_cmd)246                confd_cmd_subprocess(confd_cmd)247            if oper_cmd != "":248                confd_cmd = "confd_cmd -o -fr -c \"set {}\"".format(249                    oper_cmd)250                confd_cmd_subprocess(confd_cmd)251        for c in changes_list:252            log.debug("processing c=%s", c)253            if isinstance(c, str) and c == "send":254                send_to_confd(config_cmd, oper_cmd)255                oper_cmd = config_cmd = ""256                sleep(1)257            else:258                path_prefix = make_gnmi_path(prefix_str)259                path = make_gnmi_path(c[0])260                cmd = "{} {}".format(261                    make_formatted_path(path, gnmi_prefix=path_prefix),262                    c[1])263                if "state" in prefix_str:264                    oper_cmd += "{} ".format(cmd)265                else:266                    config_cmd += "mset {};".format(cmd)267        log.info("<==")268    @staticmethod269    def _changes_list_to_pv(changes_list):270        '''271        Return path_value_list created from changes_list.272        :param changes_list:273        :return:274        '''275        path_value = []276        pv_idx = 0277        for c in changes_list:278            if isinstance(c, str):279                if c == "send":280                    pv_idx += 1281            else:282                if len(path_value) < pv_idx + 1:283                    path_value.append([])284                path_value[pv_idx].append((make_gnmi_path(c[0]), c[1]))285        log.debug("path_value=%s", path_value)286        return path_value287    @staticmethod288    def _changes_list_to_xml(changes_list, prefix_str):289        demo = ET.Element("demo")290        sub = ET.SubElement(demo, "subscription")291        stream = ET.SubElement(sub, "STREAM")292        changes = ET.SubElement(stream, "changes")293        for c in changes_list:294            el = ET.SubElement(changes, "element")295            if isinstance(c, str):296                el.text = c297            else:298                ET.SubElement(el, "path").text = "{}/{}".format(prefix_str,299                                                                c[0])300                ET.SubElement(el, "val").text = c[1]301        xml_str = ET.tostring(demo, encoding='unicode')302        log.debug("xml_str=%s", xml_str)303        return xml_str304    @pytest.mark.long305    @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"])306    def test_subscribe_stream(self, request, data_type):307        log.info("testing subscribe_stream")308        if_state_str = prefix_state_str = ""309        if data_type == "STATE":310            prefix_state_str = "-state"311            if_state_str = "state_"312        changes_list = [313            ("interface[name={}if_5]/type".format(if_state_str),314             "fastEther"),315            ("interface[name={}if_6]/type".format(if_state_str),316             "fastEther"),317            "send",318            ("interface[name={}if_5]/type".format(if_state_str),319             "gigabitEthernet"),320            ("interface[name={}if_6]/type".format(if_state_str),321             "gigabitEthernet"),322            "send",323        ]324        if data_type == "STATE" and self.adapter_type == AdapterType.API:325            # TODO state `confd_cmd` is not transactional, so we need to check326            # every item - add 'send' after each item (or fix checking method?)327            new_change_list = []328            for c in [x for x in changes_list if x != "send"]:329                new_change_list.append(c)330                new_change_list.append("send")331            changes_list = new_change_list332        log.info("change_list=%s", changes_list)333        path_value = [[]]  # empty element means no check334        path_value.extend(self._changes_list_to_pv(changes_list))335        prefix_str = "/interfaces{}".format(prefix_state_str)336        prefix = make_gnmi_path(prefix_str)337        paths = [GrpcBase.mk_gnmi_if_path(self.list_paths_str[1], if_state_str,338                                          "N/A")]339        kwargs = {"assert_fun": GrpcBase.assert_in_updates}340        kwargs["prefix"] = prefix341        kwargs["paths"] = paths342        kwargs["path_value"] = path_value343        kwargs["subscription_mode"] = gnmi_pb2.SubscriptionList.STREAM344        kwargs["read_count"] = len(path_value)345        kwargs["assert_fun"] = GrpcBase.assert_in_updates346        if self.adapter_type == AdapterType.DEMO:347            GnmiDemoServerAdapter.load_config_string(348                self._changes_list_to_xml(changes_list, prefix_str))349        if self.adapter_type == AdapterType.API:350            sleep(1)351            thr = threading.Thread(352                target=self._send_change_list_to_confd_thread,353                args=(prefix_str, changes_list,))354            thr.start()355        self.verify_sub_sub_response_updates(**kwargs)356        if self.adapter_type == AdapterType.API:357            thr.join()358            # TODO reset ConfD DB to original values359    def test_set(self, request):360        log.info("testing set")361        if_id = 8362        prefix = make_gnmi_path("/interfaces")363        paths = [GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], "", if_id)]364        vals = [gnmi_pb2.TypedValue(string_val="fastEther")]365        response = self.client.set(prefix, list(zip(paths, vals)))366        assert (response.prefix == prefix)367        GrpcBase.assert_set_response(response.response[0],368                                     (paths[0], gnmi_pb2.UpdateResult.UPDATE))369        # fetch with get and see value has changed370        datatype = gnmi_pb2.GetRequest.DataType.CONFIG371        encoding = gnmi_pb2.Encoding.BYTES372        notification = self.client.get(prefix, paths, datatype, encoding)373        for n in notification:374            log.debug("n=%s", n)375            assert (n.prefix == prefix)376            GrpcBase.assert_updates(n.update, [(paths[0], "fastEther")])377        # put value back378        vals = [gnmi_pb2.TypedValue(string_val="gigabitEthernet")]379        response = self.client.set(prefix, list(zip(paths, vals)))380        GrpcBase.assert_set_response(response.response[0],...test_GreedyEpochIterator.py
Source:test_GreedyEpochIterator.py  
1import numpy as np2from ordered_set import OrderedSet3from text_selection.greedy.greedy_epoch_iterator import EpochProxyIterator4from text_selection.greedy.greedy_iterator import GreedyIterator5from text_selection.selection import FirstKeySelector6def test_update_is_correct():7  it = GreedyIterator(8    data=np.array([[0, 1], [1, 1], [1, 0]]),9    data_indices=OrderedSet([0, 1, 2]),10    key_selector=FirstKeySelector(),11    preselection=np.array([0, 0]),12  )13  it = EpochProxyIterator(14    iterator=it,15    epochs=2,16  )17  assert_updates = (1, 0, 1)18  for _, assert_update in zip(it, assert_updates):19    assert it.tqdm_update == assert_update20def test_returns_greedy_indices():21  it = GreedyIterator(22    data=np.array([[0, 1], [1, 1], [1, 0]]),23    data_indices=OrderedSet([0, 1, 2]),24    key_selector=FirstKeySelector(),25    preselection=np.array([0, 0]),26  )27  it = EpochProxyIterator(28    iterator=it,29    epochs=2,30  )31  result = list(it)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
