How to use wait_for_delivery_stream_ready method in localstack

Best Python code snippet using localstack_python

fixtures.py

Source:fixtures.py Github

copy

Full Screen

...583 ]584 poll_condition(is_stream_ready)585 return _wait_for_stream_ready586@pytest.fixture587def wait_for_delivery_stream_ready(firehose_client):588 def _wait_for_stream_ready(delivery_stream_name: str):589 def is_stream_ready():590 describe_stream_response = firehose_client.describe_delivery_stream(591 DeliveryStreamName=delivery_stream_name592 )593 return (594 describe_stream_response["DeliveryStreamDescription"]["DeliveryStreamStatus"]595 == "ACTIVE"596 )597 poll_condition(is_stream_ready)598 return _wait_for_stream_ready599@pytest.fixture600def wait_for_dynamodb_stream_ready(dynamodbstreams_client):601 def _wait_for_stream_ready(stream_arn: str):602 def is_stream_ready():603 describe_stream_response = dynamodbstreams_client.describe_stream(StreamArn=stream_arn)604 return describe_stream_response["StreamDescription"]["StreamStatus"] == "ENABLED"605 poll_condition(is_stream_ready)606 return _wait_for_stream_ready607@pytest.fixture()608def kms_create_key(kms_client):609 key_ids = []610 def _create_key(**kwargs):611 if "Description" not in kwargs:612 kwargs["Description"] = f"test description - {short_uid()}"613 if "KeyUsage" not in kwargs:614 kwargs["KeyUsage"] = "ENCRYPT_DECRYPT"615 key_metadata = kms_client.create_key(**kwargs)["KeyMetadata"]616 key_ids.append(key_metadata["KeyId"])617 return key_metadata618 yield _create_key619 for key_id in key_ids:620 try:621 kms_client.schedule_key_deletion(KeyId=key_id)622 except Exception as e:623 LOG.debug("error cleaning up KMS key %s: %s", key_id, e)624# kms_create_key fixture is used here not just to be able to create aliases without a key specified,625# but also to make sure that kms_create_key gets executed before and teared down after kms_create_alias -626# to make sure that we clean up aliases before keys get cleaned up.627@pytest.fixture()628def kms_create_alias(kms_client, kms_create_key):629 aliases = []630 def _create_alias(**kwargs):631 if "AliasName" not in kwargs:632 kwargs["AliasName"] = f"alias/{short_uid()}"633 if "TargetKeyId" not in kwargs:634 kwargs["TargetKeyId"] = kms_create_key()["KeyId"]635 kms_client.create_alias(**kwargs)636 aliases.append(kwargs["AliasName"])637 return kwargs["AliasName"]638 yield _create_alias639 for alias in aliases:640 try:641 kms_client.delete_alias(AliasName=alias)642 except Exception as e:643 LOG.debug("error cleaning up KMS alias %s: %s", alias, e)644@pytest.fixture645def kms_key(kms_create_key):646 return kms_create_key()647@pytest.fixture648def kms_grant_and_key(kms_client, kms_key, sts_client):649 user_arn = sts_client.get_caller_identity()["Arn"]650 return [651 kms_client.create_grant(652 KeyId=kms_key["KeyId"],653 GranteePrincipal=user_arn,654 Operations=["Decrypt", "Encrypt"],655 ),656 kms_key,657 ]658@pytest.fixture659def opensearch_wait_for_cluster(opensearch_client):660 def _wait_for_cluster(domain_name: str):661 def finished_processing():662 status = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"]663 return status["Processing"] is False664 assert poll_condition(665 finished_processing, timeout=5 * 60666 ), f"could not start domain: {domain_name}"667 return _wait_for_cluster668@pytest.fixture669def opensearch_create_domain(opensearch_client, opensearch_wait_for_cluster):670 domains = []671 def factory(**kwargs) -> str:672 if "DomainName" not in kwargs:673 kwargs["DomainName"] = f"test-domain-{short_uid()}"674 opensearch_client.create_domain(**kwargs)675 opensearch_wait_for_cluster(domain_name=kwargs["DomainName"])676 domains.append(kwargs["DomainName"])677 return kwargs["DomainName"]678 yield factory679 # cleanup680 for domain in domains:681 try:682 opensearch_client.delete_domain(DomainName=domain)683 except Exception as e:684 LOG.debug("error cleaning up domain %s: %s", domain, e)685@pytest.fixture686def opensearch_domain(opensearch_create_domain) -> str:687 return opensearch_create_domain()688@pytest.fixture689def opensearch_endpoint(opensearch_client, opensearch_domain) -> str:690 status = opensearch_client.describe_domain(DomainName=opensearch_domain)["DomainStatus"]691 assert "Endpoint" in status692 return f"https://{status['Endpoint']}"693@pytest.fixture694def opensearch_document_path(opensearch_client, opensearch_endpoint):695 document = {696 "first_name": "Boba",697 "last_name": "Fett",698 "age": 41,699 "about": "I'm just a simple man, trying to make my way in the universe.",700 "interests": ["mandalorian armor", "tusken culture"],701 }702 document_path = f"{opensearch_endpoint}/bounty/hunters/1"703 response = requests.put(704 document_path,705 data=json.dumps(document),706 headers={"content-type": "application/json", "Accept-encoding": "identity"},707 )708 assert response.status_code == 201, f"could not create document at: {document_path}"709 return document_path710# Cleanup fixtures711@pytest.fixture712def cleanup_stacks(cfn_client):713 def _cleanup_stacks(stacks: List[str]) -> None:714 stacks = ensure_list(stacks)715 for stack in stacks:716 try:717 cfn_client.delete_stack(StackName=stack)718 except Exception:719 LOG.debug(f"Failed to cleanup stack '{stack}'")720 return _cleanup_stacks721@pytest.fixture722def cleanup_changesets(cfn_client):723 def _cleanup_changesets(changesets: List[str]) -> None:724 changesets = ensure_list(changesets)725 for cs in changesets:726 try:727 cfn_client.delete_change_set(ChangeSetName=cs)728 except Exception:729 LOG.debug(f"Failed to cleanup changeset '{cs}'")730 return _cleanup_changesets731# Helpers for Cfn732# TODO: exports(!)733@dataclasses.dataclass(frozen=True)734class DeployResult:735 change_set_id: str736 stack_id: str737 stack_name: str738 change_set_name: str739 outputs: Dict[str, str]740 destroy: Callable[[], None]741@pytest.fixture742def deploy_cfn_template(743 cfn_client,744 lambda_client,745 cleanup_stacks,746 cleanup_changesets,747 is_change_set_created_and_available,748 is_change_set_finished,749):750 state = []751 def _deploy(752 *,753 is_update: Optional[bool] = False,754 stack_name: Optional[str] = None,755 change_set_name: Optional[str] = None,756 template: Optional[str] = None,757 template_path: Optional[str | os.PathLike] = None,758 template_mapping: Optional[Dict[str, any]] = None,759 parameters: Optional[Dict[str, str]] = None,760 max_wait: Optional[int] = None,761 ) -> DeployResult:762 if is_update:763 assert stack_name764 stack_name = stack_name or f"stack-{short_uid()}"765 change_set_name = change_set_name or f"change-set-{short_uid()}"766 if template_path is not None:767 template = load_template_file(template_path)768 template_rendered = render_template(template, **(template_mapping or {}))769 response = cfn_client.create_change_set(770 StackName=stack_name,771 ChangeSetName=change_set_name,772 TemplateBody=template_rendered,773 Capabilities=["CAPABILITY_AUTO_EXPAND", "CAPABILITY_IAM", "CAPABILITY_NAMED_IAM"],774 ChangeSetType=("UPDATE" if is_update else "CREATE"),775 Parameters=[776 {777 "ParameterKey": k,778 "ParameterValue": v,779 }780 for (k, v) in (parameters or {}).items()781 ],782 )783 change_set_id = response["Id"]784 stack_id = response["StackId"]785 assert wait_until(is_change_set_created_and_available(change_set_id), _max_wait=60)786 cfn_client.execute_change_set(ChangeSetName=change_set_id)787 # TODO: potentially poll for ExecutionStatus=ROLLBACK_COMPLETE here as well, to catch errors early on788 assert wait_until(is_change_set_finished(change_set_id), _max_wait=max_wait or 60)789 outputs = cfn_client.describe_stacks(StackName=stack_id)["Stacks"][0].get("Outputs", [])790 mapped_outputs = {o["OutputKey"]: o["OutputValue"] for o in outputs}791 state.append({"stack_id": stack_id, "change_set_id": change_set_id})792 def _destroy_stack():793 cfn_client.delete_stack(StackName=stack_id)794 def _await_stack_delete():795 return (796 cfn_client.describe_stacks(StackName=stack_id)["Stacks"][0]["StackStatus"]797 == "DELETE_COMPLETE"798 )799 assert wait_until(_await_stack_delete, _max_wait=max_wait or 60)800 # TODO: fix in localstack. stack should only be in DELETE_COMPLETE state after all resources have been deleted801 time.sleep(2)802 return DeployResult(803 change_set_id, stack_id, stack_name, change_set_name, mapped_outputs, _destroy_stack804 )805 yield _deploy806 for entry in state:807 entry_stack_id = entry.get("stack_id")808 entry_change_set_id = entry.get("change_set_id")809 try:810 entry_change_set_id and cleanup_changesets([entry_change_set_id])811 entry_stack_id and cleanup_stacks([entry_stack_id])812 except Exception as e:813 LOG.debug(814 f"Failed cleaning up change set {entry_change_set_id=} and stack {entry_stack_id=}: {e}"815 )816@pytest.fixture817def is_change_set_created_and_available(cfn_client):818 def _is_change_set_created_and_available(change_set_id: str):819 def _inner():820 change_set = cfn_client.describe_change_set(ChangeSetName=change_set_id)821 return (822 # TODO: CREATE_FAILED should also not lead to further retries823 change_set.get("Status") == "CREATE_COMPLETE"824 and change_set.get("ExecutionStatus") == "AVAILABLE"825 )826 return _inner827 return _is_change_set_created_and_available828@pytest.fixture829def is_change_set_failed_and_unavailable(cfn_client):830 def _is_change_set_created_and_available(change_set_id: str):831 def _inner():832 change_set = cfn_client.describe_change_set(ChangeSetName=change_set_id)833 return (834 # TODO: CREATE_FAILED should also not lead to further retries835 change_set.get("Status") == "FAILED"836 and change_set.get("ExecutionStatus") == "UNAVAILABLE"837 )838 return _inner839 return _is_change_set_created_and_available840@pytest.fixture841def is_stack_created(cfn_client):842 return _has_stack_status(cfn_client, ["CREATE_COMPLETE", "CREATE_FAILED"])843@pytest.fixture844def is_stack_updated(cfn_client):845 return _has_stack_status(cfn_client, ["UPDATE_COMPLETE", "UPDATE_FAILED"])846@pytest.fixture847def is_stack_deleted(cfn_client):848 return _has_stack_status(cfn_client, ["DELETE_COMPLETE"])849def _has_stack_status(cfn_client, statuses: List[str]):850 def _has_status(stack_id: str):851 def _inner():852 resp = cfn_client.describe_stacks(StackName=stack_id)853 s = resp["Stacks"][0] # since the lookup uses the id we can only get a single response854 return s.get("StackStatus") in statuses855 return _inner856 return _has_status857@pytest.fixture858def is_change_set_finished(cfn_client):859 def _is_change_set_finished(change_set_id: str, stack_name: Optional[str] = None):860 def _inner():861 kwargs = {"ChangeSetName": change_set_id}862 if stack_name:863 kwargs["StackName"] = stack_name864 check_set = cfn_client.describe_change_set(**kwargs)865 return check_set.get("ExecutionStatus") == "EXECUTE_COMPLETE"866 return _inner867 return _is_change_set_finished868@pytest.fixture869def wait_until_lambda_ready(lambda_client):870 def _wait_until_ready(function_name: str, qualifier: str = None, client=None):871 client = client or lambda_client872 def _is_not_pending():873 kwargs = {}874 if qualifier:875 kwargs["Qualifier"] = qualifier876 try:877 result = (878 client.get_function(FunctionName=function_name)["Configuration"]["State"]879 != "Pending"880 )881 LOG.debug(f"lambda state result: {result=}")882 return result883 except Exception as e:884 LOG.error(e)885 raise886 wait_until(_is_not_pending)887 return _wait_until_ready888role_assume_policy = """889{890 "Version": "2012-10-17",891 "Statement": [892 {893 "Effect": "Allow",894 "Principal": {895 "Service": "lambda.amazonaws.com"896 },897 "Action": "sts:AssumeRole"898 }899 ]900}901""".strip()902role_policy = """903{904 "Version": "2012-10-17",905 "Statement": [906 {907 "Effect": "Allow",908 "Action": [909 "logs:CreateLogGroup",910 "logs:CreateLogStream",911 "logs:PutLogEvents"912 ],913 "Resource": [914 "*"915 ]916 }917 ]918}919""".strip()920@pytest.fixture921def create_lambda_function(lambda_client, logs_client, iam_client, wait_until_lambda_ready):922 lambda_arns_and_clients = []923 role_names_policy_arns = []924 log_groups = []925 def _create_lambda_function(*args, **kwargs):926 client = kwargs.get("client") or lambda_client927 kwargs["client"] = client928 func_name = kwargs.get("func_name")929 assert func_name930 del kwargs["func_name"]931 if not kwargs.get("role"):932 role_name = f"lambda-autogenerated-{short_uid()}"933 role = iam_client.create_role(934 RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy935 )["Role"]936 policy_name = f"lambda-autogenerated-{short_uid()}"937 policy_arn = iam_client.create_policy(938 PolicyName=policy_name, PolicyDocument=role_policy939 )["Policy"]["Arn"]940 role_names_policy_arns.append((role_name, policy_arn))941 iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)942 kwargs["role"] = role["Arn"]943 def _create_function():944 resp = testutil.create_lambda_function(func_name, **kwargs)945 lambda_arns_and_clients.append((resp["CreateFunctionResponse"]["FunctionArn"], client))946 wait_until_lambda_ready(function_name=func_name, client=client)947 log_group_name = f"/aws/lambda/{func_name}"948 log_groups.append(log_group_name)949 return resp950 # @AWS, takes about 10s until the role/policy is "active", until then it will fail951 # localstack should normally not require the retries and will just continue here952 return retry(_create_function, retries=3, sleep=4)953 yield _create_lambda_function954 for arn, client in lambda_arns_and_clients:955 try:956 client.delete_function(FunctionName=arn)957 except Exception:958 LOG.debug(f"Unable to delete function {arn=} in cleanup")959 for role_name, policy_arn in role_names_policy_arns:960 try:961 iam_client.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn)962 iam_client.delete_role(RoleName=role_name)963 except Exception:964 LOG.debug(f"Unable to delete role {role_name=} in cleanup")965 try:966 iam_client.delete_policy(PolicyArn=policy_arn)967 except Exception:968 LOG.debug(f"Unable to delete policy {policy_arn=} in cleanup")969 for log_group_name in log_groups:970 try:971 logs_client.delete_log_group(logGroupName=log_group_name)972 except Exception:973 LOG.debug(f"Unable to delete log group {log_group_name} in cleanup")974@pytest.fixture975def check_lambda_logs(logs_client):976 def _check_logs(func_name: str, expected_lines: List[str] = None):977 if not expected_lines:978 expected_lines = []979 log_events = get_lambda_logs(func_name, logs_client=logs_client)980 log_messages = [e["message"] for e in log_events]981 for line in expected_lines:982 if ".*" in line:983 found = [re.match(line, m, flags=re.DOTALL) for m in log_messages]984 if any(found):985 continue986 assert line in log_messages987 return _check_logs988@pytest.fixture989def create_policy(iam_client):990 policy_arns = []991 def _create_policy(*args, **kwargs):992 if "PolicyName" not in kwargs:993 kwargs["PolicyName"] = f"policy-{short_uid()}"994 response = iam_client.create_policy(*args, **kwargs)995 policy_arn = response["Policy"]["Arn"]996 policy_arns.append(policy_arn)997 return response998 yield _create_policy999 for policy_arn in policy_arns:1000 try:1001 iam_client.delete_policy(PolicyArn=policy_arn)1002 except Exception:1003 LOG.debug("Could not delete policy '%s' during test cleanup", policy_arn)1004@pytest.fixture1005def create_user(iam_client):1006 usernames = []1007 def _create_user(**kwargs):1008 if "UserName" not in kwargs:1009 kwargs["UserName"] = f"user-{short_uid()}"1010 response = iam_client.create_user(**kwargs)1011 usernames.append(response["User"]["UserName"])1012 return response1013 yield _create_user1014 for username in usernames:1015 inline_policies = iam_client.list_user_policies(UserName=username)["PolicyNames"]1016 for inline_policy in inline_policies:1017 try:1018 iam_client.delete_user_policy(UserName=username, PolicyName=inline_policy)1019 except Exception:1020 LOG.debug(1021 "Could not delete user policy '%s' from '%s' during cleanup",1022 inline_policy,1023 username,1024 )1025 attached_policies = iam_client.list_attached_user_policies(UserName=username)[1026 "AttachedPolicies"1027 ]1028 for attached_policy in attached_policies:1029 try:1030 iam_client.detach_user_policy(1031 UserName=username, PolicyArn=attached_policy["PolicyArn"]1032 )1033 except Exception:1034 LOG.debug(1035 "Error detaching policy '%s' from user '%s'",1036 attached_policy["PolicyArn"],1037 username,1038 )1039 try:1040 iam_client.delete_user(UserName=username)1041 except Exception:1042 LOG.debug("Error deleting user '%s' during test cleanup", username)1043@pytest.fixture1044def wait_and_assume_role(sts_client):1045 def _wait_and_assume_role(role_arn: str, session_name: str = None):1046 if not session_name:1047 session_name = f"session-{short_uid()}"1048 def assume_role():1049 return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)[1050 "Credentials"1051 ]1052 # need to retry a couple of times before we are allowed to assume this role in AWS1053 keys = retry(assume_role, sleep=5, retries=4)1054 return keys1055 return _wait_and_assume_role1056@pytest.fixture1057def create_role(iam_client):1058 role_names = []1059 def _create_role(**kwargs):1060 if not kwargs.get("RoleName"):1061 kwargs["RoleName"] = f"role-{short_uid()}"1062 result = iam_client.create_role(**kwargs)1063 role_names.append(result["Role"]["RoleName"])1064 return result1065 yield _create_role1066 for role_name in role_names:1067 # detach policies1068 attached_policies = iam_client.list_attached_role_policies(RoleName=role_name)[1069 "AttachedPolicies"1070 ]1071 for attached_policy in attached_policies:1072 try:1073 iam_client.detach_role_policy(1074 RoleName=role_name, PolicyArn=attached_policy["PolicyArn"]1075 )1076 except Exception:1077 LOG.debug(1078 "Could not detach role policy '%s' from '%s' during cleanup",1079 attached_policy["PolicyArn"],1080 role_name,1081 )1082 role_policies = iam_client.list_role_policies(RoleName=role_name)["PolicyNames"]1083 for role_policy in role_policies:1084 try:1085 iam_client.delete_role_policy(RoleName=role_name, PolicyName=role_policy)1086 except Exception:1087 LOG.debug(1088 "Could not delete role policy '%s' from '%s' during cleanup",1089 role_policy,1090 role_name,1091 )1092 try:1093 iam_client.delete_role(RoleName=role_name)1094 except Exception:1095 LOG.debug("Could not delete role '%s' during cleanup", role_name)1096@pytest.fixture1097def create_parameter(ssm_client):1098 params = []1099 def _create_parameter(**kwargs):1100 params.append(kwargs["Name"])1101 return ssm_client.put_parameter(**kwargs)1102 yield _create_parameter1103 for param in params:1104 ssm_client.delete_parameter(Name=param)1105@pytest.fixture1106def create_secret(secretsmanager_client):1107 items = []1108 def _create_parameter(**kwargs):1109 create_response = secretsmanager_client.create_secret(**kwargs)1110 items.append(create_response["ARN"])1111 return create_response1112 yield _create_parameter1113 for item in items:1114 secretsmanager_client.delete_secret(SecretId=item)1115# TODO Figure out how to make cert creation tests pass against AWS.1116#1117# We would like to have localstack tests to pass not just against localstack, but also against AWS to make sure1118# our emulation is correct. Unfortunately, with certificate creation there are some issues.1119#1120# In AWS newly created ACM certificates have to be validated either by email or by DNS. The latter is1121# by adding some CNAME records as requested by ASW in response to a certificate request.1122# For testing purposes the DNS one seems to be easier, at least as long as DNS is handled by Region53 AWS DNS service.1123#1124# The other possible option is to use IAM certificates instead of ACM ones. Those just have to be uploaded from files1125# created by openssl etc. Not sure if there are other issues after that.1126#1127# The third option might be having in AWS some certificates created in advance - so they do not require validation1128# and can be easily used in tests. The issie with such an approach is that for AppSync, for example, in order to1129# register a domain name (https://docs.aws.amazon.com/appsync/latest/APIReference/API_CreateDomainName.html),1130# the domain name in the API request has to match the domain name used in certificate creation. Which means that with1131# pre-created certificates we would have to use specific domain names instead of random ones.1132@pytest.fixture1133def acm_request_certificate():1134 certificate_arns = []1135 def factory(**kwargs) -> str:1136 if "DomainName" not in kwargs:1137 kwargs["DomainName"] = f"test-domain-{short_uid()}.localhost.localstack.cloud"1138 region_name = kwargs.pop("region_name", None)1139 acm_client = _client("acm", region_name)1140 response = acm_client.request_certificate(**kwargs)1141 created_certificate_arn = response["CertificateArn"]1142 certificate_arns.append((created_certificate_arn, region_name))1143 return created_certificate_arn1144 yield factory1145 # cleanup1146 for certificate_arn, region_name in certificate_arns:1147 try:1148 acm_client = _client("acm", region_name)1149 acm_client.delete_certificate(CertificateArn=certificate_arn)1150 except Exception as e:1151 LOG.debug("error cleaning up certificate %s: %s", certificate_arn, e)1152@pytest.fixture1153def tmp_http_server():1154 test_port, invocations, proxy = start_http_server()1155 yield test_port, invocations, proxy1156 proxy.stop()1157role_policy_su = {1158 "Version": "2012-10-17",1159 "Statement": [{"Effect": "Allow", "Action": ["*"], "Resource": ["*"]}],1160}1161@pytest.fixture(scope="session")1162def lambda_su_role():1163 iam_client: IAMClient = _client("iam")1164 role_name = f"lambda-autogenerated-{short_uid()}"1165 role = iam_client.create_role(RoleName=role_name, AssumeRolePolicyDocument=role_assume_policy)[1166 "Role"1167 ]1168 policy_name = f"lambda-autogenerated-{short_uid()}"1169 policy_arn = iam_client.create_policy(1170 PolicyName=policy_name, PolicyDocument=json.dumps(role_policy_su)1171 )["Policy"]["Arn"]1172 iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)1173 if os.environ.get("TEST_TARGET", "") == "AWS_CLOUD": # dirty but necessary1174 time.sleep(10)1175 yield role["Arn"]1176 run_safe(iam_client.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn))1177 run_safe(iam_client.delete_role(RoleName=role_name))1178 run_safe(iam_client.delete_policy(PolicyArn=policy_arn))1179@pytest.fixture1180def create_iam_role_with_policy(iam_client):1181 roles = {}1182 def _create_role_and_policy(**kwargs):1183 role = kwargs["RoleName"]1184 policy = kwargs["PolicyName"]1185 role_policy = json.dumps(kwargs["RoleDefinition"])1186 result = iam_client.create_role(RoleName=role, AssumeRolePolicyDocument=role_policy)1187 role_arn = result["Role"]["Arn"]1188 policy_document = json.dumps(kwargs["PolicyDefinition"])1189 iam_client.put_role_policy(RoleName=role, PolicyName=policy, PolicyDocument=policy_document)1190 roles[role] = policy1191 return role_arn1192 yield _create_role_and_policy1193 for role_name, policy_name in roles.items():1194 iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name)1195 iam_client.delete_role(RoleName=role_name)1196@pytest.fixture1197def firehose_create_delivery_stream(firehose_client, wait_for_delivery_stream_ready):1198 delivery_streams = {}1199 def _create_delivery_stream(**kwargs):1200 if "DeliveryStreamName" not in kwargs:1201 kwargs["DeliveryStreamName"] = f"test-delivery-stream-{short_uid()}"1202 delivery_stream = firehose_client.create_delivery_stream(**kwargs)1203 delivery_streams.update({kwargs["DeliveryStreamName"]: delivery_stream})1204 wait_for_delivery_stream_ready(kwargs["DeliveryStreamName"])1205 return delivery_stream1206 yield _create_delivery_stream1207 for delivery_stream_name in delivery_streams.keys():1208 firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)1209@pytest.fixture1210def events_create_rule(events_client):1211 rules = []1212 def _create_rule(**kwargs):1213 rule_name = kwargs["Name"]1214 bus_name = kwargs.get("EventBusName", "")1215 pattern = kwargs.get("EventPattern", {})1216 schedule = kwargs.get("ScheduleExpression", "")1217 rule_arn = events_client.put_rule(1218 Name=rule_name,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful