How to use get_service_port_for_account method in localstack

Best Python code snippet using localstack_python

edge.py

Source:edge.py Github

copy

Full Screen

...113 # https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html114 try:115 credential_scope = auth_header.split(',')[0].split()[1]116 _, _, _, service, _ = credential_scope.split('/')117 result = service, get_service_port_for_account(service, headers)118 except Exception:119 pass120 result_before = result121 # Fallback rules and route customizations applied below122 if host.endswith('cloudfront.net'):123 path = path or '/'124 result = 'cloudfront', config.PORT_CLOUDFRONT125 elif target.startswith('AWSCognitoIdentityProviderService') or 'cognito-idp.' in host:126 result = 'cognito-idp', config.PORT_COGNITO_IDP127 elif target.startswith('AWSCognitoIdentityService') or 'cognito-identity.' in host:128 result = 'cognito-identity', config.PORT_COGNITO_IDENTITY129 elif result[0] == 's3' or re.match(r'.*s3(\-website)?\.([^\.]+\.)?amazonaws.com', host):130 host = re.sub(r's3-website\..*\.amazonaws', 's3.amazonaws', host)131 result = 's3', config.PORT_S3132 elif result[0] == 'states' in auth_header or host.startswith('states.'):133 result = 'stepfunctions', config.PORT_STEPFUNCTIONS134 elif result[0] == 'monitoring':135 result = 'cloudwatch', config.PORT_CLOUDWATCH136 elif result[0] == 'execute-api' or '.execute-api.' in host:137 result = 'apigateway', config.PORT_APIGATEWAY138 elif target.startswith('Firehose_'):139 result = 'firehose', config.PORT_FIREHOSE140 elif target.startswith('DynamoDB_'):141 result = 'dynamodb', config.PORT_DYNAMODB142 elif target.startswith('DynamoDBStreams') or host.startswith('streams.dynamodb.'):143 # Note: DDB streams requests use ../dynamodb/.. auth header, hence we also need to update result_before144 result = result_before = 'dynamodbstreams', config.PORT_DYNAMODBSTREAMS145 elif ls_target == 'web':146 result = 'web', config.PORT_WEB_UI147 elif result[0] == 'EventBridge':148 result = 'events', config.PORT_EVENTS149 return result[0], result_before[1] or result[1], path, host150def is_s3_form_data(data_bytes):151 if(to_bytes('key=') in data_bytes):152 return True153 if(to_bytes('Content-Disposition: form-data') in data_bytes and to_bytes('name="key"') in data_bytes):154 return True155 return False156def serve_health_endpoint(method, path, data):157 if method == 'GET':158 reload = 'reload' in path159 return plugins.get_services_health(reload=reload)160 if method == 'PUT':161 data = json.loads(to_str(data or '{}'))162 plugins.set_services_health(data)163 return {'status': 'OK'}164 if method == 'POST':165 data = json.loads(to_str(data or '{}'))166 # backdoor API to support restarting the instance167 if data.get('action') in ['kill', 'restart']:168 terminate_all_processes_in_docker()169 return {}170def terminate_all_processes_in_docker():171 if not in_docker():172 # make sure we only run this inside docker!173 return174 print('INFO: Received command to restart all processes ...')175 cmd = ('ps aux | grep -v supervisor | grep -v docker-entrypoint.sh | grep -v "make infra" | '176 "grep -v localstack_infra.log | awk '{print $1}' | grep -v PID")177 pids = run(cmd).strip()178 pids = re.split(r'\s+', pids)179 pids = [int(pid) for pid in pids]180 this_pid = os.getpid()181 for pid in pids:182 if pid != this_pid:183 try:184 # kill spawned process185 os.kill(pid, signal.SIGKILL)186 except Exception:187 pass188 # kill the process itself189 os._exit(0)190def serve_resource_graph(data):191 data = json.loads(to_str(data or '{}'))192 env = Environment.from_string(data.get('awsEnvironment'))193 graph = dashboard_infra.get_graph(name_filter=data.get('nameFilter') or '.*', env=env, region=data.get('awsRegion'))194 return graph195def get_api_from_custom_rules(method, path, data, headers):196 """ Determine backend port based on custom rules. """197 # detect S3 presigned URLs198 if 'AWSAccessKeyId=' in path or 'Signature=' in path:199 return 's3', config.PORT_S3200 # heuristic for SQS queue URLs201 if is_sqs_queue_url(path):202 return 'sqs', config.PORT_SQS203 # DynamoDB shell URLs204 if path.startswith('/shell') or path.startswith('/dynamodb/shell'):205 return 'dynamodb', config.PORT_DYNAMODB206 # API Gateway invocation URLs207 if ('/%s/' % PATH_USER_REQUEST) in path:208 return 'apigateway', config.PORT_APIGATEWAY209 data_bytes = to_bytes(data or '')210 if path == '/' and b'QueueName=' in data_bytes:211 return 'sqs', config.PORT_SQS212 if path.startswith('/2015-03-31/functions/'):213 return 'lambda', config.PORT_LAMBDA214 if b'Action=AssumeRoleWithWebIdentity' in data_bytes or 'Action=AssumeRoleWithWebIdentity' in path:215 return 'sts', config.PORT_STS216 if b'Action=AssumeRoleWithSAML' in data_bytes or 'Action=AssumeRoleWithSAML' in path:217 return 'sts', config.PORT_STS218 # TODO: move S3 public URLs to a separate port/endpoint, OR check ACLs here first219 stripped = path.strip('/')220 if method in ['GET', 'HEAD'] and '/' in stripped:221 # assume that this is an S3 GET request with URL path `/<bucket>/<key ...>`222 return 's3', config.PORT_S3223 # detect S3 URLs224 if stripped and '/' not in stripped:225 if method == 'HEAD':226 # assume that this is an S3 HEAD bucket request with URL path `/<bucket>`227 return 's3', config.PORT_S3228 if method == 'PUT':229 # assume that this is an S3 PUT bucket request with URL path `/<bucket>`230 return 's3', config.PORT_S3231 if method == 'POST' and is_s3_form_data(data_bytes):232 # assume that this is an S3 POST request with form parameters or multipart form in the body233 return 's3', config.PORT_S3234 if stripped.count('/') == 1 and method == 'PUT':235 # assume that this is an S3 PUT bucket object request with URL path `/<bucket>/object`236 return 's3', config.PORT_S3237 # detect S3 requests sent from aws-cli using --no-sign-request option238 if 'aws-cli/' in headers.get('User-Agent', ''):239 return 's3', config.PORT_S3240 # S3 delete object requests241 if method == 'POST' and 'delete=' in path and b'<Delete' in data_bytes and b'<Key>' in data_bytes:242 return 's3', config.PORT_S3243 # SQS queue requests244 if ('QueueUrl=' in path and 'Action=' in path) or (b'QueueUrl=' in data_bytes and b'Action=' in data_bytes):245 return 'sqs', config.PORT_SQS246def get_service_port_for_account(service, headers):247 # assume we're only using a single account, hence return the static port mapping from config.py248 return config.service_port(service)249def do_start_edge(port, use_ssl, asynchronous=False):250 try:251 # start local DNS server, if present252 from localstack_ext.services import dns_server253 dns_server.start_servers()254 except Exception:255 pass256 # get port and start Edge257 print('Starting edge router (http%s port %s)...' % ('s' if use_ssl else '', port))258 # use use=True here because our proxy allows both, HTTP and HTTPS traffic259 proxy = start_proxy_server(port, use_ssl=True, update_listener=ProxyListenerEdge())260 if not asynchronous:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful