How to use ___fixture method in Molotov

Best Python code snippet using molotov_python

dope.py

Source:dope.py Github

copy

Full Screen

1'''2DOPE Class3'''4__version__ = "2.0.3"5__author__ = "Anubhav Mattoo"6from Crypto.PublicKey import RSA7from Crypto.Cipher import PKCS1_v1_5, AES8from Crypto.Hash import HMAC, SHA256, SHA384, SHA5129from Crypto.Random import get_random_bytes10from Crypto.Signature import pss, pkcs1_1511import bchlib12import base6413from typing import Union14from hashlib import blake2b, blake2s15from dill import loads, dumps16from gzip import compress, decompress17# Lookup Tables18AES_MODE_LOOKUP = {19 "GCM": AES.MODE_GCM,20 "SIV": AES.MODE_SIV,21 "CBC": AES.MODE_CBC,22 "OFB": AES.MODE_OFB23}24RATCHET_MODE_LOOKUP = {25 "BLAKE0x0": 0x0,26 "BLAKEx0x": 0x1,27}28HMAC_LOOKUP = {29 "SHA256": SHA256,30 "SHA384": SHA384,31 "SHA512": SHA51232}33HMAC_SIZE_LOOKUP = {34 1024: 128,35 2048: 256,36 3072: 384,37 4096: 512,38 8192: 102439}40KEY_MODE_LOOKUP = {41 "XOR-BL": 0x0,42 "AND-BL": 0x1,43}44DOPE_HIGHER_LOOKUP = {45 # Higher Byte46 (1024, "GCM", "BLAKE0x0"): b'\x00',47 (1024, "GCM", "BLAKEx0x"): b'\x01',48 (1024, "SIV", "BLAKE0x0"): b'\x04',49 (1024, "SIV", "BLAKEx0x"): b'\x05',50 (1024, "CBC", "BLAKE0x0"): b'\x08',51 (1024, "CBC", "BLAKEx0x"): b'\x09',52 (1024, "OFB", "BLAKE0x0"): b'\x0C',53 (1024, "OFB", "BLAKEx0x"): b'\x0D',54 (2048, "GCM", "BLAKE0x0"): b'\x10',55 (2048, "GCM", "BLAKEx0x"): b'\x11',56 (2048, "SIV", "BLAKE0x0"): b'\x14',57 (2048, "SIV", "BLAKEx0x"): b'\x15',58 (2048, "CBC", "BLAKE0x0"): b'\x18',59 (2048, "CBC", "BLAKEx0x"): b'\x19',60 (2048, "OFB", "BLAKE0x0"): b'\x1C',61 (2048, "OFB", "BLAKEx0x"): b'\x1D',62 (4096, "GCM", "BLAKE0x0"): b'\x20',63 (4096, "GCM", "BLAKEx0x"): b'\x21',64 (4096, "SIV", "BLAKE0x0"): b'\x24',65 (4096, "SIV", "BLAKEx0x"): b'\x25',66 (4096, "CBC", "BLAKE0x0"): b'\x28',67 (4096, "CBC", "BLAKEx0x"): b'\x29',68 (4096, "OFB", "BLAKE0x0"): b'\x2C',69 (4096, "OFB", "BLAKEx0x"): b'\x2D',70}71INV_DOPE_HIGHER_LOOKUP = {72 # Higher Byte73 0x00: (1024, "GCM", "BLAKE0x0"),74 0x01: (1024, "GCM", "BLAKEx0x"),75 0x04: (1024, "SIV", "BLAKE0x0"),76 0x05: (1024, "SIV", "BLAKEx0x"),77 0x08: (1024, "CBC", "BLAKE0x0"),78 0x09: (1024, "CBC", "BLAKEx0x"),79 0x0C: (1024, "OFB", "BLAKE0x0"),80 0x0D: (1024, "OFB", "BLAKEx0x"),81 0x10: (2048, "GCM", "BLAKE0x0"),82 0x11: (2048, "GCM", "BLAKEx0x"),83 0x14: (2048, "SIV", "BLAKE0x0"),84 0x15: (2048, "SIV", "BLAKEx0x"),85 0x18: (2048, "CBC", "BLAKE0x0"),86 0x19: (2048, "CBC", "BLAKEx0x"),87 0x1C: (2048, "OFB", "BLAKE0x0"),88 0x1D: (2048, "OFB", "BLAKEx0x"),89 0x20: (4096, "GCM", "BLAKE0x0"),90 0x21: (4096, "GCM", "BLAKEx0x"),91 0x24: (4096, "SIV", "BLAKE0x0"),92 0x25: (4096, "SIV", "BLAKEx0x"),93 0x28: (4096, "CBC", "BLAKE0x0"),94 0x29: (4096, "CBC", "BLAKEx0x"),95 0x2C: (4096, "OFB", "BLAKE0x0"),96 0x2D: (4096, "OFB", "BLAKEx0x"),97}98DOPE_LOWER_LOOKUP = {99 # Lower Byte100 ("SHA256", "XOR-BL"): b'\x00',101 ("SHA256", "AND-BL"): b'\x01',102 ("SHA384", "XOR-BL"): b'\x10',103 ("SHA384", "AND-BL"): b'\x11',104 ("SHA512", "XOR-BL"): b'\x20',105 ("SHA512", "AND-BL"): b'\x21',106}107INV_DOPE_LOWER_LOOKUP = {108 # Lower Byte109 0x00: ("SHA256", "XOR-BL"),110 0x01: ("SHA256", "AND-BL"),111 0x10: ("SHA384", "XOR-BL"),112 0x11: ("SHA384", "AND-BL"),113 0x20: ("SHA512", "XOR-BL"),114 0x21: ("SHA512", "AND-BL"),115}116def byte_xor(left: bytes, right: bytes) -> bytes:117 '''118 XOR Byte String, 2 input119 '''120 return bytes([a ^ b for a, b in zip(left, right)])121def byte_and(left: bytes, right: bytes) -> bytes:122 '''123 AND Byte String, 2 inputs124 '''125 return bytes([a & b for a, b in zip(left, right)])126class DOPE2(object):127 """128 Double Ratchet Over Parity Exchange(DOPE)129 System Class for DOPE130 Parameters:-131 key: bytes132 bch_poly: int133 ecc_size: int134 aes_mode: str135 ratchet_mode: str136 """137 def __init__(self, key: bytes, bch_poly: int,138 ecc_size: int, aes_mode: str,139 nonce: bytes, block_size: int = 512):140 self.__key = key141 self.__bch = bchlib.BCH(bch_poly, ecc_size)142 self.__bch_poly = bch_poly143 self.___fixture = False144 if len(nonce) == 0:145 self.__nonce = get_random_bytes(32)146 elif len(nonce) < 32:147 self.__nonce = blake2s(nonce, digest_size=32).digest()148 elif len(nonce) == 32:149 self.__nonce = nonce150 else:151 raise TypeError(152 f"DOPE does not support nonce of size {len(nonce)}")153 self.__ratchet_count = 0154 if block_size >= 128:155 self.block_size = block_size156 else:157 raise TypeError(158 f"DOPE does not support block size{block_size},"159 + " block must be greater than 128")160 if aes_mode in AES_MODE_LOOKUP:161 self.__aes_mode = aes_mode162 if aes_mode == "SIV":163 self.__aes_size = 512164 else:165 self.__aes_size = 256166 else:167 raise TypeError(f"DOPE does not support {aes_mode} mode")168 def __str__(self):169 DOPE = f'DOPE2_'170 BCH = f'BCH_{self.__bch.t}_{self.__bch.ecc_bytes}_'171 AES = f'AES_{self.__aes_size}_{self.__aes_mode}_'172 BLK = f'BLK_{self.block_size}'173 return DOPE + BCH + AES + BLK174 def __repr__(self):175 return self.__str__()176 def serialize(self):177 khac = blake2b(self.__key, digest_size=32).digest()178 nhac = blake2b(self.__nonce, digest_size=32).digest()179 kvac = blake2b(khac + nhac).digest()180 data = self.block_size.to_bytes(16, 'big')\181 + self.__bch_poly.to_bytes(16, 'big')\182 + self.__bch.t.to_bytes(16, 'big')\183 + self.__nonce184 if self.__aes_mode in ['SIV', 'GCM']:185 nonce = get_random_bytes(16)186 encoder = AES.new(khac, AES_MODE_LOOKUP[self.__aes_mode],187 nonce=nonce)188 encoder.update(nonce)189 data, tag = encoder.encrypt_and_digest(data)190 data = nonce + data + tag191 else:192 encoder = AES.new(khac, AES_MODE_LOOKUP[self.__aes_mode])193 data = encoder.iv + encoder.encrypt(data)194 data = self.__aes_mode.encode('utf8') + data + kvac195 data = base64.urlsafe_b64encode(data)196 if len(data) >= 80:197 data = b"".join(data[i:i+80] + b"\n"198 for i in range(0, len(data), 80))199 data = b'-----BEGIN DOPE 2 KEY-----\n'\200 + data + b'-----END DOPE 2 KEY-----'201 return data202 @classmethod203 def marshall(cls, key: Union[str, bytes], password: bytes):204 khac = blake2b(password, digest_size=32).digest()205 data = key.splitlines()[1:-1]206 data = b"".join(data)207 data = base64.urlsafe_b64decode(data)208 aes_mode, niv, data, kvac = data[:3].decode('utf8'),\209 data[3:19], data[19:-64], data[-64:]210 if aes_mode in ['SIV', 'GCM']:211 decoder = AES.new(khac, AES_MODE_LOOKUP[aes_mode], nonce=niv)212 decoder.update(niv)213 data = decoder.decrypt_and_verify(data[:-16], data[-16:])214 else:215 decoder = AES.new(khac, AES_MODE_LOOKUP[aes_mode], iv=niv)216 data = decoder.decrypt(data)217 block_size, bch_poly, ecc_size, nonce =\218 int.from_bytes(data[:16], 'big'),\219 int.from_bytes(data[16:32], 'big'),\220 int.from_bytes(data[32:48], 'big'),\221 data[48:]222 nhac = blake2b(nonce, digest_size=32).digest()223 vkac = blake2b(khac + nhac).digest()224 if vkac != kvac:225 raise ValueError('Key Verification Error')226 return cls(password, bch_poly, ecc_size, aes_mode, nonce, block_size)227 def fixate(self):228 '''229 Fixate at a key and Start Ratchets230 '''231 # Key = BLAKE(BLAKE(Weak Home) XOR BLAKE(Strong Home))232 self.__fixture = True233 hash_pass = blake2b(self.__key).digest()234 hash_nonce = blake2b(self.__nonce).digest()235 if self.__aes_mode == "SIV":236 self.__hkdf = blake2b(byte_xor(hash_pass, hash_nonce))237 else:238 self.__hkdf = blake2b(byte_xor(hash_pass, hash_nonce),239 digest_size=32)240 @property241 def nonce(self):242 return self.__nonce243 def ratchet(self, ecc: Union[bytes, bytearray]):244 '''245 Ratchet to Next Key246 '''247 self.__ratchet_count += 1248 if self.__ratchet_count > 2 ** 128:249 raise ValueError('Keys Exhausted')250 key = self.__hkdf.digest()251 self.__hkdf.update(key + bytes(ecc))252 def key(self) -> bytes:253 '''254 Ratchet to Next Home Key255 '''256 if self.__ratchet_count > 2 ** 128:257 raise ValueError('Keys Exhausted')258 key = self.__hkdf.digest()259 return key260 def pack_data(self, data: bytes) -> list:261 '''262 Pack Data to DOPE Standard263 '''264 data_block = []265 for x in range(0, len(data), self.block_size - 4):266 pad_len = 0267 if len(data[x:x+self.block_size - 4]) < self.block_size - 4:268 pad_len = self.block_size\269 - 4 - len(data[x:x+self.block_size - 4])270 data_x = data[x:x+self.block_size - 4] + get_random_bytes(pad_len)271 pad_len = pad_len.to_bytes(4, 'big')272 data_block.append(pad_len + data_x)273 return data_block274 def encode(self, data: bytes) -> bytes:275 '''276 Encode Data in DOPE Data format277 and Serialise as a byte string278 '''279 if not self.__fixture:280 self.fixate()281 data_block = self.pack_data(data)282 code_string = []283 counter = -1284 for x in data_block: # x: Data Batch285 counter += 1286 key = self.key()287 ecc = self.__bch.encode(x[4:])288 if self.__aes_mode in ['SIV', 'GCM']:289 nonce = get_random_bytes(16)290 encoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],291 nonce=nonce)292 encoder.update(b'DOPE')293 header = b'DOPE' + nonce294 data, tag = encoder.encrypt_and_digest(x[4:])295 packet = {296 'block': counter,297 'header': header,298 'pad_len': x[:4],299 'data': data,300 'tag': tag,301 'ecc': bytes(self.__bch.encode(data))302 }303 code_string.append(dumps(packet))304 else:305 encoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode])306 header = b'DOPE' + encoder.iv307 data = encoder.encrypt(x[4:])308 packet = {309 'block': counter,310 'header': header,311 'pad_len': x[:4],312 'data': data,313 'ecc': bytes(self.__bch.encode(data))314 }315 code_string.append(dumps(packet))316 self.ratchet(packet['ecc'])317 packets = dumps(code_string)318 self.__fixture = False319 return packets320 def decode(self, data: bytes, start: int = 0, end: int = 0) -> bytes:321 '''322 Decode Data in DOPE Data format323 by marshalling the byte string324 '''325 if not hasattr(self, '__fixture'):326 self.fixate()327 code_string = loads(data)328 data = b''329 if end < start:330 raise ValueError('Inavlid Parameters for \'end\'')331 if end == start == 0:332 end = len(code_string)333 x = 0334 while x < start:335 packet = loads(code_string[x])336 self.ratchet(packet['ecc'])337 x += 1338 for x in range(start, end):339 key = self.key()340 packet = loads(code_string[x])341 if self.__aes_mode in ['SIV', 'GCM']:342 header = packet['header']343 decoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],344 nonce=header[4:])345 decoder.update(header[:4])346 p_data = decoder.decrypt_and_verify(packet['data'],347 packet['tag'])348 _, p_data, ecc = self.__bch.decode(p_data, packet['ecc'])349 pad = int.from_bytes(packet['pad_len'], 'big')350 data += p_data[:-pad] if pad != 0 else p_data351 else:352 decoder = AES.new(key, AES_MODE_LOOKUP[self.__aes_mode],353 iv=header[4:])354 p_data = decoder.decrypt(packet['data'])355 _, p_data, ecc = self.__bch.decode(p_data, packet['ecc'])356 pad = int.from_bytes(packet['pad_len'], 'big')357 data += p_data[:-pad] if pad != 0 else p_data358 self.ratchet(packet['ecc'])359 self.__fixture = False...

Full Screen

Full Screen

api.py

Source:api.py Github

copy

Full Screen

...88 _FIXTURES[name] = [func]89 else:90 _FIXTURES[name] = func91 @functools.wraps(func)92 def ___fixture(*args, **kw):93 return func(*args, **kw)94 return ___fixture95 return __fixture96def setup():97 """Called once per worker startup.98 Arguments received by the decorated function:99 - **worker_id** the worker number100 - **args** arguments used to start Molotov.101 The decorated function can send back a dict.102 This dict will be passed to the :class:`aiohttp.ClientSession` class103 as keywords when it's created.104 This is useful when you need to set up session-wide options105 like Authorization headers, or do whatever you need on startup.106 *The decorated function should be a coroutine.*...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful