How to use flush_path method in avocado

Best Python code snippet using avocado_python Github


Full Screen

1"""2MDS admin socket scrubbing-related tests.3"""4import json5import logging6import errno7import time8from teuthology.exceptions import CommandFailedError9import os10from tasks.cephfs.cephfs_test_case import CephFSTestCase11log = logging.getLogger(__name__)12class TestScrubChecks(CephFSTestCase):13 """14 Run flush and scrub commands on the specified files in the filesystem. This15 task will run through a sequence of operations, but it is not comprehensive16 on its own -- it doesn't manipulate the mds cache state to test on both17 in- and out-of-memory parts of the hierarchy. So it's designed to be run18 multiple times within a single test run, so that the test can manipulate19 memory state.20 Usage:21 mds_scrub_checks:22 mds_rank: 023 path: path/to/test/dir24 client: 025 run_seq: [0-9]+26 Increment the run_seq on subsequent invocations within a single test run;27 it uses that value to generate unique folder and file names.28 """29 MDSS_REQUIRED = 130 CLIENTS_REQUIRED = 131 def test_scrub_checks(self):32 self._checks(0)33 self._checks(1)34 def _checks(self, run_seq):35 mds_rank = 036 test_dir = "scrub_test_path"37 abs_test_path = "/{0}".format(test_dir)38"mountpoint: {0}".format(self.mount_a.mountpoint))39 client_path = os.path.join(self.mount_a.mountpoint, test_dir)40"client_path: {0}".format(client_path))41"Cloning repo into place")42 repo_path = self.clone_repo(self.mount_a, client_path)43"Initiating mds_scrub_checks on mds.{id_}, " +44 "test_path {path}, run_seq {seq}".format(45 id_=mds_rank, path=abs_test_path, seq=run_seq)46 )47 success_validator = lambda j, r: self.json_validator(j, r, "return_code", 0)48 nep = "{test_path}/i/dont/exist".format(test_path=abs_test_path)49 self.asok_command(mds_rank, "flush_path {nep}".format(nep=nep),50 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))51 self.asok_command(mds_rank, "scrub_path {nep}".format(nep=nep),52 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))53 test_repo_path = "{test_path}/ceph-qa-suite".format(test_path=abs_test_path)54 dirpath = "{repo_path}/suites".format(repo_path=test_repo_path)55 if run_seq == 0:56"First run: flushing {dirpath}".format(dirpath=dirpath))57 command = "flush_path {dirpath}".format(dirpath=dirpath)58 self.asok_command(mds_rank, command, success_validator)59 command = "scrub_path {dirpath}".format(dirpath=dirpath)60 self.asok_command(mds_rank, command, success_validator)61 filepath = "{repo_path}/suites/fs/verify/validater/valgrind.yaml".format(62 repo_path=test_repo_path)63 if run_seq == 0:64"First run: flushing {filepath}".format(filepath=filepath))65 command = "flush_path {filepath}".format(filepath=filepath)66 self.asok_command(mds_rank, command, success_validator)67 command = "scrub_path {filepath}".format(filepath=filepath)68 self.asok_command(mds_rank, command, success_validator)69 filepath = "{repo_path}/suites/fs/basic/clusters/fixed-3-cephfs.yaml". \70 format(repo_path=test_repo_path)71 command = "scrub_path {filepath}".format(filepath=filepath)72 self.asok_command(mds_rank, command,73 lambda j, r: self.json_validator(j, r, "performed_validation",74 False))75 if run_seq == 0:76"First run: flushing base dir /")77 command = "flush_path /"78 self.asok_command(mds_rank, command, success_validator)79 command = "scrub_path /"80 self.asok_command(mds_rank, command, success_validator)81 new_dir = "{repo_path}/new_dir_{i}".format(repo_path=repo_path, i=run_seq)82 test_new_dir = "{repo_path}/new_dir_{i}".format(repo_path=test_repo_path,83 i=run_seq)84 self.mount_a.run_shell(["mkdir", new_dir])85 command = "flush_path {dir}".format(dir=test_new_dir)86 self.asok_command(mds_rank, command, success_validator)87 new_file = "{repo_path}/new_file_{i}".format(repo_path=repo_path,88 i=run_seq)89 test_new_file = "{repo_path}/new_file_{i}".format(repo_path=test_repo_path,90 i=run_seq)91 self.mount_a.write_n_mb(new_file, 1)92 command = "flush_path {file}".format(file=test_new_file)93 self.asok_command(mds_rank, command, success_validator)94 # check that scrub fails on errors95 ino = self.mount_a.path_to_ino(new_file)96 rados_obj_name = "{ino:x}.00000000".format(ino=ino)97 command = "scrub_path {file}".format(file=test_new_file)98 # Missing parent xattr -> ENODATA99 self.fs.rados(["rmxattr", rados_obj_name, "parent"], pool=self.fs.get_data_pool_name())100 self.asok_command(mds_rank, command,101 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENODATA))102 # Missing object -> ENOENT103 self.fs.rados(["rm", rados_obj_name], pool=self.fs.get_data_pool_name())104 self.asok_command(mds_rank, command,105 lambda j, r: self.json_validator(j, r, "return_code", -errno.ENOENT))106 command = "flush_path /"107 self.asok_command(mds_rank, command, success_validator)108 def test_scrub_repair(self):109 mds_rank = 0110 test_dir = "scrub_repair_path"111 self.mount_a.run_shell(["sudo", "mkdir", test_dir])112 self.mount_a.run_shell(["sudo", "touch", "{0}/file".format(test_dir)])113 dir_objname = "{:x}.00000000".format(self.mount_a.path_to_ino(test_dir))114 self.mount_a.umount_wait()115 # flush journal entries to dirfrag objects, and expire journal116 self.fs.mds_asok(['flush', 'journal'])117 self.fs.mds_stop()118 # remove the dentry from dirfrag, cause incorrect fragstat/rstat119 self.fs.rados(["rmomapkey", dir_objname, "file_head"],120 pool=self.fs.get_metadata_pool_name())121 self.fs.mds_fail_restart()122 self.fs.wait_for_daemons()123 self.mount_a.mount()124 self.mount_a.wait_until_mounted()125 # fragstat indicates the directory is not empty, rmdir should fail126 with self.assertRaises(CommandFailedError) as ar:127 self.mount_a.run_shell(["sudo", "rmdir", test_dir])128 self.assertEqual(ar.exception.exitstatus, 1)129 self.asok_command(mds_rank, "scrub_path /{0} repair".format(test_dir),130 lambda j, r: self.json_validator(j, r, "return_code", 0))131 # wait a few second for background repair132 time.sleep(10)133 # fragstat should be fixed134 self.mount_a.run_shell(["sudo", "rmdir", test_dir])135 @staticmethod136 def json_validator(json_out, rc, element, expected_value):137 if rc != 0:138 return False, "asok command returned error {rc}".format(rc=rc)139 element_value = json_out.get(element)140 if element_value != expected_value:141 return False, "unexpectedly got {jv} instead of {ev}!".format(142 jv=element_value, ev=expected_value)143 return True, "Succeeded"144 def asok_command(self, mds_rank, command, validator):145"Running command '{command}'".format(command=command))146 command_list = command.split()147 # we just assume there's an active mds for every rank148 mds_id = self.fs.get_active_names()[mds_rank]149 proc = self.fs.mon_manager.admin_socket('mds', mds_id,150 command_list, check_status=False)151 rout = proc.exitstatus152 sout = proc.stdout.getvalue()153 if sout.strip():154 jout = json.loads(sout)155 else:156 jout = None157"command '{command}' got response code " +158 "'{rout}' and stdout '{sout}'".format(159 command=command, rout=rout, sout=sout))160 success, errstring = validator(jout, rout)161 if not success:162 raise AsokCommandFailedError(command, rout, jout, errstring)163 return jout164 def clone_repo(self, client_mount, path):165 repo = "ceph-qa-suite"166 repo_path = os.path.join(path, repo)167 client_mount.run_shell(["mkdir", "-p", path])168 try:169 client_mount.stat(repo_path)170 except CommandFailedError:171 client_mount.run_shell([172 "git", "clone", '--branch', 'giant',173 "{repo}".format(repo=repo),174 "{path}/{repo}".format(path=path, repo=repo)175 ])176 return repo_path177class AsokCommandFailedError(Exception):178 """179 Exception thrown when we get an unexpected response180 on an admin socket command181 """182 def __init__(self, command, rc, json_out, errstring):183 self.command = command184 self.rc = rc185 self.json = json_out186 self.errstring = errstring187 def __str__(self):188 return "Admin socket: {command} failed with rc={rc}," + \189 "json output={json}, because '{es}'".format(190 command=self.command, rc=self.rc,...

Full Screen

Full Screen Github


Full Screen

1import io2import logging3import os4import pickle as pickle5import resource6import sys7from time import time8import psutil9def log_stats(cmd=None, flush_file=None, flush_path=None):10 """11 Dump some statistics (system, timers, etc) at the end of the run.12 :param cmd:13 :param flush_file:14 :param flush_path:15 :return:16 """17'************************************************************')18'************************ JOB STATS *************************')19'************************************************************')20 if cmd:21'Calling command: {}'.format(cmd))22 # system23'')24'System & Resources:')25 memory = {26 'psutil': None if 'psutil' not in sys.modules else memory_usage_psutil(),27 'peak-total-resource': memory_usage_resource(),28 'peak-self-resource': memory_usage_resource(with_children=False),29 }30'\t\tMemory usage (MB, psutil): {}'.format(memory['psutil']))31'\t\tPeak total memory usage (MB): {}'.format(memory['peak-total-resource']))32'\t\tPeak (self) memory usage (MB): {}'.format(memory['peak-self-resource']))33 # timing34'')35'Timers:')36 global log_timer37 for name, timer in log_timer.get_all_timers().items():38'\t\t{}: {} s'.format(name.capitalize(), timer['duration']))39 # flush to a main file40 if flush_file and flush_path:41 global main_log42 try:43 os.makedirs(flush_path)44 except OSError:45 if not os.path.isdir(flush_path):46 raise47 with open(os.path.join(flush_path, flush_file), 'w') as f:48 f.write(main_log.getvalue())49 with open(os.path.join(flush_path, flush_file.replace('.log', '_timers.pkl')), 'wb') as f:50 pickle.dump(log_timer, f)51 with open(os.path.join(flush_path, flush_file.replace('.log', '_memory.pkl')), 'wb') as f:52 pickle.dump(memory, f)53def update_log_handles(main=True, job_name=None, path=None):54 """55 Update log streams / files (including paths) according to how the program is run. If running from,56 we buffer all output to `main_log` and will flush to file at the end. If this is a job (on a cluster), write57 directly to a file.58 :param main: if running from, log to buffer and flush later59 :param job_name:60 :param path:61 :return:62 """63 if main:64 global main_log65 # logging.basicConfig(stream=main_log, level=logging.INFO)66 handler = logging.StreamHandler(main_log)67 else:68 handler = logging.FileHandler('{}/job_{}.log'.format(path, job_name))69 handler_format = logging.Formatter('[%(name)s - %(levelname)s] %(message)s')70 handler.setFormatter(handler_format)71 handler.setLevel(logging.INFO)72 for name, logger_ in logging.Logger.manager.loggerDict.items():73 # whether to log into the main file (always, unless it's a submitted job on a cluster74 if main:75 if isinstance(logger_, logging.Logger):76 logger_.addHandler(handler)77 else:78 logger_.addHandler(handler)79 logger_.propagate = False80 global run_local81 run_local = False82def get_logger(name):83 """84 Initialize a new logger called `name`.85 :param name: Logger name86 :return: logging.Logger object87 """88 logging.basicConfig(format='[%(filename)s:%(lineno)d - %(levelname)s] %(message)s'.format(name),89 level=logging.INFO)90 logger_ = logging.getLogger(name)91 global main_log_handler92 logger_.addHandler(main_log_handler)93 return logger_94def memory_usage_psutil():95 process = psutil.Process(os.getpid())96 mem = process.memory_info()[0] / float(2 ** 20)97 return mem98def memory_usage_resource(with_children=True):99 self = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.100 children = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss / 1024.101 if with_children:102 return self + children103 else:104 return self105main_log = io.StringIO()106main_log_handler = logging.StreamHandler(main_log)107# initialize logging buffer as a stream handler108main_log_handler.setFormatter(logging.Formatter('[%(name)s:%(lineno)d - %(levelname)s] %(message)s'))109main_log_handler.setLevel(logging.INFO)110class Timer:111 """112 Time class113 """114 def __init__(self):115 self.timers = {}116 def start(self, name):117 """118 Starts a new timer at the current timepoint. If the timer already exists, the start time will be overwritten,119 otherwise a new timer entry is created.120 :param name:121 :return:122 """123 self.timers[name] = {124 'start': time(),125 'stop': 0.,126 'duration': 0.127 }128 def restart(self, name):129 """130 Resets the start time of an existing timer to the current timepoint and sets the stop time to None. If the timer131 doesn't yet exist, simply creates a new one by calling self.start().132 :param name:133 :return:134 """135 if name not in self.timers:136 self.start(name)137 return138 self.timers[name]['start'] = time()139 self.timers[name]['stop'] = None140 def stop(self, name):141 if name in self.timers:142 self.timers[name]['stop'] = time()143 self.timers[name]['duration'] = time() - self.timers[name]['start']144 def accumulate(self, name):145 if name in self.timers:146 self.timers[name]['stop'] = time()147 self.timers[name]['duration'] += time() - self.timers[name]['start']148 def get_all_timers(self):149 return self.timers150log_timer = Timer()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?