Best Python code snippet using pyresttest_python
mpeg.py
Source:mpeg.py  
...223            pos = buffer.find('\x00\x00\x01\xb5')224            if pos == -1 or len(buffer) - pos < 5:225                buffer = buffer[-10:]226                continue227            ext = (indexbytes(buffer, pos + 4) >> 4)228            if ext == 8:229                pass230            elif ext == 1:231                if (indexbytes(buffer, pos + 5) >> 3) & 1:232                    self._set('progressive', True)233                else:234                    self._set('interlaced', True)235                return True236            else:237                log.debug(u'ext: %r' % ext)238            buffer = buffer[pos + 4:]239        return False240    # #------------------------------------------------------------------------241    # # bitrate()242    # #243    # # From the MPEG-2.2 spec:244    # #245    # #   bit_rate -- This is a 30-bit integer.  The lower 18 bits of the246    # #   integer are in bit_rate_value and the upper 12 bits are in247    # #   bit_rate_extension.  The 30-bit integer specifies the bitrate of the248    # #   bitstream measured in units of 400 bits/second, rounded upwards.249    # #   The value zero is forbidden.250    # #251    # # So ignoring all the variable bitrate stuff for now, this 30 bit integer252    # # multiplied times 400 bits/sec should give the rate in bits/sec.253    # #254    # # TODO: Variable bitrates?  I need one that implements this.255    # #256    # # Continued from the MPEG-2.2 spec:257    # #258    # #    If the bitstream is a constant bitrate stream, the bitrate specified259    # #   is the actual rate of operation of the VBV specified in annex C.  If260    # #   the bitstream is a variable bitrate stream, the STD specifications in261    # #   ISO/IEC 13818-1 supersede the VBV, and the bitrate specified here is262    # #   used to dimension the transport stream STD (2.4.2 in ITU-T Rec. xxx |263    # #   ISO/IEC 13818-1), or the program stream STD (2.4.5 in ITU-T Rec. xxx |264    # #   ISO/IEC 13818-1).265    # #266    # #   If the bitstream is not a constant rate bitstream the vbv_delay267    # #   field shall have the value FFFF in hexadecimal.268    # #269    # #   Given the value encoded in the bitrate field, the bitstream shall be270    # #   generated so that the video encoding and the worst case multiplex271    # #   jitter do not cause STD buffer overflow or underflow.272    # #273    # #274    # # ------------------------------------------------------------------------275    # # Some parts in the code are based on mpgtx (mpgtx.sf.net)276    def bitrate(self, file):277        """278        read the bitrate (most of the time broken)279        """280        file.seek(self.sequence_header_offset + 8, 0)281        t, b = struct.unpack('>HB', file.read(3))282        vrate = t << 2 | b >> 6283        return vrate * 400284    @staticmethod285    def ReadSCRMpeg2(buffer):286        """287        read SCR (timestamp) for MPEG2 at the buffer beginning (6 Bytes)288        """289        if len(buffer) < 6:290            return None291        highbit = (byte2int(buffer) & 0x20) >> 5292        low4Bytes = ((int(byte2int(buffer)) & 0x18) >> 3) << 30293        low4Bytes |= (byte2int(buffer) & 0x03) << 28294        low4Bytes |= indexbytes(buffer, 1) << 20295        low4Bytes |= (indexbytes(buffer, 2) & 0xF8) << 12296        low4Bytes |= (indexbytes(buffer, 2) & 0x03) << 13297        low4Bytes |= indexbytes(buffer, 3) << 5298        low4Bytes |= (indexbytes(buffer, 4)) >> 3299        sys_clock_ref = (indexbytes(buffer, 4) & 0x3) << 7300        sys_clock_ref |= (indexbytes(buffer, 5) >> 1)301        return (int(highbit * (1 << 16) * (1 << 16)) + low4Bytes) / 90000302    @staticmethod303    def ReadSCRMpeg1(buffer):304        """305        read SCR (timestamp) for MPEG1 at the buffer beginning (5 Bytes)306        """307        if len(buffer) < 5:308            return None309        highbit = (byte2int(buffer) >> 3) & 0x01310        low4Bytes = ((int(byte2int(buffer)) >> 1) & 0x03) << 30311        low4Bytes |= indexbytes(buffer, 1) << 22312        low4Bytes |= (indexbytes(buffer, 2) >> 1) << 15313        low4Bytes |= indexbytes(buffer, 3) << 7314        low4Bytes |= indexbytes(buffer, 4) >> 1315        return (int(highbit) * (1 << 16) * (1 << 16) + low4Bytes) / 90000316    @staticmethod317    def ReadPTS(buffer):318        """319        read PTS (PES timestamp) at the buffer beginning (5 Bytes)320        """321        high = ((byte2int(buffer) & 0xF) >> 1)322        med = (indexbytes(buffer, 1) << 7) + (indexbytes(buffer, 2) >> 1)323        low = (indexbytes(buffer, 3) << 7) + (indexbytes(buffer, 4) >> 1)324        return ((int(high) << 30) + (med << 15) + low) / 90000325    def ReadHeader(self, buffer, offset):326        """327        Handle MPEG header in buffer on position offset328        Return None on error, new offset or 0 if the new offset can't be scanned329        """330        if buffer[offset:offset + 3] != '\x00\x00\x01':331            return None332        _id = indexbytes(buffer, offset + 3)333        if _id == PADDING_PKT:334            return offset + (indexbytes(buffer, offset + 4) << 8) + \335                   indexbytes(buffer, offset + 5) + 6336        if _id == PACK_PKT:337            if indexbytes(buffer, offset + 4) & 0xF0 == 0x20:338                self.type = 'MPEG-1 Video'339                self.get_time = self.ReadSCRMpeg1340                self.mpeg_version = 1341                return offset + 12342            elif (indexbytes(buffer, offset + 4) & 0xC0) == 0x40:343                self.type = 'MPEG-2 Video'344                self.get_time = self.ReadSCRMpeg2345                return offset + (indexbytes(buffer, offset + 13) & 0x07) + 14346            else:347                # I have no idea what just happened, but for some DVB348                # recordings done with mencoder this points to a349                # PACK_PKT describing something odd. Returning 0 here350                # (let's hope there are no extensions in the header)351                # fixes it.352                return 0353        if 0xC0 <= _id <= 0xDF:354            # code for audio stream355            for a in self.audio:356                if a.id == _id:357                    break358            else:359                self.audio.append(core.AudioStream())360                self.audio[-1]._set('id', _id)361            return 0362        if 0xE0 <= _id <= 0xEF:363            # code for video stream364            for v in self.video:365                if v.id == _id:366                    break367            else:368                self.video.append(core.VideoStream())369                self.video[-1]._set('id', _id)370            return 0371        if _id == SEQ_HEAD:372            # sequence header, remember that position for later use373            self.sequence_header_offset = offset374            return 0375        if _id in [PRIVATE_STREAM1, PRIVATE_STREAM2]:376            # private stream. we don't know, but maybe we can guess later377            add = indexbytes(buffer, offset + 8)378            # if (indexbytes(buffer, offset+6) & 4) or 1:379            # id = indexbytes(buffer, offset+10+add)380            if buffer[offset + 11 + add:offset + 15 + add].find('\x0b\x77') != -1:381                # AC3 stream382                for a in self.audio:383                    if a.id == _id:384                        break385                else:386                    self.audio.append(core.AudioStream())387                    self.audio[-1]._set('id', _id)388                    self.audio[-1].codec = 0x2000  # AC3389            return 0390        if _id == SYS_PKT:391            return 0392        if _id == EXT_START:393            return 0394        return 0395    # Normal MPEG (VCD, SVCD) ========================================396    def isMPEG(self, file, force=False):397        """398        This MPEG starts with a sequence of 0x00 followed by a PACK Header399        http://dvd.sourceforge.net/dvdinfo/packhdr.html400        """401        file.seek(0, 0)402        buffer = file.read(10000)403        offset = 0404        # seek until the 0 byte stop405        while offset < len(buffer) - 100 and buffer[offset] == '\0':406            offset += 1407        offset -= 2408        # test for mpeg header 0x00 0x00 0x01409        header = '\x00\x00\x01%s' % chr(PACK_PKT)410        if offset < 0 or not buffer[offset:offset + 4] == header:411            if not force:412                return 0413            # brute force and try to find the pack header in the first414            # 10000 bytes somehow415            offset = buffer.find(header)416            if offset < 0:417                return 0418        # scan the 100000 bytes of data419        buffer += file.read(100000)420        # scan first header, to get basic info about421        # how to read a timestamp422        self.ReadHeader(buffer, offset)423        # store first timestamp424        self.start = self.get_time(buffer[offset + 4:])425        while len(buffer) > offset + 1000 and \426                buffer[offset:offset + 3] == '\x00\x00\x01':427            # read the mpeg header428            new_offset = self.ReadHeader(buffer, offset)429            # header scanning detected error, this is no mpeg430            if new_offset is None:431                return 0432            if new_offset:433                # we have a new offset434                offset = new_offset435                # skip padding 0 before a new header436                while len(buffer) > offset + 10 and \437                        not indexbytes(buffer, offset + 2):438                    offset += 1439            else:440                # seek to new header by brute force441                offset += buffer[offset + 4:].find('\x00\x00\x01') + 4442        # fill in values for support functions:443        self.__seek_size__ = 1000000444        self.__sample_size__ = 10000445        self.__search__ = self._find_timer_446        self.filename = file.name447        # get length of the file448        self.length = self.get_length()449        return 1450    @staticmethod451    def _find_timer_(buffer):452        """453        Return position of timer in buffer or None if not found.454        This function is valid for 'normal' mpeg files455        """456        pos = buffer.find('\x00\x00\x01%s' % chr(PACK_PKT))457        if pos == -1:458            return None459        return pos + 4460    # PES ============================================================461    def ReadPESHeader(self, offset, buffer, id=0):462        """463        Parse a PES header.464        Since it starts with 0x00 0x00 0x01 like 'normal' mpegs, this465        function will return (0, None) when it is no PES header or466        (packet length, timestamp position (maybe None))467        http://dvd.sourceforge.net/dvdinfo/pes-hdr.html468        """469        if not buffer[0:3] == '\x00\x00\x01':470            return 0, None471        packet_length = (indexbytes(buffer, 4) << 8) + indexbytes(buffer, 5) + 6472        align = indexbytes(buffer, 6) & 4473        header_length = indexbytes(buffer, 8)474        # PES ID (starting with 001)475        if indexbytes(buffer, 3) & 0xE0 == 0xC0:476            id = id or indexbytes(buffer, 3) & 0x1F477            for a in self.audio:478                if a.id == id:479                    break480            else:481                self.audio.append(core.AudioStream())482                self.audio[-1]._set('id', id)483        elif indexbytes(buffer, 3) & 0xF0 == 0xE0:484            id = id or indexbytes(buffer, 3) & 0xF485            for v in self.video:486                if v.id == id:487                    break488            else:489                self.video.append(core.VideoStream())490                self.video[-1]._set('id', id)491            # new mpeg starting492            if buffer[header_length + 9:header_length + 13] == \493                    '\x00\x00\x01\xB3' and not self.sequence_header_offset:494                # yes, remember offset for later use495                self.sequence_header_offset = offset + header_length + 9496        elif indexbytes(buffer, 3) == 189 or indexbytes(buffer, 3) == 191:497            # private stream. we don't know, but maybe we can guess later498            id = id or indexbytes(buffer, 3) & 0xF499            if align and \500                    buffer[header_length + 9:header_length + 11] == '\x0b\x77':501                # AC3 stream502                for a in self.audio:503                    if a.id == id:504                        break505                else:506                    self.audio.append(core.AudioStream())507                    self.audio[-1]._set('id', id)508                    self.audio[-1].codec = 0x2000  # AC3509        else:510            # unknown content511            pass512        ptsdts = indexbytes(buffer, 7) >> 6513        if ptsdts and ptsdts == indexbytes(buffer, 9) >> 4:514            if indexbytes(buffer, 9) >> 4 != ptsdts:515                log.warning(u'WARNING: bad PTS/DTS, please contact us')516                return packet_length, None517            # timestamp = self.ReadPTS(buffer[9:14])518            high = ((indexbytes(buffer, 9) & 0xF) >> 1)519            med = (indexbytes(buffer, 10) << 7) + (indexbytes(buffer, 11) >> 1)520            low = (indexbytes(buffer, 12) << 7) + (indexbytes(buffer, 13) >> 1)521            return packet_length, 9522        return packet_length, None523    def isPES(self, file):524        log.info(u'trying mpeg-pes scan')525        file.seek(0, 0)526        buffer = file.read(3)527        # header (also valid for all mpegs)528        if not buffer == '\x00\x00\x01':529            return 0530        self.sequence_header_offset = 0531        buffer += file.read(10000)532        offset = 0533        while offset + 1000 < len(buffer):534            pos, timestamp = self.ReadPESHeader(offset, buffer[offset:])535            if not pos:536                return 0537            if timestamp is not None and not hasattr(self, 'start'):538                self.get_time = self.ReadPTS539                bpos = buffer[offset + timestamp:offset + timestamp + 5]540                self.start = self.get_time(bpos)541            if self.sequence_header_offset and hasattr(self, 'start'):542                # we have all informations we need543                break544            offset += pos545            if offset + 1000 < len(buffer) < 1000000 or 1:546                # looks like a pes, read more547                buffer += file.read(10000)548        if not self.video and not self.audio:549            # no video and no audio?550            return 0551        self.type = 'MPEG-PES'552        # fill in values for support functions:553        self.__seek_size__ = 10000000  # 10 MB554        self.__sample_size__ = 500000  # 500 k scanning555        self.__search__ = self._find_timer_PES_556        self.filename = file.name557        # get length of the file558        self.length = self.get_length()559        return 1560    def _find_timer_PES_(self, buffer):561        """562        Return position of timer in buffer or -1 if not found.563        This function is valid for PES files564        """565        pos = buffer.find('\x00\x00\x01')566        offset = 0567        if pos == -1 or offset + 1000 >= len(buffer):568            return None569        retpos = -1570        ackcount = 0571        while offset + 1000 < len(buffer):572            pos, timestamp = self.ReadPESHeader(offset, buffer[offset:])573            if timestamp is not None and retpos == -1:574                retpos = offset + timestamp575            if pos == 0:576                # Oops, that was a mpeg header, no PES header577                offset += buffer[offset:].find('\x00\x00\x01')578                retpos = -1579                ackcount = 0580            else:581                offset += pos582                if retpos != -1:583                    ackcount += 1584            if ackcount > 10:585                # looks ok to me586                return retpos587        return None588    # Elementary Stream ===============================================589    def isES(self, file):590        file.seek(0, 0)591        try:592            header = struct.unpack('>LL', file.read(8))593        except (struct.error, IOError):594            return False595        if header[0] != 0x1B3:596            return False597        # Is an mpeg video elementary stream598        self.mime = 'video/mpeg'599        video = core.VideoStream()600        video.width = header[1] >> 20601        video.height = (header[1] >> 8) & 0xfff602        if header[1] & 0xf < len(FRAME_RATE):603            video.fps = FRAME_RATE[header[1] & 0xf]604        if (header[1] >> 4) & 0xf < len(ASPECT_RATIO):605            # FIXME: Empirically the aspect looks like PAR rather than DAR606            video.aspect = ASPECT_RATIO[(header[1] >> 4) & 0xf]607        self.video.append(video)608        return True609    # Transport Stream ===============================================610    def isTS(self, file):611        file.seek(0, 0)612        buffer = file.read(TS_PACKET_LENGTH * 2)613        c = 0614        while c + TS_PACKET_LENGTH < len(buffer):615            if indexbytes(buffer, c) == indexbytes(buffer, c + TS_PACKET_LENGTH) == TS_SYNC:616                break617            c += 1618        else:619            return 0620        buffer += file.read(10000)621        self.type = 'MPEG-TS'622        while c + TS_PACKET_LENGTH < len(buffer):623            start = indexbytes(buffer, c + 1) & 0x40624            # maybe load more into the buffer625            if c + 2 * TS_PACKET_LENGTH > len(buffer) and c < 500000:626                buffer += file.read(10000)627            # wait until the ts payload contains a payload header628            if not start:629                c += TS_PACKET_LENGTH630                continue631            tsid = ((indexbytes(buffer, c + 1) & 0x3F) << 8) + indexbytes(buffer, c + 2)632            adapt = (indexbytes(buffer, c + 3) & 0x30) >> 4633            offset = 4634            if adapt & 0x02:635                # meta info present, skip it for now636                adapt_len = indexbytes(buffer, c + offset)637                offset += adapt_len + 1638            if not indexbytes(buffer, c + 1) & 0x40:639                # no new pes or psi in stream payload starting640                pass641            elif adapt & 0x01:642                # PES643                timestamp = self.ReadPESHeader(c + offset, buffer[c + offset:],644                                               tsid)[1]645                if timestamp is not None:646                    if not hasattr(self, 'start'):647                        self.get_time = self.ReadPTS648                        timestamp = c + offset + timestamp649                        self.start = self.get_time(buffer[timestamp:timestamp + 5])650                    elif not hasattr(self, 'audio_ok'):651                        timestamp = c + offset + timestamp652                        start = self.get_time(buffer[timestamp:timestamp + 5])653                        if start is not None and self.start is not None and \654                                abs(start - self.start) < 10:655                            # looks ok656                            self.audio_ok = True657                        else:658                            # timestamp broken659                            del self.start660                            log.warning(u'Timestamp error, correcting')661            if hasattr(self, 'start') and self.start and \662                    self.sequence_header_offset and self.video and self.audio:663                break664            c += TS_PACKET_LENGTH665        if not self.sequence_header_offset:666            return 0667        # fill in values for support functions:668        self.__seek_size__ = 10000000  # 10 MB669        self.__sample_size__ = 100000  # 100 k scanning670        self.__search__ = self._find_timer_TS_671        self.filename = file.name672        # get length of the file673        self.length = self.get_length()674        return 1675    def _find_timer_TS_(self, buffer):676        c = 0677        while c + TS_PACKET_LENGTH < len(buffer):678            if indexbytes(buffer, c) == indexbytes(buffer, c + TS_PACKET_LENGTH) == TS_SYNC:679                break680            c += 1681        else:682            return None683        while c + TS_PACKET_LENGTH < len(buffer):684            start = indexbytes(buffer, c + 1) & 0x40685            if not start:686                c += TS_PACKET_LENGTH687                continue688            tsid = ((indexbytes(buffer, c + 1) & 0x3F) << 8) + indexbytes(buffer, c + 2)689            adapt = (indexbytes(buffer, c + 3) & 0x30) >> 4690            offset = 4691            if adapt & 0x02:692                # meta info present, skip it for now693                offset += indexbytes(buffer, c + offset) + 1694            if adapt & 0x01:695                timestamp = self.ReadPESHeader(c + offset, buffer[c + offset:], tsid)[1]696                if timestamp is None:697                    # this should not happen698                    log.error(u'bad TS')699                    return None700                return c + offset + timestamp701            c += TS_PACKET_LENGTH702        return None703    # Support functions ==============================================704    def get_endpos(self):705        """706        get the last timestamp of the mpeg, return -1 if this is not possible707        """...plantuml.py
Source:plantuml.py  
...47    res = ''48    for i in range(0, len(data), 3):49        if i + 2 == len(data):50            res += _encode3bytes(51                indexbytes(data, i),52                indexbytes(data, i + 1),53                054            )55        elif i + 1 == len(data):56            res += _encode3bytes(57                indexbytes(data, i),58                0, 059            )60        else:61            res += _encode3bytes(62                indexbytes(data, i),63                indexbytes(data, i + 1),64                indexbytes(data, i + 2)65            )66    return res67def _encode3bytes(b1, b2, b3):68    c1 = b1 >> 269    c2 = ((b1 & 0x3) << 4) | (b2 >> 4)70    c3 = ((b2 & 0xF) << 2) | (b3 >> 6)71    c4 = b3 & 0x3F72    res = ''73    res += _encode6bit(c1 & 0x3F)74    res += _encode6bit(c2 & 0x3F)75    res += _encode6bit(c3 & 0x3F)76    res += _encode6bit(c4 & 0x3F)77    return res78def _encode6bit(b):...der.py
Source:der.py  
...16        Returns:17            bytes: The DER encoded signature18        """19        r_bytes = int_to_bytes(r)20        if indexbytes(r_bytes, 0) & 0x80:21            r_bytes = b"\x00" + r_bytes22        s_bytes = int_to_bytes(s)23        if indexbytes(s_bytes, 0) & 0x80:24            s_bytes = b"\x00" + s_bytes25        r_s = INTEGER + pack('B', len(r_bytes)) + r_bytes + INTEGER + pack('B', len(s_bytes)) + s_bytes26        return SEQUENCE + pack('B', len(r_s)) + r_s27    @staticmethod28    def decode_signature(sig):29        """Decode an EC signature from serialized DER format as described in30           https://tools.ietf.org/html/rfc2459 (section 7.2.2) and as detailed by31           bip-006632           Returns (r,s)33        """34        if len(sig) < 8:35            raise InvalidDerSignature("bytestring too small")36        if indexbytes(sig, 0) != ord(SEQUENCE):37            raise InvalidDerSignature("missing SEQUENCE marker")38        if indexbytes(sig, 1) != len(sig) - 2:39            raise InvalidDerSignature("invalid length")40        length_r = indexbytes(sig, 3)41        if 5 + length_r >= len(sig):42            raise InvalidDerSignature("invalid length")43        length_s = indexbytes(sig, 5 + length_r)44        if length_r + length_s + 6 != len(sig):45            raise InvalidDerSignature("invalid length")46        if indexbytes(sig, 2) != ord(INTEGER):47            raise InvalidDerSignature("invalid r marker")48        if length_r == 0:49            raise InvalidDerSignature("invalid r value")50        if indexbytes(sig, 4) & 0x80:51            raise InvalidDerSignature("invalid r value")52        if (length_r > 1 and (indexbytes(sig, 4) == 0x00) and not (indexbytes(sig, 5) & 0x80)):53            raise InvalidDerSignature("invalid r value")54        if indexbytes(sig, length_r + 4) != ord(INTEGER):55            raise InvalidDerSignature("invalid s marker")56        if length_s == 0:57            raise InvalidDerSignature("invalid s value")58        if indexbytes(sig, length_r + 6) & 0x80:59            raise InvalidDerSignature("invalid s value")60        if (length_s > 1 and (indexbytes(sig, length_r + 6) == 0x00) and not (indexbytes(sig, length_r + 7) & 0x80)):61            raise InvalidDerSignature("invalid s value")62        r_data, s_data = sig[4:4 + length_r], sig[6 + length_r:]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
