Best Python code snippet using localstack_python
test_request_reply_v12.py
Source:test_request_reply_v12.py  
...48                self.unclear -= 149                self.start_next_test(dp)50        else:51            self.print_results()52    def run_verify(self, ev):53        msg = ev.msg54        dp = msg.datapath55        verify_func = self.verify_default56        v = "verify" + self.current[4:]57        if v in dir(self):58            verify_func = getattr(self, v)59        result = verify_func(dp, msg)60        if result is True:61            self.unclear -= 162        self.results[self.current] = result63        self.start_next_test(dp)64    def verify_default(self, dp, msg):65        type_ = self._verify66        if msg.msg_type == dp.ofproto.OFPT_STATS_REPLY:67            return self.verify_stats(dp, msg.body, type_)68        elif msg.msg_type == type_:69            return True70        else:71            return 'Reply msg_type %s expected %s' \72                   % (msg.msg_type, type_)73    def verify_stats(self, dp, stats, type_):74        stats_types = dp.ofproto_parser.OFPStatsReply._STATS_TYPES75        expect = stats_types.get(type_).__name__76        if isinstance(stats, list):77            for s in stats:78                if expect == s.__class__.__name__:79                    return True80        else:81            if expect == stats.__class__.__name__:82                return True83        return 'Reply msg has not \'%s\' class.\n%s' % (expect, stats)84    def mod_flow(self, dp, cookie=0, cookie_mask=0, table_id=0,85                 command=None, idle_timeout=0, hard_timeout=0,86                 priority=0xff, buffer_id=0xffffffff, match=None,87                 actions=None, inst_type=None, out_port=None,88                 out_group=None, flags=0, inst=None):89        if command is None:90            command = dp.ofproto.OFPFC_ADD91        if inst is None:92            if inst_type is None:93                inst_type = dp.ofproto.OFPIT_APPLY_ACTIONS94            inst = []95            if actions is not None:96                inst = [dp.ofproto_parser.OFPInstructionActions(97                        inst_type, actions)]98        if match is None:99            match = dp.ofproto_parser.OFPMatch()100        if out_port is None:101            out_port = dp.ofproto.OFPP_ANY102        if out_group is None:103            out_group = dp.ofproto.OFPG_ANY104        m = dp.ofproto_parser.OFPFlowMod(dp, cookie, cookie_mask,105                                         table_id, command,106                                         idle_timeout, hard_timeout,107                                         priority, buffer_id,108                                         out_port, out_group,109                                         flags, match, inst)110        dp.send_msg(m)111    def get_port(self, dp):112        for port_no, port in dp.ports.items():113            if port_no != dp.ofproto.OFPP_LOCAL:114                return port115        return None116    # Test for Reply message type117    def test_desc_stats_request(self, dp):118        self._verify = dp.ofproto.OFPST_DESC119        m = dp.ofproto_parser.OFPDescStatsRequest(dp)120        dp.send_msg(m)121    def test_flow_stats_request(self, dp):122        self._verify = dp.ofproto.OFPST_FLOW123        self.mod_flow(dp)124        self.send_flow_stats(dp)125    def test_aggregate_stats_request(self, dp):126        self._verify = dp.ofproto.OFPST_AGGREGATE127        match = dp.ofproto_parser.OFPMatch()128        m = dp.ofproto_parser.OFPAggregateStatsRequest(129            dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,130            dp.ofproto.OFPG_ANY, 0, 0, match)131        dp.send_msg(m)132    def test_table_stats_request(self, dp):133        self._verify = dp.ofproto.OFPST_TABLE134        m = dp.ofproto_parser.OFPTableStatsRequest(dp)135        dp.send_msg(m)136    def test_port_stats_request(self, dp):137        self._verify = dp.ofproto.OFPST_PORT138        m = dp.ofproto_parser.OFPPortStatsRequest(dp, dp.ofproto.OFPP_ANY)139        dp.send_msg(m)140    def test_echo_request(self, dp):141        self._verify = dp.ofproto.OFPT_ECHO_REPLY142        m = dp.ofproto_parser.OFPEchoRequest(dp)143        dp.send_msg(m)144    def test_features_request(self, dp):145        self._verify = dp.ofproto.OFPT_FEATURES_REPLY146        m = dp.ofproto_parser.OFPFeaturesRequest(dp)147        dp.send_msg(m)148    def test_get_config_request(self, dp):149        self._verify = dp.ofproto.OFPT_GET_CONFIG_REPLY150        m = dp.ofproto_parser.OFPGetConfigRequest(dp)151        dp.send_msg(m)152    def test_barrier_request(self, dp):153        self._verify = dp.ofproto.OFPT_BARRIER_REPLY154        dp.send_barrier()155    def test_error_reply(self, dp):156        ports = [0]157        for p in dp.ports:158            if p != dp.ofproto.OFPP_LOCAL:159                ports.append(p)160        port_no = max(ports) + 1161        self._verify = dp.ofproto.OFPT_ERROR162        m = dp.ofproto_parser.OFPPortMod(163            dp, port_no, 'ff:ff:ff:ff:ff:ff', 0, 0, 0)164        dp.send_msg(m)165    # Test for reply value166    def test_flow_stats_none(self, dp):167        self.send_flow_stats(dp)168    def verify_flow_stats_none(self, dp, msg):169        stats = msg.body170        if len(stats):171            return 'Reply msg has body. %s' % (stats, )172        return True173    def test_flow_stats_reply_value(self, dp):174        self._verify = []175        c = 0176        while c < self.n_tables:177            # value = (talbe_id, cookie, idle_timeout, hard_timeout, priority)178            v = (c, c + 1, c + 2, c + 3, c + 4)179            self._verify.append(v)180            self.mod_flow(dp, table_id=v[0], cookie=v[1],181                          idle_timeout=v[2], hard_timeout=v[3], priority=v[4])182            c += 1183        dp.send_barrier()184        self.send_flow_stats(dp)185    def verify_flow_stats_reply_value(self, dp, msg):186        c = 0187        for f in msg.body:188            f_value = (f.table_id, f.cookie, f.idle_timeout,189                       f.hard_timeout, f.priority, )190            if f_value != self._verify[c]:191                return 'param is mismatched. verify=%s, reply=%s' \192                       % (self._verify[c], f_value,)193            c += 1194        return len(msg.body) == self.n_tables195    def test_echo_request_has_data(self, dp):196        data = 'test'197        self._verify = data198        m = dp.ofproto_parser.OFPEchoRequest(dp)199        m.data = data200        dp.send_msg(m)201    def verify_echo_request_has_data(self, dp, msg):202        data = msg.data203        return self._verify == data204    def test_aggregate_stats_flow_count(self, dp):205        c = 0206        while c < self.n_tables:207            self.mod_flow(dp, table_id=c)208            c += 1209        dp.send_barrier()210        match = dp.ofproto_parser.OFPMatch()211        m = dp.ofproto_parser.OFPAggregateStatsRequest(212            dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,213            dp.ofproto.OFPG_ANY, 0, 0, match)214        dp.send_msg(m)215    def verify_aggregate_stats_flow_count(self, dp, msg):216        stats = msg.body217        return stats.flow_count == self.n_tables218    def test_aggregate_stats_flow_count_out_port(self, dp):219        actions = [dp.ofproto_parser.OFPActionOutput(1, 1500)]220        self.mod_flow(dp, table_id=1, actions=actions)221        actions = [dp.ofproto_parser.OFPActionOutput(2, 1500)]222        self.mod_flow(dp, table_id=2, actions=actions)223        dp.send_barrier()224        out_port = 2225        match = dp.ofproto_parser.OFPMatch()226        m = dp.ofproto_parser.OFPAggregateStatsRequest(227            dp, dp.ofproto.OFPTT_ALL, out_port,228            dp.ofproto.OFPG_ANY, 0, 0, match)229        dp.send_msg(m)230    def verify_aggregate_stats_flow_count_out_port(self, dp, msg):231        stats = msg.body232        return stats.flow_count == 1233    def test_aggregate_stats_packet_count(self, dp):234        in_port = 1235        data = 'test'236        self._verify = {'packet_count': 1,237                        'byte_count': len(data)}238        # add flow239        match = dp.ofproto_parser.OFPMatch()240        match.set_in_port(in_port)241        self.mod_flow(dp, table_id=0, match=match)242        # packet out243        output = dp.ofproto.OFPP_TABLE244        actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]245        m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,246                                           actions, data)247        dp.send_msg(m)248        dp.send_barrier()249        match = dp.ofproto_parser.OFPMatch()250        m = dp.ofproto_parser.OFPAggregateStatsRequest(251            dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,252            dp.ofproto.OFPG_ANY, 0, 0, match)253        dp.send_msg(m)254    def verify_aggregate_stats_packet_count(self, dp, msg):255        for name, val in self._verify.items():256            r_val = getattr(msg.body, name)257            if val != r_val:258                return '%s is mismatched. verify=%s, reply=%s' \259                    % (name, val, r_val)260        return True261    def test_set_config_nomal(self, dp):262        flags = dp.ofproto.OFPC_FRAG_NORMAL263        self._verify = flags264        m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)265        dp.send_msg(m)266        dp.send_barrier()267        m = dp.ofproto_parser.OFPGetConfigRequest(dp)268        dp.send_msg(m)269    def verify_set_config_nomal(self, dp, msg):270        return self._verify == msg.flags271    def test_set_config_drop(self, dp):272        flags = dp.ofproto.OFPC_FRAG_DROP273        self._verify = flags274        m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)275        dp.send_msg(m)276        dp.send_barrier()277        m = dp.ofproto_parser.OFPGetConfigRequest(dp)278        dp.send_msg(m)279    def verify_set_config_drop(self, dp, msg):280        return self._verify == msg.flags281    def test_set_config_mask(self, dp):282        flags = dp.ofproto.OFPC_FRAG_MASK283        self._verify = flags284        m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)285        dp.send_msg(m)286        dp.send_barrier()287        m = dp.ofproto_parser.OFPGetConfigRequest(dp)288        dp.send_msg(m)289    def verify_set_config_mask(self, dp, msg):290        return self._verify == msg.flags291    def test_set_config_ttl_to_controller(self, dp):292        flags = dp.ofproto.OFPC_INVALID_TTL_TO_CONTROLLER293        self._verify = flags294        m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)295        dp.send_msg(m)296        dp.send_barrier()297        m = dp.ofproto_parser.OFPGetConfigRequest(dp)298        dp.send_msg(m)299    def verify_set_config_ttl_to_controller(self, dp, msg):300        return self._verify == msg.flags301    def test_set_config_miss_send_len(self, dp):302        flags = dp.ofproto.OFPC_FRAG_NORMAL303        ms_len = 256304        self._verify = ms_len305        m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)306        dp.send_msg(m)307        dp.send_barrier()308        m = dp.ofproto_parser.OFPGetConfigRequest(dp)309        dp.send_msg(m)310    def verify_set_config_miss_send_len(self, dp, msg):311        return self._verify == msg.miss_send_len312    def test_set_config_miss_send_len_max(self, dp):313        flags = dp.ofproto.OFPC_FRAG_NORMAL314        ms_len = dp.ofproto.OFPCML_MAX315        self._verify = ms_len316        m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)317        dp.send_msg(m)318        dp.send_barrier()319        m = dp.ofproto_parser.OFPGetConfigRequest(dp)320        dp.send_msg(m)321    def verify_set_config_miss_send_len_max(self, dp, msg):322        return self._verify == msg.miss_send_len323    def test_set_config_no_buffer(self, dp):324        flags = dp.ofproto.OFPC_FRAG_NORMAL325        ms_len = dp.ofproto.OFPCML_NO_BUFFER326        self._verify = ms_len327        m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)328        dp.send_msg(m)329        dp.send_barrier()330        m = dp.ofproto_parser.OFPGetConfigRequest(dp)331        dp.send_msg(m)332    def verify_set_config_no_buffer(self, dp, msg):333        return self._verify == msg.miss_send_len334    def _verify_flow_inst_type(self, dp, msg):335        inst_type = self._verify336        stats = msg.body337        for s in stats:338            for i in s.instructions:339                if i.type == inst_type:340                    return True341        return 'not found inst_type[%s]' % (inst_type, )342    def test_flow_add_apply_actions(self, dp):343        inst_type = dp.ofproto.OFPIT_APPLY_ACTIONS344        self._verify = inst_type345        actions = [dp.ofproto_parser.OFPActionOutput(1, 1500)]346        self.mod_flow(dp, actions=actions, inst_type=inst_type)347        self.send_flow_stats(dp)348    def verify_flow_add_apply_actions(self, dp, msg):349        return self._verify_flow_inst_type(dp, msg)350    def test_flow_add_goto_table(self, dp):351        self._verify = dp.ofproto.OFPIT_GOTO_TABLE352        inst = [dp.ofproto_parser.OFPInstructionGotoTable(1), ]353        self.mod_flow(dp, inst=inst)354        self.send_flow_stats(dp)355    def verify_flow_add_goto_table(self, dp, msg):356        return self._verify_flow_inst_type(dp, msg)357    def _verify_flow_value(self, dp, msg):358        stats = msg.body359        verify = self._verify360        if len(verify) != len(stats):361            return 'flow_count is mismatched. verify=%s stats=%s' \362                   % (len(verify), len(stats))363        for s in stats:364            v_port = -1365            v = verify.get(s.table_id, None)366            if v:367                v_port = v[3].port368            s_port = s.instructions[0].actions[0].port369            if v_port != s_port:370                return 'port is mismatched. table_id=%s verify=%s, stats=%s' \371                       % (s.table_id, v_port, s_port)372        return True373    def _add_flow_for_flow_mod_tests(self, dp):374        a1 = dp.ofproto_parser.OFPActionOutput(1, 1500)375        a2 = dp.ofproto_parser.OFPActionOutput(2, 1500)376        # table_id, cookie, priority, dl_dst, action)377        tables = {0: [0xffff, 10, b'\xee' * 6, a1],378                  1: [0xff00, 10, b'\xee' * 6, a2],379                  2: [0xf000, 100, b'\xee' * 6, a1],380                  3: [0x0000, 10, b'\xff' * 6, a1]}381        self._verify = tables382        for table_id, val in tables.items():383            match = dp.ofproto_parser.OFPMatch()384            match.set_dl_dst(val[2])385            self.mod_flow(dp, match=match, actions=[val[3]],386                          table_id=table_id, cookie=val[0], priority=val[1])387        dp.send_barrier()388    def test_flow_mod_table_id(self, dp):389        self._add_flow_for_flow_mod_tests(dp)390        # modify flow of table_id=3391        action = dp.ofproto_parser.OFPActionOutput(3, 1500)392        self._verify[3][3] = action393        table_id = 3394        self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,395                      actions=[action], table_id=table_id)396        dp.send_barrier()397        self.send_flow_stats(dp)398    def verify_flow_mod_table_id(self, dp, msg):399        return self._verify_flow_value(dp, msg)400    def test_flow_mod_cookie(self, dp):401        self._add_flow_for_flow_mod_tests(dp)402        # modify flow of table_id=1403        action = dp.ofproto_parser.OFPActionOutput(3, 1500)404        self._verify[1][3] = action405        cookie = 0xff00406        cookie_mask = 0xffff407        table_id = 1408        self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,409                      actions=[action], table_id=table_id,410                      cookie=cookie, cookie_mask=cookie_mask)411        dp.send_barrier()412        self.send_flow_stats(dp)413    def verify_flow_mod_cookie(self, dp, msg):414        return self._verify_flow_value(dp, msg)415    def test_flow_mod_cookie_mask(self, dp):416        self._add_flow_for_flow_mod_tests(dp)417        # modify flow of table_id=0,1418        action = dp.ofproto_parser.OFPActionOutput(3, 1500)419        self._verify[0][3] = action420        self._verify[1][3] = action421        cookie = 0xffff422        cookie_mask = 0xff00423        for table_id in range(2):424            self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,425                          actions=[action], table_id=table_id,426                          cookie=cookie, cookie_mask=cookie_mask)427        dp.send_barrier()428        self.send_flow_stats(dp)429    def verify_flow_mod_cookie_mask(self, dp, msg):430        return self._verify_flow_value(dp, msg)431    def test_flow_mod_match(self, dp):432        self._add_flow_for_flow_mod_tests(dp)433        # modify flow of table_id=3434        action = dp.ofproto_parser.OFPActionOutput(3, 1500)435        self._verify[3][3] = action436        match = dp.ofproto_parser.OFPMatch()437        match.set_dl_dst(b'\xff' * 6)438        table_id = 3439        self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,440                      actions=[action], table_id=table_id, match=match)441        dp.send_barrier()442        self.send_flow_stats(dp)443    def verify_flow_mod_match(self, dp, msg):444        return self._verify_flow_value(dp, msg)445    def test_flow_mod_strict(self, dp):446        self._add_flow_for_flow_mod_tests(dp)447        # modify flow of table_id=2448        action = dp.ofproto_parser.OFPActionOutput(3, 1500)449        self._verify[2][3] = action450        match = dp.ofproto_parser.OFPMatch()451        match.set_dl_dst(b'\xee' * 6)452        priority = 100453        table_id = 2454        self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY_STRICT,455                      actions=[action], table_id=table_id,456                      match=match, priority=priority)457        dp.send_barrier()458        self.send_flow_stats(dp)459    def verify_flow_mod_strict(self, dp, msg):460        return self._verify_flow_value(dp, msg)461    def test_flow_del_table_id(self, dp):462        self._add_flow_for_flow_mod_tests(dp)463        # delete flow of table_id=3464        del self._verify[3]465        table_id = 3466        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,467                      table_id=table_id)468        dp.send_barrier()469        self.send_flow_stats(dp)470    def verify_flow_del_table_id(self, dp, msg):471        return self._verify_flow_value(dp, msg)472    def test_flow_del_table_id_all(self, dp):473        self._add_flow_for_flow_mod_tests(dp)474        # delete all flows475        self._verify = {}476        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,477                      table_id=dp.ofproto.OFPTT_ALL)478        dp.send_barrier()479        self.send_flow_stats(dp)480    def verify_flow_del_table_id_all(self, dp, msg):481        return self._verify_flow_value(dp, msg)482    def test_flow_del_cookie(self, dp):483        self._add_flow_for_flow_mod_tests(dp)484        # delete flow of table_id=1485        del self._verify[1]486        cookie = 0xff00487        cookie_mask = 0xffff488        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,489                      table_id=dp.ofproto.OFPTT_ALL,490                      cookie=cookie, cookie_mask=cookie_mask)491        dp.send_barrier()492        self.send_flow_stats(dp)493    def verify_flow_del_cookie(self, dp, msg):494        return self._verify_flow_value(dp, msg)495    def test_flow_del_cookie_mask(self, dp):496        self._add_flow_for_flow_mod_tests(dp)497        # delete flow of table_id=0,1498        del self._verify[0]499        del self._verify[1]500        cookie = 0xffff501        cookie_mask = 0xff00502        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,503                      table_id=dp.ofproto.OFPTT_ALL,504                      cookie=cookie, cookie_mask=cookie_mask)505        dp.send_barrier()506        self.send_flow_stats(dp)507    def verify_flow_del_cookie_mask(self, dp, msg):508        return self._verify_flow_value(dp, msg)509    def test_flow_del_match(self, dp):510        self._add_flow_for_flow_mod_tests(dp)511        # delete flow of table_id=3512        del self._verify[3]513        match = dp.ofproto_parser.OFPMatch()514        match.set_dl_dst(b'\xff' * 6)515        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,516                      table_id=dp.ofproto.OFPTT_ALL, match=match)517        dp.send_barrier()518        self.send_flow_stats(dp)519    def verify_flow_del_match(self, dp, msg):520        return self._verify_flow_value(dp, msg)521    def test_flow_del_out_port(self, dp):522        self._add_flow_for_flow_mod_tests(dp)523        # delete flow of table_id=1524        del self._verify[1]525        out_port = 2526        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,527                      table_id=dp.ofproto.OFPTT_ALL, out_port=out_port)528        dp.send_barrier()529        self.send_flow_stats(dp)530    def verify_flow_del_out_port(self, dp, msg):531        return self._verify_flow_value(dp, msg)532    def test_flow_del_strict(self, dp):533        self._add_flow_for_flow_mod_tests(dp)534        # delete flow of table_id=2535        del self._verify[2]536        match = dp.ofproto_parser.OFPMatch()537        match.set_dl_dst(b'\xee' * 6)538        priority = 100539        self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE_STRICT,540                      table_id=dp.ofproto.OFPTT_ALL,541                      match=match, priority=priority)542        dp.send_barrier()543        self.send_flow_stats(dp)544    def verify_flow_del_strict(self, dp, msg):545        return self._verify_flow_value(dp, msg)546    def _send_port_mod(self, dp, config, mask):547        p = self.get_port(dp)548        if not p:549            err = 'need attached port to switch.'550            self.results[self.current] = err551            self.start_next_test(dp)552            return553        self._verify = [p.port_no, config & mask]554        m = dp.ofproto_parser.OFPPortMod(dp, p.port_no, p.hw_addr,555                                         config, mask, 0)556        dp.send_msg(m)557        dp.send_barrier()558        # TODO: waiting to port UP|DOWN.559        time.sleep(1)560        m = dp.ofproto_parser.OFPFeaturesRequest(dp)561        dp.send_msg(m)562    def _verify_port_mod_config(self, dp, msg):563        port_no = self._verify[0]564        config = self._verify[1]565        port = msg.ports[port_no]566        if config != port.config:567            return "config is mismatched. verify=%s, stats=%s" \568                % (bin(config), bin(port.config))569        return True570    def test_port_mod_config_01_all(self, dp):571        config = 0b1100101572        mask = 0b1111111573        self._send_port_mod(dp, config, mask)574    def verify_port_mod_config_01_all(self, dp, msg):575        return self._verify_port_mod_config(dp, msg)576    def test_port_mod_config_02_none(self, dp):577        config = 0578        mask = 0b1111111579        self._send_port_mod(dp, config, mask)580    def verify_port_mod_config_02_none(self, dp, msg):581        return self._verify_port_mod_config(dp, msg)582    def test_port_mod_config_03_mask(self, dp):583        config = 0b1100101584        mask = 0b1111000585        self._send_port_mod(dp, config, mask)586    def verify_port_mod_config_03_mask(self, dp, msg):587        res = self._verify_port_mod_config(dp, msg)588        # reset port config589        port_no = self._verify[0]590        p = msg.ports[port_no]591        m = dp.ofproto_parser.OFPPortMod(dp, p.port_no, p.hw_addr,592                                         0, 0b1111111, 0)593        dp.send_msg(m)594        dp.send_barrier()595        return res596    def test_port_stats_port_no(self, dp):597        p = self.get_port(dp)598        if not p:599            err = 'need attached port to switch.'600            self.results[self.current] = err601            self.start_next_test(dp)602            return603        self._verify = p.port_no604        m = dp.ofproto_parser.OFPPortStatsRequest(dp, p.port_no)605        dp.send_msg(m)606    def verify_port_stats_port_no(self, dp, msg):607        ports = msg.body608        if len(ports) > 1:609            return 'reply some ports.\n%s' % (ports)610        if ports[0].port_no != self._verify:611            return 'port_no is mismatched. request=%s reply=%s' \612                % (self._verify, ports[0].port_no)613        return True614    def _add_flow_flow_removed(self, dp, reason, table_id=0,615                               cookie=0xff, priority=100, in_port=1,616                               idle_timeout=0, hard_timeout=0):617        self._verify = {}618        self._verify['params'] = {'reason': reason,619                                  'table_id': table_id,620                                  'cookie': cookie,621                                  'priority': priority}622        self._verify['in_port'] = in_port623        self._verify['timeout'] = idle_timeout624        if hard_timeout:625            if (idle_timeout == 0 or idle_timeout > hard_timeout):626                self._verify['timeout'] = hard_timeout627        match = dp.ofproto_parser.OFPMatch()628        match.set_in_port(in_port)629        self.mod_flow(dp, match=match, cookie=cookie,630                      priority=priority, table_id=table_id,631                      idle_timeout=idle_timeout, hard_timeout=hard_timeout,632                      flags=dp.ofproto.OFPFF_SEND_FLOW_REM)633    def _verify_flow_removed(self, dp, msg):634        params = self._verify['params']635        in_port = self._verify['in_port']636        timeout = self._verify['timeout']637        if timeout:638            duration_nsec = (msg.duration_sec * 10 ** 9) + msg.duration_nsec639            timeout_nsec = timeout * 10 ** 9640            # grace of -0.5 and +1.5 second to timeout.641            l = (timeout - 0.5) * 10 ** 9642            h = (timeout + 1.5) * 10 ** 9643            if not l < duration_nsec < h:644                return 'bad duration time. set=%s(nsec), duration=%s(nsec)' \645                    % (timeout_nsec, duration_nsec)646        for name, val in params.items():647            r_val = getattr(msg, name)648            if val != r_val:649                return '%s is mismatched. verify=%s, reply=%s' \650                    % (name, val, r_val)651        for f in msg.match.fields:652            if f.header == ofproto_v1_2.OXM_OF_IN_PORT:653                if f.value != in_port:654                    return 'in_port is mismatched. verify=%s, reply=%s' \655                        % (in_port, f.value)656        return True657    def test_flow_removed_idle_timeout(self, dp):658        reason = dp.ofproto.OFPRR_IDLE_TIMEOUT659        idle_timeout = 2660        self._add_flow_flow_removed(dp, reason,661                                    idle_timeout=idle_timeout)662    def verify_flow_removed_idle_timeout(self, dp, msg):663        return self._verify_flow_removed(dp, msg)664    def test_flow_removed_idle_timeout_hit(self, dp):665        reason = dp.ofproto.OFPRR_IDLE_TIMEOUT666        idle_timeout = 5667        in_port = 1668        sleep = 2669        # add target flow670        self._add_flow_flow_removed(dp, reason, in_port=in_port,671                                    idle_timeout=idle_timeout)672        self._verify['timeout'] = idle_timeout + sleep673        # sleep674        time.sleep(sleep)675        # packet out676        output = dp.ofproto.OFPP_TABLE677        actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]678        m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,679                                           actions, None)680        dp.send_msg(m)681    def verify_flow_removed_idle_timeout_hit(self, dp, msg):682        return self._verify_flow_removed(dp, msg)683    def test_flow_removed_hard_timeout(self, dp):684        reason = dp.ofproto.OFPRR_HARD_TIMEOUT685        hard_timeout = 2686        self._add_flow_flow_removed(dp, reason,687                                    hard_timeout=hard_timeout)688    def verify_flow_removed_hard_timeout(self, dp, msg):689        return self._verify_flow_removed(dp, msg)690    def test_flow_removed_hard_timeout_hit(self, dp):691        reason = dp.ofproto.OFPRR_HARD_TIMEOUT692        hard_timeout = 5693        in_port = 1694        sleep = 2695        self._add_flow_flow_removed(dp, reason, in_port=in_port,696                                    hard_timeout=hard_timeout)697        dp.send_barrier()698        # sleep699        time.sleep(sleep)700        # packet out701        output = dp.ofproto.OFPP_TABLE702        actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]703        m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,704                                           actions, None)705        dp.send_msg(m)706    def verify_flow_removed_hard_timeout_hit(self, dp, msg):707        return self._verify_flow_removed(dp, msg)708    def test_flow_removed_delete(self, dp):709        reason = dp.ofproto.OFPRR_DELETE710        self._add_flow_flow_removed(dp, reason)711        dp.send_barrier()712        self.delete_all_flows(dp)713    def verify_flow_removed_delete(self, dp, msg):714        return self._verify_flow_removed(dp, msg)715    def test_flow_removed_table_id(self, dp):716        reason = dp.ofproto.OFPRR_DELETE717        table_id = 1718        self._add_flow_flow_removed(dp, reason, table_id=table_id)719        dp.send_barrier()720        self.delete_all_flows(dp)721    def verify_flow_removed_table_id(self, dp, msg):722        return self._verify_flow_removed(dp, msg)723    def _send_packet_out(self, dp, buffer_id=0xffffffff,724                         in_port=None, output=None, data=''):725        if in_port is None:726            in_port = dp.ofproto.OFPP_LOCAL727        if output is None:728            output = dp.ofproto.OFPP_CONTROLLER729        self._verify['in_port'] = in_port730        self._verify['data'] = data731        actions = [dp.ofproto_parser.OFPActionOutput(output, len(data))]732        m = dp.ofproto_parser.OFPPacketOut(dp, buffer_id, in_port,733                                           actions, data)734        dp.send_msg(m)735    def _verify_packet_in(self, dp, msg):736        for name, val in self._verify.items():737            if name == 'in_port':738                for f in msg.match.fields:739                    if f.header == ofproto_v1_2.OXM_OF_IN_PORT:740                        r_val = f.value741            else:742                r_val = getattr(msg, name)743            if val != r_val:744                return '%s is mismatched. verify=%s, reply=%s' \745                    % (name, val, r_val)746        return True747    def test_packet_in_action(self, dp):748        self._verify = {}749        self._verify['reason'] = dp.ofproto.OFPR_ACTION750        self._send_packet_out(dp)751    def verify_packet_in_action(self, dp, msg):752        return self._verify_packet_in(dp, msg)753    def test_packet_in_data(self, dp):754        self._verify = {}755        self._verify['reason'] = dp.ofproto.OFPR_ACTION756        data = 'test'757        self._send_packet_out(dp, data=data)758    def verify_packet_in_data(self, dp, msg):759        return self._verify_packet_in(dp, msg)760    def test_packet_in_table_id(self, dp):761        in_port = 1762        table_id = 2763        output = dp.ofproto.OFPP_TABLE764        self._verify = {}765        self._verify['reason'] = dp.ofproto.OFPR_ACTION766        self._verify['table_id'] = table_id767        # add flow (goto_table)768        match = dp.ofproto_parser.OFPMatch()769        match.set_in_port(in_port)770        inst = [dp.ofproto_parser.OFPInstructionGotoTable(table_id)]771        self.mod_flow(dp, inst=inst, match=match)772        # add flow (output)773        match = dp.ofproto_parser.OFPMatch()774        match.set_in_port(in_port)775        out = dp.ofproto.OFPP_CONTROLLER776        actions = [dp.ofproto_parser.OFPActionOutput(out, 0)]777        self.mod_flow(dp, actions=actions, match=match, table_id=table_id)778        dp.send_barrier()779        # packet out780        self._send_packet_out(dp, in_port=in_port, output=output)781    def verify_packet_in_table_id(self, dp, msg):782        return self._verify_packet_in(dp, msg)783    # handler784    @set_ev_cls(ofp_event.EventOFPEchoReply, MAIN_DISPATCHER)785    def echo_replay_handler(self, ev):786        if self.current.find('echo_request') > 0:787            self.run_verify(ev)788    @set_ev_cls(ofp_event.EventOFPStatsReply, MAIN_DISPATCHER)789    def stats_reply_handler(self, ev):790        if self.current is None:791            msg = ev.msg792            dp = msg.datapath793            if self._verify == dp.ofproto.OFPST_TABLE:794                self.table_stats = msg.body795            self.start_next_test(dp)796        else:797            self.run_verify(ev)798    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, MAIN_DISPATCHER)799    def features_replay_handler(self, ev):800        if self.current is None:801            pass802        else:803            self.run_verify(ev)804    @set_ev_cls(ofp_event.EventOFPGetConfigReply, MAIN_DISPATCHER)805    def get_config_replay_handler(self, ev):806        self.run_verify(ev)807    @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)808    def barrier_replay_handler(self, ev):809        if self.current == 'test_barrier_request':810            self.run_verify(ev)811    @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)812    def port_status_handler(self, ev):813        pass814    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)815    def packet_in_handler(self, ev):816        if self.current.find('packet_in'):817            self.run_verify(ev)818    @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)819    def flow_removed_handler(self, ev):820        if self.current.find('flow_removed') > 0:821            self.run_verify(ev)822    @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER)823    def error_handler(self, ev):824        if self.current.find('error') > 0:825            self.run_verify(ev)826    def is_supported(self, t):827        unsupported = [828        ]829        for u in unsupported:830            if t.find(u) != -1:831                return False...test_lab04.py
Source:test_lab04.py  
...29        print(msg,file=sys.stderr)30# Caricale nel namespace31globals().update(f_objects)32class TestLab04Segmenti(unittest.TestCase):33    def _verify(self,seq,expected):34        result   = segmenticrescenti(seq)35        messaggio = "\nTEST FAIL> La funzione segmenticrescenti({})\n dovrebbe produrre: '{}',\n invece produce:    '{}'".format(seq,expected,result)36        self.assertEqual(expected,result,msg=messaggio)37    def _verify_raise(self,seq,error,reason):38        msg="\nTEST FAIL> segmenticrescenti({}) dovrebbe sollevare un {} perché {}."39        msg = msg.format(seq,error,reason)40        with self.assertRaises(error,msg=msg):41            segmenticrescenti(seq)42    def test_segmenti_inconfrontabili(self):43        self._verify_raise([13,"stringa1","stringa2"],44                           TypeError,45                           "ha elementi non confrontabili")46    def test_segmenti_zero(self):47        self._verify([],[])48    def test_segmenti_one(self):49        self._verify([1],[[1]])50    def test_segmenti_zerot(self):51        self._verify((),[])52    def test_segmenti_one(self):53        self._verify((1,),[[1]])54    def test_segmenti_crescente(self):55        self._verify([1,2,3,4,5],[[1,2,3,4,5]])56    def test_segmenti_decrescente(self):57        self._verify([5,4,3,2,1],[[5],[4],[3],[2],[1]])58        59    def test_segmenti_due(self):60        self._verify([1,2,3,-1,2,3],[[1,2,3],[-1,2,3]])61    def test_segmenti_singleton_iniziale(self):62        self._verify([4,-3,-2,2,7], [[4],[-3,-2,2,7]] )63    def test_segmenti_singleton_finale(self):64        self._verify([-3,-2,2,7,4], [[-3,-2,2,7],[4]] )65    def test_segmenti_uguali(self):66        self._verify([1,1,1,1,1], [[1,1,1,1,1]] )67    def test_segmenti_uguali(self):68        self._verify([2,2,1,1,1], [[2,2],[1,1,1]] )69        70    def test_segmenti_esempio(self):71        self._verify([1,-1,2,4,3,7,8,8,5] ,72                     [ [1], [-1,2,4], [3,7,8,8], [5] ] )73    def test_segmenti_stringa(self):74        self._verify(['casa','studio','garage','mansarda','villaggio'] ,75                     [ ['casa','studio'], ['garage','mansarda','villaggio'] ] )76    def test_segmenti_stringa2(self):77        self._verify('analfabetismo' ,78                     [ ['a','n'], ['a','l'],['f'],['a','b','e','t'],['i','s'],['m','o'] ] )79        80class TestLab04SommeParziali(unittest.TestCase):81     82    def _verify(self,seq,expected):83        result   = sommeparziali(seq)84        messaggio = "\nTEST FAIL> La funzione sommeparziali({})\n dovrebbe produrre: '{}',\n invece produce:    '{}'".format(seq,expected,result)85        self.assertEqual(expected,result,msg=messaggio)86    def _verify_raise(self,seq,error,reason):87        msg="\nTEST FAIL> sommeparziali({}) dovrebbe sollevare un {} perché {}."88        msg = msg.format(seq,error,reason)89        with self.assertRaises(error,msg=msg):90            sommeparziali(seq)91            92    def test_parziali_zero(self):93        self._verify([],[])94    def test_parziali_one(self):95        self._verify([13],[13])96    def test_parziali_zerot(self):97        self._verify((),[])98    def test_parziali_one(self):99        self._verify((12,),[12])100    def test_parziali_string(self):101        self._verify(('aa','bb','cc'),['aa','aabb','aabbcc'])102        103    def test_somme_insommabili1(self):104        self._verify_raise([13,"stringa1","stringa2"],105                           TypeError,106                           "ha elementi non sommabili")107    def test_somme_insommabili2(self):108        self._verify_raise(('ciao',1.2),109                           TypeError,110                           "ha elementi non sommabili")111    def test_small1(self):112        self._verify( [-2,2]  , [-2,0])113    114    def test_small2(self):115        self._verify( [1,2]  , [1,3])116    def test_ones(self):117        self._verify( [1,1,1,1,1,1,1,1]  , [1,2,3,4,5,6,7,8])118    def test_sequence(self):119        self._verify( [1,2,3,4,5,6,7,8]  , [1,3,6,10,15,21,28,36])120    def test_mixed(self):121        self._verify( [1,2.3], [1,3.3])122            123        124        125if __name__ == '__main__':126    unittest.main()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
