How to use get_contexts method in robotframework-appiumlibrary

Best Python code snippet using robotframework-appiumlibrary_python

xdp_case.py

Source:xdp_case.py Github

copy

Full Screen

...11def usingCustomLoader(test):12 """13 Skips test if not using a network.14 """15 if XDPCase.get_contexts().get_local_main().xdp_mode is None:16 return unittest.skip("Custom loader skipped when not using network.")17 return test18class SendResult:19 def __init__(self, captured_local: List[Packet],20 captured_remote: List[List[Packet]]):21 self.captured_local = captured_local22 self.captured_remote = captured_remote23def _prog_test_run(fd, pkt):24 lib = ctypes.CDLL("libbcc.so.0", use_errno=True)25 lib.bpf_prog_test_run.argtype = [26 ctypes.c_int, ctypes.c_int,27 ctypes.c_void_p, ctypes.c_uint32,28 ctypes.c_void_p, ctypes.c_uint32,29 ctypes.c_uint32, ctypes.c_uint32,30 ]31 lib.bpf_prog_test_run.restype = ctypes.c_int32 """33 LIBBPF_API int bpf_prog_test_run(34 int prog_fd, int repeat,35 void *data, __u32 size,36 void *data_out, __u32 *size_out,37 __u32 *retval, __u32 *duration38 );39 """40 # Maximum size of ether frame size is 1522B.41 out_size = ctypes.c_int(2048)42 out = ctypes.create_string_buffer(out_size.value)43 ret = ctypes.c_int()44 dur = ctypes.c_int()45 pkt = bytes(pkt)46 res = lib.bpf_prog_test_run(47 fd, 1,48 pkt, len(pkt),49 ctypes.byref(out), ctypes.byref(out_size),50 ctypes.byref(ret), ctypes.byref(dur)51 )52 if res != 0:53 raise RuntimeError("bpf_prog_test_run failed, returned", res,54 "because", errno.errorcode[ctypes.get_errno()])55 out = bytes(out[:out_size.value])56 pkt_out = Ether(out)57 return (ret.value, pkt_out)58def _describe_packet(packet):59 if hasattr(packet, "summary"):60 return f"{packet.summary()} ({bytes(packet)})"61 return f"{bytes(packet)}"62def _describe_packet_container(container):63 if len(container) == 0:64 return "[]"65 if len(container) <= 5:66 descriptions = []67 for i in container:68 descriptions.append(_describe_packet(i))69 return "[\n\t" + ",\n\t".join(descriptions) + "\n]"70 return str(container)71class XDPCase(unittest.TestCase):72 @classmethod73 def set_context(cls, ctxs: context.ContextClientList):74 """Set ContextClientList to be used for testing."""75 cls.contexts = ctxs76 @classmethod77 def get_contexts(cls) -> context.ContextClientList:78 """79 Return ContextClientList,80 containing contexts of testing interfaces.81 """82 return cls.contexts83 @classmethod84 def load_bpf(cls, *args, **kwargs):85 """Set a BPF program to be used for testing."""86 pass87 def attach_xdp(self, section: bytes):88 """89 Set a function to be used for testing.90 Requires load_bpf to be called first.91 """92 raise NotImplementedError93 def send_packets(self, packets: Iterable[Packet]) -> SendResult:94 """Process packets by selected XDP function."""95 raise NotImplementedError96 @classmethod97 def prepare_class(cls):98 """Initialize the static members of XDPCase."""99 pass100 def assertPacketIn(self,101 packet: Packet,102 container: Iterable[Packet]):103 """Check that packet is in container."""104 for i in container:105 if bytes(packet) == bytes(i):106 return107 self.fail(f"Packet {_describe_packet(packet)} "108 f"unexpectedly not found in "109 f"{_describe_packet_container(container)}.")110 def assertPacketsIn(self,111 packets: Iterable[Packet],112 container: Iterable[Packet]):113 """Check that every packet from packets is in container."""114 container = list(map(bytes, container))115 for i in packets:116 self.assertPacketIn(i, container)117 container.remove(bytes(i))118 def assertPacketNotIn(self,119 packet: Packet,120 container: Iterable[Packet]):121 """Check that packet is not in container."""122 for i in container:123 if bytes(packet) == bytes(i):124 self.fail(f"Packet {_describe_packet(packet)} "125 f"unexpectedly found in "126 f"{_describe_packet_container(container)}.")127 def assertPacketsNotIn(self,128 packets: Iterable[Packet],129 container: Iterable[Packet]):130 """Check that no packet from packets is in container."""131 for i in packets:132 self.assertPacketNotIn(i, container)133 def assertPacketContainerEmpty(self, container: Iterable[Packet]):134 """Check that the container is empty."""135 if len(container) == 0:136 return137 self.fail(f"Packet {_describe_packet(container[0])} "138 f"found in list expected to be empty.")139 @classmethod140 def generate_default_packets(141 cls,142 src_port: int = 50000, dst_port: int = 50000,143 src_inet: Optional[str] = None, dst_inet: Optional[str] = None,144 src_ether: Optional[str] = None, dst_ether: Optional[str] = None,145 layer_4: str = "udp",146 amount: int = 5,147 use_inet6: bool = False,148 ) -> List[Packet]:149 """Generate a list of predefined UDP packets using context."""150 dst_ctx = cls.get_contexts().get_local_main()151 src_ctx = cls.get_contexts().get_remote_main()152 if use_inet6:153 assert(src_inet or src_ctx.inet6 is not None)154 assert(dst_inet or dst_ctx.inet6 is not None)155 ip_layer = IPv6(src=src_inet if src_inet else src_ctx.inet6,156 dst=dst_inet if dst_inet else dst_ctx.inet6)157 else:158 ip_layer = IP(src=src_inet if src_inet else src_ctx.inet,159 dst=dst_inet if dst_inet else dst_ctx.inet)160 if layer_4 == "udp":161 transport_layer = UDP(sport=src_port, dport=dst_port)162 elif layer_4 == "tcp":163 transport_layer = TCP(sport=src_port, dport=dst_port)164 else:165 assert(False)166 to_send = [167 Ether(src=src_ether if src_ether else src_ctx.ether,168 dst=dst_ether if dst_ether else dst_ctx.ether) /169 ip_layer /170 transport_layer /171 Raw(f"This is message number {i}.") for i in range(amount)172 ]173 return [Ether(p.build()) for p in to_send]174class XDPCaseBPTR(XDPCase):175 @classmethod176 def setUpClass(cls):177 cls.__fd = None178 cls.__prog = None179 @classmethod180 def prepare_class(cls):181 cls.probe_counter = BPF(182 src_file=os.path.dirname(__file__) + "/bptr_probe_counter.c"183 )184 # Using kprobes since tracepoints do not get activated with bptr.185 cls.probe_counter.attach_kprobe(event=b"bpf_xdp_redirect_map",186 fn_name=b"bpf_xdp_redirect_map")187 cls.probe_counter.attach_kprobe(event=b"bpf_xdp_redirect",188 fn_name=b"bpf_xdp_redirect")189 @classmethod190 def load_bpf(cls, *args, **kwargs):191 cls.__prog = BPF(*args, **kwargs)192 return cls.__prog193 def attach_xdp(self, section):194 if self.__prog is None:195 self.fail(196 "A BPF program needs to be loaded before attaching function."197 )198 self.__fd = self.__prog.load_func(section.encode(), BPF.XDP).fd199 def send_packets(self, packets):200 passed = []201 redirected = [[] for i in range(self.get_contexts().server_count())]202 if self.__fd is None:203 self.fail(204 "Sending packets without attaching an XDP program."205 )206 for i in packets:207 (ret_val, pkt) = _prog_test_run(self.__fd, i)208 if ret_val == BPF.XDP_PASS:209 passed.append(pkt)210 elif ret_val == BPF.XDP_TX:211 redirected[0].append(pkt)212 elif ret_val == BPF.XDP_REDIRECT:213 self.__handle_redirect(pkt, passed, redirected)214 elif ret_val == BPF.XDP_ABORTED:215 pass216 elif ret_val == BPF.XDP_DROP:217 pass218 return SendResult(passed, redirected)219 def __handle_redirect(self, pkt, passed, redirected):220 redirect_activated = self.probe_counter[b"redirect_activated"][0]221 redirect_map_activated = self.probe_counter[b"redirect_map_activated"][0]222 if redirect_activated == redirect_map_activated:223 self.fail("Unexpectedly, both or neither map "224 "or regular redirect got activated.")225 if redirect_activated:226 redirect_info = self.probe_counter[b"redirect_info"][0]227 ifindex = redirect_info.ifindex228 ifindex = self.get_contexts().iface_index_to_id(ifindex)229 redirected[ifindex].append(pkt)230 elif redirect_map_activated:231 redirect_map_info = self.probe_counter[b"redirect_map_info"][0]232 map_type = utils.BPFMapType(redirect_map_info.map_type)233 if map_type == utils.BPFMapType.BPF_MAP_TYPE_DEVMAP:234 ifindex = redirect_map_info.ifindex235 map_name = redirect_map_info.map_name236 ifindex = self.__prog[map_name][ifindex].value237 ifindex = self.get_contexts().iface_index_to_id(ifindex)238 redirected[ifindex].append(pkt)239 elif map_type == utils.BPFMapType.BPF_MAP_TYPE_CPUMAP:240 passed.append(pkt)241 elif map_type == utils.BPFMapType.BPF_MAP_TYPE_SOCKMAP:242 pass243 elif map_type == utils.BPFMapType.BPF_MAP_TYPE_XSKMAP:244 pass245 else:246 self.fail("used something else than devmap/cpumap redirect")247class XDPCaseNetwork(XDPCase):248 @classmethod249 def setUpClass(cls):250 cls.__prog = None251 cls.__pass_prog = BPF(text=b"""252 int pass_all(struct xdp_md *ctx) { return XDP_PASS; }253 """)254 cls.__pass_fn = cls.__pass_prog.load_func(b"pass_all", BPF.XDP)255 main_ctx = cls.get_contexts().get_local_main()256 for i in range(cls.get_contexts().server_count()):257 ctx = cls.get_contexts().get_local(i)258 if ctx == main_ctx or ctx.xdp_mode is None:259 continue260 cls.__pass_prog.attach_xdp(ctx.iface.encode(),261 cls.__pass_fn,262 ctx.xdp_mode)263 return super().setUpClass()264 @classmethod265 def tearDownClass(cls):266 if cls.__prog is None:267 return268 for i in range(cls.get_contexts().server_count()):269 ctx = cls.get_contexts().get_local(i)270 cls.__prog.remove_xdp(ctx.iface.encode())271 @classmethod272 def prepare_class(cls):273 ctx = cls.get_contexts()274 for i in range(ctx.server_count()):275 try:276 conn = cls.__connect(ctx.comms[i], 1000)277 conn.send((utils.ServerCommand.INTRODUCE, ))278 remote = conn.recv()279 # Custom context is prefered.280 # if ctx.remotes[i] is None:281 ctx.remotes[i] = remote282 conn.close()283 except Exception as exception:284 raise RuntimeError("Could not contact server.",285 ctx.comms[i]) from exception286 ctx.get_local(i).fill_missing()287 @classmethod288 def load_bpf(cls, *args, **kwargs):289 cls.__prog = BPF(*args, **kwargs)290 return cls.__prog291 def attach_xdp(self, section):292 if self.__prog is None:293 self.fail(294 "A BPF program needs to be loaded before attaching function."295 )296 self.__prog.attach_xdp(297 self.get_contexts().get_local_main().iface.encode(),298 self.__prog.load_func(section.encode(), BPF.XDP),299 self.get_contexts().get_local_main().xdp_mode300 )301 def send_packets(self, packets):302 sniffer = utils.wait_for_async_sniffing(303 iface=self.get_contexts().get_local_main().iface304 )305 conn_list = []306 for comm in self.get_contexts().comms:307 conn_list.append(self.__connect(comm))308 main_conn = conn_list[0]309 for conn in conn_list[1:]:310 conn.send((utils.ServerCommand.WATCH, ))311 main_conn.send((utils.ServerCommand.SEND, packets))312 # Packets are being send here.313 response = main_conn.recv()314 if response != utils.ServerResponse.FINISHED:315 self.fail(316 "Unexpected situation while sending packets: " + str(response))317 time.sleep(0.1)318 server_results = []319 for conn in conn_list:320 conn.send(utils.ServerCommand.STOP)...

Full Screen

Full Screen

context.py

Source:context.py Github

copy

Full Screen

...22 self.head_setup = parse_heads_from_composition(self.adapter_setup)23 self._empty = ignore_empty and self.adapter_setup is None and self.head_setup is None24 def __enter__(self):25 if not self._empty:26 AdapterSetup.get_contexts().append(self)27 return self28 def __exit__(self, type, value, traceback):29 if not self._empty:30 AdapterSetup.get_contexts().pop()31 @classmethod32 def get_contexts(cls):33 if not hasattr(cls.storage, "contexts"):34 cls.storage.contexts = []35 return cls.storage.contexts36 @classmethod37 def get_context(cls):38 try:39 return cls.get_contexts()[-1]40 except IndexError:41 return None42 @classmethod43 def get_context_head_setup(cls):44 context = cls.get_context()45 if context:46 return context.head_setup47 return None48class ForwardContext:49 """50 Holds context information during a forward pass through a model. This class should be used via the51 ``ForwardContext.wrap()`` method.52 Note that the context is thread-local.53 """54 # thread-local storage that holds a stack of active contexts55 storage = threading.local()56 def __init__(self, model, *args, **kwargs):57 # If the model has a method ``forward_context()``, use it to create the context.58 if hasattr(model, "forward_context"):59 model.forward_context(self, *args, **kwargs)60 def __enter__(self):61 ForwardContext.get_contexts().append(self)62 return self63 def __exit__(self, type, value, traceback):64 ForwardContext.get_contexts().pop()65 @classmethod66 def wrap(cls, f):67 """68 Decorator method that wraps a ``forward()`` function of a model class.69 """70 @functools.wraps(f)71 def wrapper_func(self, *args, **kwargs):72 if self.config.adapters is not None:73 with cls(self, *args, **kwargs):74 results = f(self, *args, **kwargs)75 return results76 else:77 return f(self, *args, **kwargs)78 return wrapper_func79 @classmethod80 def get_contexts(cls):81 if not hasattr(cls.storage, "contexts"):82 cls.storage.contexts = []83 return cls.storage.contexts84 @classmethod85 def get_context(cls):86 try:87 return cls.get_contexts()[-1]88 except IndexError:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run robotframework-appiumlibrary automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful