Best Python code snippet using avocado_python
fault_injection_test.py
Source:fault_injection_test.py  
1# Copyright 2021 The gRPC Authors2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import logging15import time16from typing import Tuple17from absl import flags18from absl.testing import absltest19import grpc20from framework import xds_url_map_testcase21from framework.rpc import grpc_testing22from framework.test_app import client_app23# Type aliases24HostRule = xds_url_map_testcase.HostRule25PathMatcher = xds_url_map_testcase.PathMatcher26GcpResourceManager = xds_url_map_testcase.GcpResourceManager27DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig28RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall29RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall30XdsTestClient = client_app.XdsTestClient31ExpectedResult = xds_url_map_testcase.ExpectedResult32logger = logging.getLogger(__name__)33flags.adopt_module_key_flags(xds_url_map_testcase)34# The first batch of RPCs don't count towards the result of test case. They are35# meant to prove the communication between driver and client is fine.36_NUM_RPCS = 1037_LENGTH_OF_RPC_SENDING_SEC = 1638# We are using sleep to synchronize test driver and the client... Even though39# the client is sending at QPS rate, we can't assert that exactly QPS *40# SLEEP_DURATION number of RPC is finished. The final completed RPC might be41# slightly more or less.42_NON_RANDOM_ERROR_TOLERANCE = 0.0143# For random generator involved test cases, we want to be more loose about the44# final result. Otherwise, we will need more test duration (sleep duration) and45# more accurate communication mechanism. The accurate of random number46# generation is not the intention of this test.47_ERROR_TOLERANCE = 0.248_DELAY_CASE_APPLICATION_TIMEOUT_SEC = 149_BACKLOG_WAIT_TIME_SEC = 2050def _build_fault_injection_route_rule(abort_percentage: int = 0,51                                      delay_percentage: int = 0):52    return {53        'priority': 0,54        'matchRules': [{55            'fullPathMatch': '/grpc.testing.TestService/UnaryCall'56        }],57        'service': GcpResourceManager().default_backend_service(),58        'routeAction': {59            'faultInjectionPolicy': {60                'abort': {61                    'httpStatus': 401,62                    'percentage': abort_percentage,63                },64                'delay': {65                    'fixedDelay': {66                        'seconds': '20'67                    },68                    'percentage': delay_percentage,69                }70            }71        },72    }73def _wait_until_backlog_cleared(test_client: XdsTestClient,74                                timeout: int = _BACKLOG_WAIT_TIME_SEC):75    """ Wait until the completed RPC is close to started RPC.76    For delay injected test cases, there might be a backlog of RPCs due to slow77    initialization of the client. E.g., if initialization took 20s and qps is78    25, then there will be a backlog of 500 RPCs. In normal test cases, this is79    fine, because RPCs will fail immediately. But for delay injected test cases,80    the RPC might linger much longer and affect the stability of test results.81    """82    logger.info('Waiting for RPC backlog to clear for %d seconds', timeout)83    deadline = time.time() + timeout84    while time.time() < deadline:85        stats = test_client.get_load_balancer_accumulated_stats()86        ok = True87        for rpc_type in [RpcTypeUnaryCall, RpcTypeEmptyCall]:88            started = stats.num_rpcs_started_by_method.get(rpc_type, 0)89            completed = stats.num_rpcs_succeeded_by_method.get(90                rpc_type, 0) + stats.num_rpcs_failed_by_method.get(rpc_type, 0)91            # We consider the backlog is healthy, if the diff between started92            # RPCs and completed RPCs is less than 1.5 QPS.93            if abs(started - completed) > xds_url_map_testcase.QPS.value * 1.1:94                logger.info(95                    'RPC backlog exist: rpc_type=%s started=%s completed=%s',96                    rpc_type, started, completed)97                time.sleep(_DELAY_CASE_APPLICATION_TIMEOUT_SEC)98                ok = False99            else:100                logger.info(101                    'RPC backlog clear: rpc_type=%s started=%s completed=%s',102                    rpc_type, started, completed)103        if ok:104            # Both backlog of both types of RPCs is clear, success, return.105            return106    raise RuntimeError('failed to clear RPC backlog in %s seconds', timeout)107class TestZeroPercentFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):108    @staticmethod109    def url_map_change(110            host_rule: HostRule,111            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:112        path_matcher["routeRules"] = [113            _build_fault_injection_route_rule(abort_percentage=0,114                                              delay_percentage=0)115        ]116        return host_rule, path_matcher117    def xds_config_validate(self, xds_config: DumpedXdsConfig):118        self.assertNumEndpoints(xds_config, 1)119        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][120            'typedPerFilterConfig']['envoy.filters.http.fault']121        self.assertEqual('20s', filter_config['delay']['fixedDelay'])122        self.assertEqual(123            0, filter_config['delay']['percentage'].get('numerator', 0))124        self.assertEqual('MILLION',125                         filter_config['delay']['percentage']['denominator'])126        self.assertEqual(401, filter_config['abort']['httpStatus'])127        self.assertEqual(128            0, filter_config['abort']['percentage'].get('numerator', 0))129        self.assertEqual('MILLION',130                         filter_config['abort']['percentage']['denominator'])131    def rpc_distribution_validate(self, test_client: XdsTestClient):132        rpc_distribution = self.configure_and_send(test_client,133                                                   rpc_types=[RpcTypeUnaryCall],134                                                   num_rpcs=_NUM_RPCS)135        self.assertRpcStatusCode(test_client,136                                 expected=(ExpectedResult(137                                     rpc_type=RpcTypeUnaryCall,138                                     status_code=grpc.StatusCode.OK,139                                     ratio=1),),140                                 length=_LENGTH_OF_RPC_SENDING_SEC,141                                 tolerance=_NON_RANDOM_ERROR_TOLERANCE)142class TestNonMatchingFaultInjection(xds_url_map_testcase.XdsUrlMapTestCase):143    """EMPTY_CALL is not fault injected, so it should succeed."""144    @staticmethod145    def client_init_config(rpc: str, metadata: str):146        # Python interop client will stuck if the traffic is slow (in this case,147        # 20s injected). The purpose of this test is examining the un-injected148        # traffic is not impacted, so it's fine to just send un-injected149        # traffic.150        return 'EmptyCall', metadata151    @staticmethod152    def url_map_change(153            host_rule: HostRule,154            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:155        path_matcher["routeRules"] = [156            _build_fault_injection_route_rule(abort_percentage=100,157                                              delay_percentage=100)158        ]159        return host_rule, path_matcher160    def xds_config_validate(self, xds_config: DumpedXdsConfig):161        self.assertNumEndpoints(xds_config, 1)162        # The first route rule for UNARY_CALL is fault injected163        self.assertEqual(164            "/grpc.testing.TestService/UnaryCall",165            xds_config.rds['virtualHosts'][0]['routes'][0]['match']['path'])166        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][167            'typedPerFilterConfig']['envoy.filters.http.fault']168        self.assertEqual('20s', filter_config['delay']['fixedDelay'])169        self.assertEqual(1000000,170                         filter_config['delay']['percentage']['numerator'])171        self.assertEqual('MILLION',172                         filter_config['delay']['percentage']['denominator'])173        self.assertEqual(401, filter_config['abort']['httpStatus'])174        self.assertEqual(1000000,175                         filter_config['abort']['percentage']['numerator'])176        self.assertEqual('MILLION',177                         filter_config['abort']['percentage']['denominator'])178        # The second route rule for all other RPCs is untouched179        self.assertNotIn(180            'envoy.filters.http.fault',181            xds_config.rds['virtualHosts'][0]['routes'][1].get(182                'typedPerFilterConfig', {}))183    def rpc_distribution_validate(self, test_client: XdsTestClient):184        self.assertRpcStatusCode(test_client,185                                 expected=(ExpectedResult(186                                     rpc_type=RpcTypeEmptyCall,187                                     status_code=grpc.StatusCode.OK,188                                     ratio=1),),189                                 length=_LENGTH_OF_RPC_SENDING_SEC,190                                 tolerance=_NON_RANDOM_ERROR_TOLERANCE)191@absltest.skip('20% RPC might pass immediately, reason unknown')192class TestAlwaysDelay(xds_url_map_testcase.XdsUrlMapTestCase):193    @staticmethod194    def url_map_change(195            host_rule: HostRule,196            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:197        path_matcher["routeRules"] = [198            _build_fault_injection_route_rule(abort_percentage=0,199                                              delay_percentage=100)200        ]201        return host_rule, path_matcher202    def xds_config_validate(self, xds_config: DumpedXdsConfig):203        self.assertNumEndpoints(xds_config, 1)204        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][205            'typedPerFilterConfig']['envoy.filters.http.fault']206        self.assertEqual('20s', filter_config['delay']['fixedDelay'])207        self.assertEqual(1000000,208                         filter_config['delay']['percentage']['numerator'])209        self.assertEqual('MILLION',210                         filter_config['delay']['percentage']['denominator'])211    def rpc_distribution_validate(self, test_client: XdsTestClient):212        rpc_distribution = self.configure_and_send(213            test_client,214            rpc_types=[RpcTypeUnaryCall],215            num_rpcs=_NUM_RPCS,216            app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)217        _wait_until_backlog_cleared(test_client)218        self.assertRpcStatusCode(219            test_client,220            expected=(ExpectedResult(221                rpc_type=RpcTypeUnaryCall,222                status_code=grpc.StatusCode.DEADLINE_EXCEEDED,223                ratio=1),),224            length=_LENGTH_OF_RPC_SENDING_SEC,225            tolerance=_NON_RANDOM_ERROR_TOLERANCE)226class TestAlwaysAbort(xds_url_map_testcase.XdsUrlMapTestCase):227    @staticmethod228    def url_map_change(229            host_rule: HostRule,230            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:231        path_matcher["routeRules"] = [232            _build_fault_injection_route_rule(abort_percentage=100,233                                              delay_percentage=0)234        ]235        return host_rule, path_matcher236    def xds_config_validate(self, xds_config: DumpedXdsConfig):237        self.assertNumEndpoints(xds_config, 1)238        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][239            'typedPerFilterConfig']['envoy.filters.http.fault']240        self.assertEqual(401, filter_config['abort']['httpStatus'])241        self.assertEqual(1000000,242                         filter_config['abort']['percentage']['numerator'])243        self.assertEqual('MILLION',244                         filter_config['abort']['percentage']['denominator'])245    def rpc_distribution_validate(self, test_client: XdsTestClient):246        rpc_distribution = self.configure_and_send(test_client,247                                                   rpc_types=[RpcTypeUnaryCall],248                                                   num_rpcs=_NUM_RPCS)249        self.assertRpcStatusCode(250            test_client,251            expected=(ExpectedResult(252                rpc_type=RpcTypeUnaryCall,253                status_code=grpc.StatusCode.UNAUTHENTICATED,254                ratio=1),),255            length=_LENGTH_OF_RPC_SENDING_SEC,256            tolerance=_NON_RANDOM_ERROR_TOLERANCE)257class TestDelayHalf(xds_url_map_testcase.XdsUrlMapTestCase):258    @staticmethod259    def url_map_change(260            host_rule: HostRule,261            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:262        path_matcher["routeRules"] = [263            _build_fault_injection_route_rule(abort_percentage=0,264                                              delay_percentage=50)265        ]266        return host_rule, path_matcher267    def xds_config_validate(self, xds_config: DumpedXdsConfig):268        self.assertNumEndpoints(xds_config, 1)269        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][270            'typedPerFilterConfig']['envoy.filters.http.fault']271        self.assertEqual('20s', filter_config['delay']['fixedDelay'])272        self.assertEqual(500000,273                         filter_config['delay']['percentage']['numerator'])274        self.assertEqual('MILLION',275                         filter_config['delay']['percentage']['denominator'])276    def rpc_distribution_validate(self, test_client: XdsTestClient):277        rpc_distribution = self.configure_and_send(278            test_client,279            rpc_types=[RpcTypeUnaryCall],280            num_rpcs=_NUM_RPCS,281            app_timeout=_DELAY_CASE_APPLICATION_TIMEOUT_SEC)282        _wait_until_backlog_cleared(test_client)283        self.assertRpcStatusCode(284            test_client,285            expected=(ExpectedResult(286                rpc_type=RpcTypeUnaryCall,287                status_code=grpc.StatusCode.DEADLINE_EXCEEDED,288                ratio=0.5),),289            length=_LENGTH_OF_RPC_SENDING_SEC,290            tolerance=_ERROR_TOLERANCE)291class TestAbortHalf(xds_url_map_testcase.XdsUrlMapTestCase):292    @staticmethod293    def url_map_change(294            host_rule: HostRule,295            path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:296        path_matcher["routeRules"] = [297            _build_fault_injection_route_rule(abort_percentage=50,298                                              delay_percentage=0)299        ]300        return host_rule, path_matcher301    def xds_config_validate(self, xds_config: DumpedXdsConfig):302        self.assertNumEndpoints(xds_config, 1)303        filter_config = xds_config.rds['virtualHosts'][0]['routes'][0][304            'typedPerFilterConfig']['envoy.filters.http.fault']305        self.assertEqual(401, filter_config['abort']['httpStatus'])306        self.assertEqual(500000,307                         filter_config['abort']['percentage']['numerator'])308        self.assertEqual('MILLION',309                         filter_config['abort']['percentage']['denominator'])310    def rpc_distribution_validate(self, test_client: XdsTestClient):311        rpc_distribution = self.configure_and_send(test_client,312                                                   rpc_types=[RpcTypeUnaryCall],313                                                   num_rpcs=_NUM_RPCS)314        self.assertRpcStatusCode(315            test_client,316            expected=(ExpectedResult(317                rpc_type=RpcTypeUnaryCall,318                status_code=grpc.StatusCode.UNAUTHENTICATED,319                ratio=0.5),),320            length=_LENGTH_OF_RPC_SENDING_SEC,321            tolerance=_ERROR_TOLERANCE)322if __name__ == '__main__':...modsegnet.py
Source:modsegnet.py  
1import torch2import torch.nn as nn3import torch.nn.functional as F4class ModSegNet(nn.Module):5    """ModSegNet: Inspired from SegNet with small improvements6    Args:7        num_classes (int): number of classes to segment8        n_init_features (int): number of input features in the fist convolution9        drop_rate (float): dropout rate of the last two encoders10        filter_config (list of 5 ints): number of output features at each level11    """12    def __init__(self, num_classes, n_init_features=3, drop_rate=0.5,13                 filter_config=(32, 64, 128, 256)):14        super(ModSegNet, self).__init__()15        self.encoder1 = _Encoder(n_init_features, filter_config[0])16        self.encoder2 = _Encoder(filter_config[0], filter_config[1])17        self.encoder3 = _Encoder(filter_config[1], filter_config[2], drop_rate)18        self.encoder4 = _Encoder(filter_config[2], filter_config[3], drop_rate)19        self.decoder1 = _Decoder(filter_config[3], filter_config[2])20        self.decoder2 = _Decoder(filter_config[2], filter_config[1])21        self.decoder3 = _Decoder(filter_config[1], filter_config[0])22        self.decoder4 = _Decoder(filter_config[0], filter_config[0])23        # final classifier (equivalent to a fully connected layer)24        self.classifier = nn.Conv2d(filter_config[0], num_classes, 1)25        # init weights26        for m in self.modules():27            if isinstance(m, nn.Conv2d):28                nn.init.xavier_normal(m.weight)29            elif isinstance(m, nn.BatchNorm2d):30                nn.init.constant(m.weight, 1)31                nn.init.constant(m.bias, 0)32    def forward(self, x):33        feat_encoder_1 = self.encoder1(x)34        size_2 = feat_encoder_1.size()35        feat_encoder_2, ind_2 = F.max_pool2d(feat_encoder_1, 2, 2,36                                             return_indices=True)37        feat_encoder_2 = self.encoder2(feat_encoder_2)38        size_3 = feat_encoder_2.size()39        feat_encoder_3, ind_3 = F.max_pool2d(feat_encoder_2, 2, 2,40                                             return_indices=True)41        feat_encoder_3 = self.encoder3(feat_encoder_3)42        size_4 = feat_encoder_3.size()43        feat_encoder_4, ind_4 = F.max_pool2d(feat_encoder_3, 2, 2,44                                             return_indices=True)45        feat_encoder_4 = self.encoder4(feat_encoder_4)46        size_5 = feat_encoder_4.size()47        feat_encoder_5, ind_5 = F.max_pool2d(feat_encoder_4, 2, 2,48                                             return_indices=True)49        feat_decoder = self.decoder1(feat_encoder_5, feat_encoder_4, ind_5, size_5)50        feat_decoder = self.decoder2(feat_decoder, feat_encoder_3, ind_4, size_4)51        feat_decoder = self.decoder3(feat_decoder, feat_encoder_2, ind_3, size_3)52        feat_decoder = self.decoder4(feat_decoder, feat_encoder_1, ind_2, size_2)53        return self.classifier(feat_decoder)54class _Encoder(nn.Module):55    """Encoder layer encodes the features along the contracting path (left side).56    Args:57        n_in_feat (int): number of input features58        n_out_feat (int): number of output features59        drop_rate (float): dropout rate at the end of the block60    """61    def __init__(self, n_in_feat, n_out_feat, drop_rate=0):62        super(_Encoder, self).__init__()63        layers = [nn.Conv2d(n_in_feat, n_out_feat, 3, 1, 1),64                  nn.ReLU(inplace=True),65                  nn.Conv2d(n_out_feat, n_out_feat, 3, 1, 1),66                  nn.ReLU(inplace=True)]67        if drop_rate > 0:68            layers += [nn.Dropout(drop_rate)]69        self.features = nn.Sequential(*layers)70    def forward(self, x):71        return self.features(x)72class _Decoder(nn.Module):73    """Decoder layer decodes the features by performing deconvolutions and74    concatenating the resulting features with cropped features from the75    corresponding encoder (skip-connections). Encoder features are cropped76    because convolution operations does not allow to recover the same77    resolution in the expansive path.78    Args:79        n_in_feat (int): number of input features80        n_out_feat (int): number of output features81    """82    def __init__(self, n_in_feat, n_out_feat):83        super(_Decoder, self).__init__()84        self.encoder = _Encoder(n_in_feat * 2, n_out_feat)85    def forward(self, x, feat_encoder, indices, size):86        unpooled = F.max_unpool2d(x, indices, 2, 2, 0, size)87        feat = torch.cat([unpooled, feat_encoder], 1)88        feat = self.encoder(feat)...unet.py
Source:unet.py  
1import torch2import torch.nn as nn3import torch.nn.functional as F4class UNet(nn.Module):5    """U-Net: Convolutional Networks for Biomedical Image Segmentation6       based on https://arxiv.org/abs/1505.045977    Args:8        num_classes (int): number of classes to segment9        n_init_features (int): number of input features in the fist convolution10        drop_rate (float): dropout rate of the last two encoders11        filter_config (list of 5 ints): number of output features at each level12    """13    def __init__(self, num_classes, n_init_features=1, drop_rate=0.5,14                 filter_config=(64, 128, 256, 512, 1024)):15        super(UNet, self).__init__()16        self.encoder1 = _Encoder(n_init_features, filter_config[0])17        self.encoder2 = _Encoder(filter_config[0], filter_config[1])18        self.encoder3 = _Encoder(filter_config[1], filter_config[2])19        self.encoder4 = _Encoder(filter_config[2], filter_config[3], drop_rate)20        self.encoder5 = _Encoder(filter_config[3], filter_config[4], drop_rate)21        self.decoder1 = _Decoder(filter_config[4], filter_config[3])22        self.decoder2 = _Decoder(filter_config[3], filter_config[2])23        self.decoder3 = _Decoder(filter_config[2], filter_config[1])24        self.decoder4 = _Decoder(filter_config[1], filter_config[0])25        # final classifier (equivalent to a fully connected layer)26        self.classifier = nn.Conv2d(filter_config[0], num_classes, 1)27        # init weights28        for m in self.modules():29            if isinstance(m, nn.Conv2d):30                nn.init.xavier_normal(m.weight)31    def forward(self, x):32        feat_encoder_1 = self.encoder1(x)33        feat_encoder_2 = self.encoder2(F.max_pool2d(feat_encoder_1, 2))34        feat_encoder_3 = self.encoder3(F.max_pool2d(feat_encoder_2, 2))35        feat_encoder_4 = self.encoder4(F.max_pool2d(feat_encoder_3, 2))36        feat_encoder_5 = self.encoder5(F.max_pool2d(feat_encoder_4, 2))37        feat_decoder = self.decoder1(feat_encoder_5, feat_encoder_4)38        feat_decoder = self.decoder2(feat_decoder, feat_encoder_3)39        feat_decoder = self.decoder3(feat_decoder, feat_encoder_2)40        feat_decoder = self.decoder4(feat_decoder, feat_encoder_1)41        return self.classifier(feat_decoder)42class _Encoder(nn.Module):43    """Encoder layer encodes the features along the contracting path (left side),44    drop_rate parameter is used with respect to the paper and the official45    caffe model.46    Args:47        n_in_feat (int): number of input features48        n_out_feat (int): number of output features49        drop_rate (float): dropout rate at the end of the block50    """51    def __init__(self, n_in_feat, n_out_feat, drop_rate=0):52        super(_Encoder, self).__init__()53        layers = [nn.Conv2d(n_in_feat, n_out_feat, 3),54                  nn.ReLU(inplace=True),55                  nn.Conv2d(n_out_feat, n_out_feat, 3),56                  nn.ReLU(inplace=True)]57        if drop_rate > 0:58            layers += [nn.Dropout(drop_rate)]59        self.features = nn.Sequential(*layers)60    def forward(self, x):61        return self.features(x)62class _Decoder(nn.Module):63    """Decoder layer decodes the features by performing deconvolutions and64    concatenating the resulting features with cropped features from the65    corresponding encoder (skip-connections). Encoder features are cropped66    because convolution operations does not allow to recover the same67    resolution in the expansive path (cf input image size > output68    segmentation map size).69    Args:70        n_in_feat (int): number of input features71        n_out_feat (int): number of output features72    """73    def __init__(self, n_in_feat, n_out_feat):74        super(_Decoder, self).__init__()75        self.encoder = _Encoder(n_in_feat, n_out_feat)76        self.decoder = nn.ConvTranspose2d(n_in_feat, n_out_feat, 2, 2)77    def forward(self, x, feat_encoder):78        feat_decoder = F.relu(self.decoder(x), True)79        # eval offset to allow cropping of the encoder's features80        crop_size = feat_decoder.size(-1)81        offset = (feat_encoder.size(-1) - crop_size) // 282        crop = feat_encoder[:, :, offset:offset + crop_size,83                            offset:offset + crop_size]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
