How to use serve_flask_app method in localstack

Best Python code snippet using localstack_python

main.py

Source:main.py Github

copy

Full Screen

...22UNHEALTHY_BLOCKHEIGHT_DIFF = 1523DATA_DIR = 'data'24UPSTREAM_DOWN_TOLERANCE_SECONDS = 3025_last_successful_trusted_fetch = 026def serve_flask_app(app: Flask, port: int, allow_remote_connections: bool = False,27 allow_multiple_listeners: bool = False):28 listener: Union[socket.socket, Tuple[str, int]]29 hostname = '' if allow_remote_connections else 'localhost'30 listener = (hostname, port)31 if allow_multiple_listeners:32 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)33 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)34 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)35 listener.bind((hostname, port))36 listener.listen()37 server = WSGIServer(listener, app)38 server.serve_forever()39def api_endpoint(f):40 @wraps(f)41 def wrapped(*args, **kwargs):42 try:43 result = f(*args, **kwargs)44 return jsonify({'status': 'OK',45 'result': result})46 except Exception as e:47 logger.warning('Error in handler %s', f, exc_info=True)48 return jsonify({'status': 'Error',49 'error': repr(e),50 'pickled_exception': jsonpickle.encode(e),51 'traceback': traceback.format_exc()}), 50052 return wrapped53@app.route('/status')54@api_endpoint55def get_validator_status():56 return get_all_slots()57@app.route('/health')58@api_endpoint59def get_health_status():60 global _last_successful_trusted_fetch61 slots = get_all_slots()62 logger.info(f'slots: {slots}')63 local = slots['local']64 upstream_height = max([v for k, v in slots.items() if k != 'local'])65 if upstream_height == 0 and _last_successful_trusted_fetch < time.time() - UPSTREAM_DOWN_TOLERANCE_SECONDS:66 raise Exception(67 f'Both upstreams have been returning errors for more than {UPSTREAM_DOWN_TOLERANCE_SECONDS} seconds'68 )69 elif upstream_height > 0:70 _last_successful_trusted_fetch = time.time()71 behind = upstream_height - local72 if behind < 0:73 logger.info(f'Local block height is greater than upstreams. '74 f'Current block height: {local}, '75 f'Upstream block height: {upstream_height}')76 unhealthy_blockheight_diff = load_data_file_locally('unhealthy_block_threshold') or UNHEALTHY_BLOCKHEIGHT_DIFF77 if behind > int(unhealthy_blockheight_diff):78 raise Exception(f'Local validator is behind trusted validator by more than {unhealthy_blockheight_diff} blocks.')79 return slots80def load_data_file_locally(filename: str, mode='r') -> Optional[str]:81 file_path = Path(DATA_DIR) / filename82 if file_path.exists():83 with file_path.open(mode=mode) as f:84 return f.read()85 return None86def get_all_slots() -> Dict[str, int]:87 futures = {k: gevent.spawn(get_slot, v) for k, v in ENDPOINTS.items()}88 return {k: v.get() for k, v in futures.items()}89def get_slot(url: str) -> int:90 try:91 return get_epoch_info(url)['result']['absoluteSlot']92 except Exception as e:93 logger.info(f'Received error fetching blockheight from {url}')94 logger.info(e)95 return 096def get_epoch_info(url: str):97 res = requests.post(98 url,99 json={100 'jsonrpc': '2.0',101 'id': 1,102 'method': 'getEpochInfo',103 'params': [{'commitment': 'single'}],104 },105 timeout=1,106 )107 res.raise_for_status()108 return res.json()109if __name__ == '__main__':110 logging.basicConfig(level=logging.INFO)111 serve_flask_app(112 app, PORT, allow_remote_connections=True, allow_multiple_listeners=True...

Full Screen

Full Screen

es_api.py

Source:es_api.py Github

copy

Full Screen

...65 result = get_domain_status(domain_name, deleted=True)66 ES_DOMAINS.pop(domain_name)67 return jsonify(result)68def serve(port, quiet=True):...

Full Screen

Full Screen

localstack_patched.py

Source:localstack_patched.py Github

copy

Full Screen

...18 kwargs.setdefault('host', 'localhost')19 orig_init(self, *args, **kwargs)20 generic_proxy.GenericProxy.__init__ = new_init21# Same kind of monkey patching is needed for serve_flask_app.22def patch_serve_flask_app():23 orig_serve = generic_proxy.serve_flask_app24 def new_serve(*args, **kwargs):25 kwargs.setdefault('host', 'localhost')26 orig_serve(*args, **kwargs)27 generic_proxy.serve_flask_app = new_serve28patch_GenericProxy_init()29patch_serve_flask_app()30# Also change BIND_HOST to use localhost instead of '0.0.0.0' (which also triggers the warning).31constants.BIND_HOST = 'localhost'32# Once patched, we run the localstack command-line script as if it were run directly....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful