How to use can_sudo method in avocado

...22 Common setUp/tearDown for partition tests23 """24 @unittest.skipIf(not os.path.isfile('/proc/mounts'),25 'system does not have /proc/mounts')26 @unittest.skipIf(not process.can_sudo('mount'),27 'current user must be allowed to run "mount" under sudo')28 @unittest.skipIf(not process.can_sudo('mkfs.ext2 -V'),29 'current user must be allowed to run "mkfs.ext2" under '30 'sudo')31 @unittest.skipIf(os.getenv('TRAVIS') and32 os.getenv('TRAVIS_CPU_ARCH') in ['arm64', 'ppc64le', 's390x'],33 'TRAVIS Environment is unsuitable for these tests')34 def setUp(self):35 prefix = temp_dir_prefix(__name__, self, 'setUp')36 self.tmpdir = tempfile.TemporaryDirectory(prefix=prefix)37 self.mountpoint = os.path.join(, "disk")38 os.mkdir(self.mountpoint)39 self.disk = partition.Partition(os.path.join(, "block"), 1,40 self.mountpoint)41 def tearDown(self):42 self.disk.unmount()43 self.tmpdir.cleanup()44class TestPartition(Base):45 def test_basic(self):46 """ Test the basic workflow """47 self.assertIsNone(self.disk.get_mountpoint())48 self.disk.mkfs()49 self.disk.mount()50 with open("/proc/mounts") as proc_mounts_file:51 proc_mounts = self.assertIn(self.mountpoint, proc_mounts)53 self.assertEqual(self.mountpoint, self.disk.get_mountpoint())54 self.disk.unmount()55 with open("/proc/mounts") as proc_mounts_file:56 proc_mounts = self.assertNotIn(self.mountpoint, proc_mounts)58class TestPartitionMkfsMount(Base):59 """60 Tests that assume a filesystem and mounted partition61 """62 def setUp(self):63 super(TestPartitionMkfsMount, self).setUp()64 self.disk.mkfs()65 self.disk.mount()66 self.use_mnt_file = os.path.join(self.mountpoint, 'file')67 self.use_mnt_cmd = ("%s -c 'import time; f = open(\"%s\", \"w\"); "68 "time.sleep(60)'" % (sys.executable,69 self.use_mnt_file))70 def run_process_to_use_mnt(self):71 proc = process.SubProcess(self.use_mnt_cmd, sudo=True)72 proc.start()73 self.assertTrue(wait.wait_for(lambda: os.path.exists(self.use_mnt_file),74 timeout=1, first=0.1, step=0.1),75 "File was not created within mountpoint")76 return proc77 @unittest.skipIf(missing_binary('lsof'), "requires running lsof")78 @unittest.skipIf(not process.can_sudo(sys.executable + " -c ''"),79 "requires running Python as a privileged user")80 @unittest.skipIf(not process.can_sudo('kill -l'),81 "requires running kill as a privileged user")82 def test_force_unmount(self):83 """ Test force-unmount feature """84 with open("/proc/mounts") as proc_mounts_file:85 proc_mounts = self.assertIn(self.mountpoint, proc_mounts)87 proc = self.run_process_to_use_mnt()88 self.assertTrue(self.disk.unmount())89 # Process should return -9, or when sudo is used90 # return code is 13791 self.assertIn(proc.poll(), [-9, 137],92 "Unexpected return code when trying to kill process "93 "using the mountpoint")94 with open("/proc/mounts") as proc_mounts_file:95 proc_mounts = self.assertNotIn(self.mountpoint, proc_mounts)97 @unittest.skipIf(not process.can_sudo(sys.executable + " -c ''"),98 "requires running Python as a privileged user")99 @unittest.skipUnless(missing_binary('lsof'), "requires not having lsof")100 def test_force_unmount_no_lsof(self):101 """ Checks that a force-unmount will fail on systems without lsof """102 with open("/proc/mounts") as proc_mounts_file:103 proc_mounts = self.assertIn(self.mountpoint, proc_mounts)105 proc = self.run_process_to_use_mnt()106 self.assertRaises(partition.PartitionError, self.disk.unmount)107 proc.wait(timeout=1)108 @unittest.skipIf(not process.can_sudo(sys.executable + " -c ''"),109 "requires running Python as a privileged user")110 def test_force_unmount_get_pids_fail(self):111 """ Checks PartitionError is raised if there's no lsof to get pids """112 with open("/proc/mounts") as proc_mounts_file:113 proc_mounts = self.assertIn(self.mountpoint, proc_mounts)115 proc = self.run_process_to_use_mnt()116 with unittest.mock.patch('',117 side_effect=process.CmdError):118 with unittest.mock.patch('avocado.utils.partition.process.system_output',119 side_effect=OSError) as mocked_system_output:120 self.assertRaises(partition.PartitionError, self.disk.unmount)121 mocked_system_output.assert_called_with('lsof ' + self.mountpoint,122 sudo=True)...

1# coding: utf-82# OceanBase Deploy.3# Copyright (C) 2021 OceanBase4#5# This file is part of OceanBase Deploy.6#7# OceanBase Deploy is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# OceanBase Deploy is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with OceanBase Deploy. If not, see <>.19from __future__ import absolute_import, division, print_function20from _rpm import Version21def ocp_check(plugin_context, ocp_version, cursor, new_cluster_config=None, new_clients=None, *args, **kwargs):22 cluster_config = new_cluster_config if new_cluster_config else plugin_context.cluster_config23 clients = new_clients if new_clients else plugin_context.clients24 stdio = plugin_context.stdio25 26 is_admin = True27 can_sudo = True28 only_one = True29 30 min_version = Version('3.1.1')31 max_version = min_version32 ocp_version = Version(ocp_version)33 if ocp_version < min_version:34 stdio.error('The current plugin version does not support OCP V%s' % ocp_version)35 return36 if ocp_version > max_version:37 stdio.warn('The plugin library does not support OCP V%s. The takeover requirements are not applicable to the current check.' % ocp_version)38 for server in cluster_config.servers:39 client = clients[server]40 if is_admin and client.config.username != 'admin':41 is_admin = False42 stdio.error('The current user must be the admin user. Run the edit-config command to modify the user.username field')43 if can_sudo and not client.execute_command('sudo whoami'):44 can_sudo = False45 stdio.error('The user must have the privilege to run sudo commands without a password.')46 if not client.execute_command('bash -c "if [ `pgrep obproxy | wc -l` -gt 1 ]; then exit 1; else exit 0;fi;"'):47 only_one = False48 stdio.error('%s Multiple OBProxies exist.' % server)49 if is_admin and can_sudo and only_one:50 stdio.print('Configurations of the OBProxy can be taken over by OCP after they take effect.' if new_cluster_config else 'Configurations of the OBProxy can be taken over by OCP.')...

