Best Python code snippet using lisa_python
osutils.py
Source:osutils.py  
1# Copyright 2017 Red Hat, Inc. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may4# not use this file except in compliance with the License. You may obtain5# a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations13# under the License.14import errno15import ipaddress16import os17import shutil18import stat19import subprocess20import distro21import jinja222from oslo_config import cfg23from oslo_log import log as logging24import six25import webob26from werkzeug import exceptions27from octavia.common import constants as consts28from octavia.common import exceptions as octavia_exceptions29from octavia.common import utils30CONF = cfg.CONF31LOG = logging.getLogger(__name__)32j2_env = jinja2.Environment(autoescape=True, loader=jinja2.FileSystemLoader(33    os.path.dirname(os.path.realpath(__file__)) + consts.AGENT_API_TEMPLATES))34class BaseOS(object):35    PACKAGE_NAME_MAP = {}36    def __init__(self, os_name):37        self.os_name = os_name38    @classmethod39    def _get_subclasses(cls):40        for subclass in cls.__subclasses__():41            for sc in subclass._get_subclasses():42                yield sc43            yield subclass44    @classmethod45    def get_os_util(cls):46        os_name = distro.id()47        for subclass in cls._get_subclasses():48            if subclass.is_os_name(os_name):49                return subclass(os_name)50        raise octavia_exceptions.InvalidAmphoraOperatingSystem(os_name=os_name)51    @classmethod52    def _map_package_name(cls, package_name):53        return cls.PACKAGE_NAME_MAP.get(package_name, package_name)54    def get_network_interface_file(self, interface):55        if CONF.amphora_agent.agent_server_network_file:56            return CONF.amphora_agent.agent_server_network_file57        if CONF.amphora_agent.agent_server_network_dir:58            return os.path.join(CONF.amphora_agent.agent_server_network_dir,59                                interface)60        network_dir = consts.UBUNTU_AMP_NET_DIR_TEMPLATE.format(61            netns=consts.AMPHORA_NAMESPACE)62        return os.path.join(network_dir, interface)63    def create_netns_dir(self, network_dir, netns_network_dir, ignore=None):64        # We need to setup the netns network directory so that the ifup65        # commands used here and in the startup scripts "sees" the right66        # interfaces and scripts.67        try:68            os.makedirs('/etc/netns/' + consts.AMPHORA_NAMESPACE)69            shutil.copytree(70                network_dir,71                '/etc/netns/{netns}/{net_dir}'.format(72                    netns=consts.AMPHORA_NAMESPACE,73                    net_dir=netns_network_dir),74                symlinks=True,75                ignore=ignore)76        except OSError as e:77            # Raise the error if it's not "File exists" otherwise pass78            if e.errno != errno.EEXIST:79                raise80    def write_vip_interface_file(self, interface_file_path,81                                 primary_interface, vip, ip, broadcast,82                                 netmask, gateway, mtu, vrrp_ip, vrrp_version,83                                 render_host_routes, template_vip):84        # write interface file85        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH86        # If we are using a consolidated interfaces file, just append87        # otherwise clear the per interface file as we are rewriting it88        # TODO(johnsom): We need a way to clean out old interfaces records89        if CONF.amphora_agent.agent_server_network_file:90            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND91        else:92            flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC93        with os.fdopen(os.open(interface_file_path, flags, mode),94                       'w') as text_file:95            text = template_vip.render(96                consts=consts,97                interface=primary_interface,98                vip=vip,99                vip_ipv6=ip.version == 6,100                # For ipv6 the netmask is already the prefix101                prefix=(netmask if ip.version == 6102                        else utils.netmask_to_prefix(netmask)),103                broadcast=broadcast,104                netmask=netmask,105                gateway=gateway,106                network=utils.ip_netmask_to_cidr(vip, netmask),107                mtu=mtu,108                vrrp_ip=vrrp_ip,109                vrrp_ipv6=vrrp_version == 6,110                host_routes=render_host_routes,111                topology=CONF.controller_worker.loadbalancer_topology,112            )113            text_file.write(text)114    def write_port_interface_file(self, netns_interface, fixed_ips, mtu,115                                  interface_file_path, template_port):116        # write interface file117        # If we are using a consolidated interfaces file, just append118        # otherwise clear the per interface file as we are rewriting it119        # TODO(johnsom): We need a way to clean out old interfaces records120        if CONF.amphora_agent.agent_server_network_file:121            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND122        else:123            flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC124        # mode 00644125        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH126        with os.fdopen(os.open(interface_file_path, flags, mode),127                       'w') as text_file:128            text = self._generate_network_file_text(129                netns_interface, fixed_ips, mtu, template_port)130            text_file.write(text)131    @classmethod132    def _generate_network_file_text(cls, netns_interface, fixed_ips, mtu,133                                    template_port):134        text = ''135        if fixed_ips is None:136            text = template_port.render(interface=netns_interface)137        else:138            for index, fixed_ip in enumerate(fixed_ips, -1):139                try:140                    ip_addr = fixed_ip['ip_address']141                    cidr = fixed_ip['subnet_cidr']142                    ip = ipaddress.ip_address(ip_addr if isinstance(143                        ip_addr, six.text_type) else six.u(ip_addr))144                    network = ipaddress.ip_network(145                        cidr if isinstance(146                            cidr, six.text_type) else six.u(cidr))147                    broadcast = network.broadcast_address.exploded148                    netmask = (network.prefixlen if ip.version == 6149                               else network.netmask.exploded)150                    host_routes = cls.get_host_routes(fixed_ip)151                except ValueError:152                    return webob.Response(153                        json=dict(message="Invalid network IP"), status=400)154                new_text = template_port.render(interface=netns_interface,155                                                ipv6=ip.version == 6,156                                                ip_address=ip.exploded,157                                                broadcast=broadcast,158                                                netmask=netmask,159                                                mtu=mtu,160                                                host_routes=host_routes)161                text = '\n'.join([text, new_text])162        return text163    @classmethod164    def get_host_routes(cls, fixed_ip):165        host_routes = []166        for hr in fixed_ip.get('host_routes', []):167            network = ipaddress.ip_network(168                hr['destination'] if isinstance(169                    hr['destination'], six.text_type) else170                six.u(hr['destination']))171            host_routes.append({'network': network, 'gw': hr['nexthop']})172        return host_routes173    @classmethod174    def _bring_if_up(cls, interface, what, flush=True):175        # Note, we are not using pyroute2 for this as it is not /etc/netns176        # aware.177        # Work around for bug:178        # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=845121179        int_up = "ip netns exec {ns} ip link set {int} up".format(180            ns=consts.AMPHORA_NAMESPACE, int=interface)181        addr_flush = "ip netns exec {ns} ip addr flush {int}".format(182            ns=consts.AMPHORA_NAMESPACE, int=interface)183        cmd = ("ip netns exec {ns} ifup {params}".format(184            ns=consts.AMPHORA_NAMESPACE, params=interface))185        try:186            out = subprocess.check_output(int_up.split(),187                                          stderr=subprocess.STDOUT)188            LOG.debug(out)189            if flush:190                out = subprocess.check_output(addr_flush.split(),191                                              stderr=subprocess.STDOUT)192                LOG.debug(out)193            out = subprocess.check_output(cmd.split(),194                                          stderr=subprocess.STDOUT)195            LOG.debug(out)196        except subprocess.CalledProcessError as e:197            LOG.error('Failed to ifup %s due to error: %s %s', interface, e,198                      e.output)199            raise exceptions.HTTPException(200                response=webob.Response(json=dict(201                    message='Error plugging {0}'.format(what),202                    details=e.output), status=500))203    @classmethod204    def _bring_if_down(cls, interface):205        # Note, we are not using pyroute2 for this as it is not /etc/netns206        # aware.207        cmd = ("ip netns exec {ns} ifdown {params}".format(208            ns=consts.AMPHORA_NAMESPACE, params=interface))209        try:210            subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)211        except subprocess.CalledProcessError as e:212            LOG.info('Ignoring failure to ifdown %s due to error: %s %s',213                     interface, e, e.output)214    @classmethod215    def bring_interfaces_up(cls, ip, primary_interface, secondary_interface):216        cls._bring_if_down(primary_interface)217        if secondary_interface:218            cls._bring_if_down(secondary_interface)219        cls._bring_if_up(primary_interface, 'VIP')220        if secondary_interface:221            cls._bring_if_up(secondary_interface, 'VIP', flush=False)222    def has_ifup_all(self):223        return True224class Ubuntu(BaseOS):225    ETH_X_PORT_CONF = 'plug_port_ethX.conf.j2'226    ETH_X_VIP_CONF = 'plug_vip_ethX.conf.j2'227    @classmethod228    def is_os_name(cls, os_name):229        return os_name in ['ubuntu']230    def cmd_get_version_of_installed_package(self, package_name):231        name = self._map_package_name(package_name)232        return "dpkg-query -W -f=${{Version}} {name}".format(name=name)233    def get_network_interface_file(self, interface):234        if CONF.amphora_agent.agent_server_network_file:235            return CONF.amphora_agent.agent_server_network_file236        if CONF.amphora_agent.agent_server_network_dir:237            return os.path.join(CONF.amphora_agent.agent_server_network_dir,238                                interface + '.cfg')239        network_dir = consts.UBUNTU_AMP_NET_DIR_TEMPLATE.format(240            netns=consts.AMPHORA_NAMESPACE)241        return os.path.join(network_dir, interface + '.cfg')242    def get_network_path(self):243        return '/etc/network'244    def get_netns_network_dir(self):245        network_dir = self.get_network_path()246        return os.path.basename(network_dir)247    def create_netns_dir(248            self, network_dir=None, netns_network_dir=None, ignore=None):249        if not netns_network_dir:250            netns_network_dir = self.get_netns_network_dir()251        if not network_dir:252            network_dir = self.get_network_path()253        if not ignore:254            ignore = shutil.ignore_patterns('eth0*', 'openssh*')255        super(Ubuntu, self).create_netns_dir(256            network_dir, netns_network_dir, ignore)257    def write_interfaces_file(self):258        name = '/etc/netns/{}/network/interfaces'.format(259            consts.AMPHORA_NAMESPACE)260        flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC261        # mode 00644262        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH263        with os.fdopen(os.open(name, flags, mode), 'w') as int_file:264            int_file.write('auto lo\n')265            int_file.write('iface lo inet loopback\n')266            if not CONF.amphora_agent.agent_server_network_file:267                int_file.write('source /etc/netns/{}/network/'268                               'interfaces.d/*.cfg\n'.format(269                                   consts.AMPHORA_NAMESPACE))270    def write_vip_interface_file(self, interface_file_path,271                                 primary_interface, vip, ip, broadcast,272                                 netmask, gateway, mtu, vrrp_ip, vrrp_version,273                                 render_host_routes, template_vip=None):274        if not template_vip:275            template_vip = j2_env.get_template(self.ETH_X_VIP_CONF)276        super(Ubuntu, self).write_vip_interface_file(277            interface_file_path, primary_interface, vip, ip, broadcast,278            netmask, gateway, mtu, vrrp_ip, vrrp_version, render_host_routes,279            template_vip)280    def write_port_interface_file(self, netns_interface, fixed_ips, mtu,281                                  interface_file_path=None,282                                  template_port=None):283        if not interface_file_path:284            interface_file_path = self.get_network_interface_file(285                netns_interface)286        if not template_port:287            template_port = j2_env.get_template(self.ETH_X_PORT_CONF)288        super(Ubuntu, self).write_port_interface_file(289            netns_interface, fixed_ips, mtu, interface_file_path,290            template_port)291    def has_ifup_all(self):292        return True293class RH(BaseOS):294    ETH_X_PORT_CONF = 'rh_plug_port_ethX.conf.j2'295    ETH_X_VIP_CONF = 'rh_plug_vip_ethX.conf.j2'296    ETH_X_ALIAS_VIP_CONF = 'rh_plug_vip_ethX_alias.conf.j2'297    ROUTE_ETH_X_CONF = 'rh_route_ethX.conf.j2'298    RULE_ETH_X_CONF = 'rh_rule_ethX.conf.j2'299    # The reason of make them as jinja templates is the current scripts force300    # to add the iptables, so leave it now for future extending if possible.301    ETH_IFUP_LOCAL_SCRIPT = 'rh_plug_port_eth_ifup_local.conf.j2'302    ETH_IFDOWN_LOCAL_SCRIPT = 'rh_plug_port_eth_ifdown_local.conf.j2'303    @classmethod304    def is_os_name(cls, os_name):305        return os_name in ['fedora', 'rhel']306    def cmd_get_version_of_installed_package(self, package_name):307        name = self._map_package_name(package_name)308        return "rpm -q --queryformat %{{VERSION}} {name}".format(name=name)309    @staticmethod310    def _get_network_interface_file(prefix, interface):311        if CONF.amphora_agent.agent_server_network_file:312            return CONF.amphora_agent.agent_server_network_file313        if CONF.amphora_agent.agent_server_network_dir:314            network_dir = CONF.amphora_agent.agent_server_network_dir315        else:316            network_dir = consts.RH_AMP_NET_DIR_TEMPLATE.format(317                netns=consts.AMPHORA_NAMESPACE)318        return os.path.join(network_dir, prefix + interface)319    def get_network_interface_file(self, interface):320        return self._get_network_interface_file('ifcfg-', interface)321    def get_alias_network_interface_file(self, interface):322        return self.get_network_interface_file(interface + ':0')323    def get_static_routes_interface_file(self, interface, version):324        route = 'route6-' if version == 6 else 'route-'325        return self._get_network_interface_file(route, interface)326    def get_route_rules_interface_file(self, interface, version):327        rule = 'rule6-' if version == 6 else 'rule-'328        return self._get_network_interface_file(rule, interface)329    def get_network_path(self):330        return '/etc/sysconfig/network-scripts'331    def get_netns_network_dir(self):332        network_full_path = self.get_network_path()333        network_basename = os.path.basename(network_full_path)334        network_dirname = os.path.dirname(network_full_path)335        network_prefixdir = os.path.basename(network_dirname)336        return os.path.join(network_prefixdir, network_basename)337    def create_netns_dir(338            self, network_dir=None, netns_network_dir=None, ignore=None):339        if not netns_network_dir:340            netns_network_dir = self.get_netns_network_dir()341        if not network_dir:342            network_dir = self.get_network_path()343        if not ignore:344            ignore = shutil.ignore_patterns('ifcfg-eth0*', 'ifcfg-lo*')345        super(RH, self).create_netns_dir(346            network_dir, netns_network_dir, ignore)347        # Copy /etc/sysconfig/network file348        src = '/etc/sysconfig/network'349        dst = '/etc/netns/{netns}/sysconfig'.format(350            netns=consts.AMPHORA_NAMESPACE)351        shutil.copy2(src, dst)352    def write_interfaces_file(self):353        # No interfaces file in RH based flavors354        return355    def write_vip_interface_file(self, interface_file_path,356                                 primary_interface, vip, ip, broadcast,357                                 netmask, gateway, mtu, vrrp_ip, vrrp_version,358                                 render_host_routes, template_vip=None):359        if not template_vip:360            template_vip = j2_env.get_template(self.ETH_X_VIP_CONF)361        super(RH, self).write_vip_interface_file(362            interface_file_path, primary_interface, vip, ip, broadcast,363            netmask, gateway, mtu, vrrp_ip, vrrp_version, render_host_routes,364            template_vip)365        # keepalived will handle the VIP if we are on active/standby366        if (ip.version == 4 and367            CONF.controller_worker.loadbalancer_topology ==368                consts.TOPOLOGY_SINGLE):369            # Create an IPv4 alias interface, needed in RH based flavors370            alias_interface_file_path = self.get_alias_network_interface_file(371                primary_interface)372            template_vip_alias = j2_env.get_template(self.ETH_X_ALIAS_VIP_CONF)373            super(RH, self).write_vip_interface_file(374                alias_interface_file_path, primary_interface, vip, ip,375                broadcast, netmask, gateway, mtu, vrrp_ip, vrrp_version,376                render_host_routes, template_vip_alias)377        routes_interface_file_path = (378            self.get_static_routes_interface_file(primary_interface,379                                                  ip.version))380        template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)381        self.write_static_routes_interface_file(382            routes_interface_file_path, primary_interface,383            render_host_routes, template_routes, gateway, vip, netmask)384        # keepalived will handle the rule(s) if we are on actvie/standby385        if (CONF.controller_worker.loadbalancer_topology ==386                consts.TOPOLOGY_SINGLE):387            route_rules_interface_file_path = (388                self.get_route_rules_interface_file(primary_interface,389                                                    ip.version))390            template_rules = j2_env.get_template(self.RULE_ETH_X_CONF)391            self.write_static_routes_interface_file(392                route_rules_interface_file_path, primary_interface,393                render_host_routes, template_rules, gateway, vip, netmask)394        self._write_ifup_ifdown_local_scripts_if_possible()395    def write_static_routes_interface_file(self, interface_file_path,396                                           interface, host_routes,397                                           template_routes, gateway,398                                           vip, netmask):399        # write static routes interface file400        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH401        # TODO(johnsom): We need a way to clean out old interfaces records402        if CONF.amphora_agent.agent_server_network_file:403            flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND404        else:405            flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC406        with os.fdopen(os.open(interface_file_path, flags, mode),407                       'w') as text_file:408            text = template_routes.render(409                consts=consts,410                interface=interface,411                host_routes=host_routes,412                gateway=gateway,413                network=utils.ip_netmask_to_cidr(vip, netmask),414                vip=vip,415                topology=CONF.controller_worker.loadbalancer_topology,416            )417            text_file.write(text)418    def write_port_interface_file(self, netns_interface, fixed_ips, mtu,419                                  interface_file_path=None,420                                  template_port=None):421        if not interface_file_path:422            interface_file_path = self.get_network_interface_file(423                netns_interface)424        if not template_port:425            template_port = j2_env.get_template(self.ETH_X_PORT_CONF)426        super(RH, self).write_port_interface_file(427            netns_interface, fixed_ips, mtu, interface_file_path,428            template_port)429        if fixed_ips:430            host_routes = []431            host_routes_ipv6 = []432            for fixed_ip in fixed_ips:433                ip_addr = fixed_ip['ip_address']434                ip = ipaddress.ip_address(ip_addr if isinstance(435                    ip_addr, six.text_type) else six.u(ip_addr))436                if ip.version == 6:437                    host_routes_ipv6.extend(self.get_host_routes(fixed_ip))438                else:439                    host_routes.extend(self.get_host_routes(fixed_ip))440            routes_interface_file_path = (441                self.get_static_routes_interface_file(netns_interface, 4))442            template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)443            self.write_static_routes_interface_file(444                routes_interface_file_path, netns_interface,445                host_routes, template_routes, None, None, None)446            routes_interface_file_path_ipv6 = (447                self.get_static_routes_interface_file(netns_interface, 6))448            template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)449            self.write_static_routes_interface_file(450                routes_interface_file_path_ipv6, netns_interface,451                host_routes_ipv6, template_routes, None, None, None)452            self._write_ifup_ifdown_local_scripts_if_possible()453    @classmethod454    def bring_interfaces_up(cls, ip, primary_interface, secondary_interface):455        if ip.version == 4:456            super(RH, cls).bring_interfaces_up(457                ip, primary_interface, secondary_interface)458        else:459            # Secondary interface is not present in IPv6 configuration460            cls._bring_if_down(primary_interface)461            cls._bring_if_up(primary_interface, 'VIP')462    def has_ifup_all(self):463        return False464    def _write_ifup_ifdown_local_scripts_if_possible(self):465        if self._check_ifup_ifdown_local_scripts_exists():466            template_ifup_local = j2_env.get_template(467                self.ETH_IFUP_LOCAL_SCRIPT)468            self.write_port_interface_if_local_scripts(template_ifup_local)469            template_ifdown_local = j2_env.get_template(470                self.ETH_IFDOWN_LOCAL_SCRIPT)471            self.write_port_interface_if_local_scripts(template_ifdown_local,472                                                       ifup=False)473    def _check_ifup_ifdown_local_scripts_exists(self):474        file_names = ['ifup-local', 'ifdown-local']475        target_dir = '/sbin/'476        res = []477        for file_name in file_names:478            if os.path.exists(os.path.join(target_dir, file_name)):479                res.append(True)480            else:481                res.append(False)482        # This means we only add the scripts when both of them are non-exists483        return not any(res)484    def write_port_interface_if_local_scripts(485            self, template_script, ifup=True):486        file_name = 'ifup' + '-local'487        mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH488        flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC489        if not ifup:490            file_name = 'ifdown' + '-local'491        with os.fdopen(492                os.open(os.path.join(493                    '/sbin/', file_name), flags, mode), 'w') as text_file:494            text = template_script.render()495            text_file.write(text)496        os.chmod(os.path.join('/sbin/', file_name), stat.S_IEXEC)497class CentOS(RH):498    PACKAGE_NAME_MAP = {'haproxy': 'haproxy18'}499    @classmethod500    def is_os_name(cls, os_name):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
