How to use _validate_grant method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...131 @handler("CreateGrant", expand=False)132 def create_grant(133 self, context: RequestContext, create_grant_request: CreateGrantRequest134 ) -> CreateGrantResponse:135 self._validate_grant(create_grant_request)136 region_details = KMSBackend.get()137 grant = dict(create_grant_request)138 grant[GRANT_ID] = long_uid()139 grant[GRANT_TOKENS] = [long_uid()]140 if NAME not in grant:141 grant[NAME] = ""142 grant[CREATION_DATE] = time.time()143 region_details.grants[grant[GRANT_ID]] = grant144 return CreateGrantResponse(GrantId=grant[GRANT_ID], GrantToken=grant[GRANT_TOKENS][0])145 @handler("ListGrants", expand=False)146 def list_grants(147 self, context: RequestContext, list_grants_request: ListGrantsRequest148 ) -> ListGrantsResponse:149 key_id = list_grants_request.get(KEY_ID)150 if not key_id:151 raise ValidationError(f"Required input parameter '{KEY_ID}' not specified")152 region_details = KMSBackend.get()153 self._verify_key_exists(key_id)154 limit = list_grants_request.get("Limit", 50)155 if "Marker" in list_grants_request:156 filtered = region_details.markers.get(list_grants_request["Marker"], [])157 else:158 filtered = [159 grant160 for grant in region_details.grants.values()161 if grant[KEY_ID] == key_id162 and filter_grant_id(grant, list_grants_request)163 and filter_grantee_principal(grant, list_grants_request)164 ]165 # filter out attributes166 filtered = [remove_attributes(dict(grant), ["GrantTokens"]) for grant in filtered]167 if len(filtered) <= limit:168 return ListGrantsResponse(Grants=filtered, Truncated=False)169 in_limit = filtered[:limit]170 out_limit = filtered[limit:]171 marker_id = long_uid()172 region_details.markers[marker_id] = out_limit173 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)174 def revoke_grant(175 self, context: RequestContext, key_id: KeyIdType, grant_id: GrantIdType176 ) -> None:177 grants = KMSBackend.get().grants178 if grants[grant_id][KEY_ID] != key_id:179 raise ValidationError(f"Invalid {KEY_ID}={key_id} specified for grant {grant_id}")180 grants.pop(grant_id)181 def retire_grant(182 self,183 context: RequestContext,184 grant_token: GrantTokenType = None,185 key_id: KeyIdType = None,186 grant_id: GrantIdType = None,187 ) -> None:188 region_details = KMSBackend.get()189 grants = region_details.grants190 if grant_id and grants[grant_id][KEY_ID] == key_id:191 grants.pop(grant_id)192 elif grant_token:193 region_details.grants = {194 grant_id: grant195 for grant_id, grant in grants.items()196 if grant_token not in grant[GRANT_TOKENS]197 }198 else:199 raise InvalidGrantTokenException("Grant token OR (grant ID, key ID) must be specified")200 def list_retirable_grants(201 self,202 context: RequestContext,203 retiring_principal: PrincipalIdType,204 limit: LimitType = None,205 marker: MarkerType = None,206 ) -> ListGrantsResponse:207 region_details = KMSBackend.get()208 grants = region_details.grants209 if not retiring_principal:210 raise ValidationError(f"Required input parameter '{RETIRING_PRINCIPAL}' not specified")211 limit = limit or 50212 if marker:213 markers = region_details.markers214 filtered = markers.get(marker, [])215 else:216 filtered = [217 grant218 for grant in grants.values()219 if RETIRING_PRINCIPAL in grant and grant[RETIRING_PRINCIPAL] == retiring_principal220 ]221 if len(filtered) <= limit:222 return ListGrantsResponse(Grants=filtered, Truncated=False)223 markers = region_details.markers224 in_limit = filtered[:limit]225 out_limit = filtered[limit:]226 marker_id = long_uid()227 markers[marker_id] = out_limit228 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)229 def get_public_key(230 self, context: RequestContext, key_id: KeyIdType, grant_tokens: GrantTokenList = None231 ) -> GetPublicKeyResponse:232 region_details = KMSBackend.get()233 result = region_details.key_pairs.get(key_id)234 if not result:235 raise NotFoundException()236 attrs = [237 "KeyId",238 "PublicKey",239 "KeySpec",240 "KeyUsage",241 "EncryptionAlgorithms",242 "SigningAlgorithms",243 ]244 result = select_attributes(result, attrs)245 return GetPublicKeyResponse(**result)246 @handler("GenerateDataKeyPair", expand=False)247 def generate_data_key_pair(248 self,249 context: RequestContext,250 generate_data_key_pair_request: GenerateDataKeyPairRequest,251 ) -> GenerateDataKeyPairResponse:252 result = _generate_data_key_pair(generate_data_key_pair_request)253 attrs = [254 "PrivateKeyCiphertextBlob",255 "PrivateKeyPlaintext",256 "PublicKey",257 "KeyId",258 "KeyPairSpec",259 ]260 result = select_attributes(result, attrs)261 return GenerateDataKeyPairResponse(**result)262 @handler("GenerateDataKeyPairWithoutPlaintext", expand=False)263 def generate_data_key_pair_without_plaintext(264 self,265 context: RequestContext,266 generate_data_key_pair_without_plaintext_request: GenerateDataKeyPairWithoutPlaintextRequest,267 ) -> GenerateDataKeyPairWithoutPlaintextResponse:268 result = _generate_data_key_pair(generate_data_key_pair_without_plaintext_request)269 result = select_attributes(270 result, ["PrivateKeyCiphertextBlob", "PublicKey", "KeyId", "KeyPairSpec"]271 )272 return GenerateDataKeyPairResponse(**result)273 def sign(274 self,275 context: RequestContext,276 key_id: KeyIdType,277 message: PlaintextType,278 signing_algorithm: SigningAlgorithmSpec,279 message_type: MessageType = None,280 grant_tokens: GrantTokenList = None,281 ) -> SignResponse:282 region_details = KMSBackend.get()283 key_pair = region_details.key_pairs.get(key_id)284 if not key_pair:285 raise NotFoundException(f"Key ID {key_id} not found for signing")286 kwargs = {}287 if signing_algorithm.startswith("RSA"):288 if "PKCS" in signing_algorithm:289 kwargs["padding"] = padding.PKCS1v15()290 elif "PSS" in signing_algorithm:291 kwargs["padding"] = padding.PSS(292 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH293 )294 else:295 LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)296 if "SHA_256" in signing_algorithm:297 kwargs["algorithm"] = hashes.SHA256()298 elif "SHA_384" in signing_algorithm:299 kwargs["algorithm"] = hashes.SHA384()300 elif "SHA_512" in signing_algorithm:301 kwargs["algorithm"] = hashes.SHA512()302 else:303 LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", signing_algorithm)304 if signing_algorithm.startswith("ECDSA"):305 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))306 # generate signature307 signature = key_pair["_key_"].sign(data=message, **kwargs)308 result = {309 "KeyId": key_id,310 "Signature": signature,311 "SigningAlgorithm": signing_algorithm,312 }313 return SignResponse(**result)314 def encrypt(315 self,316 context: RequestContext,317 key_id: KeyIdType,318 plaintext: PlaintextType,319 encryption_context: EncryptionContextType = None,320 grant_tokens: GrantTokenList = None,321 encryption_algorithm: EncryptionAlgorithmSpec = None,322 ) -> EncryptResponse:323 # check if we have imported custom key material for this key324 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]325 if not matching:326 return call_moto(context)327 key_obj = kms_backends[context.region].keys.get(key_id)328 ciphertext_blob = encrypt(key_obj.key_material, plaintext)329 return EncryptResponse(330 CiphertextBlob=ciphertext_blob, KeyId=key_id, EncryptionAlgorithm=encryption_algorithm331 )332 def decrypt(333 self,334 context: RequestContext,335 ciphertext_blob: CiphertextType,336 encryption_context: EncryptionContextType = None,337 grant_tokens: GrantTokenList = None,338 key_id: KeyIdType = None,339 encryption_algorithm: EncryptionAlgorithmSpec = None,340 ) -> DecryptResponse:341 # check if we have imported custom key material for this key342 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]343 if not matching:344 return call_moto(context)345 key_obj = kms_backends[context.region].keys.get(key_id)346 plaintext = decrypt(key_obj.key_material, ciphertext_blob)347 return DecryptResponse(348 KeyId=key_id, Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm349 )350 def get_parameters_for_import(351 self,352 context: RequestContext,353 key_id: KeyIdType,354 wrapping_algorithm: AlgorithmSpec,355 wrapping_key_spec: WrappingKeySpec,356 ) -> GetParametersForImportResponse:357 key = _generate_data_key_pair(358 {"KeySpec": wrapping_key_spec}, create_cipher=False, add_to_keys=False359 )360 import_token = short_uid()361 import_state = KeyImportState(362 key_id=key_id,363 import_token=import_token,364 private_key=key["PrivateKeyPlaintext"],365 public_key=key["PublicKey"],366 wrapping_algo=wrapping_algorithm,367 key_obj=key["_key_"],368 )369 KMSBackend.get().imports[import_token] = import_state370 expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)371 return GetParametersForImportResponse(372 KeyId=key_id,373 ImportToken=to_bytes(import_state.import_token),374 PublicKey=import_state.public_key,375 ParametersValidTo=expiry_date,376 )377 def import_key_material(378 self,379 context: RequestContext,380 key_id: KeyIdType,381 import_token: CiphertextType,382 encrypted_key_material: CiphertextType,383 valid_to: DateType = None,384 expiration_model: ExpirationModelType = None,385 ) -> ImportKeyMaterialResponse:386 import_token = to_str(import_token)387 import_state = KMSBackend.get().imports.get(import_token)388 if not import_state:389 raise NotFoundException(f"Unable to find key import token '{import_token}'")390 key_obj = kms_backends[context.region].keys.get(key_id)391 if not key_obj:392 raise NotFoundException(f"Unable to find key '{key_id}'")393 key_material = import_state.key_obj.decrypt(encrypted_key_material, padding.PKCS1v15())394 key_obj.key_material = key_material395 return ImportKeyMaterialResponse()396 def _verify_key_exists(self, key_id):397 try:398 kms_backends[aws_stack.get_region()].describe_key(key_id)399 except Exception:400 raise ValidationError(f"Invalid key ID '{key_id}'")401 def _validate_grant(self, data: Dict):402 if KEY_ID not in data or GRANTEE_PRINCIPAL not in data or OPERATIONS not in data:403 raise ValidationError("Grant ID, key ID and grantee principal must be specified")404 for operation in data[OPERATIONS]:405 if operation not in VALID_OPERATIONS:406 raise ValidationError(407 f"Value {[OPERATIONS]} at 'operations' failed to satisfy constraint: Member must satisfy"408 f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"409 )410 self._verify_key_exists(data[KEY_ID])411# ---------------412# UTIL FUNCTIONS413# ---------------414def _generate_data_key_pair(data, create_cipher=True, add_to_keys=True):415 region_details = KMSBackend.get()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful