Best Python code snippet using localstack_python
my_class.py
Source:my_class.py  
1""" KDF """2# https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet3# https://cryptography.io/en/latest/hazmat/primitives/key-derivation-functions/#cryptography.hazmat.primitives.kdf.scrypt.Scrypt4""" AES256 """5# https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption.html?highlight=aes#cryptography.hazmat.primitives.ciphers.Cipher6# https://gist.github.com/brysontyrrell/7cebfb05105c25d00e84ed35bd821dfe7""" SHA256 """8##### https://cryptography.io/en/latest/hazmat/primitives/cryptographic-hashes9# Cryptography10from cryptography.hazmat.backends import default_backend11from cryptography.fernet import Fernet, InvalidToken12from cryptography.hazmat.primitives import hashes13from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC14from cryptography.hazmat.primitives.kdf.scrypt import Scrypt15from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes16from cryptography.exceptions import InvalidKey, InvalidSignature17# General18import base6419import os.path20import chardet21import binascii22# Logs23import traceback24import inspect25# My26import utility27class PyCaClass:28    29    def __init__(self, p_salt = None):30        31        # DEFAULTS32        self.encoding = 'UTF-8' # 'latin-1'33        self.backend = default_backend()34        self.kdf_algo = 'Scrypt'35        # SALT - double input to make the encryption more robust36        if utility.is_filled(p_salt):37            self.salt_string = f"{p_salt}"38            self.salt_byte = self.byte_representation('to_bin', self.salt_string) + self.byte_representation('to_bin', self.salt_string)39        else:40            self.salt_string = None41            self.salt_byte = None42        # INPUTS43        self.inputs = f"{self.encoding}|{self.salt_string}|{self.kdf_algo}"44    """""""""""""""""""""""""""""""""""""""""""""45    UTILITY46    """""""""""""""""""""""""""""""""""""""""""""47    ## Verify if _string is encoded48    def is_byte(self, p_input):49        try:50            b = chardet.detect(p_input)51            return True52        except:53            return False54    ## Generate Random Binary Data 55    # --> this function returns random bytes suitable for cryptographic use56    def generate_random_binary_data(self, _size = 32):57        _random_bytes = None58        _random_bytes = os.urandom(_size)59        return _random_bytes60    ## BINARY representation to_bin/from_bin61    def byte_representation(self, p_what, p_input):62        # Prepare63        _output = None64        # Work65        if p_what == 'to_bin':66            if not self.is_byte(p_input):67                # GET BYTES REPRESENTATION OF p_input68                _output = p_input.encode(self.encoding)69        elif p_what == 'from_bin':70            # GET STRING REPRESENTATION OF p_input71            _output = str(p_input, self.encoding)72        return _output73    ## HEX representation to_hex/from_hex74    def hex_representation(self, p_what, p_input):75        # Prepare76        _input_byte = None77        _output = None78        # Work79        if p_what == 'to_hex':80            # RETURN HEX REPRESENTATION OF p_input ONLY BINARY81            # Return the hexadecimal representation of the binary data. Every byte of data is converted into the corresponding 2-digit hex representation. 82            # The returned bytes object is therefore twice as long as the length of data.              83            if self.is_byte(p_input):84                _output = binascii.hexlify(p_input)85            else:86                _input_byte = self.byte_representation('to_bin', p_input)87                _output = binascii.hexlify(_input_byte)88        elif p_what == 'from_hex':89            # RETURN ORIGINAL DATA OF A HEX REPRESENTATION OF p_input BINARY or STRING90            _output = binascii.unhexlify(p_input)      91        # HEXLIFY and UNHEXLIFY return always a BINARY92        return _output93    """""""""""""""""""""""""""""""""""""""""""""94    KDF ( Key Derivation Function )95    """""""""""""""""""""""""""""""""""""""""""""96    ## Create KDF the given PASSWORD and SALT97    def __kdf_create(self):98        99        # Prepare100        _inputs = f"{self.inputs}"101        _module_name = __name__102        _func_name = inspect.currentframe().f_code.co_name          103        _response_tuple = None104        _msg = None105        _kdf = None106        # PBKDF2 (Password Based Key Derivation Function 2)107        if (self.kdf_algo == 'PBKDF2'):108            try:109                _kdf = PBKDF2HMAC(  algorithm = hashes.SHA256(),110                                    length = 32,111                                    salt = self.salt_byte,112                                    iterations = 100000,113                                    backend = self.backend  )114                _response_tuple = ('OK', _kdf)115            except:116                _msg = traceback.format_exc(2)117                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}")118        # SCRYPT is a KDF designed for password storage by Colin Percival to be resistant 119        # against hardware-assisted attackers by having a tunable memory cost120        if (self.kdf_algo == 'Scrypt'):121            try:122                _kdf = Scrypt(  salt = self.salt_byte,123                                length = 32,124                                n = 2**14,125                                r = 8,126                                p = 1,127                                backend = self.backend  )128                _response_tuple = ('OK', _kdf)129            except:130                _msg = traceback.format_exc(2)131                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}")                132        return(_response_tuple)133        134    ## VERIFY the password with its derived key (your hash)135    def kdf_verify_password(self, p_password2verify_1, p_password2verify_2):136    137        # Prepare138        _inputs = f"{self.inputs}|{p_password2verify_1}|{p_password2verify_2}"139        _module_name = __name__140        _func_name = inspect.currentframe().f_code.co_name         141        _response_tuple = None142        _msg = None143        _password2verify_1_byte = None144        _password2verify_2_b64decode = None145        146        # Transform147        _password2verify_1_byte = self.byte_representation('to_bin', p_password2verify_1)148        _password2verify_2_b64decode = base64.urlsafe_b64decode(p_password2verify_2)149         150        # Create KDF151        _kdf = self.__kdf_create()152        153        # Instance Fernet Obj154        if _kdf[0] == 'OK':155            try:156                _kdf[1].verify( _password2verify_1_byte, _password2verify_2_b64decode)157                _response_tuple = ('OK', True)158            except InvalidKey as _msg:159                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")160            except Exception:161                _msg = traceback.format_exc(2)162                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")163        else:164            _msg = _kdf[1]165            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}")            166        167        return(_response_tuple)168    ## Derived Key from the given PASSWORD and SALT - like HASHING password169    def kdf_derive_and_b64encode_key(self, p_password2derive):170        171        # Prepare172        _inputs = f"{self.inputs}"173        _module_name = __name__174        _func_name = inspect.currentframe().f_code.co_name 175        _response_tuple = None176        _msg = None177        _kdf_derive = None178        _input_value_byte = None179        _output_value_string = None180        _f_key_b64encode = None181        # Transform182        _input_value_byte = self.byte_representation('to_bin', p_password2derive)183        184        # Create KDF185        _kdf = self.__kdf_create()186        187        # Derive (Hashing)188        if _kdf[0] == 'OK':189            try:190                _kdf_derive = _kdf[1].derive( _input_value_byte )191                _f_key_b64encode =  base64.urlsafe_b64encode(_kdf_derive)192                _output_value_string = self.byte_representation('from_bin', _f_key_b64encode)193                _response_tuple = ('OK', _output_value_string)194            except:195                _msg = traceback.format_exc(2)196                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}")                197        else:198            _msg = _kdf[1]199            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg,False)}")           200        return(_response_tuple)201    ## Create Fernet Obj from derived key202    def __kdf_create_fernet_obj(self, p_password2derive):203        204        # Prepare205        _inputs = f"{self.inputs}"206        _module_name = __name__207        _func_name = inspect.currentframe().f_code.co_name 208        _response_tuple = None209        _msg = None210        _fernet_obj = None211        # Derive Key 212        _f_key = self.kdf_derive_and_b64encode_key(p_password2derive)213       214        # Instance Fernet Obj215        if _f_key[0] == 'OK':216            try:217                _fernet_obj = Fernet( _f_key[1] )218                _response_tuple = ('OK',_fernet_obj)              219            except:220                _msg = traceback.format_exc(2)221                _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg,False)}")                222        else:223            _msg = _f_key[1]224            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}")             225            226        return(_response_tuple)227    ## ENCRYPT symmetric of p_secret2encrypt (with fernet obj)228    def kdf_encrypt(self, p_password4crypt, p_secret2encrypt):229        230        # Prepare231        _inputs = f"{self.inputs}|{p_secret2encrypt}"232        _module_name = __name__233        _func_name = inspect.currentframe().f_code.co_name         234        _response_tuple = None235        _msg = None236        _fernet_obj = None237        _input_value_byte = None238        _input_value_crypted = None239        _output_value_string = None240        # Create Obj241        _fernet_obj = self.__kdf_create_fernet_obj(p_password4crypt)242        243        # Crypt with Fernet Obj244        if _fernet_obj[0] == 'OK':245            if not ( self.is_byte(p_secret2encrypt)  ):246                try:247                    _input_value_byte = self.byte_representation('to_bin', p_secret2encrypt)248                    _input_value_crypted = _fernet_obj[1].encrypt(_input_value_byte)249                    _output_value_string = self.byte_representation('from_bin', _input_value_crypted)250                    _response_tuple = ('OK',_output_value_string)251                except:252                    _msg = traceback.format_exc(2)253                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")                    254            else:255                try:256                    _input_value_crypted = _fernet_obj[1].encrypt( p_secret2encrypt )257                    _output_value_string = self.byte_representation('from_bin', _input_value_crypted)258                    _response_tuple = ('OK',_output_value_string)259                except:260                    _msg = traceback.format_exc(2)261                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")262        263        else:264            _msg = _fernet_obj[1]265            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}")            266        267        return(_response_tuple)268    ## DECRYPT symmetric of p_secret2decrypt (with fernet obj)269    def kdf_decrypt(self, p_password4decrypt, p_secret2decrypt):270        # Prepare271        _inputs = f"{self.inputs}|{p_secret2decrypt}"272        _module_name = __name__273        _func_name = inspect.currentframe().f_code.co_name         274        _response_tuple = None275        _msg = None276        _fernet_obj = None        277        _input_value_byte = None278        _input_value_decrypted = None279        _output_value_string = None280        # Create Obj281        _fernet_obj = self.__kdf_create_fernet_obj(p_password4decrypt)282        # Decrypt with Fernet Obj283        if _fernet_obj[0] == 'OK':284            285            if not ( self.is_byte(p_secret2decrypt) ):286                try:287                    _input_value_byte = self.byte_representation('to_bin', p_secret2decrypt)288                    _input_value_decrypted = _fernet_obj[1].decrypt(_input_value_byte)289                    _output_value_string = self.byte_representation('from_bin', _input_value_decrypted)290                    _response_tuple = ('OK',_output_value_string)291                except InvalidToken:292                    _msg = 'Wrong password for Kdf Decrypt'293                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")294                except Exception:295                    _msg = traceback.format_exc(2)296                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")297            else:298                try:299                    _input_value_decrypted = _fernet_obj[1].decrypt(p_secret2decrypt)300                    _output_value_string = self.byte_representation('from_bin', _input_value_decrypted)301                    _response_tuple = ('OK',_output_value_string)302                except InvalidToken:303                    _msg = 'Wrong password for Kdf _decrypt'304                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_msg)}")                                                                               305                except Exception:306                    _msg = traceback.format_exc(2)307                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")308        else:309            _msg = _fernet_obj[1]310            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")311        return(_response_tuple)312    """""""""""""""""""""""""""""""""""""""313    AES256 ( CBC with IV )314    """""""""""""""""""""""""""""""""""""""315    def __aes_padding(self, p_input):316        _block_size = algorithms.AES.block_size / 8            317        _ordinal = _block_size - len(p_input) % _block_size318        _ordinal_int = int(_ordinal)319        _output = p_input + _ordinal_int * chr(_ordinal_int)320        return _output321    def __aes_unpadding(self, p_input):322        _output = p_input[:-ord(p_input[len(p_input) - 1:])]323        return _output324    ## Create Cipher Obj from p_key and with p_iv325    def __aes_create_cipher_obj(self, p_key, p_iv):326        # Prepare327        _inputs = f"{self.inputs}|{p_key}|{p_iv}"328        _module_name = __name__329        _func_name = inspect.currentframe().f_code.co_name         330        _response_tuple = None331        _msg = None332        _cipher = None333        try:334            _cipher = Cipher(   algorithms.AES(p_key), 335                                modes.CBC(p_iv),336                                backend = self.backend  )337            _response_tuple = ('OK',_cipher)338        except Exception:339            _msg = traceback.format_exc(2)340            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")341        342        return _response_tuple343    ## AES256 ENCRYPT symmetric of p_secret2encrypt with p_seed344    def aes_encrypt(self, p_seed, p_secret2encrypt):345        # Prepare346        _inputs = f"{self.inputs}|{p_seed}|{p_secret2encrypt}"347        _module_name = __name__348        _func_name = inspect.currentframe().f_code.co_name349        _response_tuple = None350        _msg = None351        _cipher_response = None352        _cipher = None353        _secret_padded = None354        _secret_byte = None355        _seed_byte = None356        _iv_byte = None357        _iv_hex_byte = None358        _iv_hex_string = None359        _encryptor = None360        _encrypted_byte = None361        362        ## Prepare Secret, Seed363        _secret_padded = self.__aes_padding(p_secret2encrypt)364        _secret_byte = self.byte_representation('to_bin', _secret_padded)365        _seed_byte = self.hex_representation('from_hex', p_seed)366        367        ## Generate IV368        _iv_byte = self.generate_random_binary_data(16)369        _iv_hex_byte = self.hex_representation('to_hex', _iv_byte)370        _iv_hex_string = self.byte_representation('from_bin', _iv_hex_byte)371        372        ## Get Cipher373        _cipher_response = self.__aes_create_cipher_obj(_seed_byte, _iv_byte)374        if _cipher_response[0]=='OK':375            _cipher = _cipher_response[1]376        else:377            _msg = _cipher_response[1]378            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")379        ## Encrypt380        try:381            _encryptor = _cipher.encryptor()382        except Exception:383            _msg = traceback.format_exc(2)384            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")385            return _response_tuple386        try:387            _encrypted_byte = _encryptor.update(_secret_byte) + _encryptor.finalize()388        except Exception:389            _msg = traceback.format_exc(2)390            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")391            return _response_tuple392        ## Return393        _response_tuple = ('OK',_encrypted_byte, _iv_hex_string)394        395        return _response_tuple396    ## AES256 DECRYPT symmetric of p_secret2decrypt with p_seed and p_iv397    def aes_decrypt(self, p_seed, p_iv, p_secret2decrypt):398        399        # Prepare400        _inputs = f"{self.inputs}|{p_seed}|{p_iv}|{p_secret2decrypt}"401        _module_name = __name__402        _func_name = inspect.currentframe().f_code.co_name403        _response_tuple = None404        _msg = None405        406        _seed_byte = None407        _iv_byte = None408        _cipher = None409        _decryptor = None410        _decrypted_byte = None411        _decrypted_text = None412        _decrypted_text_unpadded = None413        414        ## Prepare Seed415        _seed_byte = self.hex_representation('from_hex', p_seed)416        ## Prepare IV417        _iv_byte = self.hex_representation('from_hex', p_iv)418        ## Get Cipher419        _cipher_response = self.__aes_create_cipher_obj(_seed_byte, _iv_byte)420        if _cipher_response[0]=='OK':421            _cipher = _cipher_response[1]422        else:423            _msg = _cipher_response[1]424            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")425        ## Decrypt426        try:427            _decryptor = _cipher.decryptor()428        except Exception:429            _msg = traceback.format_exc(2)430            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")431            return _response_tuple432        try:433            _decrypted_byte = _decryptor.update(p_secret2decrypt) + _decryptor.finalize()434            _decrypted_text = self.byte_representation('from_bin', _decrypted_byte)435            _decrypted_text_unpadded = self.__aes_unpadding(_decrypted_text)436        except Exception:437            _msg = traceback.format_exc(2)438            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")439            return _response_tuple440        ## Return441        _response_tuple = ('OK', _decrypted_text_unpadded)442        443        return _response_tuple444    """""""""""""""""""""""""""""""""""""""445    SHA256446    """""""""""""""""""""""""""""""""""""""447    448    def make_sha256(self, p_secret2hash):449        # Prepare450        _inputs = f"{self.inputs}"451        _module_name = __name__452        _func_name = inspect.currentframe().f_code.co_name453        _response_tuple = None454        _msg = None455        _secret_byte = None456        _digest = None457        _secret_hashed_byte = None458        _secret_hashed_hex_byte = None459        _secret_hashed_hex_string = None460        461        ## Prepare Secret462        _secret_byte = self.byte_representation('to_bin', p_secret2hash)463        464        ## HASH465        try:466            _digest = hashes.Hash(  hashes.SHA256(),467                                    backend=self.backend    )468        except Exception:469            _msg = traceback.format_exc(2)470            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")471            return _response_tuple472        try:473            _digest.update(_secret_byte)474        except Exception:475            _msg = traceback.format_exc(2)476            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")477            return _response_tuple478        try:479            _secret_hashed_byte = _digest.finalize()480            _secret_hashed_hex_byte = self.hex_representation('to_hex', _secret_hashed_byte)481            _secret_hashed_hex_string = self.byte_representation('from_bin', _secret_hashed_hex_byte)482        except Exception:483            _msg = traceback.format_exc(2)484            _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")485            return _response_tuple486        ## Return487        _response_tuple = ('OK', _secret_hashed_hex_string)488        489        return _response_tuple490    """""""""""""""""""""""""""""""""""""""491    CHECKSUM file ( SHA256 or BLAKE2b )492    """""""""""""""""""""""""""""""""""""""493    494    def checksum_file(self, p_algo2checksum, p_file_path, p_file_name):495        496        # Prepare497        _inputs = f"{self.inputs}|{p_algo2checksum}|{p_file_path}|{p_file_name}"498        _module_name = __name__499        _func_name = inspect.currentframe().f_code.co_name         500        _response_tuple = None501        _msg = None502        _file = f"{p_file_path}{p_file_name}"503        _algo_checksum_file = None504        _checksum = None505        _checksum_b64encode = None506        _output_value_string = None507        508        # Choose Algorithm509        if p_algo2checksum.lower() == 'sha256':510            _algo_checksum_file = hashes.SHA256()511        elif p_algo2checksum.lower() == 'blake2':512            _algo_checksum_file = hashes.BLAKE2b(64)513        else:514            _msg = 'Unknown algorithm'515            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}")            516            return(_response_tuple)517            518        if os.path.exists(_file):519        520            with open(_file, "rb") as f:521                try:522                    _file_hash = hashes.Hash(   _algo_checksum_file, 523                                                backend=self.backend    )524                    while chunk := f.read(8192): # Read the binary file to the end (8192)525                        _file_hash.update(chunk)526                except Exception:527                    _response_tuple = ('NOK', f"{ utility.my_log('Exception',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs,_msg)}")                    528            _checksum = _file_hash.finalize()529            _checksum_b64encode = base64.urlsafe_b64encode(_checksum )530            _output_value_string = self.byte_representation('from_bin', _checksum_b64encode)531            _response_tuple = ('OK',_output_value_string)532            533        else:534            _msg = 'File does not exist'535            _response_tuple = ('NOK', f"{ utility.my_log('Error',_module_name,_func_name,inspect.currentframe().f_lineno,_inputs, _msg)}")            536            ...l2network_multi_blade.py
Source:l2network_multi_blade.py  
...49                self._inventory[key] = utils.import_object(50                    conf.PLUGINS[const.INVENTORY][key])51                LOG.debug("Loaded device inventory %s\n" % \52                        conf.PLUGINS[const.INVENTORY][key])53    def _func_name(self, offset=0):54        """Get the name of the calling function"""55        return inspect.stack()[1 + offset][3]56    def _invoke_plugin_per_device(self, plugin_key, function_name, args):57        """Invoke only device plugin for all the devices in the system"""58        if not plugin_key in self._plugins.keys():59            LOG.info("No %s Plugin loaded" % plugin_key)60            LOG.info("%s: %s with args %s ignored" \61                     % (plugin_key, function_name, args))62            return63        device_params = self._invoke_inventory(plugin_key, function_name,64                                               args)65        device_ips = device_params[const.DEVICE_IP]66        if not device_ips:67            # Return in a list68            return [self._invoke_plugin(plugin_key, function_name, args,69                                device_params)]70        else:71            # Return a list of return values from each device72            output = []73            for device_ip in device_ips:74                new_device_params = deepcopy(device_params)75                new_device_params[const.DEVICE_IP] = device_ip76                output.append(self._invoke_plugin(plugin_key, function_name,77                                args, new_device_params))78            return output79    def _invoke_inventory(self, plugin_key, function_name, args):80        """Invoke only the inventory implementation"""81        if not plugin_key in self._inventory.keys():82            LOG.warn("No %s inventory loaded" % plugin_key)83            LOG.warn("%s: %s with args %s ignored" \84                     % (plugin_key, function_name, args))85            return {const.DEVICE_IP: []}86        else:87            return getattr(self._inventory[plugin_key], function_name)(args)88    def _invoke_plugin(self, plugin_key, function_name, args, kwargs):89        """Invoke only the device plugin"""90        # If there are more args than needed, add them to kwargs91        func = getattr(self._plugins[plugin_key], function_name)92        if args.__len__() + 1 > inspect.getargspec(func).args.__len__():93            kwargs.update(args.pop())94        return func(*args, **kwargs)95    def get_all_networks(self, args):96        """Not implemented for this model"""97        pass98    def create_network(self, args):99        """Support for the Quantum core API call"""100        output = []101        ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,102                                       self._func_name(), args)103        nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,104                                       self._func_name(), args)105        output.extend(ucs_output or [])106        output.extend(nexus_output or [])107        return output108    def delete_network(self, args):109        """Support for the Quantum core API call"""110        output = []111        ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,112                                       self._func_name(), args)113        nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,114                                       self._func_name(), args)115        output.extend(ucs_output or [])116        output.extend(nexus_output or [])117        return output118    def get_network_details(self, args):119        """Not implemented for this model"""120        pass121    def update_network(self, args):122        """Support for the Quantum core API call"""123        output = []124        ucs_output = self._invoke_plugin_per_device(const.UCS_PLUGIN,125                                       self._func_name(), args)126        nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,127                                       self._func_name(), args)128        output.extend(ucs_output or [])129        output.extend(nexus_output or [])130        return output131    def get_all_ports(self, args):132        """Not implemented for this model"""133        pass134    def create_port(self, args):135        """Support for the Quantum core API call"""136        return self._invoke_plugin_per_device(const.UCS_PLUGIN,137                                       self._func_name(), args)138    def delete_port(self, args):139        """Support for the Quantum core API call"""140        return self._invoke_plugin_per_device(const.UCS_PLUGIN,141                                        self._func_name(), args)142    def update_port(self, args):143        """Not implemented for this model"""144        pass145    def get_port_details(self, args):146        """Not implemented for this model"""147        pass148    def plug_interface(self, args):149        """Support for the Quantum core API call"""150        return self._invoke_plugin_per_device(const.UCS_PLUGIN,151                                        self._func_name(), args)152    def unplug_interface(self, args):153        """Support for the Quantum core API call"""154        return self._invoke_plugin_per_device(const.UCS_PLUGIN,155                                        self._func_name(), args)156    def schedule_host(self, args):157        """Provides the hostname on which a dynamic vnic is reserved"""158        LOG.debug("schedule_host() called\n")159        return self._invoke_inventory(const.UCS_PLUGIN, self._func_name(),160                                      args)161    def associate_port(self, args):162        """163        Get the portprofile name and the device name for the dynamic vnic164        """165        LOG.debug("associate_port() called\n")166        return self._invoke_inventory(const.UCS_PLUGIN, self._func_name(),167                                      args)168    def detach_port(self, args):169        """170        Remove the association of the VIF with the dynamic vnic171        """172        LOG.debug("detach_port() called\n")173        return self._invoke_plugin_per_device(const.UCS_PLUGIN,174                                              self._func_name(), args)175    def create_multiport(self, args):176        """Support for extension  API call"""177        self._invoke_plugin_per_device(const.UCS_PLUGIN, self._func_name(),...typed.py
Source:typed.py  
1from typing import get_args2from types import GenericAlias3from inspect import getfullargspec4from typed.decorator_types import *5class TypedDecorator(object):6    def __init__(self, func):7        self._func = func8        self._func_name = self._func.__name__9        self._argspec = getfullargspec(self._func)10        self._annotations = self._argspec.annotations11        self._func_args = self._argspec.args12        self._defaults = self._argspec.defaults13        self._returns = self._get_returns_types14    @property15    def _get_returns_types(self):16        try:17            returns = self._annotations['return']18            del self._annotations['return']19        except KeyError:20            returns = None21        return returns22    @staticmethod23    def _get_type(type_) -> Any:24        try:25            return AllTypes[type_]26        except KeyError:27            return type_28    @staticmethod29    def _get_type_name(type_) -> str:30        try:31            type_name = str(type_.__name__)32        except AttributeError:33            type_name = str(type_)34        try:35            return AllNames[type_name]36        except KeyError:37            return type_name38    def _get_generic_type(self, type_):39        return self._get_type(str(type_).split('[')[0])40    def _check_generic(self, kwarg, args, _generic_args):41        if type(_generic_args) == GenericAlias:42            _generic_types = get_args(_generic_args)43            if len(args) > len(_generic_types):44                types_ = []45                for generic_type in _generic_types:46                    if type(generic_type) == GenericAlias:47                        types_.append(self._get_generic_type(generic_type))48                    else:49                        types_.append(generic_type)50                is_all_correct = {51                    self._get_type_name(type_arg)52                    for type_arg in (type(arg) for arg in args)53                    if type_arg not in types_54                }55                if len(is_all_correct):56                    raise Warning(f'{self._func_name} {_generic_args} in {kwarg} has no {" and ".join(is_all_correct)} type')57            for arg, _generic_type in zip(args, _generic_types):58                if type(_generic_type) == GenericAlias:59                    self._check_generic(kwarg, arg, _generic_type)60                else:61                    if _generic_type == AnyType: continue62                    if _generic_type == Iterator: continue63                    try:64                        assert isinstance(arg, _generic_type), \65                            f'{self._func_name} {_generic_args} in {kwarg} takes {self._get_type_name(_generic_type)}, ' \66                            f'not {self._get_type_name(type(arg))}'67                    except TypeError as e:68                        print(self._func_name, e)69            _generic_args = self._get_generic_type(_generic_args)70        if _generic_args == Iterator: return71        try:72            assert isinstance(args, _generic_args), \73                f'{self._func_name}  {kwarg} - parameter must be an {self._get_type_name(_generic_args)}, ' \74                f'not a {self._get_type_name(type(args))}.'75        except TypeError as e:76            print(self._func_name, e, args, _generic_args)77    def check_anotations(self, *args, **kwargs):78        for arg, func_arg in zip(args, self._func_args):79            if func_arg == 'self':80                self._func_args.remove('self')81                try:82                    del self._annotations['self']83                except KeyError:84                    ...85                continue86            status = True if func_arg in self._annotations else False87            if status:88                raise Warning(f'{self._func_name}: {func_arg}({arg}) - this argument is not conveyed explicitly.')89            else:90                raise Warning(91                    f'{self._func_name} {func_arg} - the argument has no annotation type. '92                    f'In parameter, data type is a {self._get_type_name(type(arg))}.'93                )94        if len(kwargs) > len(self._func_args):95            raise Warning(f'{self._func_name} The number of parameters passed is more than the function accepts')96        for kwarg, func_arg in zip(kwargs, self._func_args):97            if kwarg != func_arg:98                if not self._defaults:99                    raise Warning(f'{self._func_name} ERROR')100                else:101                    continue102            status = True if kwarg in self._annotations else False103            arg = kwargs[kwarg]104            if status:105                if (_annotation_arg := self._annotations[kwarg]) == Any: continue106                self._check_generic(kwarg, arg, _annotation_arg)107            else:108                raise Warning(109                    f'{self._func_name} {kwarg} - argument has no annotation type. '110                    f'In parameter, data type is a {self._get_type_name(type(arg))}.'111                )112    def check_returns(self, results):113        self._check_generic('results', results, self._returns)114    def main(self, *args, **kwargs):115        self.check_anotations(*args, **kwargs)116        results = self._func(*args, **kwargs)117        if self._returns:118            self.check_returns(results)119        return results120    def __call__(self, *args, **kwargs):121        return self.main(*args, **kwargs)122    @staticmethod123    def typed(func):124        def wrapper(*args, **kwargs):125            return TypedDecorator(func)(*args, **kwargs)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
