How to use describe_replay method in localstack

Best Python code snippet using localstack_python

responses.py

Source:responses.py Github

copy

Full Screen

...283 result = self.events_backend.start_replay(284 name, description, source_arn, start_time, end_time, destination285 )286 return json.dumps(result), self.response_headers287 def describe_replay(self):288 name = self._get_param("ReplayName")289 result = self.events_backend.describe_replay(name)290 return json.dumps(result), self.response_headers291 def list_replays(self):292 name_prefix = self._get_param("NamePrefix")293 source_arn = self._get_param("EventSourceArn")294 state = self._get_param("State")295 result = self.events_backend.list_replays(name_prefix, source_arn, state)296 return json.dumps({"Replays": result}), self.response_headers297 def cancel_replay(self):298 name = self._get_param("ReplayName")299 result = self.events_backend.cancel_replay(name)...

Full Screen

Full Screen

integration.py

Source:integration.py Github

copy

Full Screen

1import boto32import botocore3import json4import config5import pprint, operator6import res.utils as utils7import res.glob as glob8# =======================================================================================================================9#10# Supported services : Amazon MQ, Simple Notification Service (SNS), Simple Queue Service (SQS), Step functions,11# Amazon AppFlow, Amazon EventBridge SWF12# Unsupported services : Apache Airflow 13#14# =======================================================================================================================15# ------------------------------------------------------------------------16#17# Simple Queue Service. Not sure that the API works well (strange responses in aws cli)18#19# ------------------------------------------------------------------------20def get_sqs_inventory(oId, profile, boto3_config, selected_regions):21 """22 Returns Simple Queue Service (SQS) details23 :param oId: ownerId (AWS account)24 :type oId: string25 :param profile: configuration profile name used for session26 :type profile: string27 :return: Simple Queue Service (SQS) inventory28 :rtype: json29 ..note:: http://boto3.readthedocs.io/en/latest/reference/services/sqs.html30 """ 31 32 return glob.get_inventory(33 ownerId = oId,34 profile = profile,35 boto3_config = boto3_config,36 selected_regions = selected_regions,37 aws_service = "sqs", 38 aws_region = "all", 39 function_name = "list_queues", 40 key_get = "QueueUrls",41 detail_function = "get_queue_attributes", 42 join_key = "QueueUrl", 43 detail_join_key = "QueueUrl", 44 detail_get_key = "Attributes"45 )46# ------------------------------------------------------------------------47#48# Amazon MQ49#50# ------------------------------------------------------------------------51def get_mq_inventory(oId, profile, boto3_config, selected_regions):52 """53 Returns Amazon MQ details54 :param oId: ownerId (AWS account)55 :type oId: string56 :param profile: configuration profile name used for session57 :type profile: string58 :return: Amazon MQ inventory59 :rtype: json60 ..note:: http://boto3.readthedocs.io/en/latest/reference/services/mq.html61 """ 62 mq_inventory = {}63 64 mq_inventory['brokers'] = glob.get_inventory(65 ownerId = oId,66 profile = profile,67 boto3_config = boto3_config,68 selected_regions = selected_regions,69 aws_service = "mq", 70 aws_region = "all", 71 function_name = "list_brokers", 72 key_get = "BrokerSummaries",73 detail_function = "describe_broker", 74 join_key = "BrokerId", 75 detail_join_key = "BrokerId", 76 detail_get_key = ""77 )78 mq_inventory['configurations'] = glob.get_inventory(79 ownerId = oId,80 profile = profile,81 boto3_config = boto3_config,82 selected_regions = selected_regions,83 aws_service = "mq", 84 aws_region = "all", 85 function_name = "list_configurations", 86 key_get = "Configurations"87 )88 89 return mq_inventory90# ------------------------------------------------------------------------91#92# Simple Notification Service (SNS)93#94# ------------------------------------------------------------------------95def get_sns_inventory(oId, profile, boto3_config, selected_regions):96 """97 Returns sns (topics, applications) details98 :param oId: ownerId (AWS account)99 :type oId: string100 :param profile: configuration profile name used for session101 :type profile: string102 :return: Amazon sns inventory103 :rtype: json104 ..note:: http://boto3.readthedocs.io/en/latest/reference/services/sns.html105 """ 106 sns_inventory = {}107 108 sns_inventory['topics'] = glob.get_inventory(109 ownerId = oId,110 profile = profile,111 boto3_config = boto3_config,112 selected_regions = selected_regions,113 aws_service = "sns", 114 aws_region = "all", 115 function_name = "list_topics", 116 key_get = "Topics",117 pagination = True118 )119 sns_inventory['applications'] = glob.get_inventory(120 ownerId = oId,121 profile = profile,122 boto3_config = boto3_config,123 selected_regions = selected_regions,124 aws_service = "sns", 125 aws_region = "all", 126 function_name = "list_platform_applications", 127 key_get = "PlatformApplications",128 pagination = True129 )130 131# ------------------------------------------------------------------------132#133# Step functions134#135# ------------------------------------------------------------------------136def get_stepfunctions_inventory(oId, profile, boto3_config, selected_regions):137 """138 Returns stepfunctions (machines, activities) details139 :param oId: ownerId (AWS account)140 :type oId: string141 :param profile: configuration profile name used for session142 :type profile: string143 :return: stepfunctions (machines, activities) inventory144 :rtype: json145 ..note:: not documented yet146 """ 147 inventory = {}148 149 inventory['machines'] = glob.get_inventory(150 ownerId = oId,151 profile = profile,152 boto3_config = boto3_config,153 selected_regions = selected_regions,154 aws_service = "stepfunctions", 155 aws_region = "all", 156 function_name = "list_state_machines", 157 key_get = "stateMachines",158 detail_function = "describe_state_machine",159 join_key = "stateMachineArn",160 detail_join_key = "stateMachineArn",161 detail_get_key = "",162 pagination_detail = False,163 pagination = True164 )165 inventory['activities'] = glob.get_inventory(166 ownerId = oId,167 profile = profile,168 boto3_config = boto3_config,169 selected_regions = selected_regions,170 aws_service = "stepfunctions", 171 aws_region = "all", 172 function_name = "list_activities", 173 key_get = "activities",174 pagination = True175 )176 177 return inventory178 179# ------------------------------------------------------------------------180#181# Appflow182#183# ------------------------------------------------------------------------184def get_appflow_inventory(oId, profile, boto3_config, selected_regions):185 """186 Returns appflow (flows, connectors) details187 :param oId: ownerId (AWS account)188 :type oId: string189 :param profile: configuration profile name used for session190 :type profile: string191 :return: appflow (flows, connectors) inventory192 :rtype: json193 ..note:: not documented yet194 """ 195 inventory = {}196 197 inventory['flows'] = glob.get_inventory(198 ownerId = oId,199 profile = profile,200 boto3_config = boto3_config,201 selected_regions = selected_regions,202 aws_service = "appflow", 203 aws_region = "all", 204 function_name = "list_flows", 205 key_get = "flows",206 detail_function = "describe_flow",207 join_key = "flowName",208 detail_join_key = "flowName",209 detail_get_key = "",210 pagination_detail = False,211 pagination = False212 )213 214 return inventory215 216# ------------------------------------------------------------------------217#218# EventBridge (archives, connections, event_buses, event_sources, replays, rules)219#220# ------------------------------------------------------------------------221def get_eventbridge_inventory(oId, profile, boto3_config, selected_regions):222 """223 Returns EventBridge (rules) details224 :param oId: ownerId (AWS account)225 :type oId: string226 :param profile: configuration profile name used for session227 :type profile: string228 :return: EventBridge (rules) inventory229 :rtype: json230 ..note:: not documented yet231 """ 232 inventory = {}233 inventory['archives'] = glob.get_inventory(234 ownerId = oId,235 profile = profile,236 boto3_config = boto3_config,237 selected_regions = selected_regions,238 aws_service = "events", 239 aws_region = "all", 240 function_name = "list_archives", 241 key_get = "Archives",242 detail_function = "describe_archive",243 join_key = "ArchiveName",244 detail_join_key = "ArchiveName",245 detail_get_key = "",246 pagination_detail = False,247 pagination = False248 )249 inventory['connections'] = glob.get_inventory(250 ownerId = oId,251 profile = profile,252 boto3_config = boto3_config,253 selected_regions = selected_regions,254 aws_service = "events", 255 aws_region = "all", 256 function_name = "list_connections", 257 key_get = "Connections",258 detail_function = "describe_connection",259 join_key = "Name",260 detail_join_key = "Name",261 detail_get_key = "",262 pagination_detail = False,263 pagination = False264 )265 inventory['event_buses'] = glob.get_inventory(266 ownerId = oId,267 profile = profile,268 boto3_config = boto3_config,269 selected_regions = selected_regions,270 aws_service = "events", 271 aws_region = "all", 272 function_name = "list_event_buses", 273 key_get = "EventBuses",274 detail_function = "describe_event_bus",275 join_key = "Name",276 detail_join_key = "Name",277 detail_get_key = "",278 pagination_detail = False,279 pagination = False280 )281 282 inventory['event_sources'] = glob.get_inventory(283 ownerId = oId,284 profile = profile,285 boto3_config = boto3_config,286 selected_regions = selected_regions,287 aws_service = "events", 288 aws_region = "all", 289 function_name = "list_event_sources", 290 key_get = "EventSources",291 detail_function = "describe_event_source",292 join_key = "Name",293 detail_join_key = "Name",294 detail_get_key = "",295 pagination_detail = False,296 pagination = False297 )298 299 inventory['replays'] = glob.get_inventory(300 ownerId = oId,301 profile = profile,302 boto3_config = boto3_config,303 selected_regions = selected_regions,304 aws_service = "events", 305 aws_region = "all", 306 function_name = "list_replays", 307 key_get = "ReplayName",308 detail_function = "describe_replay",309 join_key = "Name",310 detail_join_key = "ReplayName",311 detail_get_key = "",312 pagination_detail = False,313 pagination = False314 )315 316 inventory['rules'] = glob.get_inventory(317 ownerId = oId,318 profile = profile,319 boto3_config = boto3_config,320 selected_regions = selected_regions,321 aws_service = "events", 322 aws_region = "all", 323 function_name = "list_rules", 324 key_get = "Rules",325 detail_function = "describe_rule",326 join_key = "Name",327 detail_join_key = "Name",328 detail_get_key = "",329 pagination_detail = False,330 pagination = False331 )332 333 return inventory334#335# Hey, doc: we're in a module!336#337if (__name__ == '__main__'):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful