How to use patch_urllib3_connection_pool method in localstack

Best Python code snippet using localstack_python

infra.py

Source:infra.py Github

copy

Full Screen

...59config_listener.start_listener()60# ---------------61# HELPER METHODS62# ---------------63def patch_urllib3_connection_pool(**constructor_kwargs):64 """65 Override the default parameters of HTTPConnectionPool, e.g., set the pool size via maxsize=1666 """67 try:68 from urllib3 import connectionpool, poolmanager69 class MyHTTPSConnectionPool(connectionpool.HTTPSConnectionPool):70 def __init__(self, *args, **kwargs):71 kwargs.update(constructor_kwargs)72 super(MyHTTPSConnectionPool, self).__init__(*args, **kwargs)73 poolmanager.pool_classes_by_scheme["https"] = MyHTTPSConnectionPool74 class MyHTTPConnectionPool(connectionpool.HTTPConnectionPool):75 def __init__(self, *args, **kwargs):76 kwargs.update(constructor_kwargs)77 super(MyHTTPConnectionPool, self).__init__(*args, **kwargs)78 poolmanager.pool_classes_by_scheme["http"] = MyHTTPConnectionPool79 except Exception:80 pass81def patch_instance_tracker_meta():82 """83 Avoid instance collection for moto dashboard84 """85 def new_intance(meta, name, bases, dct):86 cls = super(moto_core.models.InstanceTrackerMeta, meta).__new__(meta, name, bases, dct)87 if name == "BaseModel":88 return cls89 cls.instances = []90 return cls91 moto_core.models.InstanceTrackerMeta.__new__ = new_intance92 def new_basemodel(cls, *args, **kwargs):93 instance = super(moto_core.models.BaseModel, cls).__new__(cls)94 return instance95 moto_core.models.BaseModel.__new__ = new_basemodel96def get_multiserver_or_free_service_port():97 if config.FORWARD_EDGE_INMEM:98 return multiserver.get_moto_server_port()99 return get_free_tcp_port()100def register_signal_handlers():101 global SIGNAL_HANDLERS_SETUP102 if SIGNAL_HANDLERS_SETUP:103 return104 # register signal handlers105 def signal_handler(sig, frame):106 LOG.debug("[shutdown] signal received %s", sig)107 stop_infra()108 if config.FORCE_SHUTDOWN:109 sys.exit(0)110 signal.signal(signal.SIGTERM, signal_handler)111 signal.signal(signal.SIGINT, signal_handler)112 SIGNAL_HANDLERS_SETUP = True113def do_run(114 cmd: Union[str, List],115 asynchronous: bool,116 print_output: bool = None,117 env_vars: Dict[str, str] = {},118 auto_restart=False,119 strip_color: bool = False,120):121 sys.stdout.flush()122 if asynchronous:123 if config.DEBUG and print_output is None:124 print_output = True125 outfile = subprocess.PIPE if print_output else None126 t = ShellCommandThread(127 cmd,128 outfile=outfile,129 env_vars=env_vars,130 auto_restart=auto_restart,131 strip_color=strip_color,132 )133 t.start()134 TMP_THREADS.append(t)135 return t136 return run(cmd, env_vars=env_vars)137class MotoServerProperties:138 moto_thread: FuncThread139 service_port: int140 def __init__(self, moto_thread: FuncThread, service_port: int):141 self.moto_thread = moto_thread142 self.service_port = service_port143def start_proxy_for_service(144 service_name, port, backend_port, update_listener, quiet=False, params={}145):146 # TODO: remove special switch for Elasticsearch (see also note in service_port(...) in config.py)147 if config.FORWARD_EDGE_INMEM and service_name != "elasticsearch":148 if backend_port:149 PROXY_LISTENERS[service_name] = (150 service_name,151 backend_port,152 update_listener,153 )154 return155 # check if we have a custom backend configured156 custom_backend_url = os.environ.get("%s_BACKEND" % service_name.upper())157 backend_url = custom_backend_url or ("http://%s:%s" % (DEFAULT_BACKEND_HOST, backend_port))158 return start_proxy(159 port,160 backend_url=backend_url,161 update_listener=update_listener,162 quiet=quiet,163 params=params,164 )165def start_proxy(port, backend_url=None, update_listener=None, quiet=False, params={}, use_ssl=None):166 use_ssl = config.USE_SSL if use_ssl is None else use_ssl167 proxy_thread = start_proxy_server(168 port=port,169 forward_url=backend_url,170 use_ssl=use_ssl,171 update_listener=update_listener,172 quiet=quiet,173 params=params,174 )175 return proxy_thread176def start_moto_server(177 key, port, name=None, backend_port=None, asynchronous=False, update_listener=None178) -> MotoServerProperties:179 # TODO: refactor this method! the name and parameters suggest that a server is started, but it actually only adds180 # a proxy listener around the already started motoserver singleton.181 # TODO: remove asynchronous parameter (from all calls to this function)182 # TODO: re-think backend_port parameter (still needed since determined by motoserver singleton?)183 if not name:184 name = key185 log_startup_message(name)186 if not backend_port:187 if config.FORWARD_EDGE_INMEM:188 backend_port = motoserver.get_moto_server().port189 elif config.USE_SSL or update_listener:190 backend_port = get_free_tcp_port()191 if backend_port or config.FORWARD_EDGE_INMEM:192 start_proxy_for_service(key, port, backend_port, update_listener)193 server = motoserver.get_moto_server()194 return MotoServerProperties(server._thread, server.port)195def start_moto_server_separate(key, port, name=None, backend_port=None, asynchronous=False):196 moto_server_cmd = "%s/bin/moto_server" % LOCALSTACK_VENV_FOLDER197 if not os.path.exists(moto_server_cmd):198 moto_server_cmd = run("which moto_server").strip()199 server_port = backend_port or port200 cmd = "VALIDATE_LAMBDA_S3=0 %s %s -p %s -H %s" % (201 moto_server_cmd,202 key,203 server_port,204 constants.BIND_HOST,205 )206 return MotoServerProperties(do_run(cmd, asynchronous), server_port)207def add_service_proxy_listener(api: str, listener: ProxyListener, port=None):208 PROXY_LISTENERS[api] = (api, port or get_service_port(api), listener)209def start_local_api(name, port, api, method, asynchronous=False):210 log_startup_message(name)211 if config.FORWARD_EDGE_INMEM:212 port = get_free_tcp_port()213 PROXY_LISTENERS[api] = (api, port, None)214 if asynchronous:215 thread = start_thread(method, port, quiet=True)216 return thread217 else:218 method(port)219def stop_infra():220 if common.INFRA_STOPPED:221 return222 common.INFRA_STOPPED = True223 event_publisher.fire_event(event_publisher.EVENT_STOP_INFRA)224 analytics.log.event("infra_stop")225 try:226 generic_proxy.QUIET = True227 LOG.debug("[shutdown] Cleaning up services ...")228 SERVICE_PLUGINS.stop_all_services()229 LOG.debug("[shutdown] Cleaning up files ...")230 common.cleanup(files=True, quiet=True)231 LOG.debug("[shutdown] Cleaning up resources ...")232 common.cleanup_resources()233 if config.FORCE_SHUTDOWN:234 LOG.debug("[shutdown] Force shutdown, not waiting for infrastructure to shut down")235 return236 LOG.debug("[shutdown] Waiting for infrastructure to shut down ...")237 wait_for_infra_shutdown()238 LOG.debug("[shutdown] Infrastructure is shut down")239 finally:240 SHUTDOWN_INFRA.set()241def log_startup_message(service):242 LOG.info("Starting mock %s service on %s ...", service, edge_ports_info())243def check_aws_credentials():244 session = boto3.Session()245 credentials = None246 # hardcode credentials here, to allow us to determine internal API calls made via boto3247 os.environ["AWS_ACCESS_KEY_ID"] = constants.INTERNAL_AWS_ACCESS_KEY_ID248 os.environ["AWS_SECRET_ACCESS_KEY"] = constants.INTERNAL_AWS_SECRET_ACCESS_KEY249 try:250 credentials = session.get_credentials()251 except Exception:252 pass253 session = boto3.Session()254 credentials = session.get_credentials()255 assert credentials256def terminate_all_processes_in_docker():257 if not in_docker():258 # make sure we only run this inside docker!259 return260 print("INFO: Received command to restart all processes ...")261 cmd = (262 'ps aux | grep -v supervisor | grep -v docker-entrypoint.sh | grep -v "make infra" | '263 "grep -v localstack_infra.log | awk '{print $1}' | grep -v PID"264 )265 pids = run(cmd).strip()266 pids = re.split(r"\s+", pids)267 pids = [int(pid) for pid in pids]268 this_pid = os.getpid()269 for pid in pids:270 if pid != this_pid:271 try:272 # kill spawned process273 os.kill(pid, signal.SIGKILL)274 except Exception:275 pass276 # kill the process itself277 sys.exit(0)278# -------------279# MAIN STARTUP280# -------------281def print_runtime_information(in_docker=False):282 # FIXME: this is legacy code from the old CLI, reconcile with new CLI and runtime output283 print()284 print("LocalStack version: %s" % constants.VERSION)285 if in_docker:286 id = get_main_container_id()287 if id:288 print("LocalStack Docker container id: %s" % id[:12])289 if config.LOCALSTACK_BUILD_DATE:290 print("LocalStack build date: %s" % config.LOCALSTACK_BUILD_DATE)291 if config.LOCALSTACK_BUILD_GIT_HASH:292 print("LocalStack build git hash: %s" % config.LOCALSTACK_BUILD_GIT_HASH)293 print()294def start_infra(asynchronous=False, apis=None):295 try:296 os.environ[LOCALSTACK_INFRA_PROCESS] = "1"297 is_in_docker = in_docker()298 # print a warning if we're not running in Docker but using Docker based LAMBDA_EXECUTOR299 if not is_in_docker and "docker" in config.LAMBDA_EXECUTOR and not is_linux():300 print(301 (302 "!WARNING! - Running outside of Docker with $LAMBDA_EXECUTOR=%s can lead to "303 "problems on your OS. The environment variable $LOCALSTACK_HOSTNAME may not "304 "be properly set in your Lambdas."305 )306 % config.LAMBDA_EXECUTOR307 )308 if (309 is_in_docker310 and not config.LAMBDA_REMOTE_DOCKER311 and not os.environ.get("HOST_TMP_FOLDER")312 ):313 print(314 "!WARNING! - Looks like you have configured $LAMBDA_REMOTE_DOCKER=0 - "315 "please make sure to configure $HOST_TMP_FOLDER to point to your host's $TMPDIR"316 )317 print_runtime_information(is_in_docker)318 # apply patches319 patch_urllib3_connection_pool(maxsize=128)320 patch_instance_tracker_meta()321 # load plugins322 load_plugins()323 # with plugins loaded, now start the infrastructure324 thread = do_start_infra(asynchronous, apis, is_in_docker)325 if not asynchronous and thread:326 # We're making sure that we stay in the execution context of the327 # main thread, otherwise our signal handlers don't work328 SHUTDOWN_INFRA.wait()329 return thread330 except KeyboardInterrupt:331 print("Shutdown")332 except Exception as e:333 print("Error starting infrastructure: %s %s" % (e, traceback.format_exc()))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful