Best Python code snippet using autotest_python
p4_operators.py
Source:p4_operators.py  
...116        out += '\n'117        return out118    def get_commands(self):119        commands = list()120        commands.append(self.init_table.get_default_command())121        commands.append(self.update_table.get_default_command())122        commands.append(self.pass_table.get_default_command())123        commands.append(self.drop_table.get_default_command())124        return commands125    def get_control_flow(self, indent_level):126        indent = '\t' * indent_level127        out = ''128        out += '%sapply(%s);\n' % (indent, self.init_table.get_name())129        out += '%sif (%s %s %i) {\n' % (indent, self.value_field_name, self.comp_func, self.threshold)130        out += '%s\tapply(%s);\n' % (indent, self.pass_table.get_name())131        out += '%s\tapply(%s);\n' % (indent, self.update_table.get_name())132        out += '%s}\n' % (indent,)133        out += '%selse {\n' % (indent,)134        out += '%s\tapply(%s);\n' % (indent, self.drop_table.get_name())135        out += '%s}\n' % (indent,)136        return out137    def get_init_keys(self):138        return self.keys139class P4Reduce(P4Operator):140    def __init__(self, qid, operator_id, meta_init_name, drop_action, keys, values, threshold, read_register,141                 p4_raw_fields):142        super(P4Reduce, self).__init__('Reduce', qid, operator_id, keys, p4_raw_fields)143        if threshold == '-1':144            self.threshold = int(THRESHOLD)145        else:146            self.threshold = int(threshold)147        self.read_register = read_register148        if self.read_register:149            self.out_headers += ['index']150        else:151            self.out_headers += ['count']152        # create METADATA to store index and value153        fields = [('value', REGISTER_WIDTH), ('index', REGISTER_NUM_INDEX_BITS)]154        self.metadata = MetaData(self.operator_name, fields)155        # create REGISTER to keep track of counts156        self.register = Register(self.operator_name, REGISTER_WIDTH, REGISTER_INSTANCE_COUNT)157        self.values = values158        # Add map init159        hash_init_fields = list()160        for fld in self.keys:161            if fld == 'qid':162                hash_init_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",163                                                size=QID_SIZE))164            elif fld == 'count':165                hash_init_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",166                                                size=COUNT_SIZE))167            elif fld == 'index':168                hash_init_fields.append(P4Field(layer=None, target_name="index", sonata_name="index",169                                                size=INDEX_SIZE))170            else:171                hash_init_fields.append(self.p4_raw_fields.get_target_field(fld))172        # create HASH for access to register173        hash_fields = list()174        for field in hash_init_fields:175            if '/' in field.sonata_name:176                self.logger.error('found a / in the key')177                raise NotImplementedError178            else:179                hash_fields.append('%s.%s' % (meta_init_name, field.target_name.replace(".", "_")))180        self.hash = HashFields(self.operator_name, hash_fields, 'crc16', REGISTER_NUM_INDEX_BITS)181        # name of metadata field where the index of the count within the register is stored182        self.index_field_name = '%s.index' % self.metadata.get_name()183        # name of metadata field where the count is kept temporarily184        self.value_field_name = '%s.value' % self.metadata.get_name()185        # create ACTION and TABLE to compute hash and get value186        primitives = list()187        primitives.append(ModifyFieldWithHashBasedOffset(self.index_field_name, 0, self.hash.get_name(),188                                                         REGISTER_INSTANCE_COUNT))189        primitives.append(RegisterRead(self.value_field_name, self.register.get_name(), self.index_field_name))190        if self.values[0] == 'count':191            if self.threshold <= 1:192                self.threshold = '1'193            primitives.append(ModifyField(self.value_field_name, '%s + %i' % (self.value_field_name, 1)))194        else:195            target_fld = self.p4_raw_fields.get_target_field(self.values[0])196            if self.threshold <= 1:197                self.threshold = '%s.%s' % (meta_init_name, target_fld.target_name.replace(".", "_"))198            primitives.append(ModifyField(self.value_field_name, '%s + %s' % (199            self.value_field_name, '%s.%s' % (meta_init_name, target_fld.target_name.replace(".", "_")))))200        primitives.append(RegisterWrite(self.register.get_name(), self.index_field_name, self.value_field_name))201        self.init_action = Action('do_init_%s' % self.operator_name, primitives)202        table_name = 'init_%s' % self.operator_name203        self.init_table = Table(table_name, self.init_action.get_name(), [], None, 1)204        # create three TABLEs that implement reduce operation205        # if count <= THRESHOLD, update count and drop,206        table_name = 'drop_%s' % self.operator_name207        self.drop_table = Table(table_name, drop_action, [], None, 1)208        # if count == THRESHOLD, pass through with current count209        field_to_modified = None210        if not self.read_register:211            field_to_modified = ModifyField('%s.count' % meta_init_name, self.value_field_name)212        else:213            field_to_modified = ModifyField('%s.index' % meta_init_name, self.index_field_name)214        self.set_count_action = Action('set_count_%s' % self.operator_name,215                                       field_to_modified)216        table_name = 'first_pass_%s' % self.operator_name217        self.first_pass_table = Table(table_name, self.set_count_action.get_name(), [], None, 1)218        if not self.read_register:219            # if count > THRESHOLD, let it pass through with count set to 1220            self.reset_count_action = Action('reset_count_%s' % self.operator_name,221                                             ModifyField('%s.count' % meta_init_name, 1))222            table_name = 'pass_%s' % self.operator_name223            self.pass_table = Table(table_name, self.reset_count_action.get_name(), [], None, 1)224    def __repr__(self):225        return '.Reduce(keys=' + ','.join([x for x in self.keys]) + ', threshold=' + str(self.threshold) + ')'226    def get_code(self):227        out = ''228        out += '// %s %i of query %i\n' % (self.name, self.operator_id, self.query_id)229        out += self.metadata.get_code()230        out += self.hash.get_code()231        out += self.register.get_code()232        out += self.init_action.get_code()233        out += self.set_count_action.get_code()234        if not self.read_register:235            out += self.reset_count_action.get_code()236            out += self.pass_table.get_code()237        out += self.init_table.get_code()238        out += self.first_pass_table.get_code()239        out += self.drop_table.get_code()240        out += '\n'241        return out242    def get_commands(self):243        commands = list()244        commands.append(self.init_table.get_default_command())245        commands.append(self.first_pass_table.get_default_command())246        if not self.read_register: commands.append(self.pass_table.get_default_command())247        commands.append(self.drop_table.get_default_command())248        return commands249    def get_control_flow(self, indent_level):250        indent = '\t' * indent_level251        out = ''252        out += '%sapply(%s);\n' % (indent, self.init_table.get_name())253        out += '%sif (%s == %s) {\n' % (indent, self.value_field_name, self.threshold)254        out += '%s\tapply(%s);\n' % (indent, self.first_pass_table.get_name())255        out += '%s}\n' % (indent,)256        if not self.read_register:257            out += '%selse if (%s > %s) {\n' % (indent, self.value_field_name, self.threshold)258            out += '%s\tapply(%s);\n' % (indent, self.pass_table.get_name())259            out += '%s}\n' % (indent,)260        out += '%selse {\n' % (indent,)261        out += '%s\tapply(%s);\n' % (indent, self.drop_table.get_name())262        out += '%s}\n' % (indent,)263        return out264    def get_init_keys(self):265        return self.keys + self.values266class P4MapInit(P4Operator):267    def __init__(self, qid, operator_id, keys, p4_raw_fields):268        super(P4MapInit, self).__init__('MapInit', qid, operator_id, keys, p4_raw_fields)269        # Add map init270        map_init_fields = list()271        for fld in self.keys:272            if fld == 'qid':273                map_init_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",274                                               size=QID_SIZE))275            elif fld == 'count':276                map_init_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",277                                               size=COUNT_SIZE))278            elif fld == 'index':279                map_init_fields.append(P4Field(layer=None, target_name="index", sonata_name="index",280                                               size=INDEX_SIZE))281            else:282                map_init_fields.append(self.p4_raw_fields.get_target_field(fld))283        # create METADATA object to store data for all keys284        meta_fields = list()285        for fld in map_init_fields:286            meta_fields.append((fld.target_name.replace(".", "_"), fld.size))287        self.metadata = MetaData(self.operator_name, meta_fields)288        # create ACTION to initialize the metadata289        primitives = list()290        for fld in map_init_fields:291            sonata_name = fld.sonata_name292            target_name = fld.target_name293            meta_field_name = '%s.%s' % (self.metadata.get_name(), target_name.replace(".", "_"))294            if sonata_name == 'qid':295                # Assign query id to this field296                primitives.append(ModifyField(meta_field_name, qid))297            elif sonata_name == 'count':298                primitives.append(ModifyField(meta_field_name, 0))299            elif sonata_name == 'index':300                primitives.append(ModifyField(meta_field_name, 0))301            else:302                # Read data from raw header fields and assign them to these meta fields303                primitives.append(ModifyField(meta_field_name, target_name))304        self.action = Action('do_%s' % self.operator_name, primitives)305        # create dummy TABLE to execute the action306        self.table = Table(self.operator_name, self.action.get_name(), [], None, 1)307    def __repr__(self):308        return '.MapInit(keys=' + str(self.keys) + ')'309    def get_meta_name(self):310        return self.metadata.get_name()311    def get_code(self):312        out = ''313        out += '// MapInit of query %i\n' % self.query_id314        out += self.metadata.get_code()315        out += self.action.get_code()316        out += self.table.get_code()317        out += '\n'318        return out319    def get_commands(self):320        commands = list()321        commands.append(self.table.get_default_command())322        return commands323    def get_control_flow(self, indent_level):324        indent = '\t' * indent_level325        out = ''326        out += '%sapply(%s);\n' % (indent, self.table.get_name())327        return out328    def get_init_keys(self):329        return self.keys330class P4Map(P4Operator):331    def __init__(self, qid, operator_id, meta_init_name, keys, map_keys, map_values, func, p4_raw_fields):332        super(P4Map, self).__init__('Map', qid, operator_id, keys, p4_raw_fields)333        self.meta_init_name = meta_init_name334        self.map_keys = map_keys335        self.func = func336        self.map_values = map_values337        # Add map init338        map_fields = list()339        for fld in self.map_keys:340            if fld == 'qid':341                map_fields.append(P4Field(layer=None, target_name="qid", sonata_name="qid",342                                          size=QID_SIZE))343            elif fld == 'count':344                map_fields.append(P4Field(layer=None, target_name="count", sonata_name="count",345                                          size=COUNT_SIZE))346            else:347                map_fields.append(self.p4_raw_fields.get_target_field(fld))348        map_fields_values = list()349        for fld in self.map_values:350            if fld == 'qid':351                map_fields_values.append(P4Field(layer=None, target_name="qid", sonata_name="qid",352                                          size=QID_SIZE))353            elif fld == 'count':354                map_fields_values.append(P4Field(layer=None, target_name="count", sonata_name="count",355                                          size=COUNT_SIZE))356            else:357                map_fields_values.append(self.p4_raw_fields.get_target_field(fld))358        # create ACTION using the function359        primitives = list()360        if len(func) > 0:361            self.func = func362            if func[0] == 'mask' or not func[0]:363                for field in map_fields:364                    mask_size = (func[1] / 4)365                    mask = '0x' + ('f' * mask_size) + ('0' * (HEADER_MASK_SIZE[field.target_name] - mask_size))366                    field_name = '%s.%s' % (self.meta_init_name, field.target_name.replace(".", "_"))367                    primitives.append(BitAnd(field_name, field_name, mask))368            if func[0] == 'set' or not func[0]:369                for field in map_fields_values:370                    field_name = '%s.%s' % (self.meta_init_name, field.target_name.replace(".", "_"))371                    primitives.append(ModifyField(field_name, func[1]))372        self.action = Action('do_%s' % self.operator_name, primitives)373        # create dummy TABLE to execute the action374        self.table = Table(self.operator_name, self.action.get_name(), [], None, 1)375    def __repr__(self):376        return '.Map(keys=' + str(self.keys) + ', map_keys=' + str(self.map_keys) + ', map_values=' + str(self.map_values) + ', func=' + str(self.func) + ')'377    def get_code(self):378        out = ''379        out += '// Map %i of query %i\n' % (self.operator_id, self.query_id)380        out += self.action.get_code()381        out += self.table.get_code()382        out += '\n'383        return out384    def get_commands(self):385        commands = list()386        commands.append(self.table.get_default_command())387        return commands388    def get_control_flow(self, indent_level):389        indent = '\t' * indent_level390        out = ''391        out += '%sapply(%s);\n' % (indent, self.table.get_name())392        return out393    def get_init_keys(self):394        return list(self.keys) + list(self.map_keys) + list(self.map_values)395    def get_out_headers(self):396        return list(self.keys) + list(self.map_keys) + list(self.map_values)397class P4Filter(P4Operator):398    def __init__(self, qid, operator_id, keys, filter_keys, func, source, match_action, miss_action, p4_raw_fields):399        super(P4Filter, self).__init__('Filter', qid, operator_id, keys, p4_raw_fields)400        self.filter_keys = filter_keys401        self.filter_mask = None402        self.filter_values = None403        self.func = None404        # self.out_headers = []405        self.match_action = match_action406        self.miss_action = miss_action407        self.source = source408        if not len(func) > 0 or func[0] == 'geq':409            self.logger.error('Got the following func with the Filter Operator: %s' % (str(func),))410            # raise NotImplementedError411        else:412            self.func = func[0]413            if func[0] == 'mask':414                self.filter_mask = func[1]415                self.filter_values = func[2:]416            elif func[0] == 'eq':417                self.filter_values = [func[1:]]418        reads_fields = list()419        for filter_key in self.filter_keys:420            if self.func == 'mask':421                reads_fields.append((filter_key, 'lpm'))422            else:423                reads_fields.append((filter_key, 'exact'))424        self.table = Table(self.operator_name, miss_action, (match_action,), reads_fields, TABLE_SIZE)425    def __repr__(self):426        return '.Filter(filter_keys=' + str(self.filter_keys) + ', func=' + str(self.func) + ', src = ' + str(427            self.source) + ')'428    def get_code(self):429        out = ''430        out += '// Filter %i of query %i\n' % (self.operator_id, self.query_id)431        out += self.table.get_code()432        out += '\n'433        return out434    def get_commands(self):435        commands = list()436        commands.append(self.table.get_default_command())437        if self.filter_values:438            for filter_value in self.filter_values:439                commands.append(self.table.get_add_rule_command(self.match_action, filter_value, None))440        return commands441    def get_control_flow(self, indent_level):442        indent = '\t' * indent_level443        out = ''444        out += '%sapply(%s);\n' % (indent, self.table.get_name())445        return out446    def get_match_action(self):447        return self.match_action448    def get_filter_mask(self):449        return self.filter_mask450    def get_init_keys(self):...custom_api.py
Source:custom_api.py  
1import platform2import subprocess3class TextToSpeeckAPIClient:4    def __init__(self, command=None):5        self.command = command or get_default_command()6    def say(self, text):7        subprocess.call(self.command(text))8def get_default_command():9    system = platform.system()10    if system == "Darwin":11        return lambda text: ["say", text]12    elif system == "Linux":13        return lambda text: ["espeak", text]14    elif system == "Windows":15        return lambda text: ["powershell", "-c", f"echo {text}"]16    else:17        raise Exception("Unsupported system")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
