Best Python code snippet using autotest_python
package_test.py
Source:package_test.py  
1import difflib2import os3import re4from functools import partial as p5import pytest6from tests.helpers.assertions import has_datapoint7from tests.helpers.util import (8    copy_file_into_container,9    get_container_file_content,10    path_exists_in_container,11    print_lines,12    wait_for,13)14from tests.packaging.common import (15    AGENT_YAML_PATH,16    INIT_SYSTEMD,17    INIT_SYSV,18    INIT_UPSTART,19    PIDFILE_PATH,20    get_agent_logs,21    get_deb_package_to_test,22    get_rpm_package_to_test,23    is_agent_running_as_non_root,24    run_init_system_image,25)26pytestmark = pytest.mark.packaging27PACKAGE_UTIL = {".deb": "dpkg", ".rpm": "rpm"}28INIT_START_TIMEOUT = 529INIT_STOP_TIMEOUT = 1130INIT_RESTART_TIMEOUT = INIT_STOP_TIMEOUT + INIT_START_TIMEOUT31INIT_START_COMMAND = {32    INIT_SYSV: "service signalfx-agent start",33    INIT_UPSTART: "/etc/init.d/signalfx-agent start",34    INIT_SYSTEMD: "systemctl start signalfx-agent",35}36INIT_RESTART_COMMAND = {37    INIT_SYSV: "service signalfx-agent restart",38    INIT_UPSTART: "/etc/init.d/signalfx-agent restart",39    INIT_SYSTEMD: "systemctl restart signalfx-agent",40}41INIT_STOP_COMMAND = {42    INIT_SYSV: "service signalfx-agent stop",43    INIT_UPSTART: "/etc/init.d/signalfx-agent stop",44    INIT_SYSTEMD: "systemctl stop signalfx-agent",45}46INIT_LIST_COMMAND = {47    INIT_SYSV: "service --status-all",48    INIT_UPSTART: "bash -c 'chkconfig --list || service --status-all'",49    INIT_SYSTEMD: "systemctl list-unit-files --type=service --no-pager",50}51INIT_STATUS_COMMAND = {52    INIT_SYSV: "service signalfx-agent status",53    INIT_UPSTART: "/etc/init.d/signalfx-agent status",54    INIT_SYSTEMD: "systemctl status signalfx-agent",55}56INIT_STATUS_OUTPUT = {57    INIT_SYSV: {"active": "Running with pid", "inactive": "Not running"},58    INIT_UPSTART: {"active": "Running with pid", "inactive": "Not running"},59    INIT_SYSTEMD: {"active": "Active: active (running)", "inactive": "Active: inactive (dead)"},60}61def get_agent_pid(container):62    command = "pgrep -u signalfx-agent -f /usr/bin/signalfx-agent"63    code, output = container.exec_run(command)64    output = output.decode("utf-8").strip()65    if code == 0:66        assert re.match(r"\d+", output), "Unexpected output from command '%s':\n%s" % (command, output)67        return output68    return None69def agent_has_new_pid(container, old_pid):70    def _new_pid():71        pid = get_agent_pid(container)72        return pid and pid != old_pid73    return wait_for(_new_pid, timeout_seconds=INIT_RESTART_TIMEOUT)74def get_agent_yaml_diff(old_agent_yaml, new_agent_yaml):75    diff = "\n".join(76        difflib.unified_diff(77            old_agent_yaml.splitlines(),78            new_agent_yaml.splitlines(),79            fromfile="%s.orig" % AGENT_YAML_PATH,80            tofile=AGENT_YAML_PATH,81            lineterm="",82        )83    ).strip()84    return diff85def update_agent_yaml(container, backend, hostname="test-hostname"):86    def set_option(name, value):87        code, _ = container.exec_run("grep '^%s:' %s" % (name, AGENT_YAML_PATH))88        if code == 0:89            _, _ = container.exec_run("sed -i 's|^%s:.*|%s: %s|' %s" % (name, name, value, AGENT_YAML_PATH))90        else:91            _, _ = container.exec_run("bash -ec 'echo >> %s'" % AGENT_YAML_PATH)92            _, _ = container.exec_run("bash -ec 'echo \"%s: %s\" >> %s'" % (name, value, AGENT_YAML_PATH))93    assert path_exists_in_container(container, AGENT_YAML_PATH), "File %s does not exist!" % AGENT_YAML_PATH94    if hostname:95        set_option("hostname", hostname)96    ingest_url = "http://%s:%d" % (backend.ingest_host, backend.ingest_port)97    set_option("ingestUrl", ingest_url)98    api_url = "http://%s:%d" % (backend.api_host, backend.api_port)99    set_option("apiUrl", api_url)100    return get_container_file_content(container, AGENT_YAML_PATH)101def install_deps(cont, base_image):102    if "debian" in base_image or "ubuntu" in base_image:103        cmd = "sh -ec 'apt-get update && apt-get install -y libcap2-bin'"104    elif "opensuse" in base_image:105        cmd = "zypper install -y -l libcap2 libcap-progs libpcap1 shadow"106    else:107        return108    code, output = cont.exec_run(cmd)109    assert code == 0, "Failed to install dependencies:\n%s" % output.decode("utf-8")110def _test_install_package(cont, cmd):111    code, output = cont.exec_run(cmd)112    print("Output of package install:")113    print_lines(output)114    assert code == 0, "Package could not be installed!"115def _test_service_status(container, init_system, expected_status):116    _, output = container.exec_run(INIT_STATUS_COMMAND[init_system])117    print("%s status command output:" % init_system)118    print_lines(output)119    assert INIT_STATUS_OUTPUT[init_system][expected_status] in output.decode("utf-8"), (120        "'%s' expected in status output" % INIT_STATUS_OUTPUT[init_system][expected_status]121    )122def _test_service_list(container, init_system, service_name="signalfx-agent"):123    code, output = container.exec_run(INIT_LIST_COMMAND[init_system])124    print("%s list command output:" % init_system)125    print_lines(output)126    assert code == 0, "Failed to get service list"127    assert service_name in output.decode("utf-8"), "Agent service not registered"128def _test_service_start(container, init_system, backend, user="signalfx-agent"):129    code, output = container.exec_run(INIT_START_COMMAND[init_system])130    print("%s start command output:" % init_system)131    print_lines(output)132    backend.reset_datapoints()133    assert code == 0, "Agent could not be started"134    assert wait_for(p(is_agent_running_as_non_root, container, user=user), timeout_seconds=INIT_START_TIMEOUT)135    assert wait_for(p(has_datapoint, backend, metric_name="disk.utilization")), "Datapoints didn't come through"136def _test_service_restart(container, init_system, backend):137    old_pid = get_agent_pid(container)138    code, output = container.exec_run(INIT_RESTART_COMMAND[init_system])139    print("%s restart command output:" % init_system)140    print_lines(output)141    backend.reset_datapoints()142    assert code == 0, "Agent could not be restarted"143    assert wait_for(p(is_agent_running_as_non_root, container), timeout_seconds=INIT_RESTART_TIMEOUT)144    assert agent_has_new_pid(container, old_pid), "Agent pid the same after service restart"145    assert wait_for(p(has_datapoint, backend, metric_name="disk.utilization")), "Datapoints didn't come through"146def _test_service_stop(container, init_system, backend):147    code, output = container.exec_run(INIT_STOP_COMMAND[init_system])148    print("%s stop command output:" % init_system)149    print_lines(output)150    assert code == 0, "Agent could not be stop"151    assert wait_for(152        lambda: not get_agent_pid(container), timeout_seconds=INIT_STOP_TIMEOUT153    ), "Timed out waiting for agent process to stop"154    if init_system in [INIT_SYSV, INIT_UPSTART]:155        assert not path_exists_in_container(container, PIDFILE_PATH), "%s exists when agent is stopped" % PIDFILE_PATH156    backend.reset_datapoints()157def _test_system_restart(container, init_system, backend):158    print("Restarting container")159    container.stop(timeout=3)160    backend.reset_datapoints()161    container.start()162    assert wait_for(p(is_agent_running_as_non_root, container), timeout_seconds=INIT_RESTART_TIMEOUT)163    _test_service_status(container, init_system, "active")164    assert wait_for(p(has_datapoint, backend, metric_name="disk.utilization")), "Datapoints didn't come through"165def _test_service_status_redirect(container):166    code, output = container.exec_run(INIT_STATUS_COMMAND[INIT_SYSV])167    print("%s status command output:" % INIT_SYSV)168    print_lines(output)169    assert code == 0 and "/lib/systemd/system/signalfx-agent.service; enabled" in output.decode("utf-8")170def _test_agent_status(container):171    for arg in ["", "all", "monitors", "config", "endpoints"]:172        cmd = f"agent-status {arg}" if arg else "agent-status"173        print(f"Running '{cmd}' ...")174        code, output = container.exec_run(cmd)175        assert code == 0, output.decode("utf-8")176def _test_package_verify(container, package_ext, base_image):177    if package_ext == ".rpm":178        if "opensuse" in base_image:179            code, output = container.exec_run("rpm --verify --nodeps signalfx-agent")180        else:181            code, output = container.exec_run("rpm --verify signalfx-agent")182        assert code == 0, "rpm verify failed!\n%s" % output.decode("utf-8")183    elif package_ext == ".deb":184        code, output = container.exec_run("dpkg --verify signalfx-agent")185        assert code == 0, "dpkg verify failed!\n%s" % output.decode("utf-8")186def _test_service_override(container, init_system, backend):187    _test_service_stop(container, init_system, backend)188    code, output = container.exec_run(f"useradd --system --user-group --no-create-home --shell /sbin/nologin test-user")189    assert code == 0, output.decode("utf-8")190    if init_system == INIT_SYSTEMD:191        override_path = "/etc/systemd/system/signalfx-agent.service.d/override.conf"192        config = "[Service]\nUser=test-user\nGroup=test-user"193    else:194        override_path = "/etc/default/signalfx-agent"195        config = "user=test-user\ngroup=test-user"196    container.exec_run(f"mkdir -p {os.path.dirname(override_path)}")197    container.exec_run(f'''bash -c "echo -e '{config}' > {override_path}"''')198    if init_system == INIT_SYSTEMD:199        # override tmpfile with the new user/group200        tmpfile_override_path = "/etc/tmpfiles.d/signalfx-agent.conf"201        tmpfile_override_config = "D /run/signalfx-agent 0755 test-user test-user - -"202        container.exec_run(f'''bash -c "echo '{tmpfile_override_config}' > {tmpfile_override_path}"''')203        code, output = container.exec_run(f"systemd-tmpfiles --create --remove {tmpfile_override_path}")204        assert code == 0, output.decode("utf-8")205        code, output = container.exec_run("systemctl daemon-reload")206        assert code == 0, output.decode("utf-8")207    _test_service_start(container, init_system, backend, user="test-user")208    _test_service_status(container, init_system, "active")209INSTALL_COMMAND = {210    ".rpm": "yum --nogpgcheck localinstall -y /opt/signalfx-agent.rpm",211    ".deb": "dpkg -i /opt/signalfx-agent.deb",212}213def _test_package_install(base_image, package_path, init_system):214    with run_init_system_image(base_image, with_socat=False) as [cont, backend]:215        _, package_ext = os.path.splitext(package_path)216        copy_file_into_container(package_path, cont, "/opt/signalfx-agent%s" % package_ext)217        install_deps(cont, base_image)218        if "opensuse" in base_image:219            install_cmd = "rpm -ivh --nodeps /opt/signalfx-agent.rpm"220        else:221            install_cmd = INSTALL_COMMAND[package_ext]222        _test_install_package(cont, install_cmd)223        _test_package_verify(cont, package_ext, base_image)224        if init_system == INIT_SYSTEMD:225            assert not path_exists_in_container(cont, "/etc/init.d/signalfx-agent")226        else:227            assert path_exists_in_container(cont, "/etc/init.d/signalfx-agent")228        cont.exec_run("bash -ec 'echo -n testing123 > /etc/signalfx/token'")229        update_agent_yaml(cont, backend, hostname="test-" + base_image)230        try:231            _test_service_list(cont, init_system)232            _test_service_restart(cont, init_system, backend)233            _test_service_status(cont, init_system, "active")234            _test_service_stop(cont, init_system, backend)235            _test_service_status(cont, init_system, "inactive")236            _test_service_start(cont, init_system, backend)237            _test_service_status(cont, init_system, "active")238            _test_service_stop(cont, init_system, backend)239            _test_system_restart(cont, init_system, backend)240            if init_system == INIT_SYSTEMD:241                _test_service_status_redirect(cont)242            _test_agent_status(cont)243            _test_service_override(cont, init_system, backend)244        finally:245            cont.reload()246            if cont.status.lower() == "running":247                print("Agent service status:")248                print_lines(cont.exec_run(INIT_STATUS_COMMAND[init_system]).output)249                print("Agent log:")250                print_lines(get_agent_logs(cont, init_system))251OLD_RPM_URL = "https://dl.signalfx.com/rpms/signalfx-agent/archive/release/signalfx-agent-3.0.1-1.x86_64.rpm"252OLD_RPM_NAME = OLD_RPM_URL.rsplit("/", maxsplit=1)[-1]253OLD_SUSE_RPM_URL = "https://splunk.jfrog.io/splunk/signalfx-agent-rpm/release/signalfx-agent-4.7.7-1.x86_64.rpm"254OLD_SUSE_RPM_NAME = OLD_SUSE_RPM_URL.rsplit("/", maxsplit=1)[-1]255OLD_DEB_URL = "https://dl.signalfx.com/debs/signalfx-agent/archive/release/signalfx-agent_3.0.1-1_amd64.deb"256OLD_DEB_NAME = OLD_DEB_URL.rsplit("/", maxsplit=1)[-1]257OLD_INSTALL_COMMAND = {258    ".rpm": f"yum install -y {OLD_RPM_URL}",259    ".deb": f"bash -ec 'wget -nv {OLD_DEB_URL} && dpkg -i {OLD_DEB_NAME}'",260}261UPGRADE_COMMAND = {262    ".rpm": "yum --nogpgcheck update -y /opt/signalfx-agent.rpm",263    ".deb": "dpkg -i --force-confold /opt/signalfx-agent.deb",264}265def _test_package_upgrade(base_image, package_path, init_system):266    with run_init_system_image(base_image, with_socat=False) as [cont, backend]:267        _, package_ext = os.path.splitext(package_path)268        copy_file_into_container(package_path, cont, "/opt/signalfx-agent%s" % package_ext)269        install_deps(cont, base_image)270        if "opensuse" in base_image:271            install_cmd = f"bash -ec 'wget -nv {OLD_SUSE_RPM_URL} && rpm -ivh --nodeps {OLD_SUSE_RPM_NAME}'"272            upgrade_cmd = "rpm -Uvh --nodeps /opt/signalfx-agent.rpm"273        else:274            install_cmd = OLD_INSTALL_COMMAND[package_ext]275            upgrade_cmd = UPGRADE_COMMAND[package_ext]276        _test_install_package(cont, install_cmd)277        cont.exec_run("bash -ec 'echo -n testing123 > /etc/signalfx/token'")278        old_agent_yaml = update_agent_yaml(cont, backend, hostname="test-" + base_image)279        _, _ = cont.exec_run("cp -f %s %s.orig" % (AGENT_YAML_PATH, AGENT_YAML_PATH))280        code, output = cont.exec_run(upgrade_cmd)281        print("Output of package upgrade:")282        print_lines(output)283        assert code == 0, "Package could not be upgraded!"284        _test_package_verify(cont, package_ext, base_image)285        if init_system == INIT_SYSTEMD:286            assert not path_exists_in_container(cont, "/etc/init.d/signalfx-agent")287        else:288            assert path_exists_in_container(cont, "/etc/init.d/signalfx-agent")289        new_agent_yaml = get_container_file_content(cont, AGENT_YAML_PATH)290        diff = get_agent_yaml_diff(old_agent_yaml, new_agent_yaml)291        assert not diff, "%s different after upgrade!\n%s" % (AGENT_YAML_PATH, diff)292        try:293            _test_service_list(cont, init_system)294            _test_service_restart(cont, init_system, backend)295            _test_service_status(cont, init_system, "active")296            _test_service_stop(cont, init_system, backend)297            _test_service_status(cont, init_system, "inactive")298            _test_service_start(cont, init_system, backend)299            _test_service_status(cont, init_system, "active")300            _test_service_stop(cont, init_system, backend)301            _test_system_restart(cont, init_system, backend)302            if init_system == INIT_SYSTEMD:303                _test_service_status_redirect(cont)304            _test_agent_status(cont)305            _test_service_override(cont, init_system, backend)306        finally:307            cont.reload()308            if cont.status.lower() == "running":309                print("Agent service status:")310                print_lines(cont.exec_run(INIT_STATUS_COMMAND[init_system]).output)311                print("Agent log:")312                print_lines(get_agent_logs(cont, init_system))313@pytest.mark.rpm314@pytest.mark.parametrize(315    "base_image,init_system",316    [317        ("amazonlinux1", INIT_UPSTART),318        ("amazonlinux2", INIT_SYSTEMD),319        ("centos7", INIT_SYSTEMD),320        ("centos8", INIT_SYSTEMD),321        ("opensuse15", INIT_SYSTEMD),322    ],323)324def test_rpm_package(base_image, init_system):325    _test_package_install(base_image, get_rpm_package_to_test(), init_system)326@pytest.mark.deb327@pytest.mark.parametrize(328    "base_image,init_system",329    [330        ("debian-8-jessie", INIT_SYSTEMD),331        ("debian-9-stretch", INIT_SYSTEMD),332        ("ubuntu1404", INIT_UPSTART),333        ("ubuntu1604", INIT_SYSTEMD),334        ("ubuntu1804", INIT_SYSTEMD),335    ],336)337def test_deb_package(base_image, init_system):338    _test_package_install(base_image, get_deb_package_to_test(), init_system)339@pytest.mark.rpm340@pytest.mark.upgrade341@pytest.mark.parametrize(342    "base_image,init_system",343    [344        ("amazonlinux1", INIT_UPSTART),345        ("amazonlinux2", INIT_SYSTEMD),346        ("centos7", INIT_SYSTEMD),347        ("centos8", INIT_SYSTEMD),348        ("opensuse15", INIT_SYSTEMD),349    ],350)351def test_rpm_package_upgrade(base_image, init_system):352    _test_package_upgrade(base_image, get_rpm_package_to_test(), init_system)353@pytest.mark.deb354@pytest.mark.upgrade355@pytest.mark.parametrize(356    "base_image,init_system",357    [358        ("debian-8-jessie", INIT_SYSTEMD),359        ("debian-9-stretch", INIT_SYSTEMD),360        ("ubuntu1404", INIT_UPSTART),361        ("ubuntu1604", INIT_SYSTEMD),362        ("ubuntu1804", INIT_SYSTEMD),363    ],364)365def test_deb_package_upgrade(base_image, init_system):...keepalivedlvs.py
Source:keepalivedlvs.py  
...76            b = stream.read(BUFFER)77            while b:78                f.write(b)79                b = stream.read(BUFFER)80        init_system = util.get_os_init_system()81        file_path = util.keepalived_lvs_init_path(init_system, listener_id)82        if init_system == consts.INIT_SYSTEMD:83            template = SYSTEMD_TEMPLATE84            # Render and install the network namespace systemd service85            util.install_netns_systemd_service()86            util.run_systemctl_command(87                consts.ENABLE, consts.AMP_NETNS_SVC_PREFIX)88        elif init_system == consts.INIT_UPSTART:89            template = UPSTART_TEMPLATE90        elif init_system == consts.INIT_SYSVINIT:91            template = SYSVINIT_TEMPLATE92        else:93            raise util.UnknownInitError()94        # Render and install the keepalivedlvs init script95        if init_system == consts.INIT_SYSTEMD:96            # mode 0064497            mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH98        else:99            # mode 00755100            mode = (stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP |101                    stat.S_IROTH | stat.S_IXOTH)102        keepalived_pid, vrrp_pid, check_pid = util.keepalived_lvs_pids_path(103            listener_id)104        if not os.path.exists(file_path):105            with os.fdopen(os.open(file_path, flags, mode), 'w') as text_file:106                text = template.render(107                    keepalived_pid=keepalived_pid,108                    vrrp_pid=vrrp_pid,109                    check_pid=check_pid,110                    keepalived_cmd=consts.KEEPALIVED_CMD,111                    keepalived_cfg=util.keepalived_lvs_cfg_path(listener_id),112                    amphora_nsname=consts.AMPHORA_NAMESPACE,113                    amphora_netns=consts.AMP_NETNS_SVC_PREFIX114                )115                text_file.write(text)116        # Make sure the keepalivedlvs service is enabled on boot117        if init_system == consts.INIT_SYSTEMD:118            util.run_systemctl_command(119                consts.ENABLE, "octavia-keepalivedlvs-%s" % str(listener_id))120        elif init_system == consts.INIT_SYSVINIT:121            init_enable_cmd = "insserv {file}".format(file=file_path)122            try:123                subprocess.check_output(init_enable_cmd.split(),124                                        stderr=subprocess.STDOUT)125            except subprocess.CalledProcessError as e:126                LOG.debug('Failed to enable '127                          'octavia-keepalivedlvs service: '128                          '%(err)s', {'err': e})129                return webob.Response(json=dict(130                    message="Error enabling "131                            "octavia-keepalivedlvs service",132                    details=e.output), status=500)133        if NEED_CHECK:134            # inject the check script for keepalived process135            script_path = os.path.join(util.keepalived_check_scripts_dir(),136                                       KEEPALIVED_CHECK_SCRIPT_NAME)137            if not os.path.exists(script_path):138                with os.fdopen(os.open(script_path, flags, stat.S_IEXEC),139                               'w') as script_file:140                    text = check_script_file_template.render(141                        consts=consts,142                        init_system=init_system,143                        keepalived_lvs_pid_dir=util.keepalived_lvs_dir()144                    )145                    script_file.write(text)146        res = webob.Response(json={'message': 'OK'}, status=200)147        res.headers['ETag'] = stream.get_md5()148        return res149    def _check_udp_listener_exists(self, listener_id):150        if not os.path.exists(util.keepalived_lvs_cfg_path(listener_id)):151            raise exceptions.HTTPException(152                response=webob.Response(json=dict(153                    message='UDP Listener Not Found',154                    details="No UDP listener with UUID: {0}".format(155                        listener_id)), status=404))156    def get_udp_listener_config(self, listener_id):157        """Gets the keepalivedlvs config158        :param listener_id: the id of the listener159        """160        self._check_udp_listener_exists(listener_id)161        with open(util.keepalived_lvs_cfg_path(listener_id), 'r') as file:162            cfg = file.read()163            resp = webob.Response(cfg, content_type='text/plain')164            return resp165    def manage_udp_listener(self, listener_id, action):166        action = action.lower()167        if action not in [consts.AMP_ACTION_START,168                          consts.AMP_ACTION_STOP,169                          consts.AMP_ACTION_RELOAD]:170            return webob.Response(json=dict(171                message='Invalid Request',172                details="Unknown action: {0}".format(action)), status=400)173        self._check_udp_listener_exists(listener_id)174        if action == consts.AMP_ACTION_RELOAD:175            if consts.OFFLINE == self._check_udp_listener_status(listener_id):176                action = consts.AMP_ACTION_START177        cmd = ("/usr/sbin/service "178               "octavia-keepalivedlvs-{listener_id} "179               "{action}".format(listener_id=listener_id, action=action))180        try:181            subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)182        except subprocess.CalledProcessError as e:183            LOG.debug('Failed to %s keepalivedlvs listener %s',184                      listener_id + ' : ' + action, e)185            return webob.Response(json=dict(186                message=("Failed to {0} keepalivedlvs listener {1}"187                         .format(action, listener_id)),188                details=e.output), status=500)189        return webob.Response(190            json=dict(message='OK',191                      details='keepalivedlvs listener {listener_id}'192                              '{action}ed'.format(listener_id=listener_id,193                                                  action=action)),194            status=202)195    def _check_udp_listener_status(self, listener_id):196        if os.path.exists(util.keepalived_lvs_pids_path(listener_id)[0]):197            if os.path.exists(os.path.join(198                    '/proc', util.get_keepalivedlvs_pid(listener_id))):199                # Check if the listener is disabled200                with open(util.keepalived_lvs_cfg_path(listener_id),201                          'r') as file:202                    cfg = file.read()203                    m = re.search('virtual_server', cfg)204                    if m:205                        return consts.ACTIVE206                    return consts.OFFLINE207            return consts.ERROR208        return consts.OFFLINE209    def get_all_udp_listeners_status(self):210        """Gets the status of all UDP listeners211        Gets the status of all UDP listeners on the amphora.212        """213        listeners = list()214        for udp_listener in util.get_udp_listeners():215            status = self._check_udp_listener_status(udp_listener)216            listeners.append({217                'status': status,218                'uuid': udp_listener,219                'type': 'UDP',220            })221        return listeners222    def get_udp_listener_status(self, listener_id):223        """Gets the status of a UDP listener224        This method will consult the stats socket225        so calling this method will interfere with226        the health daemon with the risk of the amphora227        shut down228        :param listener_id: The id of the listener229        """230        self._check_udp_listener_exists(listener_id)231        status = self._check_udp_listener_status(listener_id)232        if status != consts.ACTIVE:233            stats = dict(234                status=status,235                uuid=listener_id,236                type='UDP'237            )238            return webob.Response(json=stats)239        stats = dict(240            status=status,241            uuid=listener_id,242            type='UDP'243        )244        try:245            pool = keepalivedlvs_query.get_udp_listener_pool_status(246                listener_id)247        except subprocess.CalledProcessError as e:248            return webob.Response(json=dict(249                message="Error getting kernel lvs status for udp listener "250                        "{}".format(listener_id),251                details=e.output), status=500)252        stats['pools'] = [pool]253        return webob.Response(json=stats)254    def delete_udp_listener(self, listener_id):255        try:256            self._check_udp_listener_exists(listener_id)257        except exceptions.HTTPException:258            return webob.Response(json={'message': 'OK'})259        # check if that keepalived is still running and if stop it260        keepalived_pid, vrrp_pid, check_pid = util.keepalived_lvs_pids_path(261            listener_id)262        if os.path.exists(keepalived_pid) and os.path.exists(263                os.path.join('/proc',264                             util.get_keepalivedlvs_pid(listener_id))):265            cmd = ("/usr/sbin/service "266                   "octavia-keepalivedlvs-{0} stop".format(listener_id))267            try:268                subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)269            except subprocess.CalledProcessError as e:270                LOG.error("Failed to stop keepalivedlvs service: %s", e)271                return webob.Response(json=dict(272                    message="Error stopping keepalivedlvs",273                    details=e.output), status=500)274        # Since the lvs check script based on the keepalived pid file for275        # checking whether it is alived. So here, we had stop the keepalived276        # process by the previous step, must make sure the pid files are not277        # exist.278        if (os.path.exists(keepalived_pid) or279                os.path.exists(vrrp_pid) or os.path.exists(check_pid)):280            for pid in [keepalived_pid, vrrp_pid, check_pid]:281                os.remove(pid)282        # disable the service283        init_system = util.get_os_init_system()284        init_path = util.keepalived_lvs_init_path(init_system, listener_id)285        if init_system == consts.INIT_SYSTEMD:286            util.run_systemctl_command(287                consts.DISABLE, "octavia-keepalivedlvs-%s" % str(listener_id))288        elif init_system == consts.INIT_SYSVINIT:289            init_disable_cmd = "insserv -r {file}".format(file=init_path)290        elif init_system != consts.INIT_UPSTART:291            raise util.UnknownInitError()292        if init_system == consts.INIT_SYSVINIT:293            try:294                subprocess.check_output(init_disable_cmd.split(),295                                        stderr=subprocess.STDOUT)296            except subprocess.CalledProcessError as e:297                LOG.error("Failed to disable "...util.py
Source:util.py  
...139def is_udp_listener_running(listener_id):140    pid_file = keepalived_lvs_pids_path(listener_id)[0]141    return os.path.exists(pid_file) and os.path.exists(142        os.path.join('/proc', get_keepalivedlvs_pid(listener_id)))143def get_os_init_system():144    if os.path.exists(consts.INIT_PROC_COMM_PATH):145        with open(consts.INIT_PROC_COMM_PATH, 'r') as init_comm:146            init_proc_name = init_comm.read().rstrip('\n')147            if init_proc_name == consts.INIT_SYSTEMD:148                return consts.INIT_SYSTEMD149            if init_proc_name == 'init':150                init_path = consts.INIT_PATH151                if os.path.exists(init_path):152                    args = [init_path, '--version']153                    init_version = subprocess.check_output(args, shell=False)154                    if consts.INIT_UPSTART in str(init_version, 'utf-8'):155                        return consts.INIT_UPSTART156                    return consts.INIT_SYSVINIT157    return consts.INIT_UNKOWN...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
