How to use _restore_lro method in lisa

Best Python code snippet using lisa_python

xdpdump.py

Source:xdpdump.py Github

copy

Full Screen

...93 sudo=True,94 cwd=self._code_path,95 )96 except Exception as identifier:97 self._restore_lro(nic_name)98 raise identifier99 # wait to receive at least one packet or 1 second to make sure the100 # process started.101 xdpdump_process.wait_output("IP", timeout=1, error_on_missing=False)102 return xdpdump_process103 def start(self, nic_name: str = "", timeout: int = 5) -> ExecutableResult:104 process = self.start_async(nic_name=nic_name, timeout=timeout)105 return self.wait_result(nic_name=nic_name, process=process)106 def wait_result(self, nic_name: str, process: Process) -> ExecutableResult:107 try:108 result = process.wait_result()109 finally:110 self._restore_lro(nic_name)111 return result112 def test_by_ping(113 self,114 nic_name: str = "",115 timeout: int = 5,116 build_type: Optional[BuildType] = None,117 remote_address: str = "",118 expected_ping_success: bool = True,119 ping_package_size: Optional[int] = None,120 # the ping command can be triggered from different node121 ping_source_node: Optional[Node] = None,122 ) -> str:123 """124 Test with ICMP ping packets125 """126 if not nic_name:127 nic_name = self.node.nics.default_nic128 if not ping_source_node:129 ping_source_node = self.node130 self.make_by_build_type(build_type=build_type)131 # if there is an remote address defined, test it in async mode, and132 # check the ping result.133 if remote_address:134 ping = ping_source_node.tools[Ping]135 xdpdump_process = self.start_async(nic_name=nic_name, timeout=timeout)136 if remote_address:137 is_success = ping.ping(138 remote_address,139 nic_name=nic_name,140 ignore_error=True,141 package_size=ping_package_size,142 )143 assert_that(is_success).described_as(144 "ping result is not expected."145 ).is_equal_to(expected_ping_success)146 result = self.wait_result(nic_name=nic_name, process=xdpdump_process)147 return result.stdout148 def make_by_build_type(self, build_type: Optional[BuildType] = None) -> None:149 env_variables: Dict[str, str] = {}150 # if no build type specified, rebuild it with default behavior.151 if build_type:152 cflags = f"-D __{build_type.name}__ -I../libbpf/src/root/usr/include"153 # no output log to improve perf with high volume data.154 if build_type in [BuildType.PERF_DROP, BuildType.TX_FWD, BuildType.PERF]:155 cflags = f"{cflags} -D __PERF__"156 env_variables["CFLAGS"] = cflags157 make = self.node.tools[Make]158 make.make(159 arguments="",160 cwd=self._code_path,161 is_clean=True,162 update_envs=env_variables,163 )164 # discard local changes after built, it's used to cleanup changes from165 # the forwarder role.166 git = self.node.tools[Git]167 git.discard_local_changes(cwd=self._code_path)168 def make_on_forwarder_role(169 self,170 forwarder_nic: NicInfo,171 receiver_nic: NicInfo,172 ) -> None:173 sed = self.node.tools[Sed]174 # replace hard code mac and ip addresses in code, the changes will be175 # reset after built.176 forwarder_mac = self._convert_mac_to_c_style(forwarder_nic.mac_addr)177 forwarder_ip = self._convert_ip_to_c_style(forwarder_nic.ip_addr)178 receiver_mac = self._convert_mac_to_c_style(receiver_nic.mac_addr)179 receiver_ip = self._convert_ip_to_c_style(receiver_nic.ip_addr)180 sed.substitute(181 regexp=r"unsigned char newethsrc \[\] = "182 "{ 0x00, 0x22, 0x48, 0x4c, 0xc4, 0x4d };",183 replacement=r"unsigned char newethsrc \[\] = {" + forwarder_mac + "};",184 file=f"{self._code_path}/xdpdump_kern.c",185 )186 sed.substitute(187 regexp=r"unsigned char newethdest \[\] = "188 "{ 0x00, 0x22, 0x48, 0x4c, 0xc0, 0xfd };",189 replacement=r"unsigned char newethdest \[\] = {" + receiver_mac + "};",190 file=f"{self._code_path}/xdpdump_kern.c",191 )192 sed.substitute(193 regexp=r"__u8 newsrc \[\] = { 10, 0, 1, 5 };",194 replacement=r"__u8 newsrc \[\] = {" + forwarder_ip + "};",195 file=f"{self._code_path}/xdpdump_kern.c",196 )197 sed.substitute(198 regexp=r"__u8 newdest \[\] = { 10, 0, 1, 4 };",199 replacement=r"__u8 newdest \[\] = {" + receiver_ip + "};",200 file=f"{self._code_path}/xdpdump_kern.c",201 )202 self.make_by_build_type(build_type=BuildType.TX_FWD)203 def _convert_mac_to_c_style(self, mac: str) -> str:204 """205 convert 00:22:48:7a:ed:28 to 0x00, 0x22, 0x48, 0x7a, 0xed, 0x28206 """207 bytes_list = [f"0x{x}" for x in mac.split(":")]208 return ", ".join(bytes_list)209 def _convert_ip_to_c_style(self, ip: str) -> str:210 """211 convert 10.0.0.1 to 10, 0, 0, 1212 """213 return ", ".join(ip.split("."))214 def _disable_lro(self, nic_name: str) -> None:215 ethtool = self.node.tools[Ethtool]216 gro_lro_settings = self._get_gro_lro_settings(nic_name)217 if gro_lro_settings.lro_setting is False:218 return219 # disable LRO (RSC), because XDP program cannot run with it. Restore220 # it after test completed.221 ethtool.change_device_gro_lro_settings(222 nic_name,223 gro_setting=gro_lro_settings.gro_setting,224 lro_setting=False,225 )226 def _restore_lro(self, nic_name: str) -> None:227 # recover settings228 ethtool = self.node.tools[Ethtool]229 current_settings = ethtool.get_device_gro_lro_settings(nic_name, force_run=True)230 original_settings = self._get_gro_lro_settings(nic_name)231 if original_settings.lro_setting == current_settings.lro_setting:232 return233 ethtool.change_device_gro_lro_settings(234 nic_name,235 gro_setting=original_settings.gro_setting,236 lro_setting=original_settings.lro_setting,237 )238 def _get_gro_lro_settings(self, nic_name: str) -> DeviceGroLroSettings:239 gro_lro_settings = self._gro_lro_settings.get(nic_name, None)240 ethtool = self.node.tools[Ethtool]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful