How to use poll_condition method in localstack

Best Python code snippet using localstack_python

query.py

Source:query.py Github

copy

Full Screen

...5from gremlin_python.structure.graph import Graph6from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection7NEPTUNE_ENDPOINT = os.environ.get('NEPTUNE_ENDPOINT') or 'http://localhost:4566'8CLUSTER_ID = 'cluster123'9def poll_condition(condition, timeout: float = None, interval: float = 0.5) -> bool:10 remaining = 011 if timeout is not None:12 remaining = timeout13 while not condition():14 if timeout is not None:15 remaining -= interval16 if remaining <= 0:17 return False18 time.sleep(interval)19 return True20def run_queries(cluster):21 cluster_url = 'ws://localhost:%s/gremlin' % cluster['Port']22 # test Client API23 print('Connecting to Neptune Graph DB cluster URL: %s' % cluster_url)24 graph_client = gremlin_client.Client(cluster_url, 'g')25 values = '[1,2,3,4]'26 print('Submitting values: %s' % values)27 result_set = graph_client.submit(values)28 future_results = result_set.all()29 results = future_results.result()30 print('Received values from cluster: %s' % results)31 assert results == [1, 2, 3, 4]32 future_result_set = graph_client.submitAsync('[1,2,3,4]')33 result_set = future_result_set.result()34 result = result_set.one()35 assert result == [1, 2, 3, 4]36 poll_condition(lambda: result_set.done.done(), timeout=3)37 assert result_set.done.done()38 graph_client.close()39 # test DriverRemoteConnection API40 graph = Graph()41 conn = DriverRemoteConnection(cluster_url, 'g')42 g = graph.traversal().withRemote(conn)43 vertices_before = g.V().toList()44 print('Existing vertices in the graph: %s' % vertices_before)45 print('Adding new vertices "v1" and "v2" to the graph')46 g.addV().property('id', 'v1').property('name', 'Vertex 1').next()47 g.addV().property('id', 'v2').property('name', 'Vertex 2').next()48 vertices_after = g.V().toList()49 print('New list of vertices in the graph: %s' % vertices_after)50 result = set(vertices_after) - set(vertices_before)51 assert len(result) == 252 conn.close()53def create_graph_db():54 print('Creating Neptune Graph DB cluster "%s" - this may take a few moments ...' % CLUSTER_ID)55 client = connect_neptune()56 cluster = client.create_db_cluster(DBClusterIdentifier=CLUSTER_ID, Engine='neptune')['DBCluster']57 def is_cluster_available():58 clusters = client.describe_db_clusters()59 return clusters["DBClusters"][0]["Status"] == "available"60 cluster_available = poll_condition(is_cluster_available, timeout=15, interval=1)61 if not cluster_available:62 raise Exception("The server took too much time to start")63 return cluster64def delete_db(cluster=None):65 if cluster:66 cluster_id = cluster['DBClusterIdentifier']67 else:68 cluster_id = CLUSTER_ID69 print('Deleting Neptune Graph DB cluster "%s"' % cluster_id)70 client = connect_neptune()71 client.delete_db_cluster(DBClusterIdentifier=cluster_id)72def connect_neptune():73 return boto3.client('neptune', endpoint_url=NEPTUNE_ENDPOINT)74def main():...

Full Screen

Full Screen

test_http2_server.py

Source:test_http2_server.py Github

copy

Full Screen

...13 thread = run_server(port=port, bind_address=host, asynchronous=True)14 try:15 url = f"http://{host}:{port}"16 self.assertTrue(17 poll_condition(lambda: is_port_open(url, http_path="/"), timeout=15),18 "gave up waiting for port %d " % port,19 )20 finally:21 LOG.info("%.2f stopping server on port %d", time.time(), port)22 thread.stop()23 LOG.info("%.2f waiting on server to shut down", time.time())24 thread.join(timeout=15)25 self.assertFalse(is_port_open(port), "port is still open after stop")26 LOG.info("%.2f port stopped %d", time.time(), port)27 def test_run_and_stop_server_from_different_threads(self):28 port = get_free_tcp_port()29 host = "127.0.0.1"30 LOG.info("%.2f starting server on port %d", time.time(), port)31 thread = run_server(port=port, bind_address=host, asynchronous=True)32 try:33 url = f"http://{host}:{port}"34 self.assertTrue(35 poll_condition(lambda: is_port_open(url, http_path="/"), timeout=15),36 "gave up waiting for port %d " % port,37 )38 finally:39 LOG.info("%.2f stopping server on port %d", time.time(), port)40 threading.Thread(target=thread.stop).start()41 LOG.info("%.2f waiting on server to shut down", time.time())42 thread.join(timeout=15)43 self.assertFalse(is_port_open(port), "port is still open after stop")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful