Best Python code snippet using molotov_python
dope.py
Source:dope.py  
1'''2DOPE Class3'''4__version__ = "2.0.3"5__author__ = "Anubhav Mattoo"6from Crypto.PublicKey import RSA7from Crypto.Cipher import PKCS1_v1_5, AES8from Crypto.Hash import HMAC, SHA256, SHA384, SHA5129from Crypto.Random import get_random_bytes10from Crypto.Signature import pss, pkcs1_1511import bchlib12import base6413from typing import Union14from hashlib import blake2b, blake2s15from dill import loads, dumps16from gzip import compress, decompress17# Lookup Tables18AES_MODE_LOOKUP = {19    "GCM": AES.MODE_GCM,20    "SIV": AES.MODE_SIV,21    "CBC": AES.MODE_CBC,22    "OFB": AES.MODE_OFB23}24RATCHET_MODE_LOOKUP = {25    "BLAKE0x0": 0x0,26    "BLAKEx0x": 0x1,27}28HMAC_LOOKUP = {29    "SHA256": SHA256,30    "SHA384": SHA384,31    "SHA512": SHA51232}33HMAC_SIZE_LOOKUP = {34    1024: 128,35    2048: 256,36    3072: 384,37    4096: 512,38    8192: 102439}40KEY_MODE_LOOKUP = {41    "XOR-BL": 0x0,42    "AND-BL": 0x1,43}44DOPE_HIGHER_LOOKUP = {45    # Higher Byte46    (1024, "GCM", "BLAKE0x0"): b'\x00',47    (1024, "GCM", "BLAKEx0x"): b'\x01',48    (1024, "SIV", "BLAKE0x0"): b'\x04',49    (1024, "SIV", "BLAKEx0x"): b'\x05',50    (1024, "CBC", "BLAKE0x0"): b'\x08',51    (1024, "CBC", "BLAKEx0x"): b'\x09',52    (1024, "OFB", "BLAKE0x0"): b'\x0C',53    (1024, "OFB", "BLAKEx0x"): b'\x0D',54    (2048, "GCM", "BLAKE0x0"): b'\x10',55    (2048, "GCM", "BLAKEx0x"): b'\x11',56    (2048, "SIV", "BLAKE0x0"): b'\x14',57    (2048, "SIV", "BLAKEx0x"): b'\x15',58    (2048, "CBC", "BLAKE0x0"): b'\x18',59    (2048, "CBC", "BLAKEx0x"): b'\x19',60    (2048, "OFB", "BLAKE0x0"): b'\x1C',61    (2048, "OFB", "BLAKEx0x"): b'\x1D',62    (4096, "GCM", "BLAKE0x0"): b'\x20',63    (4096, "GCM", "BLAKEx0x"): b'\x21',64    (4096, "SIV", "BLAKE0x0"): b'\x24',65    (4096, "SIV", "BLAKEx0x"): b'\x25',66    (4096, "CBC", "BLAKE0x0"): b'\x28',67    (4096, "CBC", "BLAKEx0x"): b'\x29',68    (4096, "OFB", "BLAKE0x0"): b'\x2C',69    (4096, "OFB", "BLAKEx0x"): b'\x2D',70}71INV_DOPE_HIGHER_LOOKUP = {72    # Higher Byte73    0x00: (1024, "GCM", "BLAKE0x0"),74    0x01: (1024, "GCM", "BLAKEx0x"),75    0x04: (1024, "SIV", "BLAKE0x0"),76    0x05: (1024, "SIV", "BLAKEx0x"),77    0x08: (1024, "CBC", "BLAKE0x0"),78    0x09: (1024, "CBC", "BLAKEx0x"),79    0x0C: (1024, "OFB", "BLAKE0x0"),80    0x0D: (1024, "OFB", "BLAKEx0x"),81    0x10: (2048, "GCM", "BLAKE0x0"),82    0x11: (2048, "GCM", "BLAKEx0x"),83    0x14: (2048, "SIV", "BLAKE0x0"),84    0x15: (2048, "SIV", "BLAKEx0x"),85    0x18: (2048, "CBC", "BLAKE0x0"),86    0x19: (2048, "CBC", "BLAKEx0x"),87    0x1C: (2048, "OFB", "BLAKE0x0"),88    0x1D: (2048, "OFB", "BLAKEx0x"),89    0x20: (4096, "GCM", "BLAKE0x0"),90    0x21: (4096, "GCM", "BLAKEx0x"),91    0x24: (4096, "SIV", "BLAKE0x0"),92    0x25: (4096, "SIV", "BLAKEx0x"),93    0x28: (4096, "CBC", "BLAKE0x0"),94    0x29: (4096, "CBC", "BLAKEx0x"),95    0x2C: (4096, "OFB", "BLAKE0x0"),96    0x2D: (4096, "OFB", "BLAKEx0x"),97}98DOPE_LOWER_LOOKUP = {99    # Lower Byte100    ("SHA256", "XOR-BL"): b'\x00',101    ("SHA256", "AND-BL"): b'\x01',102    ("SHA384", "XOR-BL"): b'\x10',103    ("SHA384", "AND-BL"): b'\x11',104    ("SHA512", "XOR-BL"): b'\x20',105    ("SHA512", "AND-BL"): b'\x21',106}107INV_DOPE_LOWER_LOOKUP = {108    # Lower Byte109    0x00: ("SHA256", "XOR-BL"),110    0x01: ("SHA256", "AND-BL"),111    0x10: ("SHA384", "XOR-BL"),112    0x11: ("SHA384", "AND-BL"),113    0x20: ("SHA512", "XOR-BL"),114    0x21: ("SHA512", "AND-BL"),115}116def byte_xor(left: bytes, right: bytes) -> bytes:117    '''118    XOR Byte String, 2 input119    '''120    return bytes([a ^ b for a, b in zip(left, right)])121def byte_and(left: bytes, right: bytes) -> bytes:122    '''123    AND Byte String, 2 inputs124    '''125    return bytes([a & b for a, b in zip(left, right)])126class DOPE2(object):127    """128    Double Ratchet Over Parity Exchange(DOPE)129    System Class for DOPE130    Parameters:-131        key: bytes132        bch_poly: int133        ecc_size: int134        aes_mode: str135        ratchet_mode: str136    """137    def __init__(self, key: bytes, bch_poly: int,138                 ecc_size: int, aes_mode: str,139                 nonce: bytes, block_size: int = 512):140        self.__key = key141        self.__bch = bchlib.BCH(bch_poly, ecc_size)142        self.__bch_poly = bch_poly143        self.___fixture = False144        if len(nonce) == 0:145            self.__nonce = get_random_bytes(32)146        elif len(nonce) < 32:147            self.__nonce = blake2s(nonce, digest_size=32).digest()148        elif len(nonce) == 32:149            self.__nonce = nonce150        else:151            raise TypeError(152                f"DOPE does not support nonce of size {len(nonce)}")153        self.__ratchet_count = 0154        if block_size >= 128:155            self.block_size = block_size156        else:157            raise TypeError(158                f"DOPE does not support block size{block_size},"159                + " block must be greater than 128")160        if aes_mode in AES_MODE_LOOKUP:161            self.__aes_mode = aes_mode162            if aes_mode == "SIV":163                self.__aes_size = 512164            else:165                self.__aes_size = 256166        else:167            raise TypeError(f"DOPE does not support {aes_mode} mode")168    def __str__(self):169        DOPE = f'DOPE2_'170        BCH = f'BCH_{self.__bch.t}_{self.__bch.ecc_bytes}_'171        AES = f'AES_{self.__aes_size}_{self.__aes_mode}_'172        BLK = f'BLK_{self.block_size}'173        return DOPE + BCH + AES + BLK174    def __repr__(self):175        return self.__str__()176    def serialize(self):177        khac = blake2b(self.__key, digest_size=32).digest()178        nhac = blake2b(self.__nonce, digest_size=32).digest()179        kvac = blake2b(khac + nhac).digest()180        data = self.block_size.to_bytes(16, 'big')\181            + self.__bch_poly.to_bytes(16, 'big')\182            + self.__bch.t.to_bytes(16, 'big')\183            + self.__nonce184        if self.__aes_mode in ['SIV', 'GCM']:185            nonce = get_random_bytes(16)186            encoder = AES.new(khac, AES_MODE_LOOKUP[self.__aes_mode],187                              nonce=nonce)188            encoder.update(nonce)189            data, tag = encoder.encrypt_and_digest(data)190            data = nonce + data + tag191        else:192            encoder = AES.new(khac, AES_MODE_LOOKUP[self.__aes_mode])193            data = encoder.iv + encoder.encrypt(data)194        data = self.__aes_mode.encode('utf8') + data + kvac195        data = base64.urlsafe_b64encode(data)196        if len(data) >= 80:197            data = b"".join(data[i:i+80] + b"\n"198                            for i in range(0, len(data), 80))199        data = b'-----BEGIN DOPE 2 KEY-----\n'\200            + data + b'-----END DOPE 2 KEY-----'201        return data202    @classmethod203    def marshall(cls, key: Union[str, bytes], password: bytes):204        khac = blake2b(password, digest_size=32).digest()205        data = key.splitlines()[1:-1]206        data = b"".join(data)207        data = base64.urlsafe_b64decode(data)208        aes_mode, niv, data, kvac = data[:3].decode('utf8'),\209            data[3:19], data[19:-64], data[-64:]210        if aes_mode in ['SIV', 'GCM']:211            decoder = AES.new(khac, AES_MODE_LOOKUP[aes_mode], nonce=niv)212            decoder.update(niv)213            data = decoder.decrypt_and_verify(data[:-16], data[-16:])214        else:215            decoder = AES.new(khac, AES_MODE_LOOKUP[aes_mode], iv=niv)216            data = decoder.decrypt(data)217        block_size, bch_poly, ecc_size, nonce =\218            int.from_bytes(data[:16], 'big'),\219            int.from_bytes(data[16:32], 'big'),\220            int.from_bytes(data[32:48], 'big'),\221            data[48:]222        nhac = blake2b(nonce, digest_size=32).digest()223        vkac = blake2b(khac + nhac).digest()224        if vkac != kvac:225            raise ValueError('Key Verification Error')226        return cls(password, bch_poly, ecc_size, aes_mode, nonce, block_size)227    def fixate(self):228        '''229        Fixate at a key and Start Ratchets230        '''231        # Key = BLAKE(BLAKE(Weak Home) XOR BLAKE(Strong Home))232        self.__fixture = True233        hash_pass = blake2b(self.__key).digest()234        hash_nonce = blake2b(self.__nonce).digest()235        if self.__aes_mode == "SIV":236            self.__hkdf = blake2b(byte_xor(hash_pass, hash_nonce))237        else:238            self.__hkdf = blake2b(byte_xor(hash_pass, hash_nonce),239                                  digest_size=32)240    @property241    def nonce(self):242        return self.__nonce243    def ratchet(self, ecc: Union[bytes, bytearray]):244        '''245        Ratchet to Next Key246        '''247        self.__ratchet_count += 1248        if self.__ratchet_count > 2 ** 128:249            raise ValueError('Keys Exhausted')250        key = self.__hkdf.digest()251        self.__hkdf.update(key + bytes(ecc))252    def key(self) -> bytes:253        '''254        Ratchet to Next Home Key255        '''256        if self.__ratchet_count > 2 ** 128:257            raise ValueError('Keys Exhausted')258        key = self.__hkdf.digest()259        return key260    def pack_data(self, data: bytes) -> list:261        '''262        Pack Data to DOPE Standard263        '''264        data_block = []265        for x in range(0, len(data), self.block_size - 4):266            pad_len = 0267            if len(data[x:x+self.block_size - 4]) < self.block_size - 4:268                pad_len = self.block_size\269                    - 4 - len(data[x:x+self.block_size - 4])270            data_x = data[x:x+self.block_size - 4] + get_random_bytes(pad_len)271            pad_len = pad_len.to_bytes(4, 'big')272            data_block.append(pad_len + data_x)273        return data_block274    def encode(self, data: bytes) -> bytes:275        '''276        Encode Data in DOPE Data format277        and Serialise as a byte string278        '''279        if not self.__fixture:280            self.fixate()281        data_block = self.pack_data(data)282        code_string = []283        counter = -1284        for x in data_block:  # x: Data Batch285            counter += 1286            key = self.key()287            ecc = self.__bch.encode(x[4:])288            if self.__aes_mode in ['SIV', 'GCM']:289                nonce = get_random_bytes(16)290                encoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],291                                  nonce=nonce)292                encoder.update(b'DOPE')293                header = b'DOPE' + nonce294                data, tag = encoder.encrypt_and_digest(x[4:])295                packet = {296                    'block': counter,297                    'header': header,298                    'pad_len': x[:4],299                    'data': data,300                    'tag': tag,301                    'ecc': bytes(self.__bch.encode(data))302                }303                code_string.append(dumps(packet))304            else:305                encoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode])306                header = b'DOPE' + encoder.iv307                data = encoder.encrypt(x[4:])308                packet = {309                    'block': counter,310                    'header': header,311                    'pad_len': x[:4],312                    'data': data,313                    'ecc': bytes(self.__bch.encode(data))314                }315                code_string.append(dumps(packet))316            self.ratchet(packet['ecc'])317        packets = dumps(code_string)318        self.__fixture = False319        return packets320    def decode(self, data: bytes, start: int = 0, end: int = 0) -> bytes:321        '''322        Decode Data in DOPE Data format323        by marshalling the byte string324        '''325        if not hasattr(self, '__fixture'):326            self.fixate()327        code_string = loads(data)328        data = b''329        if end < start:330            raise ValueError('Inavlid Parameters for \'end\'')331        if end == start == 0:332            end = len(code_string)333        x = 0334        while x < start:335            packet = loads(code_string[x])336            self.ratchet(packet['ecc'])337            x += 1338        for x in range(start, end):339            key = self.key()340            packet = loads(code_string[x])341            if self.__aes_mode in ['SIV', 'GCM']:342                header = packet['header']343                decoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],344                                  nonce=header[4:])345                decoder.update(header[:4])346                p_data = decoder.decrypt_and_verify(packet['data'],347                                                    packet['tag'])348                _, p_data, ecc = self.__bch.decode(p_data, packet['ecc'])349                pad = int.from_bytes(packet['pad_len'], 'big')350                data += p_data[:-pad] if pad != 0 else p_data351            else:352                decoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],353                                  iv=header[4:])354                p_data = decoder.decrypt(packet['data'])355                _, p_data, ecc = self.__bch.decode(p_data, packet['ecc'])356                pad = int.from_bytes(packet['pad_len'], 'big')357                data += p_data[:-pad] if pad != 0 else p_data358            self.ratchet(packet['ecc'])359        self.__fixture = False...api.py
Source:api.py  
...88                _FIXTURES[name] = [func]89        else:90            _FIXTURES[name] = func91        @functools.wraps(func)92        def ___fixture(*args, **kw):93            return func(*args, **kw)94        return ___fixture95    return __fixture96def setup():97    """Called once per worker startup.98    Arguments received by the decorated function:99    - **worker_id** the worker number100    - **args** arguments used to start Molotov.101    The decorated function can send back a dict.102    This dict will be passed to the :class:`aiohttp.ClientSession` class103    as keywords when it's created.104    This is useful when you need to set up session-wide options105    like Authorization headers, or do whatever you need on startup.106    *The decorated function should be a coroutine.*...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
