Best Python code snippet using assertpy_python
tflite_converter.py
Source:tflite_converter.py  
1# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")2#3# Copyright (c) 2014-2020 Megvii Inc. All rights reserved.4#5# Unless required by applicable law or agreed to in writing,6# software distributed under the License is distributed on an7# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.8# pylint: disable=import-error9import flatbuffers10from tqdm import tqdm11from ...converter_ir.ir_graph import IRGraph12from ...converter_ir.ir_op import (13    ConstantOpr,14    LinspaceOpr,15    MultipleDeviceTensorHolderOpr,16    SharedDeviceTensorOpr,17)18from .tflite import (19    Buffer,20    Model,21    Operator,22    OperatorCode,23    QuantizationParameters,24    SubGraph,25    Tensor,26)27from .tflite.CustomOptionsFormat import CustomOptionsFormat28from .tflite_op import (29    MGE2TFLITE,30    get_shape_param,31    mge2tflite_dtype_mapping,32    set_quantization,33)34class TFLiteConverter:35    def __init__(self, net, graph_name="graph", quantizer=None):36        assert isinstance(net, IRGraph), "net must be instance of IRGraph"37        self.net = net38        self.graph_name = graph_name39        self._var2tensor = dict()  # varnode to tensor index40        self._opr_type_list = []41        self._buffer_list = []42        self._tensor_list = []43        self._operator_list = []44        self.quantizer = quantizer45        # buffer size will automatically increase if needed46        self._builder = flatbuffers.Builder(1024)47        set_quantization(require_quantize=quantizer.require_quantize)48    def convert(self, disable_nhwc=False):49        # Note the 0th entry of this array must be an empty buffer (sentinel)50        Buffer.BufferStart(self._builder)51        buffer = Buffer.BufferEnd(self._builder)52        self._buffer_list.append(buffer)53        def need_convert(mge_opr):54            if isinstance(55                mge_opr,56                (57                    ConstantOpr,58                    LinspaceOpr,59                    MultipleDeviceTensorHolderOpr,60                    SharedDeviceTensorOpr,61                ),62            ):63                return False64            is_const = [data.np_data is not None for data in mge_opr.inp_tensors]65            return not all(is_const) and len(mge_opr.inp_tensors) > 066        for mge_opr in tqdm(self.net.all_oprs):67            last_opr = mge_opr68            if not need_convert(mge_opr):69                continue70            tfl_opr_type, tfl_options_type, tfl_options = MGE2TFLITE[type(mge_opr)](71                mge_opr, self._builder72            )73            if tfl_opr_type not in self._opr_type_list:74                self._opr_type_list.append(tfl_opr_type)75            # buffer and tensor76            for tensor in mge_opr.inp_tensors + mge_opr.out_tensors:77                if tensor in self._var2tensor:78                    continue79                result_shape, byte_list = get_shape_param(80                    tensor, mge_opr, self.quantizer, disable_nhwc=disable_nhwc81                )82                scale = None83                zero_point = 084                if self.quantizer.require_quantize:85                    has_qparams = False86                    if hasattr(tensor, "scale") and tensor.scale is not None:87                        scale = tensor.scale88                        has_qparams = True89                    if hasattr(tensor, "zero_point") and tensor.zero_point is not None:90                        zero_point = int(tensor.zero_point)91                    dtype = tensor.q_dtype if has_qparams else tensor.dtype92                    from megengine.core.tensor.dtype import (  # pylint: disable=import-outside-toplevel,no-name-in-module93                        QuantDtypeMeta,94                    )95                    if isinstance(dtype, QuantDtypeMeta):96                        dtype = dtype.np_dtype_str97                else:98                    dtype = tensor.dtype99                buffer = self.gen_buffer(byte_list)100                self._buffer_list.append(buffer)101                tfl_tensor = self.gen_tensor(102                    tensor.name,103                    result_shape,104                    mge2tflite_dtype_mapping[dtype],105                    len(self._buffer_list) - 1,106                    scale=scale,107                    zero_point=int(zero_point),108                )109                self._tensor_list.append(tfl_tensor)110                self._var2tensor[tensor] = len(self._tensor_list) - 1111            tfl_opr = self.gen_operator(112                mge_opr, tfl_opr_type, tfl_options_type, tfl_options113            )114            self._operator_list.append(tfl_opr)115        print("last op: {}".format(last_opr))116        out_tensor = last_opr.out_tensors[0]117        print("dtype: {}".format(out_tensor.dtype))118        if hasattr(out_tensor.dtype, "metadata"):119            scale = out_tensor.dtype.metadata["mgb_dtype"]["scale"]120            zero_point = out_tensor.dtype.metadata["mgb_dtype"].get("zero_point") or 0121            print("scale: {}, zero point: {}".format(scale, zero_point))122        return self.get_model()123    def gen_buffer(self, byte_list):124        if not byte_list:125            Buffer.BufferStart(self._builder)126            buffer = Buffer.BufferEnd(self._builder)127        else:128            Buffer.BufferStartDataVector(self._builder, len(byte_list))129            for i in reversed(byte_list):130                self._builder.PrependByte(i)131            datas = self._builder.EndVector(len(byte_list))132            Buffer.BufferStart(self._builder)133            Buffer.BufferAddData(self._builder, datas)134            buffer = Buffer.BufferEnd(self._builder)135        return buffer136    def gen_tensor(137        self, tensor_name, result_shape, dtype, buffer_idx, scale=None, zero_point=0138    ):139        name = self._builder.CreateString(tensor_name)140        Tensor.TensorStartShapeVector(self._builder, len(result_shape))141        for i in reversed(result_shape):142            self._builder.PrependInt32(i)143        shape = self._builder.EndVector(len(result_shape))144        if scale:145            QuantizationParameters.QuantizationParametersStartScaleVector(146                self._builder, 1147            )148            self._builder.PrependFloat32(scale)149            scales = self._builder.EndVector(1)150            QuantizationParameters.QuantizationParametersStartZeroPointVector(151                self._builder, 1152            )153            self._builder.PrependInt64(zero_point)154            zero_points = self._builder.EndVector(1)155            QuantizationParameters.QuantizationParametersStart(self._builder)156            QuantizationParameters.QuantizationParametersAddScale(self._builder, scales)157            QuantizationParameters.QuantizationParametersAddZeroPoint(158                self._builder, zero_points159            )160            qp = QuantizationParameters.QuantizationParametersEnd(self._builder)161        Tensor.TensorStart(self._builder)162        Tensor.TensorAddName(self._builder, name)163        Tensor.TensorAddShape(self._builder, shape)164        Tensor.TensorAddType(self._builder, dtype)165        Tensor.TensorAddBuffer(self._builder, buffer_idx)166        if scale:167            Tensor.TensorAddQuantization(self._builder, qp)168        tensor = Tensor.TensorEnd(self._builder)169        return tensor170    def gen_operator(self, opr, opr_type, options_type, options):171        # opcode_index172        opcode_index = self._opr_type_list.index(opr_type)173        # inputs174        Operator.OperatorStartInputsVector(self._builder, len(opr.inp_tensors))175        for var in reversed(opr.inp_tensors):176            self._builder.PrependInt32(self._var2tensor[var])177        inputs = self._builder.EndVector(len(opr.inp_tensors))178        # outputs179        Operator.OperatorStartOutputsVector(self._builder, len(opr.out_tensors))180        for var in reversed(opr.out_tensors):181            self._builder.PrependInt32(self._var2tensor[var])182        outputs = self._builder.EndVector(len(opr.out_tensors))183        custom_options = None184        builtin_options = None185        if options:186            if isinstance(options, bytes):  # custom_options187                Operator.OperatorStartCustomOptionsVector(self._builder, len(options))188                for i in reversed(options):189                    self._builder.PrependByte(i)190                custom_options = self._builder.EndVector(len(options))191            else:  # builtin_options192                builtin_options = options193        Operator.OperatorStart(self._builder)194        Operator.OperatorAddOpcodeIndex(self._builder, opcode_index)195        Operator.OperatorAddInputs(self._builder, inputs)196        Operator.OperatorAddOutputs(self._builder, outputs)197        if custom_options:198            Operator.OperatorAddCustomOptions(self._builder, custom_options)199            Operator.OperatorAddCustomOptionsFormat(200                self._builder, CustomOptionsFormat.FLEXBUFFERS201            )202        elif builtin_options:203            Operator.OperatorAddBuiltinOptionsType(self._builder, options_type)204            Operator.OperatorAddBuiltinOptions(self._builder, builtin_options)205        operator = Operator.OperatorEnd(self._builder)206        return operator207    def get_version(self):208        return 3209    def get_description(self):210        description = self._builder.CreateString("Converted by MegEngine")211        return description212    def get_operator_codes(self):213        operator_codes_list = []214        for opr_type in self._opr_type_list:215            is_custom = not isinstance(opr_type, int)216            if is_custom:217                custom_code = self._builder.CreateString(opr_type.code)218                opr_type = opr_type.type219            OperatorCode.OperatorCodeStart(self._builder)220            OperatorCode.OperatorCodeAddBuiltinCode(self._builder, opr_type)221            if is_custom:222                OperatorCode.OperatorCodeAddCustomCode(self._builder, custom_code)223            operator_code = OperatorCode.OperatorCodeEnd(self._builder)224            operator_codes_list.append(operator_code)225        Model.ModelStartOperatorCodesVector(self._builder, len(operator_codes_list))226        for i in reversed(operator_codes_list):227            self._builder.PrependUOffsetTRelative(i)228        operator_codes = self._builder.EndVector(len(operator_codes_list))229        return operator_codes230    def get_subgraphs(self):231        # only support one subgraph now232        subgraphs_list = []233        # tensors234        SubGraph.SubGraphStartTensorsVector(self._builder, len(self._tensor_list))235        for tensor in reversed(self._tensor_list):236            self._builder.PrependUOffsetTRelative(tensor)237        tensors = self._builder.EndVector(len(self._tensor_list))238        # inputs239        SubGraph.SubGraphStartInputsVector(self._builder, len(self.net.graph_inputs))240        for var in reversed(self.net.graph_inputs):241            self._builder.PrependInt32(self._var2tensor[var])242        graph_inputs = self._builder.EndVector(len(self.net.graph_inputs))243        # outputs244        SubGraph.SubGraphStartOutputsVector(self._builder, len(self.net.graph_outputs))245        for var in reversed(self.net.graph_outputs):246            self._builder.PrependInt32(self._var2tensor[var])247        graph_outputs = self._builder.EndVector(len(self.net.graph_outputs))248        # operators249        SubGraph.SubGraphStartOperatorsVector(self._builder, len(self._operator_list))250        for operator in reversed(self._operator_list):251            self._builder.PrependUOffsetTRelative(operator)252        operators = self._builder.EndVector(len(self._operator_list))253        # name254        sub_graph_name = self._builder.CreateString("graph0")255        SubGraph.SubGraphStart(self._builder)256        SubGraph.SubGraphAddTensors(self._builder, tensors)257        SubGraph.SubGraphAddInputs(self._builder, graph_inputs)258        SubGraph.SubGraphAddOutputs(self._builder, graph_outputs)259        SubGraph.SubGraphAddOperators(self._builder, operators)260        SubGraph.SubGraphAddName(self._builder, sub_graph_name)261        subgraph = SubGraph.SubGraphEnd(self._builder)262        subgraphs_list.append(subgraph)263        Model.ModelStartSubgraphsVector(self._builder, len(subgraphs_list))264        for i in reversed(subgraphs_list):265            self._builder.PrependUOffsetTRelative(i)266        subgraphs = self._builder.EndVector(len(subgraphs_list))267        return subgraphs268    def get_buffers(self):269        Model.ModelStartBuffersVector(self._builder, len(self._buffer_list))270        for i in reversed(self._buffer_list):271            self._builder.PrependUOffsetTRelative(i)272        buffers = self._builder.EndVector(len(self._buffer_list))273        return buffers274    def get_model(self):275        version = self.get_version()276        operator_codes = self.get_operator_codes()277        subgraphs = self.get_subgraphs()278        description = self.get_description()279        buffers = self.get_buffers()280        Model.ModelStart(self._builder)281        Model.ModelAddVersion(self._builder, version)282        Model.ModelAddOperatorCodes(self._builder, operator_codes)283        Model.ModelAddSubgraphs(self._builder, subgraphs)284        Model.ModelAddDescription(self._builder, description)285        Model.ModelAddBuffers(self._builder, buffers)286        model = Model.ModelEnd(self._builder)287        self._builder.Finish(model, "TFL3".encode())...flag.py
Source:flag.py  
1# Copyright (c) 2014, Fundacion Dr. Manuel Sadosky2# All rights reserved.3# Redistribution and use in source and binary forms, with or without4# modification, are permitted provided that the following conditions are met:5# 1. Redistributions of source code must retain the above copyright notice, this6# list of conditions and the following disclaimer.7# 2. Redistributions in binary form must reproduce the above copyright notice,8# this list of conditions and the following disclaimer in the documentation9# and/or other materials provided with the distribution.10# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"11# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE12# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE13# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE14# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL15# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR16# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER17# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,18# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE19# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.20from __future__ import absolute_import21from barf.arch.x86 import X86RegisterOperand22# "Flag Control (EFLAG) Instructions"23# ============================================================================ #24def _translate_clc(self, tb, instruction):25    # Flags Affected26    # The CF flag is set to 0. The OF, ZF, SF, AF, and PF flags are27    # unaffected.28    self._flag_translator.clear_flag(tb, self._flags.cf)29def _translate_cld(self, tb, instruction):30    # Flags Affected31    # The DF flag is set to 0. The CF, OF, ZF, SF, AF, and PF flags32    # are unaffected.33    self._flag_translator.clear_flag(tb, self._flags.df)34def _translate_lahf(self, tb, instruction):35    # Operation36    # IF 64-Bit Mode37    #   THEN38    #       IF CPUID.80000001H:ECX.LAHF-SAHF[bit 0] = 1;39    #           THEN AH <- RFLAGS(SF:ZF:0:AF:0:PF:1:CF);40    #       ELSE #UD;41    #       FI;42    #   ELSE43    #       AH <- EFLAGS(SF:ZF:0:AF:0:PF:1:CF);44    # FI;45    # Flags Affected46    # None. The state of the flags in the EFLAGS register is not affected.47    oprnd0_x86 = __get_lahf_implicit_operand()48    dst = tb.temporal(8)49    tmp1 = tb.temporal(dst.size)50    shl_one = tb.immediate(1, 8)51    tb.add(self._builder.gen_str(tb.immediate(0, 8), dst))52    # Store SF.53    tb.add(self._builder.gen_str(self._flags.sf, tmp1))54    tb.add(self._builder.gen_or(tmp1, dst, dst))55    # Shift left.56    tb.add(self._builder.gen_bsh(dst, shl_one, dst))57    # Store ZF.58    tb.add(self._builder.gen_str(self._flags.zf, tmp1))59    tb.add(self._builder.gen_or(tmp1, dst, dst))60    # Shift left.61    tb.add(self._builder.gen_bsh(dst, shl_one, dst))62    # Store 0.63    tb.add(self._builder.gen_str(tb.immediate(0, 8), tmp1))64    tb.add(self._builder.gen_or(tmp1, dst, dst))65    # Shift left.66    tb.add(self._builder.gen_bsh(dst, shl_one, dst))67    # Store AF.68    tb.add(self._builder.gen_str(self._flags.af, tmp1))69    tb.add(self._builder.gen_or(tmp1, dst, dst))70    # Shift left.71    tb.add(self._builder.gen_bsh(dst, shl_one, dst))72    # Store 0.73    tb.add(self._builder.gen_str(tb.immediate(0, 8), tmp1))74    tb.add(self._builder.gen_or(tmp1, dst, dst))75    # Shift left.76    tb.add(self._builder.gen_bsh(dst, shl_one, dst))77    # Store PF.78    tb.add(self._builder.gen_str(self._flags.pf, tmp1))79    tb.add(self._builder.gen_or(tmp1, dst, dst))80    # Shift left.81    tb.add(self._builder.gen_bsh(dst, shl_one, dst))82    # Store 1.83    tb.add(self._builder.gen_str(tb.immediate(1, 8), tmp1))84    tb.add(self._builder.gen_or(tmp1, dst, dst))85    # Shift left.86    tb.add(self._builder.gen_bsh(dst, shl_one, dst))87    # Store CF.88    tb.add(self._builder.gen_str(self._flags.cf, tmp1))89    tb.add(self._builder.gen_or(tmp1, dst, dst))90    self._reg_acc_translator.write(tb, oprnd0_x86, dst)91def _translate_sahf(self, tb, instruction):92    # Flags Affected93    # The SF, ZF, AF, PF, and CF flags are loaded with values from94    # the AH register. Bits 1, 3, and 5 of the EFLAGS register are95    # unaffected, with the values remaining 1, 0, and 0,96    # respectively.97    oprnd0 = self._reg_acc_translator.read(tb, __get_sahf_implicit_operand())98    tmp0 = tb.temporal(oprnd0.size)99    tmp1 = tb.temporal(oprnd0.size)100    tmp2 = tb.temporal(oprnd0.size)101    tmp3 = tb.temporal(oprnd0.size)102    shl_two = tb.immediate(-2, 8)103    shl_one = tb.immediate(-1, 8)104    tb.add(self._builder.gen_str(oprnd0, self._flags.cf))105    tb.add(self._builder.gen_bsh(oprnd0, shl_two, tmp0))106    tb.add(self._builder.gen_str(tmp0, self._flags.pf))107    tb.add(self._builder.gen_bsh(tmp0, shl_two, tmp1))108    tb.add(self._builder.gen_str(tmp1, self._flags.af))109    tb.add(self._builder.gen_bsh(tmp1, shl_two, tmp2))110    tb.add(self._builder.gen_str(tmp2, self._flags.zf))111    tb.add(self._builder.gen_bsh(tmp2, shl_one, tmp3))112    tb.add(self._builder.gen_str(tmp3, self._flags.sf))113def _translate_popf(self, tb, instruction):114    # Flags Affected115    # All flags may be affected; see the Operation section for116    # details.117    tmp1 = tb.temporal(self._sp.size)118    shl_one = tb.immediate(-1, self._sp.size)119    shl_two = tb.immediate(-2, self._sp.size)120    shl_three = tb.immediate(-3, self._sp.size)121    tmp0 = tb.temporal(self._sp.size)122    tb.add(self._builder.gen_ldm(self._sp, tmp1))123    tb.add(self._builder.gen_add(self._sp, self._ws, tmp0))124    tb.add(self._builder.gen_str(tmp0, self._sp))125    tb.add(self._builder.gen_str(tmp1, self._flags.cf))126    tb.add(self._builder.gen_bsh(tmp1, shl_two, tmp1))127    tb.add(self._builder.gen_str(tmp1, self._flags.pf))128    tb.add(self._builder.gen_bsh(tmp1, shl_two, tmp1))129    tb.add(self._builder.gen_str(tmp1, self._flags.af))130    tb.add(self._builder.gen_bsh(tmp1, shl_two, tmp1))131    tb.add(self._builder.gen_str(tmp1, self._flags.zf))132    tb.add(self._builder.gen_bsh(tmp1, shl_one, tmp1))133    tb.add(self._builder.gen_str(tmp1, self._flags.sf))134    tb.add(self._builder.gen_bsh(tmp1, shl_three, tmp1))135    tb.add(self._builder.gen_str(tmp1, self._flags.df))136    tb.add(self._builder.gen_bsh(tmp1, shl_one, tmp1))137    tb.add(self._builder.gen_str(tmp1, self._flags.of))138def _translate_popfd(self, tb, instruction):139    # Flags Affected140    # None.141    _translate_popf(self, tb, instruction)142def _translate_popfq(self, tb, instruction):143    # Flags Affected144    # None.145    _translate_popf(self, tb, instruction)146def _translate_pushf(self, tb, instruction):147    # Flags Affected148    # None.149    tmp0 = tb.temporal(self._sp.size)150    tmp1 = tb.temporal(self._sp.size)151    shr_one = tb.immediate(1, self._sp.size)152    shr_two = tb.immediate(2, self._sp.size)153    shr_three = tb.immediate(3, self._sp.size)154    tb.add(self._builder.gen_str(self._flags.of, tmp1))155    tb.add(self._builder.gen_bsh(tmp1, shr_one, tmp1))156    tb.add(self._builder.gen_str(self._flags.df, tmp1))157    tb.add(self._builder.gen_bsh(tmp1, shr_three, tmp1))158    tb.add(self._builder.gen_str(self._flags.sf, tmp1))159    tb.add(self._builder.gen_bsh(tmp1, shr_one, tmp1))160    tb.add(self._builder.gen_str(self._flags.zf, tmp1))161    tb.add(self._builder.gen_bsh(tmp1, shr_two, tmp1))162    tb.add(self._builder.gen_str(self._flags.af, tmp1))163    tb.add(self._builder.gen_bsh(tmp1, shr_two, tmp1))164    tb.add(self._builder.gen_str(self._flags.pf, tmp1))165    tb.add(self._builder.gen_bsh(tmp1, shr_two, tmp1))166    tb.add(self._builder.gen_str(self._flags.cf, tmp1))167    tb.add(self._builder.gen_sub(self._sp, self._ws, tmp0))168    tb.add(self._builder.gen_str(tmp0, self._sp))169    tb.add(self._builder.gen_stm(tmp1, self._sp))170def _translate_pushfd(self, tb, instruction):171    # Flags Affected172    # None.173    _translate_pushf(self, tb, instruction)174def _translate_pushfq(self, tb, instruction):175    # Flags Affected176    # None.177    _translate_pushf(self, tb, instruction)178def _translate_stc(self, tb, instruction):179    # Flags Affected180    # The CF flag is set. The OF, ZF, SF, AF, and PF flags are181    # unaffected.182    self._flag_translator.set_flag(tb, self._flags.cf)183def _translate_std(self, tb, instruction):184    # Flags Affected185    # The DF flag is set. The CF, OF, ZF, SF, AF, and PF flags are186    # unaffected.187    self._flag_translator.set_flag(tb, self._flags.df)188# Auxiliary functions189# ============================================================================ #190def __get_lahf_implicit_operand():191    return X86RegisterOperand('ah', 8)192def __get_sahf_implicit_operand():193    return X86RegisterOperand('ah', 8)194dispatcher = {195    'clc': _translate_clc,196    'cld': _translate_cld,197    'lahf': _translate_lahf,198    'popf': _translate_popf,199    'popfd': _translate_popfd,200    'popfq': _translate_popfq,201    'pushf': _translate_pushf,202    'pushfd': _translate_pushfd,203    'pushfq': _translate_pushfq,204    'sahf': _translate_sahf,205    'stc': _translate_stc,206    'std': _translate_std,...builder.py
Source:builder.py  
1#!/usr/bin/env python32#3# Copyright 2018 | Dario Ostuni <dario.ostuni@gmail.com>4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#     http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16import json17from .wytypes import Variable, Array, Constant, IoType, DataType18from .util import UnreachableError19from copy import deepcopy20class ProgramBuilder:21    def __init__(self):22        self._program = {}23        self._program["symbol"] = {}24        self._program["operation"] = []25        self._program["storage"] = {}26        self._program["input"] = {}27        self._program["output"] = {}28        self._label_id = 129        self._token_id = 030        self._stack = [[]]31    def newContext(self):32        ctx = Context()33        ctx.__dict__["_builder"] = self34        ctx.__dict__["_var"] = {}35        return ctx36    def finalize(self):37        assert len(self._stack) == 138        program = deepcopy(self._program)39        program["operation"].extend(deepcopy(self._stack[0]))40        return json.dumps(program)41    def _next_token_id(self):42        tid = self._token_id43        self._token_id += 144        return tid45    def _next_label_id(self):46        lid = self._label_id47        self._label_id += 148        return lid49    def _new_constant(self, ty):50        tid = self._next_token_id()51        self._program["symbol"][str(tid)] = {"Constant": ty.value}52        return tid53    def _new_variable(self, ty):54        tid = self._next_token_id()55        self._program["symbol"][str(tid)] = {"Variable": ty.value}56        return tid57    def _new_array(self, ty):58        tid = self._next_token_id()59        self._program["symbol"][str(tid)] = {"Array": ty.value}60        return tid61    def _add_command(self, cmd):62        self._stack[-1].append(cmd)63    def _push_stack(self):64        self._stack.append([])65    def _pop_stack(self):66        return self._stack.pop()67class Context:68    def If(self, cond, body):69        self._builder._push_stack()70        condition = cond()71        if type(condition) != Constant or condition._ty != DataType.bool:72            raise TypeError73        cond = self._builder._pop_stack()74        self._builder._push_stack()75        body()76        body = self._builder._pop_stack()77        self._builder._add_command({78            "If": [cond, condition._tid, self._builder._next_label_id(),79                   body, self._builder._next_label_id()]80        })81    def IfElse(self, cond, body1, body2):82        self._builder._push_stack()83        condition = cond()84        if type(condition) != Constant or condition._ty != DataType.bool:85            raise TypeError86        cond = self._builder._pop_stack()87        self._builder._push_stack()88        body1()89        body1 = self._builder._pop_stack()90        self._builder._push_stack()91        body2()92        body2 = self._builder._pop_stack()93        self._builder._add_command({94            "IfElse": [cond, condition._tid, self._builder._next_label_id(),95                       body1, self._builder._next_label_id(),96                       body2, self._builder._next_label_id()]97        })98    def While(self, cond, body):99        self._builder._push_stack()100        condition = cond()101        if type(condition) != Constant or condition._ty != DataType.bool:102            raise TypeError103        cond = self._builder._pop_stack()104        self._builder._push_stack()105        body()106        body = self._builder._pop_stack()107        self._builder._add_command({108            "While": [self._builder._next_label_id(), cond, condition._tid,109                      self._builder._next_label_id(), body,110                      self._builder._next_label_id()]111        })112    def workerId(self):113        wid = Constant._new_constant(self, DataType.uint32)114        self._builder._add_command({115            "WorkerId": wid._tid116        })117        return wid118    def numWorkers(self):119        wnum = Constant._new_constant(self, DataType.uint32)120        self._builder._add_command({121            "NumWorkers": wnum._tid122        })123        return wnum124    def getProgramBuilder(self):125        return self._builder126    def declVariable(self, name, ty, io_type):127        if name in self._var:128            raise NameError129        tid = self._new_variable(ty, name)130        if io_type == IoType.private:131            pass132        elif io_type in (IoType.input, IoType.output):133            self._builder._program[io_type.value][name] = tid134        else:135            raise UnreachableError136    def declArray(self, name, ty, io_type, size, max_size=0):137        size = self._sanitize(size)138        if name in self._var:139            raise NameError140        if type(size) != Constant or size._ty != DataType.uint32:141            raise TypeError142        tid = self._new_array(ty, name)143        self._builder._add_command({144            "ArrayNew": [tid, size._tid, ty.value,145                         max_size, io_type != IoType.private]146        })147        if io_type in (IoType.input, IoType.output):148            array_type = "SharedArray"149            self._builder._program[io_type.value][name] = tid150        else:151            array_type = "PrivateArray"152        self._builder._program["storage"][str(tid)] = {153            array_type: [ty.value, max_size]154        }155    def _get_array_object(self, name):156        array = Array()157        array._ctx = self158        array._ty = self._var[name][1]159        array._name = name160        array._tid = self._var[name][0]161        return array162    def _new_constant(self, ty):163        return self._builder._new_constant(ty)164    def _new_variable(self, ty, name):165        tid = self._builder._new_variable(ty)166        self._var[name] = (tid, ty, Variable)167        self._builder._program["storage"][str(tid)] = {168            "Variable": ty.value169        }170        return tid171    def _new_array(self, ty, name):172        tid = self._builder._new_array(ty)173        self._var[name] = (tid, ty, Array)174        return tid175    def _get(self, name):176        if name not in self._var:177            return None178        tid, ty, pyty = self._var[name]179        if pyty == Variable:180            const = Constant._new_constant(self, ty)181            self._builder._add_command({"Load": [const._tid, tid]})182            return const183        elif pyty == Array:184            return self._get_array_object(name)185        raise UnreachableError186    def _sanitize(self, value):187        if type(value) == Constant:188            pass189        elif type(value) == int:190            if value >= 0:191                value = Constant.uint32(value, self)192            else:193                value = Constant.int32(value, self)194        elif type(value) == float:195            value = Constant.float32(value, self)196        elif type(value) == bool:197            value = Constant.bool(value, self)198        else:199            raise TypeError200        return value201    def _set(self, name, value):202        value = self._sanitize(value)203        if name in self._var:204            assert value._ty == self._var[name][1]205            self._builder._add_command({206                "Store": [self._var[name][0], value._tid]207            })208        else:209            var = self._new_variable(value._ty, name)210            self._builder._add_command({"Store": [var, value._tid]})211    def __setattr__(self, name, value):212        self._set(name, value)213    def __getattr__(self, name):214        value = self._get(name)215        if value is None:216            raise AttributeError217        return value218    def __setitem__(self, key, value):219        self._set(key, value)220    def __getitem__(self, key):221        value = self._get(key)222        if value is None:223            raise KeyError...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
