How to use get_socket_count method in lisa

Best Python code snippet using lisa_python

queue_mode.py

Source:queue_mode.py Github

copy

Full Screen

...38 acl=[access_control.make_acl_entry(1, access_control.AccessMask.READ),39 access_control.make_acl_entry(2, access_control.AccessMask.OWNER)])40 # first check if sockets stay online in non-queue mode41 time.sleep(self.max_transmit_wait() - 2)42 self.assertEqual(self.get_socket_count(), 2)43 time.sleep(4)44 self.assertEqual(self.get_socket_count(), 2)45 # put servers[0] into queue mode46 self.write_resource(self.servers[0], OID.Server, 1, RID.Server.Binding, b'UQ')47 self.assertDemoUpdatesRegistration(self.servers[0], binding='UQ', content=ANY)48 # Observe the Counter argument49 self.observe(self.servers[0], OID.Test, 0, RID.Test.Counter)50 time.sleep(self.max_transmit_wait() - 2)51 self.assertEqual(self.get_socket_count(), 2)52 time.sleep(4)53 self.assertEqual(self.get_socket_count(), 1)54 # Trigger Notification from the non-queue server55 self.execute_resource(self.servers[1], OID.Test, 0, RID.Test.IncrementCounter)56 self.assertDtlsReconnect(self.servers[0])57 pkt = self.servers[0].recv()58 self.assertIsInstance(pkt, Lwm2mNotify)59 self.assertEqual(self.get_socket_count(), 2)60 # "queued RPCs"61 self.read_resource(self.servers[0], OID.Test, 0, RID.Test.Timestamp)62 # cancel observation63 self.observe(self.servers[0], OID.Test, 0, RID.Test.Counter, observe=1)64 # assert queue mode operation again65 time.sleep(12)66 self.assertEqual(self.get_socket_count(), 2)67 time.sleep(4)...

Full Screen

Full Screen

einhorn.py

Source:einhorn.py Github

copy

Full Screen

...7 pass8def is_worker() -> bool:9 """Return if this process is an Einhorn worker."""10 return os.getppid() == int(os.environ.get("EINHORN_MASTER_PID", -1))11def get_socket_count() -> int:12 """Return how many sockets are bound."""13 if not is_worker():14 raise NotEinhornWorker15 return int(os.environ.get("EINHORN_FD_COUNT", 0))16def get_socket(index: int = 0) -> socket.socket:17 """Get an Einhorn-bound socket from the environment.18 Einhorn can bind multiple sockets (via multiple -b arguments), the19 ``index`` parameter can be used to choose which socket to retrieve. When20 sockets are bound, Einhorn provides several environment variables to child21 worker processes:22 - EINHORN_FD_COUNT: the number of sockets bound23 - EINHORN_FD_#: for each socket bound, the file descriptor for that socket24 - EINHORN_FD_FAMILY_#: for each socket bound, the protocol family of that25 socket (this is a recent addition, so if it's not present default to26 AF_INET)27 :param index: The socket number to get.28 """29 if not is_worker():30 raise NotEinhornWorker31 fd_count = get_socket_count()32 if not 0 <= index < fd_count:33 raise IndexError34 fileno = int(os.environ[f"EINHORN_FD_{index:d}"])35 family_name = os.environ.get(f"EINHORN_FD_FAMILY_{index:d}", "AF_INET")36 assert family_name.startswith("AF_"), "invalid socket family name"37 family = getattr(socket, family_name)38 return socket.fromfd(fileno, family, socket.SOCK_STREAM)39def ack_startup() -> None:40 """Send acknowledgement that we started up to the Einhorn master."""41 if not is_worker():42 raise NotEinhornWorker43 control_sock_name = os.environ["EINHORN_SOCK_PATH"]44 control_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)45 control_sock.connect(control_sock_name)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful