How to use _func_name method in localstack

Best Python code snippet using localstack_python

my_class.py

Source:my_class.py Github

copy

Full Screen

1""" KDF """2# https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet3# https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#cryptography.hazmat.primitives.kdf.scrypt.Scrypt4""" AES256 """5# https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption.html?highlight=aes#cryptography.hazmat.primitives.ciphers.Cipher6# https://gist.github.com/brysontyrrell/7cebfb05105c25d00e84ed35bd821dfe7""" SHA256 """8##### https://cryptography.io/en/latest/hazmat/primitives/cryptographic-hashes9# Cryptography10from cryptography.hazmat.backends import default_backend11from cryptography.fernet import Fernet, InvalidToken12from cryptography.hazmat.primitives import hashes13from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC14from cryptography.hazmat.primitives.kdf.scrypt import Scrypt15from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes16from cryptography.exceptions import InvalidKey, InvalidSignature17# General18import base6419import os.path20import chardet21import binascii22# Logs23import traceback24import inspect25# My26import utility27class PyCaClass:28 29 def __init__(self, p_salt = None):30 31 # DEFAULTS32 self.encoding = 'UTF-8' # 'latin-1'33 self.backend = default_backend()34 self.kdf_algo = 'Scrypt'35 # SALT - double input to make the encryption more robust36 if utility.is_filled(p_salt):37 self.salt_string = f"{p_salt}"38 self.salt_byte = self.byte_representation('to_bin', self.salt_string) + self.byte_representation('to_bin', self.salt_string)39 else:40 self.salt_string = None41 self.salt_byte = None42 # INPUTS43 self.inputs = f"{self.encoding}|{self.salt_string}|{self.kdf_algo}"44 """""""""""""""""""""""""""""""""""""""""""""45 UTILITY46 """""""""""""""""""""""""""""""""""""""""""""47 ## Verify if _string is encoded48 def is_byte(self, p_input):49 try:50 b = chardet.detect(p_input)51 return True52 except:53 return False54 ## Generate Random Binary Data 55 # --> this function returns random bytes suitable for cryptographic use56 def generate_random_binary_data(self, _size = 32):57 _random_bytes = None58 _random_bytes = os.urandom(_size)59 return _random_bytes60 ## BINARY representation to_bin/from_bin61 def byte_representation(self, p_what, p_input):62 # Prepare63 _output = None64 # Work65 if p_what == 'to_bin':66 if not self.is_byte(p_input):67 # GET BYTES REPRESENTATION OF p_input68 _output = p_input.encode(self.encoding)69 elif p_what == 'from_bin':70 # GET STRING REPRESENTATION OF p_input71 _output = str(p_input, self.encoding)72 return _output73 ## HEX representation to_hex/from_hex74 def hex_representation(self, p_what, p_input):75 # Prepare76 _input_byte = None77 _output = None78 # Work79 if p_what == 'to_hex':80 # RETURN HEX REPRESENTATION OF p_input ONLY BINARY81 # Return the hexadecimal representation of the binary data. Every byte of data is converted into the corresponding 2-digit hex representation. 82 # The returned bytes object is therefore twice as long as the length of data. 83 if self.is_byte(p_input):84 _output = binascii.hexlify(p_input)85 else:86 _input_byte = self.byte_representation('to_bin', p_input)87 _output = binascii.hexlify(_input_byte)88 elif p_what == 'from_hex':89 # RETURN ORIGINAL DATA OF A HEX REPRESENTATION OF p_input BINARY or STRING90 _output = binascii.unhexlify(p_input) 91 # HEXLIFY and UNHEXLIFY return always a BINARY92 return _output93 """""""""""""""""""""""""""""""""""""""""""""94 KDF ( Key Derivation Function )95 """""""""""""""""""""""""""""""""""""""""""""96 ## Create KDF the given PASSWORD and SALT97 def __kdf_create(self):98 99 # Prepare100 _inputs = f"{self.inputs}"101 _module_name = __name__102 _func_name = inspect.currentframe().f_code.co_name 103 _response_tuple = None104 _msg = None105 _kdf = None106 # PBKDF2 (Password Based Key Derivation Function 2)107 if (self.kdf_algo == 'PBKDF2'):108 try:109 _kdf = PBKDF2HMAC( algorithm = hashes.SHA256(),110 length = 32,111 salt = self.salt_byte,112 iterations = 100000,113 backend = self.backend )114 _response_tuple = ('OK', _kdf)115 except:116 _msg = traceback.format_exc(2)117 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}")118 # SCRYPT is a KDF designed for password storage by Colin Percival to be resistant 119 # against hardware-assisted attackers by having a tunable memory cost120 if (self.kdf_algo == 'Scrypt'):121 try:122 _kdf = Scrypt( salt = self.salt_byte,123 length = 32,124 n = 2**14,125 r = 8,126 p = 1,127 backend = self.backend )128 _response_tuple = ('OK', _kdf)129 except:130 _msg = traceback.format_exc(2)131 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}") 132 return(_response_tuple)133 134 ## VERIFY the password with its derived key (your hash)135 def kdf_verify_password(self, p_password2verify_1, p_password2verify_2):136 137 # Prepare138 _inputs = f"{self.inputs}|{p_password2verify_1}|{p_password2verify_2}"139 _module_name = __name__140 _func_name = inspect.currentframe().f_code.co_name 141 _response_tuple = None142 _msg = None143 _password2verify_1_byte = None144 _password2verify_2_b64decode = None145 146 # Transform147 _password2verify_1_byte = self.byte_representation('to_bin', p_password2verify_1)148 _password2verify_2_b64decode = base64.urlsafe_b64decode(p_password2verify_2)149 150 # Create KDF151 _kdf = self.__kdf_create()152 153 # Instance Fernet Obj154 if _kdf[0] == 'OK':155 try:156 _kdf[1].verify( _password2verify_1_byte, _password2verify_2_b64decode)157 _response_tuple = ('OK', True)158 except InvalidKey as _msg:159 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")160 except Exception:161 _msg = traceback.format_exc(2)162 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")163 else:164 _msg = _kdf[1]165 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}") 166 167 return(_response_tuple)168 ## Derived Key from the given PASSWORD and SALT - like HASHING password169 def kdf_derive_and_b64encode_key(self, p_password2derive):170 171 # Prepare172 _inputs = f"{self.inputs}"173 _module_name = __name__174 _func_name = inspect.currentframe().f_code.co_name 175 _response_tuple = None176 _msg = None177 _kdf_derive = None178 _input_value_byte = None179 _output_value_string = None180 _f_key_b64encode = None181 # Transform182 _input_value_byte = self.byte_representation('to_bin', p_password2derive)183 184 # Create KDF185 _kdf = self.__kdf_create()186 187 # Derive (Hashing)188 if _kdf[0] == 'OK':189 try:190 _kdf_derive = _kdf[1].derive( _input_value_byte )191 _f_key_b64encode = base64.urlsafe_b64encode(_kdf_derive)192 _output_value_string = self.byte_representation('from_bin', _f_key_b64encode)193 _response_tuple = ('OK', _output_value_string)194 except:195 _msg = traceback.format_exc(2)196 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}") 197 else:198 _msg = _kdf[1]199 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg,False)}") 200 return(_response_tuple)201 ## Create Fernet Obj from derived key202 def __kdf_create_fernet_obj(self, p_password2derive):203 204 # Prepare205 _inputs = f"{self.inputs}"206 _module_name = __name__207 _func_name = inspect.currentframe().f_code.co_name 208 _response_tuple = None209 _msg = None210 _fernet_obj = None211 # Derive Key 212 _f_key = self.kdf_derive_and_b64encode_key(p_password2derive)213 214 # Instance Fernet Obj215 if _f_key[0] == 'OK':216 try:217 _fernet_obj = Fernet( _f_key[1] )218 _response_tuple = ('OK',_fernet_obj) 219 except:220 _msg = traceback.format_exc(2)221 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}") 222 else:223 _msg = _f_key[1]224 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}") 225 226 return(_response_tuple)227 ## ENCRYPT symmetric of p_secret2encrypt (with fernet obj)228 def kdf_encrypt(self, p_password4crypt, p_secret2encrypt):229 230 # Prepare231 _inputs = f"{self.inputs}|{p_secret2encrypt}"232 _module_name = __name__233 _func_name = inspect.currentframe().f_code.co_name 234 _response_tuple = None235 _msg = None236 _fernet_obj = None237 _input_value_byte = None238 _input_value_crypted = None239 _output_value_string = None240 # Create Obj241 _fernet_obj = self.__kdf_create_fernet_obj(p_password4crypt)242 243 # Crypt with Fernet Obj244 if _fernet_obj[0] == 'OK':245 if not ( self.is_byte(p_secret2encrypt) ):246 try:247 _input_value_byte = self.byte_representation('to_bin', p_secret2encrypt)248 _input_value_crypted = _fernet_obj[1].encrypt(_input_value_byte)249 _output_value_string = self.byte_representation('from_bin', _input_value_crypted)250 _response_tuple = ('OK',_output_value_string)251 except:252 _msg = traceback.format_exc(2)253 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}") 254 else:255 try:256 _input_value_crypted = _fernet_obj[1].encrypt( p_secret2encrypt )257 _output_value_string = self.byte_representation('from_bin', _input_value_crypted)258 _response_tuple = ('OK',_output_value_string)259 except:260 _msg = traceback.format_exc(2)261 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")262 263 else:264 _msg = _fernet_obj[1]265 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}") 266 267 return(_response_tuple)268 ## DECRYPT symmetric of p_secret2decrypt (with fernet obj)269 def kdf_decrypt(self, p_password4decrypt, p_secret2decrypt):270 # Prepare271 _inputs = f"{self.inputs}|{p_secret2decrypt}"272 _module_name = __name__273 _func_name = inspect.currentframe().f_code.co_name 274 _response_tuple = None275 _msg = None276 _fernet_obj = None 277 _input_value_byte = None278 _input_value_decrypted = None279 _output_value_string = None280 # Create Obj281 _fernet_obj = self.__kdf_create_fernet_obj(p_password4decrypt)282 # Decrypt with Fernet Obj283 if _fernet_obj[0] == 'OK':284 285 if not ( self.is_byte(p_secret2decrypt) ):286 try:287 _input_value_byte = self.byte_representation('to_bin', p_secret2decrypt)288 _input_value_decrypted = _fernet_obj[1].decrypt(_input_value_byte)289 _output_value_string = self.byte_representation('from_bin', _input_value_decrypted)290 _response_tuple = ('OK',_output_value_string)291 except InvalidToken:292 _msg = 'Wrong password for Kdf Decrypt'293 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")294 except Exception:295 _msg = traceback.format_exc(2)296 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")297 else:298 try:299 _input_value_decrypted = _fernet_obj[1].decrypt(p_secret2decrypt)300 _output_value_string = self.byte_representation('from_bin', _input_value_decrypted)301 _response_tuple = ('OK',_output_value_string)302 except InvalidToken:303 _msg = 'Wrong password for Kdf _decrypt'304 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_msg)}") 305 except Exception:306 _msg = traceback.format_exc(2)307 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")308 else:309 _msg = _fernet_obj[1]310 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")311 return(_response_tuple)312 """""""""""""""""""""""""""""""""""""""313 AES256 ( CBC with IV )314 """""""""""""""""""""""""""""""""""""""315 def __aes_padding(self, p_input):316 _block_size = algorithms.AES.block_size / 8 317 _ordinal = _block_size - len(p_input) % _block_size318 _ordinal_int = int(_ordinal)319 _output = p_input + _ordinal_int * chr(_ordinal_int)320 return _output321 def __aes_unpadding(self, p_input):322 _output = p_input[:-ord(p_input[len(p_input) - 1:])]323 return _output324 ## Create Cipher Obj from p_key and with p_iv325 def __aes_create_cipher_obj(self, p_key, p_iv):326 # Prepare327 _inputs = f"{self.inputs}|{p_key}|{p_iv}"328 _module_name = __name__329 _func_name = inspect.currentframe().f_code.co_name 330 _response_tuple = None331 _msg = None332 _cipher = None333 try:334 _cipher = Cipher( algorithms.AES(p_key), 335 modes.CBC(p_iv),336 backend = self.backend )337 _response_tuple = ('OK',_cipher)338 except Exception:339 _msg = traceback.format_exc(2)340 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")341 342 return _response_tuple343 ## AES256 ENCRYPT symmetric of p_secret2encrypt with p_seed344 def aes_encrypt(self, p_seed, p_secret2encrypt):345 # Prepare346 _inputs = f"{self.inputs}|{p_seed}|{p_secret2encrypt}"347 _module_name = __name__348 _func_name = inspect.currentframe().f_code.co_name349 _response_tuple = None350 _msg = None351 _cipher_response = None352 _cipher = None353 _secret_padded = None354 _secret_byte = None355 _seed_byte = None356 _iv_byte = None357 _iv_hex_byte = None358 _iv_hex_string = None359 _encryptor = None360 _encrypted_byte = None361 362 ## Prepare Secret, Seed363 _secret_padded = self.__aes_padding(p_secret2encrypt)364 _secret_byte = self.byte_representation('to_bin', _secret_padded)365 _seed_byte = self.hex_representation('from_hex', p_seed)366 367 ## Generate IV368 _iv_byte = self.generate_random_binary_data(16)369 _iv_hex_byte = self.hex_representation('to_hex', _iv_byte)370 _iv_hex_string = self.byte_representation('from_bin', _iv_hex_byte)371 372 ## Get Cipher373 _cipher_response = self.__aes_create_cipher_obj(_seed_byte, _iv_byte)374 if _cipher_response[0]=='OK':375 _cipher = _cipher_response[1]376 else:377 _msg = _cipher_response[1]378 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")379 ## Encrypt380 try:381 _encryptor = _cipher.encryptor()382 except Exception:383 _msg = traceback.format_exc(2)384 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")385 return _response_tuple386 try:387 _encrypted_byte = _encryptor.update(_secret_byte) + _encryptor.finalize()388 except Exception:389 _msg = traceback.format_exc(2)390 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")391 return _response_tuple392 ## Return393 _response_tuple = ('OK',_encrypted_byte, _iv_hex_string)394 395 return _response_tuple396 ## AES256 DECRYPT symmetric of p_secret2decrypt with p_seed and p_iv397 def aes_decrypt(self, p_seed, p_iv, p_secret2decrypt):398 399 # Prepare400 _inputs = f"{self.inputs}|{p_seed}|{p_iv}|{p_secret2decrypt}"401 _module_name = __name__402 _func_name = inspect.currentframe().f_code.co_name403 _response_tuple = None404 _msg = None405 406 _seed_byte = None407 _iv_byte = None408 _cipher = None409 _decryptor = None410 _decrypted_byte = None411 _decrypted_text = None412 _decrypted_text_unpadded = None413 414 ## Prepare Seed415 _seed_byte = self.hex_representation('from_hex', p_seed)416 ## Prepare IV417 _iv_byte = self.hex_representation('from_hex', p_iv)418 ## Get Cipher419 _cipher_response = self.__aes_create_cipher_obj(_seed_byte, _iv_byte)420 if _cipher_response[0]=='OK':421 _cipher = _cipher_response[1]422 else:423 _msg = _cipher_response[1]424 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")425 ## Decrypt426 try:427 _decryptor = _cipher.decryptor()428 except Exception:429 _msg = traceback.format_exc(2)430 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")431 return _response_tuple432 try:433 _decrypted_byte = _decryptor.update(p_secret2decrypt) + _decryptor.finalize()434 _decrypted_text = self.byte_representation('from_bin', _decrypted_byte)435 _decrypted_text_unpadded = self.__aes_unpadding(_decrypted_text)436 except Exception:437 _msg = traceback.format_exc(2)438 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")439 return _response_tuple440 ## Return441 _response_tuple = ('OK', _decrypted_text_unpadded)442 443 return _response_tuple444 """""""""""""""""""""""""""""""""""""""445 SHA256446 """""""""""""""""""""""""""""""""""""""447 448 def make_sha256(self, p_secret2hash):449 # Prepare450 _inputs = f"{self.inputs}"451 _module_name = __name__452 _func_name = inspect.currentframe().f_code.co_name453 _response_tuple = None454 _msg = None455 _secret_byte = None456 _digest = None457 _secret_hashed_byte = None458 _secret_hashed_hex_byte = None459 _secret_hashed_hex_string = None460 461 ## Prepare Secret462 _secret_byte = self.byte_representation('to_bin', p_secret2hash)463 464 ## HASH465 try:466 _digest = hashes.Hash( hashes.SHA256(),467 backend=self.backend )468 except Exception:469 _msg = traceback.format_exc(2)470 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")471 return _response_tuple472 try:473 _digest.update(_secret_byte)474 except Exception:475 _msg = traceback.format_exc(2)476 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")477 return _response_tuple478 try:479 _secret_hashed_byte = _digest.finalize()480 _secret_hashed_hex_byte = self.hex_representation('to_hex', _secret_hashed_byte)481 _secret_hashed_hex_string = self.byte_representation('from_bin', _secret_hashed_hex_byte)482 except Exception:483 _msg = traceback.format_exc(2)484 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")485 return _response_tuple486 ## Return487 _response_tuple = ('OK', _secret_hashed_hex_string)488 489 return _response_tuple490 """""""""""""""""""""""""""""""""""""""491 CHECKSUM file ( SHA256 or BLAKE2b )492 """""""""""""""""""""""""""""""""""""""493 494 def checksum_file(self, p_algo2checksum, p_file_path, p_file_name):495 496 # Prepare497 _inputs = f"{self.inputs}|{p_algo2checksum}|{p_file_path}|{p_file_name}"498 _module_name = __name__499 _func_name = inspect.currentframe().f_code.co_name 500 _response_tuple = None501 _msg = None502 _file = f"{p_file_path}{p_file_name}"503 _algo_checksum_file = None504 _checksum = None505 _checksum_b64encode = None506 _output_value_string = None507 508 # Choose Algorithm509 if p_algo2checksum.lower() == 'sha256':510 _algo_checksum_file = hashes.SHA256()511 elif p_algo2checksum.lower() == 'blake2':512 _algo_checksum_file = hashes.BLAKE2b(64)513 else:514 _msg = 'Unknown algorithm'515 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}") 516 return(_response_tuple)517 518 if os.path.exists(_file):519 520 with open(_file, "rb") as f:521 try:522 _file_hash = hashes.Hash( _algo_checksum_file, 523 backend=self.backend )524 while chunk := f.read(8192): # Read the binary file to the end (8192)525 _file_hash.update(chunk)526 except Exception:527 _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}") 528 _checksum = _file_hash.finalize()529 _checksum_b64encode = base64.urlsafe_b64encode(_checksum )530 _output_value_string = self.byte_representation('from_bin', _checksum_b64encode)531 _response_tuple = ('OK',_output_value_string)532 533 else:534 _msg = 'File does not exist'535 _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}") 536 ...

Full Screen

Full Screen

l2network_multi_blade.py

Source:l2network_multi_blade.py Github

copy

Full Screen

...49 self._inventory[key] = utils.import_object(50 conf.PLUGINS[const.INVENTORY][key])51 LOG.debug("Loaded device inventory %s\n" % \52 conf.PLUGINS[const.INVENTORY][key])53 def _func_name(self, offset=0):54 """Get the name of the calling function"""55 return inspect.stack()[1 + offset][3]56 def _invoke_plugin_per_device(self, plugin_key, function_name, args):57 """Invoke only device plugin for all the devices in the system"""58 if not plugin_key in self._plugins.keys():59 LOG.info("No %s Plugin loaded" % plugin_key)60 LOG.info("%s: %s with args %s ignored" \61 % (plugin_key, function_name, args))62 return63 device_params = self._invoke_inventory(plugin_key, function_name,64 args)65 device_ips = device_params[const.DEVICE_IP]66 if not device_ips:67 # Return in a list68 return [self._invoke_plugin(plugin_key, function_name, args,69 device_params)]70 else:71 # Return a list of return values from each device72 output = []73 for device_ip in device_ips:74 new_device_params = deepcopy(device_params)75 new_device_params[const.DEVICE_IP] = device_ip76 output.append(self._invoke_plugin(plugin_key, function_name,77 args, new_device_params))78 return output79 def _invoke_inventory(self, plugin_key, function_name, args):80 """Invoke only the inventory implementation"""81 if not plugin_key in self._inventory.keys():82 LOG.warn("No %s inventory loaded" % plugin_key)83 LOG.warn("%s: %s with args %s ignored" \84 % (plugin_key, function_name, args))85 return {const.DEVICE_IP: []}86 else:87 return getattr(self._inventory[plugin_key], function_name)(args)88 def _invoke_plugin(self, plugin_key, function_name, args, kwargs):89 """Invoke only the device plugin"""90 # If there are more args than needed, add them to kwargs91 func = getattr(self._plugins[plugin_key], function_name)92 if args.__len__() + 1 > inspect.getargspec(func).args.__len__():93 kwargs.update(args.pop())94 return func(*args, **kwargs)95 def get_all_networks(self, args):96 """Not implemented for this model"""97 pass98 def create_network(self, args):99 """Support for the Quantum core API call"""100 output = []101 ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,102 self._func_name(), args)103 nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,104 self._func_name(), args)105 output.extend(ucs_output or [])106 output.extend(nexus_output or [])107 return output108 def delete_network(self, args):109 """Support for the Quantum core API call"""110 output = []111 ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,112 self._func_name(), args)113 nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,114 self._func_name(), args)115 output.extend(ucs_output or [])116 output.extend(nexus_output or [])117 return output118 def get_network_details(self, args):119 """Not implemented for this model"""120 pass121 def update_network(self, args):122 """Support for the Quantum core API call"""123 output = []124 ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,125 self._func_name(), args)126 nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,127 self._func_name(), args)128 output.extend(ucs_output or [])129 output.extend(nexus_output or [])130 return output131 def get_all_ports(self, args):132 """Not implemented for this model"""133 pass134 def create_port(self, args):135 """Support for the Quantum core API call"""136 return self._invoke_plugin_per_device(const.UCS_PLUGIN,137 self._func_name(), args)138 def delete_port(self, args):139 """Support for the Quantum core API call"""140 return self._invoke_plugin_per_device(const.UCS_PLUGIN,141 self._func_name(), args)142 def update_port(self, args):143 """Not implemented for this model"""144 pass145 def get_port_details(self, args):146 """Not implemented for this model"""147 pass148 def plug_interface(self, args):149 """Support for the Quantum core API call"""150 return self._invoke_plugin_per_device(const.UCS_PLUGIN,151 self._func_name(), args)152 def unplug_interface(self, args):153 """Support for the Quantum core API call"""154 return self._invoke_plugin_per_device(const.UCS_PLUGIN,155 self._func_name(), args)156 def schedule_host(self, args):157 """Provides the hostname on which a dynamic vnic is reserved"""158 LOG.debug("schedule_host() called\n")159 return self._invoke_inventory(const.UCS_PLUGIN, self._func_name(),160 args)161 def associate_port(self, args):162 """163 Get the portprofile name and the device name for the dynamic vnic164 """165 LOG.debug("associate_port() called\n")166 return self._invoke_inventory(const.UCS_PLUGIN, self._func_name(),167 args)168 def detach_port(self, args):169 """170 Remove the association of the VIF with the dynamic vnic171 """172 LOG.debug("detach_port() called\n")173 return self._invoke_plugin_per_device(const.UCS_PLUGIN,174 self._func_name(), args)175 def create_multiport(self, args):176 """Support for extension API call"""177 self._invoke_plugin_per_device(const.UCS_PLUGIN, self._func_name(),...

Full Screen

Full Screen

typed.py

Source:typed.py Github

copy

Full Screen

1from typing import get_args2from types import GenericAlias3from inspect import getfullargspec4from typed.decorator_types import *5class TypedDecorator(object):6 def __init__(self, func):7 self._func = func8 self._func_name = self._func.__name__9 self._argspec = getfullargspec(self._func)10 self._annotations = self._argspec.annotations11 self._func_args = self._argspec.args12 self._defaults = self._argspec.defaults13 self._returns = self._get_returns_types14 @property15 def _get_returns_types(self):16 try:17 returns = self._annotations['return']18 del self._annotations['return']19 except KeyError:20 returns = None21 return returns22 @staticmethod23 def _get_type(type_) -> Any:24 try:25 return AllTypes[type_]26 except KeyError:27 return type_28 @staticmethod29 def _get_type_name(type_) -> str:30 try:31 type_name = str(type_.__name__)32 except AttributeError:33 type_name = str(type_)34 try:35 return AllNames[type_name]36 except KeyError:37 return type_name38 def _get_generic_type(self, type_):39 return self._get_type(str(type_).split('[')[0])40 def _check_generic(self, kwarg, args, _generic_args):41 if type(_generic_args) == GenericAlias:42 _generic_types = get_args(_generic_args)43 if len(args) > len(_generic_types):44 types_ = []45 for generic_type in _generic_types:46 if type(generic_type) == GenericAlias:47 types_.append(self._get_generic_type(generic_type))48 else:49 types_.append(generic_type)50 is_all_correct = {51 self._get_type_name(type_arg)52 for type_arg in (type(arg) for arg in args)53 if type_arg not in types_54 }55 if len(is_all_correct):56 raise Warning(f'{self._func_name} {_generic_args} in {kwarg} has no {" and ".join(is_all_correct)} type')57 for arg, _generic_type in zip(args, _generic_types):58 if type(_generic_type) == GenericAlias:59 self._check_generic(kwarg, arg, _generic_type)60 else:61 if _generic_type == AnyType: continue62 if _generic_type == Iterator: continue63 try:64 assert isinstance(arg, _generic_type), \65 f'{self._func_name} {_generic_args} in {kwarg} takes {self._get_type_name(_generic_type)}, ' \66 f'not {self._get_type_name(type(arg))}'67 except TypeError as e:68 print(self._func_name, e)69 _generic_args = self._get_generic_type(_generic_args)70 if _generic_args == Iterator: return71 try:72 assert isinstance(args, _generic_args), \73 f'{self._func_name} {kwarg} - parameter must be an {self._get_type_name(_generic_args)}, ' \74 f'not a {self._get_type_name(type(args))}.'75 except TypeError as e:76 print(self._func_name, e, args, _generic_args)77 def check_anotations(self, *args, **kwargs):78 for arg, func_arg in zip(args, self._func_args):79 if func_arg == 'self':80 self._func_args.remove('self')81 try:82 del self._annotations['self']83 except KeyError:84 ...85 continue86 status = True if func_arg in self._annotations else False87 if status:88 raise Warning(f'{self._func_name}: {func_arg}({arg}) - this argument is not conveyed explicitly.')89 else:90 raise Warning(91 f'{self._func_name} {func_arg} - the argument has no annotation type. '92 f'In parameter, data type is a {self._get_type_name(type(arg))}.'93 )94 if len(kwargs) > len(self._func_args):95 raise Warning(f'{self._func_name} The number of parameters passed is more than the function accepts')96 for kwarg, func_arg in zip(kwargs, self._func_args):97 if kwarg != func_arg:98 if not self._defaults:99 raise Warning(f'{self._func_name} ERROR')100 else:101 continue102 status = True if kwarg in self._annotations else False103 arg = kwargs[kwarg]104 if status:105 if (_annotation_arg := self._annotations[kwarg]) == Any: continue106 self._check_generic(kwarg, arg, _annotation_arg)107 else:108 raise Warning(109 f'{self._func_name} {kwarg} - argument has no annotation type. '110 f'In parameter, data type is a {self._get_type_name(type(arg))}.'111 )112 def check_returns(self, results):113 self._check_generic('results', results, self._returns)114 def main(self, *args, **kwargs):115 self.check_anotations(*args, **kwargs)116 results = self._func(*args, **kwargs)117 if self._returns:118 self.check_returns(results)119 return results120 def __call__(self, *args, **kwargs):121 return self.main(*args, **kwargs)122 @staticmethod123 def typed(func):124 def wrapper(*args, **kwargs):125 return TypedDecorator(func)(*args, **kwargs)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful