Best Python code snippet using yandex-tank
vnc.py
Source:vnc.py  
1import struct2from array import array3import PyD3DES4from AppKit import *5from twisted.internet.protocol import Protocol, ClientFactory6from twisted.internet import reactor7class NotEnoughDataException(Exception):8    pass9def tocard32(i):10    try:11        return struct.pack('!i', i)12    except struct.error, e:13        raise NotEnoughDataException(e)14def card32(i):15    if len(i) != 4:16        raise NotEnoughDataException(i)17    return array('i', i.tostring())[0]18def tocard16(i):19    try:20        return struct.pack('!h', i)21    except struct.error, e:22        raise NotEnoughDataException(e)23def card16(i):24    if len(i) != 2:25        raise NotEnoughDataException(i)26    return array('h', i.tostring())[0]27def tocard8(i):28    return chr(i)29    30def card8(i):31    return i32class PixelFormat(object):33    def __init__(self, **args):34        for k, v in args.items():35            setattr(self, k, v)36        37    def parse(klass, data):38        parts = list(data[:4])39        parts.append(card16(data[4:6]))40        parts.append(card16(data[6:8]))41        parts.append(card16(data[8:10]))42        parts += map(None, data[10:13])43        args = {}44        for attr, value in zip(('bpp', 'depth', 'bigendian', 'truecolor',45                                'red_max', 'green_max', 'blue_max',46                                'red_shift', 'green_shift',47                                'blue_shift'), parts):48            args[attr] = value49        args['size'] = args['bpp'] / 850        return klass(**args)51    parse = classmethod(parse)52            53    def __str__(self):54        parts = [x for x in dir(self) if not x.startswith('_')]55        parts.sort()56        s = '\n'.join([('%12s: %s' % (x, getattr(self, x))) for x in parts])57        return s58    def pack(self):59        parts = [tocard8(self.bpp),60                 tocard8(self.depth),61                 tocard8(self.bigendian),62                 tocard8(self.truecolor),63                 tocard16(self.red_max),64                 tocard16(self.green_max),65                 tocard16(self.blue_max),66                 tocard8(self.red_shift),67                 tocard8(self.green_shift),68                 tocard8(self.blue_shift),69                 '\0' * 3]70        return ''.join(parts)71    def default(klass):72        return klass(bpp=32, depth=32, bigendian=0, truecolor=1, red_max=255, green_max=255, blue_max=255, red_shift=0, green_shift=8, blue_shift=16, size=4)73    default = classmethod(default)74class VNCClient(Protocol):75    RAW_ENCODING, COPYRECT_ENCODING, RRE_ENCODING, CORRE_ENCODING, HEXTILE_ENCODING = 0, 1, 2, 4, 576    def __init__(self, passwd=''):77        self.read_buffer = array('B')78        self.state = 'ProtocolVersion'79        self.states = { 'ProtocolVersion': self.do_ProtocolVersion,80                        'Authentication': self.do_Auth,81                        'Initialization': self.do_Init,82                        'Connected': self.do_ConnectedHandler83                        }84        self.auth_types = [self.do_ConnectionFailed,85                            self.do_NoAuth,86                            self.do_VNCAuth,87                            ]88        self.vnc_auth_state = 089        self.vnc_auth_states = [self.do_VNCAuth1,90                                self.do_VNCAuth2,91                                ]92        self.connected_msgs = {0: self.do_FramebufferUpdate,93                               2: self.do_Bell,94                               3: self.do_ServerCutText,95                               }96    def dataReceived(self, data):97        self.read_buffer.extend(array('B', data))98        self.states[self.state]()99    def setFrameBufferRect(self, x, y, w, h, data):100        destRect = ((x, y), (w, h))101        srcRect = ((0, 0), (w, h))102    103        self.frameBuffer.lockFocus()104        data.drawInRect_fromRect_operation_fraction_(destRect, srcRect, NSCompositeCopy, 1.0)105        self.frameBuffer.unlockFocus()106        107    def setFrameBufferRectToRawData(self, x, y, w, h, rawData):108        imgRep = NSBitmapImageRep.alloc().initWithBitmapDataPlanes_pixelsWide_pixelsHigh_bitsPerSample_samplesPerPixel_hasAlpha_isPlanar_colorSpaceName_bytesPerRow_bitsPerPixel_((rawData, None, None, None, None), w, h, 8, 4, True, False, NSCalibratedRGBColorSpace, 0, 4 * 8)109        imgRep.setAlpha_(False)110        img = NSImage.alloc().initWithSize_((w, h))111        img.setFlipped_(True)112        img.lockFocus()113        imgRep.drawInRect_(((0, 0), (w, h)))114        img.unlockFocus()115        self.setFrameBufferRect(x, y, w, h, img)116        117    def dumpFramebufferToFile(self):118        self.frameBuffer.TIFFRepresentationUsingCompression_factor_(NSTIFFCompressionLZW, 1.0).writeToFile_atomically_('VNC - %s.tiff' % (self.name, ), False)119    def do_ProtocolVersion(self):120        # Remove the servers version and return our own121        print('Protocol version recieved "%s"' % (''.join(self.read_buffer[:11].tostring()), ))122        self.read_buffer = self.read_buffer[12:]123        self.transport.write('RFB 003.003\n')124        self.state = 'Authentication'125    def do_ConnectionFailed(self):126        try:127            reason_length = card32(self.read_buffer[4:8])128            reason = self.read_buffer[8:8 + reason_length]129            self.read_buffer = self.read_buffer[8 + reason_length:]130            self.transport.loseConnection()131            print('Connection failed: %s' % (reason,))132            return 1133        except Exception, e:134            print repr(e)135    def do_Auth(self):136        t = self.read_buffer[:4]137        t = card32(t)138        print('Authentication type is %d (%r)' % (t, self.auth_types[t]))139        if self.auth_types[t]():140            pass141                142    def do_NoAuth(self):143        pass144    def do_VNCAuth(self):145        self.vnc_auth_states[self.vnc_auth_state]()146    def do_VNCAuth1(self):147        try:148            if len(self.read_buffer) < 20:149                raise IndexError150            challenge = self.read_buffer[4:20]151            self.read_buffer = self.read_buffer[20:]152            self.read_buffer.extend(array('B', '\x00\x00\x00\x02'))153            PyD3DES.setkey(self.factory.password)154            self.transport.write(PyD3DES.encrypt(challenge[:8]) + PyD3DES.encrypt(challenge[8:]))155            self.vnc_auth_state += 1156        except Exception, e:157            print repr(e), e158            return159    def do_VNCAuth2(self):160        try:161            if len(self.read_buffer) < 8:162                raise IndexError163            response = card32(self.read_buffer[4:8])164            if response == 0:165                print('Authentication succeeded')166                self.state = 'Initialization'167                self.read_buffer = self.read_buffer[8:]168                # Send the ClientInit message that consists169                # of a card32 that is 1 if the session is shared170                try:171                    self.transport.write(chr(self.shared))172                except:173                    self.transport.write(chr(0))174            elif response == 1:175                print('Invalid password')176                self.transport.loseConnection()177            elif response == 2:178                print('Too many connections!')179                self.transport.loseConnection()180        except Exception, e:181            print(e)182            183            184    def do_Init(self):185        """186        Handle a ServerInitialisation message.187        """188        if len(self.read_buffer) >= 24:189            print('ServerInitialisation buffer length: %d' % (len(self.read_buffer), ))190            self.fbsize = (card16(self.read_buffer[:2]), card16(self.read_buffer[2:4]))191            self.pixel_format = PixelFormat.parse(self.read_buffer[4:20])192            name_length = card32(self.read_buffer[20:24])193        else:194            return195        196        if len(self.read_buffer) < 24 + name_length:197            return198        self.name = ''.join(self.read_buffer[24:24 + name_length].tostring())199        print('Connected to %s' % (self.name, ))200        print('Servers pixel format: \n%s' % (self.pixel_format, ))201        202        self.frameBuffer = NSImage.alloc().initWithSize_(self.fbsize)203        self.frameBuffer.setFlipped_(True)204        self.alreadyProcessedRectangles = (0, 4)205        self.read_buffer = self.read_buffer[24 + name_length:]206        self.state = 'Connected'207        self.send_SetPixelFormat(PixelFormat.default())208        self.send_SetEncoding((self.CORRE_ENCODING, ))209        self.send_FramebufferUpdateRequest((0, 0, self.fbsize[0], self.fbsize[1]), 0)210        211    def do_ConnectedHandler(self):212        """ """213        p = NSAutoreleasePool.alloc().init()214        self.connected_msgs[self.read_buffer[0]]()215        del p216    def do_FramebufferUpdate(self):217        buf = self.read_buffer218        try:219            messageType = buf[0]220            numberOfRectangles = card16(buf[2:4])221            222            firstRectangle, currentRectangleStart = self.alreadyProcessedRectangles223            for currentRectangle in range(firstRectangle, numberOfRectangles):224                currentRectangleLength = 0225                x, y, width, height = card16(buf[currentRectangleStart    :currentRectangleStart + 2]), \226                                      card16(buf[currentRectangleStart + 2:currentRectangleStart + 4]), \227                                      card16(buf[currentRectangleStart + 4:currentRectangleStart + 6]), \228                                      card16(buf[currentRectangleStart + 6:currentRectangleStart + 8])229                encodingType = card32(buf[currentRectangleStart + 8:currentRectangleStart + 12])230                currentRectangleLength += 12231                232                currentEncodingStart = currentRectangleStart + 12233                currentEncodingLength = 0234                if encodingType == self.RAW_ENCODING:235                    currentEncodingLength = VNCClient._Raw(self, currentEncodingStart, x, y, width, height, NotEnoughDataException)236                elif encodingType == self.CORRE_ENCODING:237                    currentEncodingLength = VNCClient._CoRRE(self, currentEncodingStart, x, y, width, height, NotEnoughDataException)238                else:239                    print '\t\tUnknown encoding!', encodingType240                currentRectangleLength += currentEncodingLength241                currentRectangleStart += currentRectangleLength242                243                self.alreadyProcessedRectangles = (currentRectangle + 1, currentRectangleStart)244            245            self.read_buffer = self.read_buffer[currentRectangleStart:]246            self.alreadyProcessedRectangles = (0, 4)247#            self.dumpFramebufferToFile()248            self.send_FramebufferUpdateRequest((0, 0, self.fbsize[0], self.fbsize[1]), 0)249#            self.transport.loseConnection()250        except NotEnoughDataException, e:251            pass252            253    def _Raw(self, currentEncodingStart, x, y, width, height, NotEnoughDataException):254        rawDataLength = width * height * self.pixel_format.size255        rawData = self.read_buffer[currentEncodingStart:currentEncodingStart + rawDataLength]256        if len(rawData) < rawDataLength:257            raise NotEnoughDataException258        259        self.setFrameBufferRectToRawData(x, y, width, height, rawData)260        return rawDataLength261            262    def _CoRRE(self, currentEncodingStart, x, y, width, height, NotEnoughDataException):263        buf = self.read_buffer264        if len(buf) < currentEncodingStart + 8:265            raise NotEnoughDataException266    267        numberOfSubRectangles = card32(buf[currentEncodingStart:currentEncodingStart + 4])268        r, g, b, a = buf[currentEncodingStart + 4:currentEncodingStart + 8]269        270        self.frameBuffer.lockFocus()271        NSColor.colorWithCalibratedRed_green_blue_alpha_(r / 255.0, g / 255.0, b / 255.0, 1.0).set()272        NSRectFill(((x, y), (width, height)))273    274        currentSubRectangleStart = currentEncodingStart + 8275        276        if len(buf) < currentSubRectangleStart + (8 * numberOfSubRectangles):277            raise NotEnoughDataException278    279        subRectangleRects = []280        subRectangleColors = []281        if numberOfSubRectangles:282            for currentSubRectangle in range(numberOfSubRectangles):283                r, g, b, a, srX, srY, srWidth, srHeight = buf[currentSubRectangleStart:currentSubRectangleStart + 8]284    285                subRectangleRects.append(((x + srX, y + srY), (srWidth, srHeight)))286                subRectangleColors.append((r / 255.0, g / 255.0, b / 255.0, 1.0))287                currentSubRectangleStart += 8288    289            subRectangleColors = map(NSColor.colorWithCalibratedRed_green_blue_alpha_, *zip(*subRectangleColors))        290            NSRectFillListWithColors(subRectangleRects, subRectangleColors)291    292        self.frameBuffer.unlockFocus()293            294        currentEncodingLength = 8 * (numberOfSubRectangles + 1)295        return currentEncodingLength296    297    try:298        from _CoRRE import _CoRRE299    except ImportError:300        pass301    def do_Bell(self):302        self.read_buffer = self.read_buffer[1:]303    def do_ServerCutText(self):304        length = card32(self.read_buffer[4:8])305        text = self.read_buffer[8:8 + length]306        print text307        self.read_buffer = self.read_buffer[8 + length:]308    309    def send_SetPixelFormat(self, pixelFormat):310        self.pixel_format = pixelFormat311        msg = tocard8(0)312        msg += '\0' * 3313        msg += pixelFormat.pack()314        self.transport.write(msg)315    def send_SetEncoding(self, encodings):316        msg = tocard8(2)317        msg = msg + '\0'318        msg = msg + tocard16(len(encodings))319        encs = map(tocard32, encodings)320        msg = msg + ''.join(encs)321        self.transport.write(msg)322    def send_FramebufferUpdateRequest(self, pos, incremental):323        msg = tocard8(3)324        msg = msg + tocard8(incremental)325        msg = msg + ''.join(map(tocard16, pos))326        self.transport.write(msg)327    def send_KeyEvent(self, down, key):328        msg = tocard8(4)329        msg = msg + tocard8(down) + '\0'330        msg = msg + tocard32(key)331        self.transport.write(msg)332    def send_PointerEvent(self, pos, buttons):333        msg = tocard8(5)334        msg = msg + tocard8(buttons)335        msg = msg + ''.join(map(tocard16, pos))336        self.transport.write(msg)337    def send_ClientCutText(self, text):338        msg = tocard8(6)339        msg = msg + ' ' + card32(len(text))340        msg = msg + text341        self.transport.write(msg)342class VNCClientFactory(ClientFactory):343    protocol = VNCClient344    def __init__(self, password):345        self.password = password346    def startedConnecting(self, connector):347        print 'startedConnecting'348    def clientConnectionFailed(self, connector, reason):349        print 'Connection failed. Reason:', reason350    def clientConnectionLost(self, connector, reason):351        print 'Connection lost. Reason:', reason352        if reactor.running:353            reactor.stop()354import hotshot, hotshot.stats355def main():356    import sys357    if len(sys.argv) < 4:358        print 'Usage: %s hostname port password' % (sys.argv[0], )359        return360    NSApplicationLoad()361    reactor.connectTCP(sys.argv[1], int(sys.argv[2]), VNCClientFactory(sys.argv[3]))362    reactor.run()363import profile, pstats364if __name__ == '__main__':365    main()...frame.py
Source:frame.py  
1import struct2from enum import IntEnum3from mod_websocket_server.util import skip_first4class OpCode(IntEnum):5    CONTINUATION = 0x06    TEXT = 0x17    BINARY = 0x28    CLOSE = 0x89    PING = 0x910    PONG = 0xA11class Mask(object):12    def __init__(self, masking_key):13        if len(masking_key) != 4:14            raise ValueError(15                "Masking-key must be exactly four bytes, {length} given.".format(16                    length=len(masking_key)17                )18            )19        self.masking_key = bytearray(masking_key)20    def mask(self, data):21        data = bytearray(data)22        for i in xrange(len(data)):23            data[i] ^= self.masking_key[i % 4]24        return data25class Frame(object):26    def __init__(self, fin, op_code, mask, payload):27        self.fin = fin28        self.op_code = op_code29        self.mask = mask30        self.payload = payload31    @property32    def masked(self):33        return self.mask is not None34    @property35    def payload_length(self):36        return len(self.payload)37    def serialize(self):38        frame = bytearray()39        fin_bit = self.fin << 740        frame += struct.pack("!B", fin_bit | self.op_code)41        masked_bit = self.masked << 742        if self.payload_length <= 125:43            frame += struct.pack("!B", masked_bit | self.payload_length)44        elif self.payload_length < 2 ** 16:45            frame += struct.pack("!B", masked_bit | 126)46            frame += struct.pack("!H", self.payload_length)47        else:48            frame += struct.pack("!B", masked_bit | 127)49            frame += struct.pack("!Q", self.payload_length)50        if self.masked:51            frame += self.mask.masking_key52            frame += self.mask.mask(self.payload)53        else:54            frame += self.payload55        return frame56    @classmethod57    @skip_first58    def multi_parser(cls):59        parser = cls.parser()60        remaining = None61        frames = []62        while True:63            if remaining is None:64                remaining = yield frames65                frames = []66            result = parser.send(remaining)67            if result:68                frame, remaining = result69                frames.append(frame)70                parser = cls.parser()71            else:72                remaining = None73    @staticmethod74    @skip_first75    def parser():76        read_buffer = bytearray()77        # ------------------------------------------78        # first byte79        # ------------------------------------------80        while len(read_buffer) < 1:81            read_buffer += yield82        fin_mask = 0b1000000083        fin = bool(read_buffer[0] & fin_mask)84        rsv_mask = 0b0111000085        if read_buffer[0] & rsv_mask:86            raise AssertionError("Unsupported extension.")87        op_code_mask = 0b0000111188        op_code = OpCode(read_buffer[0] & op_code_mask)89        read_buffer = read_buffer[1:]90        # ------------------------------------------91        # second byte92        # ------------------------------------------93        while len(read_buffer) < 1:94            read_buffer += yield95        masked_mask = 0b1000000096        masked = bool(read_buffer[0] & masked_mask)97        payload_length_mask = 0b0111111198        payload_length = read_buffer[0] & payload_length_mask99        read_buffer = read_buffer[1:]100        # ------------------------------------------101        # extended length field102        # ------------------------------------------103        if payload_length == 126:104            while len(read_buffer) < 2:105                read_buffer += yield106            (payload_length,) = struct.unpack("!H", read_buffer[:2])107            read_buffer = read_buffer[2:]108        elif payload_length == 127:109            while len(read_buffer) < 4:110                read_buffer += yield111            (payload_length,) = struct.unpack("!Q", read_buffer[:4])112            read_buffer = read_buffer[4:]113        # ------------------------------------------114        # mask key115        # ------------------------------------------116        mask = None117        if masked:118            while len(read_buffer) < 4:119                read_buffer += yield120            mask = Mask(read_buffer[:4])121            read_buffer = read_buffer[4:]122        # ------------------------------------------123        # payload124        # ------------------------------------------125        while len(read_buffer) < payload_length:126            read_buffer += yield127        payload = read_buffer[:payload_length]128        read_buffer = read_buffer[payload_length:]129        # ------------------------------------------130        # assemble131        # ------------------------------------------132        if masked:133            payload = mask.mask(payload)134        yield Frame(fin, op_code, mask, payload), read_buffer135    def __repr__(self):...nbt.py
Source:nbt.py  
1import struct2def parse_nbt(read_buffer):3    tag_id = read_buffer.read_byte()4    name = read_name(read_buffer)5    return {name:parse(tag_id, read_buffer)}6def read_name(read_buffer):7    length = read_buffer.read_ushort()8    string = read_buffer.read(length).decode('utf8')9    return string10def parse(tag_id, read_buffer):11    if tag_id == 0:12        return None13    elif tag_id == 1:14        return parse_byte(read_buffer)15    elif tag_id == 2:16        return parse_short(read_buffer)17    elif tag_id == 3:18        return parse_int(read_buffer)19    elif tag_id == 4:20        return parse_long(read_buffer)21    elif tag_id == 5:22        return parse_float(read_buffer)23    elif tag_id == 6:24        return parse_double(read_buffer)25    elif tag_id == 7:26        return parse_byte_array(read_buffer)27    elif tag_id == 8:28        return parse_string(read_buffer)29    elif tag_id == 9:30        return parse_list(read_buffer)31    elif tag_id == 10:32        return parse_compound(read_buffer)33    elif tag_id == 11:34        return parse_int_array(read_buffer)35    elif tag_id == 12:36        return parse_long_array(read_buffer)37    else:38        raise Exception('Invalid tag id')39def parse_byte(read_buffer):40    payload = read_buffer.read_sbyte()41    return payload42def parse_short(read_buffer):43    payload = read_buffer.read_short()44    return payload45def parse_int(read_buffer):46    payload = read_buffer.read_int()47    return payload48def parse_long(read_buffer):49    payload = read_buffer.read_long()50    return payload51def parse_float(read_buffer):52    payload = read_buffer.read_float()53    return payload54def parse_double(read_buffer):55    payload = read_buffer.read_double()56    return payload57def parse_byte_array(read_buffer):58    length = read_buffer.read_int()59    payload = [read_buffer.read_sbyte() for _ in range(length)]60    return payload61def parse_string(read_buffer):62    length = read_buffer.read_ushort()63    payload = read_buffer.read(length).decode('utf8')64    return payload65def parse_list(read_buffer):66    tag_id = read_buffer.read_byte()67    length = read_buffer.read_int()68    if length > 0 and tag_id == 0:69        raise Exception("Cannot have a non-zero length list of End Tags")70    if tag_id == 0:71        return []72    else:73        return [parse(tag_id, read_buffer) for _ in range(length)]74def parse_compound(read_buffer):75    tag_id = read_buffer.read_byte()76    output = []77    while tag_id != 0:78        name = read_name(read_buffer)79        payload = parse(tag_id, read_buffer)80        output.append({name:payload})81        tag_id = read_buffer.read_byte()82    return output83def parse_int_array(read_buffer):84    length = read_buffer.read_int()85    return [read_buffer.read_int() for _ in range(length)]86def parse_long_array(read_buffer):87    length = read_buffer.read_int()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
