Best Python code snippet using localstack_python
absorb.py
Source:absorb.py  
...45                n.op_type == "Add"46                and not model.is_fork_node(n)47                and not model.is_join_node(n)48            ):49                consumer = model.find_consumer(n.output[0])50                if consumer is not None and consumer.op_type == "MultiThreshold":51                    add_weight_name = n.input[1]52                    threshold_name = consumer.input[1]53                    A = model.get_initializer(add_weight_name)54                    T = model.get_initializer(threshold_name)55                    assert A is not None, "Initializer for add weights is not set."56                    assert T is not None, "Initializer for thresholds is not set."57                    start_name = n.input[0]58                    # we can only absorb 0d or 1d adds59                    is_scalar = A.ndim == 0 or all(x == 1 for x in A.shape)60                    actual_ndims = len(tuple(filter(lambda x: x > 1, A.shape)))61                    is_1d = actual_ndims == 162                    if is_scalar or is_1d:63                        Tnew = T - A.reshape(-1, 1)64                        # Tnew = T - A.reshape(-1, T.shape[1])65                        # compute new thresholds and set initializer66                        model.set_initializer(threshold_name, Tnew)67                        # wire add input directly to MultiThreshold68                        consumer.input[0] = start_name69                        # remove the add node70                        graph.node.remove(n)71                        graph_modified = True72        return (model, graph_modified)73class AbsorbMulIntoMultiThreshold(Transformation):74    """Absorb preceding Mul ops into MultiThreshold by updating the threshold75    values. Only *positive* scalar/1D mul vectors can be absorbed."""76    def apply(self, model):77        graph = model.graph78        node_ind = 079        graph_modified = False80        for n in graph.node:81            node_ind += 182            if (83                n.op_type == "Mul"84                and not model.is_fork_node(n)85                and not model.is_join_node(n)86            ):87                mul_weight_name = n.input[1]88                A = model.get_initializer(mul_weight_name)89                assert A is not None, "Initializer for mul weights is not set."90                is_signed = (A < 0).any()91                is_scalar = A.ndim == 0 or all(x == 1 for x in A.shape)92                actual_ndims = len(tuple(filter(lambda x: x > 1, A.shape)))93                is_1d = actual_ndims == 194                consumer = model.find_consumer(n.output[0])95                if consumer is not None and consumer.op_type == "MultiThreshold":96                    if not is_signed and (is_1d or is_scalar):97                        threshold_name = consumer.input[1]98                        T = model.get_initializer(threshold_name)99                        assert T is not None, "Initializer for thresholds is not set."100                        start_name = n.input[0]101                        # compute new thresholds and set initializer102                        Tnew = T / A.reshape(-1, 1)103                        Tnew[Tnew == np.inf] = np.finfo(np.float32).max104                        Tnew[Tnew == -np.inf] = np.finfo(np.float32).min105                        # TODO: need to handle negative A values correctly; produce106                        # mul sign mask and merge into preceding matmul?107                        model.set_initializer(threshold_name, Tnew)108                        # wire add input directly to MultiThreshold109                        consumer.input[0] = start_name110                        # remove the mul node111                        graph.node.remove(n)112                        graph_modified = True113        return (model, graph_modified)114class FactorOutMulSignMagnitude(Transformation):115    """Split multiply-by-constant nodes into two multiply-by-constant nodes,116    where the first node is a bipolar vector (of signs) and the second is a117    vector of magnitudes."""118    def apply(self, model):119        graph = model.graph120        node_ind = 0121        graph_modified = False122        for n in graph.node:123            node_ind += 1124            if n.op_type == "Mul":125                mul_weight_name = n.input[1]126                A = model.get_initializer(mul_weight_name)127                assert A is not None, "Initializer for mul weights is not set."128                is_scalar = np.prod(A.shape) == 1129                actual_ndims = len(tuple(filter(lambda x: x > 1, A.shape)))130                is_1d = actual_ndims == 1131                is_not_bipolar = (132                    model.get_tensor_datatype(mul_weight_name) != DataType.BIPOLAR133                )134                is_signed = (A < 0).any()135                if is_signed and (is_scalar or is_1d) and is_not_bipolar:136                    start_name = n.input[0]137                    in_shape = model.get_tensor_shape(start_name)138                    middle_name = model.make_new_valueinfo_name()139                    model.set_tensor_shape(middle_name, in_shape)140                    sign_mul_param_name = model.make_new_valueinfo_name()141                    # create new mul node with sign(A) as the operand142                    sgn = np.sign(A)143                    model.set_initializer(sign_mul_param_name, sgn)144                    model.set_tensor_datatype(sign_mul_param_name, DataType.BIPOLAR)145                    # replace original mul weight by magnitudes146                    model.set_initializer(mul_weight_name, np.abs(A))147                    new_mul = oh.make_node(148                        "Mul", [start_name, sign_mul_param_name], [middle_name]149                    )150                    n.input[0] = middle_name151                    graph.node.insert(node_ind - 1, new_mul)152                    graph_modified = True153        return (model, graph_modified)154class Absorb1BitMulIntoMatMul(Transformation):155    """Absorb bipolar or binary multiplications into the preciding matrix156    multiply."""157    def apply(self, model):158        graph = model.graph159        node_ind = 0160        graph_modified = False161        for n in graph.node:162            node_ind += 1163            if n.op_type == "MatMul":164                matmul_weight_name = n.input[1]165                W = model.get_initializer(matmul_weight_name)166                Wdt = model.get_tensor_datatype(matmul_weight_name)167                assert W is not None, "Initializer for matmul weights is not set."168                consumer = model.find_consumer(n.output[0])169                if consumer is not None and consumer.op_type == "Mul":170                    mul_weight_name = consumer.input[1]171                    A = model.get_initializer(mul_weight_name)172                    assert A is not None, "Initializer for mul weights is not set."173                    is_1bit = model.get_tensor_datatype(mul_weight_name).bitwidth() == 1174                    if is_1bit:175                        Wnew = A * W176                        assert (177                            Wnew.shape == W.shape178                        ), """Shape of new weights is not179                        the same as the shape of the weight matrix before."""180                        check_fxn = np.vectorize(lambda x: Wdt.allowed(x))181                        # only absorb if permitted by W datatype182                        if check_fxn(Wnew).all():183                            model.set_initializer(matmul_weight_name, Wnew)184                            n.output[0] = consumer.output[0]185                            graph.node.remove(consumer)186                            graph_modified = True187        return (model, graph_modified)188class Absorb1BitMulIntoConv(Transformation):189    """Absorb bipolar or binary multiplications into the preciding convolution."""190    def apply(self, model):191        graph = model.graph192        node_ind = 0193        graph_modified = False194        for n in graph.node:195            node_ind += 1196            if n.op_type == "Conv":197                conv_weight_name = n.input[1]198                W = model.get_initializer(conv_weight_name)199                Wdt = model.get_tensor_datatype(conv_weight_name)200                assert W is not None, "Initializer for conv weights is not set."201                consumer = model.find_consumer(n.output[0])202                if consumer is not None and consumer.op_type == "Mul":203                    mul_weight_name = consumer.input[1]204                    A = model.get_initializer(mul_weight_name)205                    assert A is not None, "Initializer for mul weights is not set."206                    is_1bit = model.get_tensor_datatype(mul_weight_name).bitwidth() == 1207                    is_scalar = np.prod(A.shape) == 1208                    actual_ndims = len(tuple(filter(lambda x: x > 1, A.shape)))209                    is_1d = actual_ndims == 1210                    if is_1bit and (is_1d or is_scalar):211                        # move the mul to the OFM position, since the mul is212                        # applied on the outputs channelwise or as scalar213                        Wnew = A.reshape(-1, 1, 1, 1) * W214                        assert (215                            Wnew.shape == W.shape216                        ), """Shape of new weights is not217                        the same as the shape of the conv weights before."""218                        check_fxn = np.vectorize(lambda x: Wdt.allowed(x))219                        # only absorb if permitted by W datatype220                        if check_fxn(Wnew).all():221                            model.set_initializer(conv_weight_name, Wnew)222                            n.output[0] = consumer.output[0]223                            graph.node.remove(consumer)224                            graph_modified = True225        return (model, graph_modified)226class AbsorbTransposeIntoMultiThreshold(Transformation):227    """Change (NHWCTranpose -> MultiThreshold -> NCHWTranspose) to (MultiThreshold)228    with NHWC mode."""229    def apply(self, model):230        graph = model.graph231        node_ind = 0232        graph_modified = False233        for n in graph.node:234            node_ind += 1235            if n.op_type == "Transpose":236                perms = list(get_by_name(n.attribute, "perm").ints)237                if perms == [0, 3, 1, 2]:238                    mt_cand = model.find_consumer(n.output[0])239                    if mt_cand.op_type == "MultiThreshold":240                        final_t_cand = model.find_consumer(mt_cand.output[0])241                        if final_t_cand.op_type == "Transpose":242                            perms = list(243                                get_by_name(final_t_cand.attribute, "perm").ints244                            )245                            if perms == [0, 2, 3, 1]:246                                mt = getCustomOp(mt_cand)247                                mt.set_nodeattr("data_layout", "NHWC")248                                # get rid of tranpose nodes, wire MT directly249                                mt_cand.input[0] = n.input[0]250                                mt_cand.output[0] = final_t_cand.output[0]251                                graph.node.remove(n)252                                graph.node.remove(final_t_cand)253                                graph_modified = True254                        elif final_t_cand.op_type == "Reshape":...move_reshape.py
Source:move_reshape.py  
...30                if len(oshape) == 2 and ishape[0] == oshape[0]:31                    producer = model.find_producer(n.input[0])32                    if _is_fpgadataflow_node(producer) is True:33                        # standalone flatten, remove34                        consumer = model.find_consumer(n.output[0])35                        if _is_fpgadataflow_node(consumer) is True:36                            graph_modified = True37                            consumer.input[0] = n.input[0]38                            graph.node.remove(n)39                    elif producer.op_type == "Transpose":40                        # transpose + flatten, absorb into following node41                        transp_node = producer42                        # check if transpose converts NHWC to NCHW43                        perms = list(get_by_name(transp_node.attribute, "perm").ints)44                        if perms == [0, 3, 1, 2]:45                            producer = model.find_producer(transp_node.input[0])46                            if _is_fpgadataflow_node(producer) is True:47                                consumer = model.find_consumer(n.output[0])48                                if consumer.op_type == "StreamingFCLayer_Batch":49                                    fc_inst = getCustomOp(consumer)50                                    mw = fc_inst.get_nodeattr("MW")51                                    mh = fc_inst.get_nodeattr("MH")52                                    (b, h, w, c) = model.get_tensor_shape(53                                        transp_node.input[0]54                                    )55                                    # absorb transpose into weight matrix,56                                    # allowing FC layer to operate on the NHWC input57                                    W = model.get_initializer(consumer.input[1])58                                    assert (59                                        W is not None60                                    ), "Initializer for matmul weights is not set."61                                    W_new = W.reshape(c, h, w, mh)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
