Best Python code snippet using yandex-tank
missile.py
Source:missile.py  
...49        '''50        self.missiles = cycle([(missile_sample.to_s(), None)])51    def __iter__(self):52        for m in self.missiles:53            info.status.inc_loop_count()54            yield m55class UriStyleGenerator(object):56    '''57    Generates GET ammo based on given URI list.58    '''59    def __init__(self, uris, headers, http_ver='1.1'):60        '''61        uris - a list of URIs as strings.62        '''63        self.uri_count = len(uris)64        self.missiles = cycle([(65            HttpAmmo(66                uri, headers, http_ver=http_ver).to_s(), None) for uri in uris])67    def __iter__(self):68        for m in self.missiles:69            yield m70            info.status.loop_count = info.status.ammo_count / self.uri_count71class AmmoFileReader(object):72    '''Read missiles from ammo file'''73    def __init__(self, filename, **kwargs):74        self.filename = filename75        self.log = logging.getLogger(__name__)76        self.log.info("Loading ammo from '%s'" % filename)77    def __iter__(self):78        def read_chunk_header(ammo_file):79            chunk_header = ''80            while chunk_header is '':81                line = ammo_file.readline()82                if line is '':83                    return line84                chunk_header = line.strip('\r\n')85            return chunk_header86        opener = resource.get_opener(self.filename)87        with opener() as ammo_file:88            info.status.af_size = opener.data_length89            # if we got StopIteration here, the file is empty90            chunk_header = read_chunk_header(ammo_file)91            while chunk_header:92                if chunk_header is not '':93                    try:94                        fields = chunk_header.split()95                        chunk_size = int(fields[0])96                        if chunk_size == 0:97                            if info.status.loop_count == 0:98                                self.log.info(99                                    'Zero-sized chunk in ammo file at %s. Starting over.'100                                    % ammo_file.tell())101                            ammo_file.seek(0)102                            info.status.inc_loop_count()103                            chunk_header = read_chunk_header(ammo_file)104                            continue105                        marker = fields[1] if len(fields) > 1 else None106                        missile = ammo_file.read(chunk_size)107                        if len(missile) < chunk_size:108                            raise AmmoFileError(109                                "Unexpected end of file: read %s bytes instead of %s"110                                % (len(missile), chunk_size))111                        yield (missile, marker)112                    except (IndexError, ValueError) as e:113                        raise AmmoFileError(114                            "Error while reading ammo file. Position: %s, header: '%s', original exception: %s"115                            % (ammo_file.tell(), chunk_header, e))116                chunk_header = read_chunk_header(ammo_file)117                if chunk_header == '':118                    ammo_file.seek(0)119                    info.status.inc_loop_count()120                    chunk_header = read_chunk_header(ammo_file)121                info.status.af_position = ammo_file.tell()122class SlowLogReader(object):123    '''Read missiles from SQL slow log. Not usable with Phantom'''124    def __init__(self, filename, **kwargs):125        self.filename = filename126    def __iter__(self):127        opener = resource.get_opener(self.filename)128        with opener() as ammo_file:129            info.status.af_size = opener.data_length130            request = ""131            while True:132                for line in ammo_file:133                    info.status.af_position = ammo_file.tell()134                    if line.startswith('#'):135                        if request != "":136                            yield (request, None)137                            request = ""138                    else:139                        request += line140                ammo_file.seek(0)141                info.status.af_position = 0142                info.status.inc_loop_count()143class LineReader(object):144    '''One line -- one missile'''145    def __init__(self, filename, **kwargs):146        self.filename = filename147    def __iter__(self):148        opener = resource.get_opener(self.filename)149        with opener() as ammo_file:150            info.status.af_size = opener.data_length151            while True:152                for line in ammo_file:153                    info.status.af_position = ammo_file.tell()154                    yield (line.rstrip('\r\n'), None)155                ammo_file.seek(0)156                info.status.af_position = 0157                info.status.inc_loop_count()158class CaseLineReader(object):159    '''One line -- one missile with case, tab separated'''160    def __init__(self, filename, **kwargs):161        self.filename = filename162    def __iter__(self):163        opener = resource.get_opener(self.filename)164        with opener() as ammo_file:165            info.status.af_size = opener.data_length166            while True:167                for line in ammo_file:168                    info.status.af_position = ammo_file.tell()169                    parts = line.rstrip('\r\n').split('\t', 1)170                    if len(parts) == 2:171                        yield (parts[1], parts[0])172                    elif len(parts) == 1:173                        yield (parts[0], None)174                    else:175                        raise RuntimeError("Unreachable branch")176                ammo_file.seek(0)177                info.status.af_position = 0178                info.status.inc_loop_count()179class AccessLogReader(object):180    '''Missiles from access log'''181    def __init__(self, filename, headers=[], http_ver='1.1', **kwargs):182        self.filename = filename183        self.warned = False184        self.headers = set(headers)185        self.log = logging.getLogger(__name__)186    def warn(self, message):187        if not self.warned:188            self.warned = True189            self.log.warning(190                "There are some skipped lines. See full log for details.")191        self.log.debug(message)192    def __iter__(self):193        opener = resource.get_opener(self.filename)194        with opener() as ammo_file:195            info.status.af_size = opener.data_length196            while True:197                for line in ammo_file:198                    info.status.af_position = ammo_file.tell()199                    try:200                        request = line.split('"')[1]201                        method, uri, proto = request.split()202                        http_ver = proto.split('/')[1]203                        if method == "GET":204                            yield (205                                HttpAmmo(206                                    uri,207                                    headers=self.headers,208                                    http_ver=http_ver, ).to_s(), None)209                        else:210                            self.warn(211                                "Skipped line: %s (unsupported method)" % line)212                    except (ValueError, IndexError) as e:213                        self.warn("Skipped line: %s (%s)" % (line, e))214                ammo_file.seek(0)215                info.status.af_position = 0216                info.status.inc_loop_count()217def _parse_header(header):218    return dict([(h.strip() for h in header.split(':', 1))])219class UriReader(object):220    def __init__(self, filename, headers=[], http_ver='1.1', **kwargs):221        self.filename = filename222        self.headers = {}223        for header in headers:224            self.headers.update(_parse_header(header))225        self.http_ver = http_ver226        self.log = logging.getLogger(__name__)227        self.log.info("Loading ammo from '%s' using URI format." % filename)228    def __iter__(self):229        opener = resource.get_opener(self.filename)230        with opener() as ammo_file:231            info.status.af_size = opener.data_length232            while True:233                for line in ammo_file:234                    info.status.af_position = ammo_file.tell()235                    if line.startswith('['):236                        self.headers.update(237                            _parse_header(line.strip('\r\n[]\t ')))238                    elif len(line.rstrip('\r\n')):239                        fields = line.split()240                        uri = fields[0]241                        if len(fields) > 1:242                            marker = fields[1]243                        else:244                            marker = None245                        yield (246                            HttpAmmo(247                                uri,248                                headers=[249                                    ': '.join(header)250                                    for header in self.headers.items()251                                ],252                                http_ver=self.http_ver, ).to_s(), marker)253                if info.status.ammo_count == 0:254                    self.log.error("No ammo in uri-style file")255                    raise AmmoFileError("No ammo! Cover me!")256                ammo_file.seek(0)257                info.status.af_position = 0258                info.status.inc_loop_count()259class UriPostReader(object):260    '''Read POST missiles from ammo file'''261    def __init__(self, filename, headers=None, http_ver='1.1', **kwargs):262        self.filename = filename263        self.headers = {}264        for header in headers:265            self.headers.update(_parse_header(header))266        self.http_ver = http_ver267        self.log = logging.getLogger(__name__)268        self.log.info("Loading ammo from '%s' using URI+POST format", filename)269    def __iter__(self):270        def read_chunk_header(ammo_file):271            chunk_header = ''272            while chunk_header is '':273                line = ammo_file.readline()274                if line.startswith('['):275                    self.headers.update(_parse_header(line.strip('\r\n[]\t ')))276                elif line is '':277                    return line278                else:279                    chunk_header = line.strip('\r\n')280            return chunk_header281        opener = resource.get_opener(self.filename)282        with opener() as ammo_file:283            info.status.af_size = opener.data_length284            # if we got StopIteration here, the file is empty285            chunk_header = read_chunk_header(ammo_file)286            while chunk_header:287                if chunk_header is not '':288                    try:289                        fields = chunk_header.split()290                        chunk_size = int(fields[0])291                        if chunk_size == 0:292                            self.log.debug(293                                'Zero-sized chunk in ammo file at %s. Starting over.'294                                % ammo_file.tell())295                            ammo_file.seek(0)296                            info.status.inc_loop_count()297                            chunk_header = read_chunk_header(ammo_file)298                            continue299                        uri = fields[1]300                        marker = fields[2] if len(fields) > 2 else None301                        missile = ammo_file.read(chunk_size)302                        if len(missile) < chunk_size:303                            raise AmmoFileError(304                                "Unexpected end of file: read %s bytes instead of %s"305                                % (len(missile), chunk_size))306                        yield (307                            HttpAmmo(308                                uri=uri,309                                headers=[310                                    ': '.join(header)311                                    for header in self.headers.items()312                                ],313                                method='POST',314                                body=missile,315                                http_ver=self.http_ver, ).to_s(), marker)316                    except (IndexError, ValueError) as e:317                        raise AmmoFileError(318                            "Error while reading ammo file. Position: %s, header: '%s', original exception: %s"319                            % (ammo_file.tell(), chunk_header, e))320                chunk_header = read_chunk_header(ammo_file)321                if chunk_header == '':322                    self.log.debug(323                        'Reached the end of ammo file. Starting over.')324                    ammo_file.seek(0)325                    info.status.inc_loop_count()326                    chunk_header = read_chunk_header(ammo_file)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
