How to use start_moto_server method in localstack

Best Python code snippet using localstack_python

infra.py

Source:infra.py Github

copy

Full Screen

...88# API ENTRY POINTS89# -----------------90def start_apigateway(port=None, asynchronous=False, update_listener=None):91 port = port or config.PORT_APIGATEWAY92 return start_moto_server('apigateway', port, name='API Gateway', asynchronous=asynchronous,93 backend_port=DEFAULT_PORT_APIGATEWAY_BACKEND, update_listener=update_listener)94def start_sns(port=None, asynchronous=False, update_listener=None):95 port = port or config.PORT_SNS96 return start_moto_server('sns', port, name='SNS', asynchronous=asynchronous,97 backend_port=DEFAULT_PORT_SNS_BACKEND, update_listener=update_listener)98def start_cloudwatch(port=None, asynchronous=False):99 port = port or config.PORT_CLOUDWATCH100 return start_moto_server('cloudwatch', port, name='CloudWatch', asynchronous=asynchronous)101def start_cloudwatch_logs(port=None, asynchronous=False):102 port = port or config.PORT_LOGS103 return start_moto_server('logs', port, name='CloudWatch Logs', asynchronous=asynchronous)104def start_events(port=None, asynchronous=False):105 port = port or config.PORT_EVENTS106 return start_moto_server('events', port, name='CloudWatch Events', asynchronous=asynchronous)107def start_sts(port=None, asynchronous=False):108 port = port or config.PORT_STS109 return start_moto_server('sts', port, name='STS', asynchronous=asynchronous)110def start_iam(port=None, asynchronous=False, update_listener=None):111 port = port or config.PORT_IAM112 return start_moto_server('iam', port, name='IAM', asynchronous=asynchronous,113 backend_port=DEFAULT_PORT_IAM_BACKEND, update_listener=update_listener)114def start_redshift(port=None, asynchronous=False):115 port = port or config.PORT_REDSHIFT116 return start_moto_server('redshift', port, name='Redshift', asynchronous=asynchronous)117def start_route53(port=None, asynchronous=False):118 port = port or config.PORT_ROUTE53119 return start_moto_server('route53', port, name='Route53', asynchronous=asynchronous)120def start_ses(port=None, asynchronous=False):121 port = port or config.PORT_SES122 return start_moto_server('ses', port, name='SES', asynchronous=asynchronous)123def start_elasticsearch_service(port=None, asynchronous=False):124 port = port or config.PORT_ES125 return start_local_api('ES', port, method=es_api.serve, asynchronous=asynchronous)126def start_firehose(port=None, asynchronous=False):127 port = port or config.PORT_FIREHOSE128 return start_local_api('Firehose', port, method=firehose_api.serve, asynchronous=asynchronous)129def start_dynamodbstreams(port=None, asynchronous=False):130 port = port or config.PORT_DYNAMODBSTREAMS131 return start_local_api('DynamoDB Streams', port, method=dynamodbstreams_api.serve, asynchronous=asynchronous)132def start_lambda(port=None, asynchronous=False):133 port = port or config.PORT_LAMBDA134 return start_local_api('Lambda', port, method=lambda_api.serve, asynchronous=asynchronous)135def start_ssm(port=None, asynchronous=False):136 port = port or config.PORT_SSM137 return start_moto_server('ssm', port, name='SSM', asynchronous=asynchronous)138def start_secretsmanager(port=None, asynchronous=False):139 port = port or config.PORT_SECRETSMANAGER140 return start_moto_server('secretsmanager', port, name='Secrets Manager', asynchronous=asynchronous)141def start_ec2(port=None, asynchronous=False):142 port = port or config.PORT_EC2143 return start_moto_server('ec2', port, name='EC2', asynchronous=asynchronous)144# ---------------145# HELPER METHODS146# ---------------147def restore_persisted_data(apis):148 for api in apis:149 persistence.restore_persisted_data(api)150def register_signal_handlers():151 global SIGNAL_HANDLERS_SETUP152 if SIGNAL_HANDLERS_SETUP:153 return154 # register signal handlers155 def signal_handler(signal, frame):156 stop_infra()157 os._exit(0)158 signal.signal(signal.SIGTERM, signal_handler)159 signal.signal(signal.SIGINT, signal_handler)160 SIGNAL_HANDLERS_SETUP = True161def do_run(cmd, asynchronous, print_output=False, env_vars={}):162 sys.stdout.flush()163 if asynchronous:164 if is_debug():165 print_output = True166 outfile = subprocess.PIPE if print_output else None167 t = ShellCommandThread(cmd, outfile=outfile, env_vars=env_vars)168 t.start()169 TMP_THREADS.append(t)170 return t171 return run(cmd)172def start_proxy_for_service(service_name, port, default_backend_port, update_listener, quiet=False, params={}):173 # check if we have a custom backend configured174 custom_backend_url = os.environ.get('%s_BACKEND' % service_name.upper())175 backend_url = custom_backend_url or ('http://%s:%s' % (DEFAULT_BACKEND_HOST, default_backend_port))176 return start_proxy(port, backend_url=backend_url, update_listener=update_listener, quiet=quiet, params=params)177def start_proxy(port, backend_url, update_listener, quiet=False, params={}):178 proxy_thread = GenericProxy(port=port, forward_url=backend_url,179 ssl=USE_SSL, update_listener=update_listener, quiet=quiet, params=params)180 proxy_thread.start()181 TMP_THREADS.append(proxy_thread)182 return proxy_thread183def start_moto_server(key, port, name=None, backend_port=None, asynchronous=False, update_listener=None):184 if not name:185 name = key186 print('Starting mock %s (%s port %s)...' % (name, get_service_protocol(), port))187 if USE_SSL and not backend_port:188 backend_port = get_free_tcp_port()189 if backend_port:190 start_proxy_for_service(key, port, backend_port, update_listener)191 if config.BUNDLE_API_PROCESSES:192 return multiserver.start_api_server(key, backend_port or port)193 return start_moto_server_separate(key, port, name=name, backend_port=backend_port, asynchronous=asynchronous)194def start_moto_server_separate(key, port, name=None, backend_port=None, asynchronous=False):195 moto_server_cmd = '%s/bin/moto_server' % LOCALSTACK_VENV_FOLDER196 if not os.path.exists(moto_server_cmd):197 moto_server_cmd = run('which moto_server').strip()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful