How to use _verify_key_exists method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...149 key_id = list_grants_request.get(KEY_ID)150 if not key_id:151 raise ValidationError(f"Required input parameter '{KEY_ID}' not specified")152 region_details = KMSBackend.get()153 self._verify_key_exists(key_id)154 limit = list_grants_request.get("Limit", 50)155 if "Marker" in list_grants_request:156 filtered = region_details.markers.get(list_grants_request["Marker"], [])157 else:158 filtered = [159 grant160 for grant in region_details.grants.values()161 if grant[KEY_ID] == key_id162 and filter_grant_id(grant, list_grants_request)163 and filter_grantee_principal(grant, list_grants_request)164 ]165 # filter out attributes166 filtered = [remove_attributes(dict(grant), ["GrantTokens"]) for grant in filtered]167 if len(filtered) <= limit:168 return ListGrantsResponse(Grants=filtered, Truncated=False)169 in_limit = filtered[:limit]170 out_limit = filtered[limit:]171 marker_id = long_uid()172 region_details.markers[marker_id] = out_limit173 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)174 def revoke_grant(175 self, context: RequestContext, key_id: KeyIdType, grant_id: GrantIdType176 ) -> None:177 grants = KMSBackend.get().grants178 if grants[grant_id][KEY_ID] != key_id:179 raise ValidationError(f"Invalid {KEY_ID}={key_id} specified for grant {grant_id}")180 grants.pop(grant_id)181 def retire_grant(182 self,183 context: RequestContext,184 grant_token: GrantTokenType = None,185 key_id: KeyIdType = None,186 grant_id: GrantIdType = None,187 ) -> None:188 region_details = KMSBackend.get()189 grants = region_details.grants190 if grant_id and grants[grant_id][KEY_ID] == key_id:191 grants.pop(grant_id)192 elif grant_token:193 region_details.grants = {194 grant_id: grant195 for grant_id, grant in grants.items()196 if grant_token not in grant[GRANT_TOKENS]197 }198 else:199 raise InvalidGrantTokenException("Grant token OR (grant ID, key ID) must be specified")200 def list_retirable_grants(201 self,202 context: RequestContext,203 retiring_principal: PrincipalIdType,204 limit: LimitType = None,205 marker: MarkerType = None,206 ) -> ListGrantsResponse:207 region_details = KMSBackend.get()208 grants = region_details.grants209 if not retiring_principal:210 raise ValidationError(f"Required input parameter '{RETIRING_PRINCIPAL}' not specified")211 limit = limit or 50212 if marker:213 markers = region_details.markers214 filtered = markers.get(marker, [])215 else:216 filtered = [217 grant218 for grant in grants.values()219 if RETIRING_PRINCIPAL in grant and grant[RETIRING_PRINCIPAL] == retiring_principal220 ]221 if len(filtered) <= limit:222 return ListGrantsResponse(Grants=filtered, Truncated=False)223 markers = region_details.markers224 in_limit = filtered[:limit]225 out_limit = filtered[limit:]226 marker_id = long_uid()227 markers[marker_id] = out_limit228 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)229 def get_public_key(230 self, context: RequestContext, key_id: KeyIdType, grant_tokens: GrantTokenList = None231 ) -> GetPublicKeyResponse:232 region_details = KMSBackend.get()233 result = region_details.key_pairs.get(key_id)234 if not result:235 raise NotFoundException()236 attrs = [237 "KeyId",238 "PublicKey",239 "KeySpec",240 "KeyUsage",241 "EncryptionAlgorithms",242 "SigningAlgorithms",243 ]244 result = select_attributes(result, attrs)245 return GetPublicKeyResponse(**result)246 @handler("GenerateDataKeyPair", expand=False)247 def generate_data_key_pair(248 self,249 context: RequestContext,250 generate_data_key_pair_request: GenerateDataKeyPairRequest,251 ) -> GenerateDataKeyPairResponse:252 result = _generate_data_key_pair(generate_data_key_pair_request)253 attrs = [254 "PrivateKeyCiphertextBlob",255 "PrivateKeyPlaintext",256 "PublicKey",257 "KeyId",258 "KeyPairSpec",259 ]260 result = select_attributes(result, attrs)261 return GenerateDataKeyPairResponse(**result)262 @handler("GenerateDataKeyPairWithoutPlaintext", expand=False)263 def generate_data_key_pair_without_plaintext(264 self,265 context: RequestContext,266 generate_data_key_pair_without_plaintext_request: GenerateDataKeyPairWithoutPlaintextRequest,267 ) -> GenerateDataKeyPairWithoutPlaintextResponse:268 result = _generate_data_key_pair(generate_data_key_pair_without_plaintext_request)269 result = select_attributes(270 result, ["PrivateKeyCiphertextBlob", "PublicKey", "KeyId", "KeyPairSpec"]271 )272 return GenerateDataKeyPairResponse(**result)273 def sign(274 self,275 context: RequestContext,276 key_id: KeyIdType,277 message: PlaintextType,278 signing_algorithm: SigningAlgorithmSpec,279 message_type: MessageType = None,280 grant_tokens: GrantTokenList = None,281 ) -> SignResponse:282 region_details = KMSBackend.get()283 key_pair = region_details.key_pairs.get(key_id)284 if not key_pair:285 raise NotFoundException(f"Key ID {key_id} not found for signing")286 kwargs = {}287 if signing_algorithm.startswith("RSA"):288 if "PKCS" in signing_algorithm:289 kwargs["padding"] = padding.PKCS1v15()290 elif "PSS" in signing_algorithm:291 kwargs["padding"] = padding.PSS(292 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH293 )294 else:295 LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)296 if "SHA_256" in signing_algorithm:297 kwargs["algorithm"] = hashes.SHA256()298 elif "SHA_384" in signing_algorithm:299 kwargs["algorithm"] = hashes.SHA384()300 elif "SHA_512" in signing_algorithm:301 kwargs["algorithm"] = hashes.SHA512()302 else:303 LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", signing_algorithm)304 if signing_algorithm.startswith("ECDSA"):305 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))306 # generate signature307 signature = key_pair["_key_"].sign(data=message, **kwargs)308 result = {309 "KeyId": key_id,310 "Signature": signature,311 "SigningAlgorithm": signing_algorithm,312 }313 return SignResponse(**result)314 def encrypt(315 self,316 context: RequestContext,317 key_id: KeyIdType,318 plaintext: PlaintextType,319 encryption_context: EncryptionContextType = None,320 grant_tokens: GrantTokenList = None,321 encryption_algorithm: EncryptionAlgorithmSpec = None,322 ) -> EncryptResponse:323 # check if we have imported custom key material for this key324 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]325 if not matching:326 return call_moto(context)327 key_obj = kms_backends[context.region].keys.get(key_id)328 ciphertext_blob = encrypt(key_obj.key_material, plaintext)329 return EncryptResponse(330 CiphertextBlob=ciphertext_blob, KeyId=key_id, EncryptionAlgorithm=encryption_algorithm331 )332 def decrypt(333 self,334 context: RequestContext,335 ciphertext_blob: CiphertextType,336 encryption_context: EncryptionContextType = None,337 grant_tokens: GrantTokenList = None,338 key_id: KeyIdType = None,339 encryption_algorithm: EncryptionAlgorithmSpec = None,340 ) -> DecryptResponse:341 # check if we have imported custom key material for this key342 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]343 if not matching:344 return call_moto(context)345 key_obj = kms_backends[context.region].keys.get(key_id)346 plaintext = decrypt(key_obj.key_material, ciphertext_blob)347 return DecryptResponse(348 KeyId=key_id, Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm349 )350 def get_parameters_for_import(351 self,352 context: RequestContext,353 key_id: KeyIdType,354 wrapping_algorithm: AlgorithmSpec,355 wrapping_key_spec: WrappingKeySpec,356 ) -> GetParametersForImportResponse:357 key = _generate_data_key_pair(358 {"KeySpec": wrapping_key_spec}, create_cipher=False, add_to_keys=False359 )360 import_token = short_uid()361 import_state = KeyImportState(362 key_id=key_id,363 import_token=import_token,364 private_key=key["PrivateKeyPlaintext"],365 public_key=key["PublicKey"],366 wrapping_algo=wrapping_algorithm,367 key_obj=key["_key_"],368 )369 KMSBackend.get().imports[import_token] = import_state370 expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)371 return GetParametersForImportResponse(372 KeyId=key_id,373 ImportToken=to_bytes(import_state.import_token),374 PublicKey=import_state.public_key,375 ParametersValidTo=expiry_date,376 )377 def import_key_material(378 self,379 context: RequestContext,380 key_id: KeyIdType,381 import_token: CiphertextType,382 encrypted_key_material: CiphertextType,383 valid_to: DateType = None,384 expiration_model: ExpirationModelType = None,385 ) -> ImportKeyMaterialResponse:386 import_token = to_str(import_token)387 import_state = KMSBackend.get().imports.get(import_token)388 if not import_state:389 raise NotFoundException(f"Unable to find key import token '{import_token}'")390 key_obj = kms_backends[context.region].keys.get(key_id)391 if not key_obj:392 raise NotFoundException(f"Unable to find key '{key_id}'")393 key_material = import_state.key_obj.decrypt(encrypted_key_material, padding.PKCS1v15())394 key_obj.key_material = key_material395 return ImportKeyMaterialResponse()396 def _verify_key_exists(self, key_id):397 try:398 kms_backends[aws_stack.get_region()].describe_key(key_id)399 except Exception:400 raise ValidationError(f"Invalid key ID '{key_id}'")401 def _validate_grant(self, data: Dict):402 if KEY_ID not in data or GRANTEE_PRINCIPAL not in data or OPERATIONS not in data:403 raise ValidationError("Grant ID, key ID and grantee principal must be specified")404 for operation in data[OPERATIONS]:405 if operation not in VALID_OPERATIONS:406 raise ValidationError(407 f"Value {[OPERATIONS]} at 'operations' failed to satisfy constraint: Member must satisfy"408 f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"409 )410 self._verify_key_exists(data[KEY_ID])411# ---------------412# UTIL FUNCTIONS413# ---------------414def _generate_data_key_pair(data, create_cipher=True, add_to_keys=True):415 region_details = KMSBackend.get()416 kms = aws_stack.connect_to_service("kms")417 key_id = data.get("KeyId")418 key_spec = data.get("KeyPairSpec") or data.get("KeySpec") or data.get("CustomerMasterKeySpec")419 key = None420 public_format = None421 if key_spec.startswith("RSA"):422 rsa_key_sizes = {423 "RSA_2048": 2048,424 "RSA_3072": 3072,...

Full Screen

Full Screen

py_convert_alert.py

Source:py_convert_alert.py Github

copy

Full Screen

...87 if not self._check_for_labels():88 raise(MissingLabelsKey(f"mapper must contain 'labels' key at "89 f"outer most level: {self.mapper}"))90 return self.mapper91 def _verify_key_exists(self, key, lookup_dict):92 """93 Return True if the specified key exists in our lookup_dict.94 This is a protected method and should not be used outside of the public95 convert_it method. This method is responsible for checking to see if we96 get a hit on a specified key in a lookup on our dict. This helps us97 ensure that during conversion, if any labels annotations or otherwise98 will be unaffected by the conversion and thusly not return empty lists.99 Parameters100 ----------101 key : str102 The key we are looking up in lookup_dict.103 lookup_dict : str104 The lookup_dict is in our case an alert from an arbitrary alerting105 system to be converted to an alert that Alert manager will accept.106 Returns107 -------108 exists : boolean109 Simple flag, True or False, to let us know if our search was110 successful.111 """112 exists = False113 if get_occurrence_of_key(lookup_dict, key) > 0:114 exists = True115 return exists116 def _key_list_search(self, keys_list, lookup_dict):117 """118 Return the final value returned after iterating over keys_list.119 In order to handle situations where a key might exist twice in the120 alert on which we run our lookup, we can specify a list of keys to121 iterate over. Essentially, if a list is provided as the value to one of122 our corresponding alert manager alert keys, we will search for the123 first key in the list, return it's value, then search for the next key124 etc. Finally when we reach the end of our list, the value returned125 should be the result of all searches. This is a protected method and126 should only be used in the public convert_it method.127 Parameters128 ----------129 keys_list : list of str130 This is the list of keys to drill down on. It will be iterated over131 until we reach the end of the list, providing the more specific key132 lookup to prevent multiple values for a single lookup.133 lookup_dict : dict134 This is the arbitrary alert that we are searching through for keys135 to add to our alert manager alert.136 Returns137 -------138 value : str139 This value should be the result of all of the searches for the keys140 provided.141 """142 for index, key in enumerate(keys_list):143 result = nested_lookup(key, lookup_dict)144 try:145 value = nested_lookup(keys_list[index + 1], result)146 except IndexError:147 pass148 return value149 def _add_found_values(self, transform_dict, transform_key,150 lookup_key, lookup_dict):151 """152 Set value for the specified key in our transform dict to our lookup.153 This is a protected method that shouldn't be used outside of the154 public convert_it method. This method is responsible for all of our155 lookups and assigning the corresponding found value to the appropriate156 key in our alert manager alert.157 Parameters158 ----------159 transform_dict : dict160 The transform_dict is our self.mapping values. The dict is161 transformed by the iterating over it in the convert_it method and162 adding values here.163 transform_key : str164 This is the key we want to target in our transform dict. Ultimately165 the value returne from the lookup will be assigned as the value of166 this key.167 lookup_key : str168 This is the key we are looking up in our arbitrary alert json169 structure to be assigned as the value of the transform_key.170 lookup_dict : dict171 This is the arbitrary alert we are searching for values to be172 assigned to our transform_dict.173 Returns174 -------175 transform_dict : dict176 Return the modified transform dict back to the convert_it method.177 This transform dict should now have the keys we specified in our178 mapper. There corresponding values should now be changed to the179 result of searching for what was the key's previous value (our180 lookup_key is replaced by an actual value).181 """182 try:183 if self._verify_key_exists(lookup_key, lookup_dict):184 transform_dict[transform_key] = \185 ''.join(nested_lookup(lookup_key, lookup_dict))186 except TypeError:187 pass188 if isinstance(lookup_key, list):189 transform_dict[transform_key] = \190 ''.join(self._key_list_search(lookup_key, lookup_dict))191 return transform_dict192 def _map_to_mapper(self, convert_alert, alert):193 for key, value in convert_alert.items():194 if isinstance(value, dict):195 for k, v in value.items():196 value = self._add_found_values(value, k, v, alert)197 return value...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful