How to use get_path_status method in avocado

Best Python code snippet using avocado_python

multipath.py

Source:multipath.py Github

copy

Full Screen

...160 if not process.system("multipath -c /dev/%s" % disk_path, sudo=True,161 ignore_status=True, shell=True):162 return True163 return False164def get_path_status(disk_path):165 """166 Return the status of a path in multipath.167 :param disk_path: disk path. Example: sda, sdb.168 :return: Tuple in the format of (dm status, dev status, checker status)169 """170 mpath_op = get_multipath_details()171 if not mpath_op:172 return ('', '', '')173 for maps in mpath_op['maps']:174 for path_groups in maps['path_groups']:175 for paths in path_groups['paths']:176 if paths['dev'] == disk_path:177 return(paths['dm_st'], paths['dev_st'], paths['chk_st'])178def fail_path(path):179 """180 Fail the individual paths.181 :param str path: disk path. Example: sda, sdb.182 :return: True if succeeded, False otherwise183 :rtype: bool184 """185 def is_failed():186 path_stat = get_path_status(path)187 if path_stat[0] == 'failed' and path_stat[2] == 'faulty':188 return True189 return False190 cmd = 'multipathd -k"fail path %s"' % path191 if process.system(cmd) == 0:192 return wait.wait_for(is_failed, timeout=10) or False193 return False194def reinstate_path(path):195 """196 Reinstate the individual paths.197 :param str path: disk path. Example: sda, sdb.198 :return: True if succeeded, False otherwise199 """200 def is_reinstated():201 path_stat = get_path_status(path)202 if path_stat[0] == 'active' and path_stat[2] == 'ready':203 return True204 return False205 cmd = 'multipathd -k"reinstate path %s"' % path206 if process.system(cmd) == 0:207 return wait.wait_for(is_reinstated, timeout=10) or False208 return False209def get_policy(wwid):210 """211 Gets path_checker policy, given a multipath wwid.212 :return: path checker policy.213 :rtype: str214 """215 if device_exists(wwid):216 cmd = "multipath -ll %s" % wwid217 lines = process.run(cmd, sudo=True).stdout_text.strip("\n")218 for line in lines.split("\n"):219 if 'policy' in line:220 return line.split("'")[1].split()[0]221def get_size(wwid):222 """223 Gets size of device, given a multipath wwid.224 :return: size of multipath device.225 :rtype: str226 """227 if device_exists(wwid):228 cmd = "multipath -ll %s" % wwid229 lines = process.run(cmd, sudo=True).stdout_text.strip("\n")230 for line in lines.split("\n"):231 if 'size' in line:232 return line.split("=")[1].split()[0]233def flush_path(path_name):234 """235 Flushes the given multipath.236 :return: Returns False if command fails, True otherwise.237 """238 cmd = "multipath -f %s" % path_name239 if process.system(cmd, ignore_status=True, sudo=True, shell=True):240 return False241 return True242def get_mpath_status(mpath):243 """244 Get the status of mpathX of multipaths.245 :param mpath: mpath names. Example: mpatha, mpathb.246 :return: state of mpathX eg: Active, Suspend, None247 """248 cmd = 'multipathd -k"show maps status" | grep -i %s' % mpath249 mpath_status = process.getoutput(cmd).split()[-2]250 return mpath_status251def suspend_mpath(mpath):252 """253 Suspend the given mpathX of multipaths.254 :param mpath: mpath names. Example: mpatha, mpathb.255 :return: True or False256 """257 def is_mpath_suspended():258 if get_mpath_status(mpath) == 'suspend':259 return True260 return False261 cmd = 'multipathd -k"suspend map %s"' % mpath262 if process.system(cmd) == 0:263 return wait.wait_for(is_mpath_suspended, timeout=10) or False264 return False265def resume_mpath(mpath):266 """267 Resume the suspended mpathX of multipaths.268 :param mpath_name: mpath names. Example: mpatha, mpathb.269 :return: True or False270 """271 def is_mpath_resumed():272 if get_mpath_status(mpath) == 'active':273 return True274 return False275 cmd = 'multipathd -k"resume map %s"' % mpath276 if process.system(cmd) == 0:277 return wait.wait_for(is_mpath_resumed, timeout=10) or False278 return False279def remove_mpath(mpath):280 """281 Remove the mpathX of multipaths.282 :param mpath_name: mpath names. Example: mpatha, mpathb.283 :return: True or False284 """285 def is_mpath_removed():286 if device_exists(mpath):287 return False288 return True289 cmd = 'multipathd -k"remove map %s"' % mpath290 if process.system(cmd) == 0:291 return wait.wait_for(is_mpath_removed, timeout=10) or False292 return False293def add_mpath(mpath):294 """295 Add back the removed mpathX of multipath.296 :param mpath_name: mpath names. Example: mpatha, mpathb.297 :return: True or False298 """299 def is_mpath_added():300 if device_exists(mpath):301 return True302 return False303 cmd = 'multipathd -k"add map %s"' % mpath304 if process.system(cmd) == 0:305 return wait.wait_for(is_mpath_added, timeout=10) or False306 return False307def remove_path(path):308 """309 Remove the individual paths.310 :param disk_path: disk path. Example: sda, sdb.311 :return: True or False312 """313 def is_path_removed():314 if get_path_status(path) is None:315 return True316 return False317 cmd = 'multipathd -k"remove path %s"' % path318 if process.system(cmd) == 0:319 return wait.wait_for(is_path_removed, timeout=10) or False320 return False321def add_path(path):322 """323 Add back the removed individual paths.324 :param str path: disk path. Example: sda, sdb.325 :return: True or False326 """327 def is_path_added():328 if get_path_status(path) is None:329 return False330 return True331 cmd = 'multipathd -k"add path %s"' % path332 if process.system(cmd) == 0:333 return wait.wait_for(is_path_added, timeout=10) or False...

Full Screen

Full Screen

transaction_dao.py

Source:transaction_dao.py Github

copy

Full Screen

...15 """16 __slots__ = ('__path',)17 def __init__(self, file_path: str = DEFAULT_TRANSACTION_FILE_PATH) -> None:18 self.__path = file_path19 ps = get_path_status(file_path)20 if ps == PathStatus.NOT_EXIST:21 self.reset_all()22 elif ps == PathStatus.UNREADABLE:23 raise AppError(f'{file_path} 不是文件,无法覆盖或读取。')24 def reset_all(self) -> None:25 with open(self.__path, 'w', encoding=UNIFIED_ENCODING) as f:26 json.dump([], f)27 def load_transaction_set(self) -> Set[Transaction]:28 with open(self.__path, 'r', encoding=UNIFIED_ENCODING) as f:29 content = json.load(f)30 return set(Transaction._make(x) for x in content)31 def store_transactions(self, trans: Iterable[Transaction]) -> None:32 trans_list = list(trans)33 with open(self.__path, 'w', encoding=UNIFIED_ENCODING) as f:...

Full Screen

Full Screen

file_util.py

Source:file_util.py Github

copy

Full Screen

...8 # 该路径存在文件且可以读写9 READABLE = 110 # 该路径存在同名文件夹等,无法读写11 UNREADABLE = 212def get_path_status(path: str) -> int:13 """14 获取某个路径的状态。返回 PathStatus 的某个属性。详情参考 PathStatus 的注释。15 :param path: 文件路径16 :return: PathStatus 的某个属性17 """18 logger.debug(f'get_path_status({path})')19 path = Path(path)20 # 根据文件是否存在进行初始化逻辑21 if path.exists():22 # 如果存在但不是文件,就抛出错误23 if not path.is_file():24 return PathStatus.UNREADABLE25 return PathStatus.READABLE26 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful