How to use indexbytes method in pyresttest

Best Python code snippet using pyresttest_python

mpeg.py

Source:mpeg.py Github

copy

Full Screen

...223 pos = buffer.find('\x00\x00\x01\xb5')224 if pos == -1 or len(buffer) - pos < 5:225 buffer = buffer[-10:]226 continue227 ext = (indexbytes(buffer, pos + 4) >> 4)228 if ext == 8:229 pass230 elif ext == 1:231 if (indexbytes(buffer, pos + 5) >> 3) & 1:232 self._set('progressive', True)233 else:234 self._set('interlaced', True)235 return True236 else:237 log.debug(u'ext: %r' % ext)238 buffer = buffer[pos + 4:]239 return False240 # #------------------------------------------------------------------------241 # # bitrate()242 # #243 # # From the MPEG-2.2 spec:244 # #245 # # bit_rate -- This is a 30-bit integer. The lower 18 bits of the246 # # integer are in bit_rate_value and the upper 12 bits are in247 # # bit_rate_extension. The 30-bit integer specifies the bitrate of the248 # # bitstream measured in units of 400 bits/second, rounded upwards.249 # # The value zero is forbidden.250 # #251 # # So ignoring all the variable bitrate stuff for now, this 30 bit integer252 # # multiplied times 400 bits/sec should give the rate in bits/sec.253 # #254 # # TODO: Variable bitrates? I need one that implements this.255 # #256 # # Continued from the MPEG-2.2 spec:257 # #258 # # If the bitstream is a constant bitrate stream, the bitrate specified259 # # is the actual rate of operation of the VBV specified in annex C. If260 # # the bitstream is a variable bitrate stream, the STD specifications in261 # # ISO/IEC 13818-1 supersede the VBV, and the bitrate specified here is262 # # used to dimension the transport stream STD (2.4.2 in ITU-T Rec. xxx |263 # # ISO/IEC 13818-1), or the program stream STD (2.4.5 in ITU-T Rec. xxx |264 # # ISO/IEC 13818-1).265 # #266 # # If the bitstream is not a constant rate bitstream the vbv_delay267 # # field shall have the value FFFF in hexadecimal.268 # #269 # # Given the value encoded in the bitrate field, the bitstream shall be270 # # generated so that the video encoding and the worst case multiplex271 # # jitter do not cause STD buffer overflow or underflow.272 # #273 # #274 # # ------------------------------------------------------------------------275 # # Some parts in the code are based on mpgtx (mpgtx.sf.net)276 def bitrate(self, file):277 """278 read the bitrate (most of the time broken)279 """280 file.seek(self.sequence_header_offset + 8, 0)281 t, b = struct.unpack('>HB', file.read(3))282 vrate = t << 2 | b >> 6283 return vrate * 400284 @staticmethod285 def ReadSCRMpeg2(buffer):286 """287 read SCR (timestamp) for MPEG2 at the buffer beginning (6 Bytes)288 """289 if len(buffer) < 6:290 return None291 highbit = (byte2int(buffer) & 0x20) >> 5292 low4Bytes = ((int(byte2int(buffer)) & 0x18) >> 3) << 30293 low4Bytes |= (byte2int(buffer) & 0x03) << 28294 low4Bytes |= indexbytes(buffer, 1) << 20295 low4Bytes |= (indexbytes(buffer, 2) & 0xF8) << 12296 low4Bytes |= (indexbytes(buffer, 2) & 0x03) << 13297 low4Bytes |= indexbytes(buffer, 3) << 5298 low4Bytes |= (indexbytes(buffer, 4)) >> 3299 sys_clock_ref = (indexbytes(buffer, 4) & 0x3) << 7300 sys_clock_ref |= (indexbytes(buffer, 5) >> 1)301 return (int(highbit * (1 << 16) * (1 << 16)) + low4Bytes) / 90000302 @staticmethod303 def ReadSCRMpeg1(buffer):304 """305 read SCR (timestamp) for MPEG1 at the buffer beginning (5 Bytes)306 """307 if len(buffer) < 5:308 return None309 highbit = (byte2int(buffer) >> 3) & 0x01310 low4Bytes = ((int(byte2int(buffer)) >> 1) & 0x03) << 30311 low4Bytes |= indexbytes(buffer, 1) << 22312 low4Bytes |= (indexbytes(buffer, 2) >> 1) << 15313 low4Bytes |= indexbytes(buffer, 3) << 7314 low4Bytes |= indexbytes(buffer, 4) >> 1315 return (int(highbit) * (1 << 16) * (1 << 16) + low4Bytes) / 90000316 @staticmethod317 def ReadPTS(buffer):318 """319 read PTS (PES timestamp) at the buffer beginning (5 Bytes)320 """321 high = ((byte2int(buffer) & 0xF) >> 1)322 med = (indexbytes(buffer, 1) << 7) + (indexbytes(buffer, 2) >> 1)323 low = (indexbytes(buffer, 3) << 7) + (indexbytes(buffer, 4) >> 1)324 return ((int(high) << 30) + (med << 15) + low) / 90000325 def ReadHeader(self, buffer, offset):326 """327 Handle MPEG header in buffer on position offset328 Return None on error, new offset or 0 if the new offset can't be scanned329 """330 if buffer[offset:offset + 3] != '\x00\x00\x01':331 return None332 _id = indexbytes(buffer, offset + 3)333 if _id == PADDING_PKT:334 return offset + (indexbytes(buffer, offset + 4) << 8) + \335 indexbytes(buffer, offset + 5) + 6336 if _id == PACK_PKT:337 if indexbytes(buffer, offset + 4) & 0xF0 == 0x20:338 self.type = 'MPEG-1 Video'339 self.get_time = self.ReadSCRMpeg1340 self.mpeg_version = 1341 return offset + 12342 elif (indexbytes(buffer, offset + 4) & 0xC0) == 0x40:343 self.type = 'MPEG-2 Video'344 self.get_time = self.ReadSCRMpeg2345 return offset + (indexbytes(buffer, offset + 13) & 0x07) + 14346 else:347 # I have no idea what just happened, but for some DVB348 # recordings done with mencoder this points to a349 # PACK_PKT describing something odd. Returning 0 here350 # (let's hope there are no extensions in the header)351 # fixes it.352 return 0353 if 0xC0 <= _id <= 0xDF:354 # code for audio stream355 for a in self.audio:356 if a.id == _id:357 break358 else:359 self.audio.append(core.AudioStream())360 self.audio[-1]._set('id', _id)361 return 0362 if 0xE0 <= _id <= 0xEF:363 # code for video stream364 for v in self.video:365 if v.id == _id:366 break367 else:368 self.video.append(core.VideoStream())369 self.video[-1]._set('id', _id)370 return 0371 if _id == SEQ_HEAD:372 # sequence header, remember that position for later use373 self.sequence_header_offset = offset374 return 0375 if _id in [PRIVATE_STREAM1, PRIVATE_STREAM2]:376 # private stream. we don't know, but maybe we can guess later377 add = indexbytes(buffer, offset + 8)378 # if (indexbytes(buffer, offset+6) & 4) or 1:379 # id = indexbytes(buffer, offset+10+add)380 if buffer[offset + 11 + add:offset + 15 + add].find('\x0b\x77') != -1:381 # AC3 stream382 for a in self.audio:383 if a.id == _id:384 break385 else:386 self.audio.append(core.AudioStream())387 self.audio[-1]._set('id', _id)388 self.audio[-1].codec = 0x2000 # AC3389 return 0390 if _id == SYS_PKT:391 return 0392 if _id == EXT_START:393 return 0394 return 0395 # Normal MPEG (VCD, SVCD) ========================================396 def isMPEG(self, file, force=False):397 """398 This MPEG starts with a sequence of 0x00 followed by a PACK Header399 http://dvd.sourceforge.net/dvdinfo/packhdr.html400 """401 file.seek(0, 0)402 buffer = file.read(10000)403 offset = 0404 # seek until the 0 byte stop405 while offset < len(buffer) - 100 and buffer[offset] == '\0':406 offset += 1407 offset -= 2408 # test for mpeg header 0x00 0x00 0x01409 header = '\x00\x00\x01%s' % chr(PACK_PKT)410 if offset < 0 or not buffer[offset:offset + 4] == header:411 if not force:412 return 0413 # brute force and try to find the pack header in the first414 # 10000 bytes somehow415 offset = buffer.find(header)416 if offset < 0:417 return 0418 # scan the 100000 bytes of data419 buffer += file.read(100000)420 # scan first header, to get basic info about421 # how to read a timestamp422 self.ReadHeader(buffer, offset)423 # store first timestamp424 self.start = self.get_time(buffer[offset + 4:])425 while len(buffer) > offset + 1000 and \426 buffer[offset:offset + 3] == '\x00\x00\x01':427 # read the mpeg header428 new_offset = self.ReadHeader(buffer, offset)429 # header scanning detected error, this is no mpeg430 if new_offset is None:431 return 0432 if new_offset:433 # we have a new offset434 offset = new_offset435 # skip padding 0 before a new header436 while len(buffer) > offset + 10 and \437 not indexbytes(buffer, offset + 2):438 offset += 1439 else:440 # seek to new header by brute force441 offset += buffer[offset + 4:].find('\x00\x00\x01') + 4442 # fill in values for support functions:443 self.__seek_size__ = 1000000444 self.__sample_size__ = 10000445 self.__search__ = self._find_timer_446 self.filename = file.name447 # get length of the file448 self.length = self.get_length()449 return 1450 @staticmethod451 def _find_timer_(buffer):452 """453 Return position of timer in buffer or None if not found.454 This function is valid for 'normal' mpeg files455 """456 pos = buffer.find('\x00\x00\x01%s' % chr(PACK_PKT))457 if pos == -1:458 return None459 return pos + 4460 # PES ============================================================461 def ReadPESHeader(self, offset, buffer, id=0):462 """463 Parse a PES header.464 Since it starts with 0x00 0x00 0x01 like 'normal' mpegs, this465 function will return (0, None) when it is no PES header or466 (packet length, timestamp position (maybe None))467 http://dvd.sourceforge.net/dvdinfo/pes-hdr.html468 """469 if not buffer[0:3] == '\x00\x00\x01':470 return 0, None471 packet_length = (indexbytes(buffer, 4) << 8) + indexbytes(buffer, 5) + 6472 align = indexbytes(buffer, 6) & 4473 header_length = indexbytes(buffer, 8)474 # PES ID (starting with 001)475 if indexbytes(buffer, 3) & 0xE0 == 0xC0:476 id = id or indexbytes(buffer, 3) & 0x1F477 for a in self.audio:478 if a.id == id:479 break480 else:481 self.audio.append(core.AudioStream())482 self.audio[-1]._set('id', id)483 elif indexbytes(buffer, 3) & 0xF0 == 0xE0:484 id = id or indexbytes(buffer, 3) & 0xF485 for v in self.video:486 if v.id == id:487 break488 else:489 self.video.append(core.VideoStream())490 self.video[-1]._set('id', id)491 # new mpeg starting492 if buffer[header_length + 9:header_length + 13] == \493 '\x00\x00\x01\xB3' and not self.sequence_header_offset:494 # yes, remember offset for later use495 self.sequence_header_offset = offset + header_length + 9496 elif indexbytes(buffer, 3) == 189 or indexbytes(buffer, 3) == 191:497 # private stream. we don't know, but maybe we can guess later498 id = id or indexbytes(buffer, 3) & 0xF499 if align and \500 buffer[header_length + 9:header_length + 11] == '\x0b\x77':501 # AC3 stream502 for a in self.audio:503 if a.id == id:504 break505 else:506 self.audio.append(core.AudioStream())507 self.audio[-1]._set('id', id)508 self.audio[-1].codec = 0x2000 # AC3509 else:510 # unknown content511 pass512 ptsdts = indexbytes(buffer, 7) >> 6513 if ptsdts and ptsdts == indexbytes(buffer, 9) >> 4:514 if indexbytes(buffer, 9) >> 4 != ptsdts:515 log.warning(u'WARNING: bad PTS/DTS, please contact us')516 return packet_length, None517 # timestamp = self.ReadPTS(buffer[9:14])518 high = ((indexbytes(buffer, 9) & 0xF) >> 1)519 med = (indexbytes(buffer, 10) << 7) + (indexbytes(buffer, 11) >> 1)520 low = (indexbytes(buffer, 12) << 7) + (indexbytes(buffer, 13) >> 1)521 return packet_length, 9522 return packet_length, None523 def isPES(self, file):524 log.info(u'trying mpeg-pes scan')525 file.seek(0, 0)526 buffer = file.read(3)527 # header (also valid for all mpegs)528 if not buffer == '\x00\x00\x01':529 return 0530 self.sequence_header_offset = 0531 buffer += file.read(10000)532 offset = 0533 while offset + 1000 < len(buffer):534 pos, timestamp = self.ReadPESHeader(offset, buffer[offset:])535 if not pos:536 return 0537 if timestamp is not None and not hasattr(self, 'start'):538 self.get_time = self.ReadPTS539 bpos = buffer[offset + timestamp:offset + timestamp + 5]540 self.start = self.get_time(bpos)541 if self.sequence_header_offset and hasattr(self, 'start'):542 # we have all informations we need543 break544 offset += pos545 if offset + 1000 < len(buffer) < 1000000 or 1:546 # looks like a pes, read more547 buffer += file.read(10000)548 if not self.video and not self.audio:549 # no video and no audio?550 return 0551 self.type = 'MPEG-PES'552 # fill in values for support functions:553 self.__seek_size__ = 10000000 # 10 MB554 self.__sample_size__ = 500000 # 500 k scanning555 self.__search__ = self._find_timer_PES_556 self.filename = file.name557 # get length of the file558 self.length = self.get_length()559 return 1560 def _find_timer_PES_(self, buffer):561 """562 Return position of timer in buffer or -1 if not found.563 This function is valid for PES files564 """565 pos = buffer.find('\x00\x00\x01')566 offset = 0567 if pos == -1 or offset + 1000 >= len(buffer):568 return None569 retpos = -1570 ackcount = 0571 while offset + 1000 < len(buffer):572 pos, timestamp = self.ReadPESHeader(offset, buffer[offset:])573 if timestamp is not None and retpos == -1:574 retpos = offset + timestamp575 if pos == 0:576 # Oops, that was a mpeg header, no PES header577 offset += buffer[offset:].find('\x00\x00\x01')578 retpos = -1579 ackcount = 0580 else:581 offset += pos582 if retpos != -1:583 ackcount += 1584 if ackcount > 10:585 # looks ok to me586 return retpos587 return None588 # Elementary Stream ===============================================589 def isES(self, file):590 file.seek(0, 0)591 try:592 header = struct.unpack('>LL', file.read(8))593 except (struct.error, IOError):594 return False595 if header[0] != 0x1B3:596 return False597 # Is an mpeg video elementary stream598 self.mime = 'video/mpeg'599 video = core.VideoStream()600 video.width = header[1] >> 20601 video.height = (header[1] >> 8) & 0xfff602 if header[1] & 0xf < len(FRAME_RATE):603 video.fps = FRAME_RATE[header[1] & 0xf]604 if (header[1] >> 4) & 0xf < len(ASPECT_RATIO):605 # FIXME: Empirically the aspect looks like PAR rather than DAR606 video.aspect = ASPECT_RATIO[(header[1] >> 4) & 0xf]607 self.video.append(video)608 return True609 # Transport Stream ===============================================610 def isTS(self, file):611 file.seek(0, 0)612 buffer = file.read(TS_PACKET_LENGTH * 2)613 c = 0614 while c + TS_PACKET_LENGTH < len(buffer):615 if indexbytes(buffer, c) == indexbytes(buffer, c + TS_PACKET_LENGTH) == TS_SYNC:616 break617 c += 1618 else:619 return 0620 buffer += file.read(10000)621 self.type = 'MPEG-TS'622 while c + TS_PACKET_LENGTH < len(buffer):623 start = indexbytes(buffer, c + 1) & 0x40624 # maybe load more into the buffer625 if c + 2 * TS_PACKET_LENGTH > len(buffer) and c < 500000:626 buffer += file.read(10000)627 # wait until the ts payload contains a payload header628 if not start:629 c += TS_PACKET_LENGTH630 continue631 tsid = ((indexbytes(buffer, c + 1) & 0x3F) << 8) + indexbytes(buffer, c + 2)632 adapt = (indexbytes(buffer, c + 3) & 0x30) >> 4633 offset = 4634 if adapt & 0x02:635 # meta info present, skip it for now636 adapt_len = indexbytes(buffer, c + offset)637 offset += adapt_len + 1638 if not indexbytes(buffer, c + 1) & 0x40:639 # no new pes or psi in stream payload starting640 pass641 elif adapt & 0x01:642 # PES643 timestamp = self.ReadPESHeader(c + offset, buffer[c + offset:],644 tsid)[1]645 if timestamp is not None:646 if not hasattr(self, 'start'):647 self.get_time = self.ReadPTS648 timestamp = c + offset + timestamp649 self.start = self.get_time(buffer[timestamp:timestamp + 5])650 elif not hasattr(self, 'audio_ok'):651 timestamp = c + offset + timestamp652 start = self.get_time(buffer[timestamp:timestamp + 5])653 if start is not None and self.start is not None and \654 abs(start - self.start) < 10:655 # looks ok656 self.audio_ok = True657 else:658 # timestamp broken659 del self.start660 log.warning(u'Timestamp error, correcting')661 if hasattr(self, 'start') and self.start and \662 self.sequence_header_offset and self.video and self.audio:663 break664 c += TS_PACKET_LENGTH665 if not self.sequence_header_offset:666 return 0667 # fill in values for support functions:668 self.__seek_size__ = 10000000 # 10 MB669 self.__sample_size__ = 100000 # 100 k scanning670 self.__search__ = self._find_timer_TS_671 self.filename = file.name672 # get length of the file673 self.length = self.get_length()674 return 1675 def _find_timer_TS_(self, buffer):676 c = 0677 while c + TS_PACKET_LENGTH < len(buffer):678 if indexbytes(buffer, c) == indexbytes(buffer, c + TS_PACKET_LENGTH) == TS_SYNC:679 break680 c += 1681 else:682 return None683 while c + TS_PACKET_LENGTH < len(buffer):684 start = indexbytes(buffer, c + 1) & 0x40685 if not start:686 c += TS_PACKET_LENGTH687 continue688 tsid = ((indexbytes(buffer, c + 1) & 0x3F) << 8) + indexbytes(buffer, c + 2)689 adapt = (indexbytes(buffer, c + 3) & 0x30) >> 4690 offset = 4691 if adapt & 0x02:692 # meta info present, skip it for now693 offset += indexbytes(buffer, c + offset) + 1694 if adapt & 0x01:695 timestamp = self.ReadPESHeader(c + offset, buffer[c + offset:], tsid)[1]696 if timestamp is None:697 # this should not happen698 log.error(u'bad TS')699 return None700 return c + offset + timestamp701 c += TS_PACKET_LENGTH702 return None703 # Support functions ==============================================704 def get_endpos(self):705 """706 get the last timestamp of the mpeg, return -1 if this is not possible707 """...

Full Screen

Full Screen

plantuml.py

Source:plantuml.py Github

copy

Full Screen

...47 res = ''48 for i in range(0, len(data), 3):49 if i + 2 == len(data):50 res += _encode3bytes(51 indexbytes(data, i),52 indexbytes(data, i + 1),53 054 )55 elif i + 1 == len(data):56 res += _encode3bytes(57 indexbytes(data, i),58 0, 059 )60 else:61 res += _encode3bytes(62 indexbytes(data, i),63 indexbytes(data, i + 1),64 indexbytes(data, i + 2)65 )66 return res67def _encode3bytes(b1, b2, b3):68 c1 = b1 >> 269 c2 = ((b1 & 0x3) << 4) | (b2 >> 4)70 c3 = ((b2 & 0xF) << 2) | (b3 >> 6)71 c4 = b3 & 0x3F72 res = ''73 res += _encode6bit(c1 & 0x3F)74 res += _encode6bit(c2 & 0x3F)75 res += _encode6bit(c3 & 0x3F)76 res += _encode6bit(c4 & 0x3F)77 return res78def _encode6bit(b):...

Full Screen

Full Screen

der.py

Source:der.py Github

copy

Full Screen

...16 Returns:17 bytes: The DER encoded signature18 """19 r_bytes = int_to_bytes(r)20 if indexbytes(r_bytes, 0) & 0x80:21 r_bytes = b"\x00" + r_bytes22 s_bytes = int_to_bytes(s)23 if indexbytes(s_bytes, 0) & 0x80:24 s_bytes = b"\x00" + s_bytes25 r_s = INTEGER + pack('B', len(r_bytes)) + r_bytes + INTEGER + pack('B', len(s_bytes)) + s_bytes26 return SEQUENCE + pack('B', len(r_s)) + r_s27 @staticmethod28 def decode_signature(sig):29 """Decode an EC signature from serialized DER format as described in30 https://tools.ietf.org/html/rfc2459 (section 7.2.2) and as detailed by31 bip-006632 Returns (r,s)33 """34 if len(sig) < 8:35 raise InvalidDerSignature("bytestring too small")36 if indexbytes(sig, 0) != ord(SEQUENCE):37 raise InvalidDerSignature("missing SEQUENCE marker")38 if indexbytes(sig, 1) != len(sig) - 2:39 raise InvalidDerSignature("invalid length")40 length_r = indexbytes(sig, 3)41 if 5 + length_r >= len(sig):42 raise InvalidDerSignature("invalid length")43 length_s = indexbytes(sig, 5 + length_r)44 if length_r + length_s + 6 != len(sig):45 raise InvalidDerSignature("invalid length")46 if indexbytes(sig, 2) != ord(INTEGER):47 raise InvalidDerSignature("invalid r marker")48 if length_r == 0:49 raise InvalidDerSignature("invalid r value")50 if indexbytes(sig, 4) & 0x80:51 raise InvalidDerSignature("invalid r value")52 if (length_r > 1 and (indexbytes(sig, 4) == 0x00) and not (indexbytes(sig, 5) & 0x80)):53 raise InvalidDerSignature("invalid r value")54 if indexbytes(sig, length_r + 4) != ord(INTEGER):55 raise InvalidDerSignature("invalid s marker")56 if length_s == 0:57 raise InvalidDerSignature("invalid s value")58 if indexbytes(sig, length_r + 6) & 0x80:59 raise InvalidDerSignature("invalid s value")60 if (length_s > 1 and (indexbytes(sig, length_r + 6) == 0x00) and not (indexbytes(sig, length_r + 7) & 0x80)):61 raise InvalidDerSignature("invalid s value")62 r_data, s_data = sig[4:4 + length_r], sig[6 + length_r:]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pyresttest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful