Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...131    @handler("CreateGrant", expand=False)132    def create_grant(133        self, context: RequestContext, create_grant_request: CreateGrantRequest134    ) -> CreateGrantResponse:135        self._validate_grant(create_grant_request)136        region_details = KMSBackend.get()137        grant = dict(create_grant_request)138        grant[GRANT_ID] = long_uid()139        grant[GRANT_TOKENS] = [long_uid()]140        if NAME not in grant:141            grant[NAME] = ""142        grant[CREATION_DATE] = time.time()143        region_details.grants[grant[GRANT_ID]] = grant144        return CreateGrantResponse(GrantId=grant[GRANT_ID], GrantToken=grant[GRANT_TOKENS][0])145    @handler("ListGrants", expand=False)146    def list_grants(147        self, context: RequestContext, list_grants_request: ListGrantsRequest148    ) -> ListGrantsResponse:149        key_id = list_grants_request.get(KEY_ID)150        if not key_id:151            raise ValidationError(f"Required input parameter '{KEY_ID}' not specified")152        region_details = KMSBackend.get()153        self._verify_key_exists(key_id)154        limit = list_grants_request.get("Limit", 50)155        if "Marker" in list_grants_request:156            filtered = region_details.markers.get(list_grants_request["Marker"], [])157        else:158            filtered = [159                grant160                for grant in region_details.grants.values()161                if grant[KEY_ID] == key_id162                and filter_grant_id(grant, list_grants_request)163                and filter_grantee_principal(grant, list_grants_request)164            ]165        # filter out attributes166        filtered = [remove_attributes(dict(grant), ["GrantTokens"]) for grant in filtered]167        if len(filtered) <= limit:168            return ListGrantsResponse(Grants=filtered, Truncated=False)169        in_limit = filtered[:limit]170        out_limit = filtered[limit:]171        marker_id = long_uid()172        region_details.markers[marker_id] = out_limit173        return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)174    def revoke_grant(175        self, context: RequestContext, key_id: KeyIdType, grant_id: GrantIdType176    ) -> None:177        grants = KMSBackend.get().grants178        if grants[grant_id][KEY_ID] != key_id:179            raise ValidationError(f"Invalid {KEY_ID}={key_id} specified for grant {grant_id}")180        grants.pop(grant_id)181    def retire_grant(182        self,183        context: RequestContext,184        grant_token: GrantTokenType = None,185        key_id: KeyIdType = None,186        grant_id: GrantIdType = None,187    ) -> None:188        region_details = KMSBackend.get()189        grants = region_details.grants190        if grant_id and grants[grant_id][KEY_ID] == key_id:191            grants.pop(grant_id)192        elif grant_token:193            region_details.grants = {194                grant_id: grant195                for grant_id, grant in grants.items()196                if grant_token not in grant[GRANT_TOKENS]197            }198        else:199            raise InvalidGrantTokenException("Grant token OR (grant ID, key ID) must be specified")200    def list_retirable_grants(201        self,202        context: RequestContext,203        retiring_principal: PrincipalIdType,204        limit: LimitType = None,205        marker: MarkerType = None,206    ) -> ListGrantsResponse:207        region_details = KMSBackend.get()208        grants = region_details.grants209        if not retiring_principal:210            raise ValidationError(f"Required input parameter '{RETIRING_PRINCIPAL}' not specified")211        limit = limit or 50212        if marker:213            markers = region_details.markers214            filtered = markers.get(marker, [])215        else:216            filtered = [217                grant218                for grant in grants.values()219                if RETIRING_PRINCIPAL in grant and grant[RETIRING_PRINCIPAL] == retiring_principal220            ]221        if len(filtered) <= limit:222            return ListGrantsResponse(Grants=filtered, Truncated=False)223        markers = region_details.markers224        in_limit = filtered[:limit]225        out_limit = filtered[limit:]226        marker_id = long_uid()227        markers[marker_id] = out_limit228        return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)229    def get_public_key(230        self, context: RequestContext, key_id: KeyIdType, grant_tokens: GrantTokenList = None231    ) -> GetPublicKeyResponse:232        region_details = KMSBackend.get()233        result = region_details.key_pairs.get(key_id)234        if not result:235            raise NotFoundException()236        attrs = [237            "KeyId",238            "PublicKey",239            "KeySpec",240            "KeyUsage",241            "EncryptionAlgorithms",242            "SigningAlgorithms",243        ]244        result = select_attributes(result, attrs)245        return GetPublicKeyResponse(**result)246    @handler("GenerateDataKeyPair", expand=False)247    def generate_data_key_pair(248        self,249        context: RequestContext,250        generate_data_key_pair_request: GenerateDataKeyPairRequest,251    ) -> GenerateDataKeyPairResponse:252        result = _generate_data_key_pair(generate_data_key_pair_request)253        attrs = [254            "PrivateKeyCiphertextBlob",255            "PrivateKeyPlaintext",256            "PublicKey",257            "KeyId",258            "KeyPairSpec",259        ]260        result = select_attributes(result, attrs)261        return GenerateDataKeyPairResponse(**result)262    @handler("GenerateDataKeyPairWithoutPlaintext", expand=False)263    def generate_data_key_pair_without_plaintext(264        self,265        context: RequestContext,266        generate_data_key_pair_without_plaintext_request: GenerateDataKeyPairWithoutPlaintextRequest,267    ) -> GenerateDataKeyPairWithoutPlaintextResponse:268        result = _generate_data_key_pair(generate_data_key_pair_without_plaintext_request)269        result = select_attributes(270            result, ["PrivateKeyCiphertextBlob", "PublicKey", "KeyId", "KeyPairSpec"]271        )272        return GenerateDataKeyPairResponse(**result)273    def sign(274        self,275        context: RequestContext,276        key_id: KeyIdType,277        message: PlaintextType,278        signing_algorithm: SigningAlgorithmSpec,279        message_type: MessageType = None,280        grant_tokens: GrantTokenList = None,281    ) -> SignResponse:282        region_details = KMSBackend.get()283        key_pair = region_details.key_pairs.get(key_id)284        if not key_pair:285            raise NotFoundException(f"Key ID {key_id} not found for signing")286        kwargs = {}287        if signing_algorithm.startswith("RSA"):288            if "PKCS" in signing_algorithm:289                kwargs["padding"] = padding.PKCS1v15()290            elif "PSS" in signing_algorithm:291                kwargs["padding"] = padding.PSS(292                    mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH293                )294            else:295                LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)296        if "SHA_256" in signing_algorithm:297            kwargs["algorithm"] = hashes.SHA256()298        elif "SHA_384" in signing_algorithm:299            kwargs["algorithm"] = hashes.SHA384()300        elif "SHA_512" in signing_algorithm:301            kwargs["algorithm"] = hashes.SHA512()302        else:303            LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", signing_algorithm)304        if signing_algorithm.startswith("ECDSA"):305            kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))306        # generate signature307        signature = key_pair["_key_"].sign(data=message, **kwargs)308        result = {309            "KeyId": key_id,310            "Signature": signature,311            "SigningAlgorithm": signing_algorithm,312        }313        return SignResponse(**result)314    def encrypt(315        self,316        context: RequestContext,317        key_id: KeyIdType,318        plaintext: PlaintextType,319        encryption_context: EncryptionContextType = None,320        grant_tokens: GrantTokenList = None,321        encryption_algorithm: EncryptionAlgorithmSpec = None,322    ) -> EncryptResponse:323        # check if we have imported custom key material for this key324        matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]325        if not matching:326            return call_moto(context)327        key_obj = kms_backends[context.region].keys.get(key_id)328        ciphertext_blob = encrypt(key_obj.key_material, plaintext)329        return EncryptResponse(330            CiphertextBlob=ciphertext_blob, KeyId=key_id, EncryptionAlgorithm=encryption_algorithm331        )332    def decrypt(333        self,334        context: RequestContext,335        ciphertext_blob: CiphertextType,336        encryption_context: EncryptionContextType = None,337        grant_tokens: GrantTokenList = None,338        key_id: KeyIdType = None,339        encryption_algorithm: EncryptionAlgorithmSpec = None,340    ) -> DecryptResponse:341        # check if we have imported custom key material for this key342        matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]343        if not matching:344            return call_moto(context)345        key_obj = kms_backends[context.region].keys.get(key_id)346        plaintext = decrypt(key_obj.key_material, ciphertext_blob)347        return DecryptResponse(348            KeyId=key_id, Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm349        )350    def get_parameters_for_import(351        self,352        context: RequestContext,353        key_id: KeyIdType,354        wrapping_algorithm: AlgorithmSpec,355        wrapping_key_spec: WrappingKeySpec,356    ) -> GetParametersForImportResponse:357        key = _generate_data_key_pair(358            {"KeySpec": wrapping_key_spec}, create_cipher=False, add_to_keys=False359        )360        import_token = short_uid()361        import_state = KeyImportState(362            key_id=key_id,363            import_token=import_token,364            private_key=key["PrivateKeyPlaintext"],365            public_key=key["PublicKey"],366            wrapping_algo=wrapping_algorithm,367            key_obj=key["_key_"],368        )369        KMSBackend.get().imports[import_token] = import_state370        expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)371        return GetParametersForImportResponse(372            KeyId=key_id,373            ImportToken=to_bytes(import_state.import_token),374            PublicKey=import_state.public_key,375            ParametersValidTo=expiry_date,376        )377    def import_key_material(378        self,379        context: RequestContext,380        key_id: KeyIdType,381        import_token: CiphertextType,382        encrypted_key_material: CiphertextType,383        valid_to: DateType = None,384        expiration_model: ExpirationModelType = None,385    ) -> ImportKeyMaterialResponse:386        import_token = to_str(import_token)387        import_state = KMSBackend.get().imports.get(import_token)388        if not import_state:389            raise NotFoundException(f"Unable to find key import token '{import_token}'")390        key_obj = kms_backends[context.region].keys.get(key_id)391        if not key_obj:392            raise NotFoundException(f"Unable to find key '{key_id}'")393        key_material = import_state.key_obj.decrypt(encrypted_key_material, padding.PKCS1v15())394        key_obj.key_material = key_material395        return ImportKeyMaterialResponse()396    def _verify_key_exists(self, key_id):397        try:398            kms_backends[aws_stack.get_region()].describe_key(key_id)399        except Exception:400            raise ValidationError(f"Invalid key ID '{key_id}'")401    def _validate_grant(self, data: Dict):402        if KEY_ID not in data or GRANTEE_PRINCIPAL not in data or OPERATIONS not in data:403            raise ValidationError("Grant ID, key ID and grantee principal must be specified")404        for operation in data[OPERATIONS]:405            if operation not in VALID_OPERATIONS:406                raise ValidationError(407                    f"Value {[OPERATIONS]} at 'operations' failed to satisfy constraint: Member must satisfy"408                    f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"409                )410        self._verify_key_exists(data[KEY_ID])411# ---------------412# UTIL FUNCTIONS413# ---------------414def _generate_data_key_pair(data, create_cipher=True, add_to_keys=True):415    region_details = KMSBackend.get()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
