How to use add_ipv4_address method in lisa

Best Python code snippet using lisa_python

sync_collector_info.py

Source:sync_collector_info.py Github

copy

Full Screen

1#!/usr/bin/python32"""3this script updates the collector source code to the latest execution priorities based on the collector table in4README.md5"""6__author__ = "Lukas Reiter"7__license__ = "GPL v3.0"8__copyright__ = """Copyright 2018 Lukas Reiter9This program is free software: you can redistribute it and/or modify10it under the terms of the GNU General Public License as published by11the Free Software Foundation, either version 3 of the License, or12(at your option) any later version.13This program is distributed in the hope that it will be useful,14but WITHOUT ANY WARRANTY; without even the implied warranty of15MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the16GNU General Public License for more details.17You should have received a copy of the GNU General Public License18along with this program. If not, see <http://www.gnu.org/licenses/>.19"""20__version__ = 0.121import os22import re23import fnmatch24import sys25import argparse26class ReadmeEntry:27 def __init__(self, collector_name: str, priority: int):28 self.collector_name = collector_name29 self.priority = priority30class ReadmeParser(dict):31 def __init__(self):32 super().__init__()33 self._readme_path = os.path.abspath("../README.md")34 self._re_entry = re.compile("^\|\s*(?P<priority>[0-9]+)\s*\|\s*(?P<collector_name>[a-zA-Z0-9]+)\s*\|.+$")35 self._parse()36 def _parse(self):37 with open(self._readme_path, "r") as file:38 for line in file.readlines():39 line = line.strip()40 match = self._re_entry.match(line)41 if match:42 collector_name = match.group("collector_name")43 priority = int(match.group("priority"))44 if collector_name not in self:45 self[collector_name] = ReadmeEntry(collector_name, priority)46 else:47 raise ValueError("collector '{}' exists already".format(collector_name))48class AnalyzerBase(dict):49 def __init__(self):50 super().__init__()51 self._home_dir = os.path.abspath("../kis/collectors/os/modules/")52 def _get_collector_files(self, pattern: str = "*.py") -> list:53 for root, dirs, files in os.walk(self._home_dir):54 for basename in files:55 if fnmatch.fnmatch(basename, pattern):56 filename = os.path.join(root, basename)57 yield filename58class CollectorFileUpdater(AnalyzerBase):59 def __init__(self, readme_parser: ReadmeParser):60 super().__init__()61 self._readme_parser = readme_parser62 self._load_modules()63 def _load_modules(self):64 for file in self._get_collector_files():65 collector = os.path.splitext(os.path.basename(file))[0]66 if collector in self._readme_parser:67 if collector not in self:68 self[collector] = file69 else:70 raise ValueError("collector name '{}' is not unique".format(collector))71 elif collector not in ["core", "__init__"]:72 print("ignoring file: {}".format(file), file=sys.stderr)73 @staticmethod74 def _replace(path: str, priority: int) -> None:75 with open(path, "r") as file:76 content = file.read()77 new_content = re.sub("priority=[0-9]+", "priority={}".format(priority), content)78 if new_content != content:79 with open(path, "w") as file:80 file.write(new_content)81 def run(self):82 for collector, value in self._readme_parser.items():83 file = self[collector]84 self._replace(file, value.priority)85class AnalyzeCollectorTableUpdates(AnalyzerBase):86 def __init__(self):87 super().__init__()88 self._method_table_mapping = {"add_additional_info": ["additional_info", "source"],89 "add_certificate": ["domain_name",90 "host_name",91 "source",92 "file",93 "company",94 "email"],95 "add_company": ["company", "source"],96 "add_credential": ["credential"],97 "add_domain_name": ["domain_name", "host_name", "source"],98 "add_email": ["email", "domain_name", "host_name", "source"],99 "add_execution_info_enum": ["command"],100 "add_execution_info_str": ["command"],101 "add_file_content": ["file"],102 "add_hint": ["command"],103 "add_host_host_name_mapping": ["host_host_name_mapping", "source"],104 "add_host_name": ["domain_name", "host_name", "source"],105 "add_host_name_host_name_mapping": ["host_name_host_name_mapping", "source"],106 "add_ipv4_address": ["host", "source"],107 "add_ipv4_network": ["network", "source"],108 "add_path": ["path", "source", "query"],109 "add_query": ["query"],110 "add_robots_txt": ["path", "source"],111 "add_service": ["service", "source"],112 "add_service_method": ["service_method", "source"],113 "add_source": ["source"],114 "add_url": ["path", "source"]}115 self._collector_methods = {"ftpdotdotpwn": ["add_path"],116 "httpdotdotpwn": ["add_path"],117 "rdphydra": ["add_credential"],118 "smbhydra": ["add_credential"],119 "sshhydra": ["add_credential"],120 "ftphydra": ["add_credential"],121 "mssqlhydra": ["add_credential"],122 "pgsqlhydra": ["add_credential"],123 "rdphydra": ["add_credential"],124 "httphydra": ["add_credential"],125 "mysqlhydra": ["add_credential"],126 "smbmedusa": ["add_credential"],127 "securitytrails": ["add_domain_name"],128 "hunter": ["add_email"],129 "haveibeenbreach": ["add_additional_info"],130 "haveibeenpaste": ["add_additional_info"],131 "tcpnmapnetwork": ["add_ipv4_address",132 "add_service",133 "add_domain_name",134 "add_additional_info",135 "add_path",136 "add_service_method",137 "add_certificate"],138 "tcpnmapdomain": ["add_ipv4_address",139 "add_service",140 "add_domain_name",141 "add_additional_info",142 "add_path",143 "add_service_method",144 "add_certificate"],145 "udpnmapnetwork": ["add_ipv4_address",146 "add_service",147 "add_domain_name",148 "add_additional_info",149 "add_path",150 "add_service_method",151 "add_certificate"],152 "udpnmapdomain": ["add_ipv4_address",153 "add_service",154 "add_domain_name",155 "add_additional_info",156 "add_path",157 "add_service_method",158 "add_certificate"],159 "mssqlnmap": ["add_service"],160 "smbnmap": ["add_path", "add_additional_info", "add_domain_name"],161 "smtpnmap": ["add_service_method", "add_domain_name"],162 "rpcnmap": ["add_service", "add_domain_name"],163 "msrpcenum": ["add_path", "add_additional_info", "add_domain_name"],164 "sshnmap": ["add_additional_info"],165 "tftpnmap": ["add_path"],166 "certnmap": ["add_certificate"],167 "httpnmap": ["add_source",168 "add_additional_info",169 "add_url",170 "add_service_method"],171 "tcpmasscannetwork": ["add_ipv4_address", "add_service"],172 "tcptraceroute": ["add_ipv4_address", "add_host_name"],173 "sshchangeme": ["add_credential"],174 "httpchangeme": ["add_credential", "add_path"]175 }176 self._re_method = re.compile("^.*\.(?P<method>add_.+?)\(.*$")177 def _analyze(self, collector: str, path: str):178 if collector not in self:179 self[collector] = {}180 if collector in self._collector_methods:181 for method in self._collector_methods[collector]:182 for item in self._method_table_mapping[method]:183 self[collector][item] = None184 else:185 with open(path, "r") as file:186 content = file.readlines()187 for line in content:188 line = line.strip()189 line = line.split("#")[0]190 if line:191 match = self._re_method.match(line)192 if match:193 method = match.group("method")194 if method in self._method_table_mapping:195 for item in self._method_table_mapping[method]:196 self[collector][item] = None197 def run(self):198 # analyze199 for file in self._get_collector_files():200 collector = os.path.splitext(os.path.basename(file))[0]201 if collector not in ["core", "__init__"]:202 self._analyze(collector, file)203 # report204 for collector, methods in self.items():205 print("{}:{}".format(collector, ", ".join(methods.keys())))206if __name__ == "__main__":207 parser = argparse.ArgumentParser(description=__doc__)208 mutex_group = parser.add_mutually_exclusive_group(required=True)209 mutex_group.add_argument("-u", "--update",210 action='store_true',211 help="update collector priorities based on README.md")212 mutex_group.add_argument("-a", "--analyze",213 action='store_true',214 help="report which collectors use which update methods")215 args = parser.parse_args()216 if len(sys.argv) <= 1:217 parser.print_help()218 sys.exit(1)219 if args.update:220 parser = ReadmeParser()221 updater = CollectorFileUpdater(parser)222 updater.run()223 elif args.analyze:224 analyzer = AnalyzeCollectorTableUpdates()...

Full Screen

Full Screen

ip.py

Source:ip.py Github

copy

Full Screen

...57 if match:58 d = match.groupdict()59 address = ipaddress.IPv4Interface(d['address'])60 yield (address, None)61 def add_ipv4_address(self, net_interface, address, broadcast=None):62 """Adds an ipv4 address to a net_interface.63 Args:64 net_interface: string, The network interface65 to get the new ipv4 (eg. wlan0).66 address: ipaddress.IPv4Interface, The new ipaddress and netmask67 to add to an interface.68 broadcast: ipaddress.IPv4Address, The broadcast address to use for69 this net_interfaces subnet.70 """71 if broadcast:72 self._runner.run('ip addr add %s broadcast %s dev %s' %73 (address, broadcast, net_interface))74 else:75 self._runner.run('ip addr add %s dev %s' %76 (address, net_interface))77 def remove_ipv4_address(self, net_interface, address, ignore_status=False):78 """Remove an ipv4 address.79 Removes an ipv4 address from a network interface.80 Args:81 net_interface: string, The network interface to remove the82 ipv4 address from (eg. wlan0).83 address: ipaddress.IPv4Interface or ipaddress.IPv4Address,84 The ip address to remove from the net_interface.85 ignore_status: True if the exit status can be ignored86 Returns:87 The job result from a the command88 """89 return self._runner.run(90 'ip addr del %s dev %s' % (address, net_interface),91 ignore_status=ignore_status)92 def set_ipv4_address(self, net_interface, address, broadcast=None):93 """Set the ipv4 address.94 Sets the ipv4 address of a network interface. If the network interface95 has any other ipv4 addresses these will be cleared.96 Args:97 net_interface: string, The network interface to set the ip address98 on (eg. wlan0).99 address: ipaddress.IPv4Interface, The ip address and subnet to give100 the net_interface.101 broadcast: ipaddress.IPv4Address, The broadcast address to use for102 the subnet.103 """104 self.clear_ipv4_addresses(net_interface)105 self.add_ipv4_address(net_interface, address, broadcast)106 def clear_ipv4_addresses(self, net_interface):107 """Clears all ipv4 addresses registered to a net_interface.108 Args:109 net_interface: string, The network interface to clear addresses from110 (eg. wlan0).111 """112 ip_info = self.get_ipv4_addresses(net_interface)113 for address, _ in ip_info:114 result = self.remove_ipv4_address(net_interface, address,115 ignore_status=True)116 # It is possible that the address has already been removed by the117 # time this command has been called. In such a case, we would get118 # this error message.119 error_msg = 'RTNETLINK answers: Cannot assign requested address'...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful