How to use _shell_soe method in fMBT

Best Python code snippet using fMBT_python

pycosh.py

Source:pycosh.py Github

copy

Full Screen

...76def _output(s):77 sys.stdout.write(s)78 sys.stdout.write("\n")79 sys.stdout.flush()80def _shell_soe(cmd):81 try:82 p = subprocess.Popen(cmd, shell=True,83 stdin=subprocess.PIPE,84 stdout=subprocess.PIPE,85 stderr=subprocess.PIPE)86 out, err = p.communicate()87 status = p.returncode88 except OSError:89 status, out, err = None, None, None90 if out != None and sys.stdout.encoding:91 out = out.decode(sys.stdout.encoding).encode("utf-8")92 if err != None and sys.stderr.encoding:93 err = err.decode(sys.stderr.encoding).encode("utf-8")94 return status, out, err95def cmd2py(cmdline):96 if "|" in cmdline:97 cmd_left, cmd_right = cmdline.split("|", 1)98 funccall = "pipe(%s, %s)" % (repr(cmd2py(cmd_left)),99 repr(cmd2py(cmd_right)))100 elif ">" in cmdline:101 cmd_left, filename = cmdline.split(">", 1)102 funccall = "pipe(%s, %s)" % (103 repr(cmd2py(cmd_left)),104 repr("redir('%s')" % (filename.strip(),)))105 else:106 cmdline_list = shlex.split(cmdline.strip())107 funcname = cmdline_list[0]108 args = cmdline_list[1:]109 funccall = funcname + repr(tuple(args))110 return funccall111def prompt():112 """prompt113 print prompt"""114 try:115 user = getpass.getuser()116 except Exception:117 user = ""118 try:119 hostname = socket.gethostname()120 except Exception:121 hostname = ""122 return (user + "@" +123 hostname + ":" +124 os.getcwd() + ": ")125def awk(prog, *args):126 """awk PROG [FILE...]127 PROG syntax: [/REGEXP/]{print $N...}"""128 filenames = expand(*args, accept_pipe=True).splitlines()129 if not filenames:130 raise ValueError("missing input")131 rv = []132 awk_syntax = re.compile('(/([^/]*)/)?\{([^}]*)\}')133 parsed_prog = awk_syntax.match(prog)134 if not parsed_prog:135 raise ValueError('syntax error in awk program')136 awk_pattern = parsed_prog.group(2)137 if not awk_pattern is None:138 awk_pattern_re = re.compile(awk_pattern)139 else:140 awk_pattern_re = re.compile("")141 awk_statements = [s.strip() for s in parsed_prog.group(3).split(";")]142 awk_fieldno_re = re.compile("\$([0-9]+)")143 awk_fieldsep_re = re.compile("[ \n\t\r]*")144 for filename in filenames:145 for line in open(filename).xreadlines():146 if awk_pattern_re.search(line):147 for stmt in awk_statements:148 if stmt.startswith("print"):149 what = stmt[5:].strip()150 if not what:151 # plain "print" results in full line152 what = "$0"153 else:154 # no variable handling for now...155 what = what.replace('"', '')156 fields = [int(n) for n in awk_fieldno_re.findall(what)]157 translate = {}158 if fields:159 line_fields = [line.splitlines()[0]] + [160 l for l in awk_fieldsep_re.split(line) if l]161 for field in fields:162 if field < len(line_fields):163 translate["$" + str(field)] = line_fields[field]164 else:165 translate["$" + str(field)] = ""166 for rep in reversed(sorted(translate.keys())):167 # if not reversed, might replace $1 before $10168 what = what.replace(rep, translate[rep])169 rv.append(what)170 return "\n".join(rv)171def cd(dirname):172 """cd DIRNAME173 change current working directory"""174 d = expand(dirname, accept_pipe=False, min=1, exist=True).splitlines()175 if len(d) > 1:176 raise ValueError("unambiguous directory name")177 os.chdir(os.path.join(os.getcwd(), d[0]))178 return ""179def curl(*args):180 """curl [-x P][-o FILE] URL181 download URL (use proxy P), save to FILE"""182 opts, urls = _getopts(args, "x:o:")183 rv = []184 if not urls:185 raise ValueError("missing URL(s)")186 if "-x" not in opts and "http_proxy" in _g_pyenv:187 opts["-x"] = _g_pyenv.get("http_proxy")188 if "-x" in opts:189 proxy = urllib2.ProxyHandler({190 'http': opts["-x"],191 'https': opts["-x"],192 'ftp': opts["-x"]})193 else:194 proxy = urllib2.ProxyHandler({})195 opener = urllib2.build_opener(proxy)196 urllib2.install_opener(opener)197 for url in urls:198 data = urllib2.urlopen(url).read()199 if "-o" in opts:200 _file(opts["-o"], "a").write(data)201 else:202 rv.append(data)203 return "".join(rv)204def find(*args):205 """find [-n FILE] DIR206 find file(s) in directory"""207 opts, remainder = _getopts(args, "n:")208 if not remainder:209 raise ValueError("missing DIR")210 dirname = remainder[0]211 if "-n" in opts:212 findname = opts["-n"]213 else:214 findname = "*"215 dirname_ends_with_sep = dirname[-1] in ["/", "\\"]216 slash_only = not "\\" in dirname217 if slash_only:218 sep = "/"219 else:220 sep = os.path.sep221 rv = []222 # DIR + NAME forms a path without duplicate path separators223 for root, dirs, files in os.walk(dirname):224 if slash_only:225 root = root.replace("\\", "/")226 for name in dirs + files:227 if fnmatch.fnmatch(name, findname):228 if root == dirname:229 if dirname_ends_with_sep:230 rv.append(name)231 else:232 rv.append(sep + name)233 else:234 rv.append(root[len(dirname):] + sep + name)235 return "\n".join(rv)236def date():237 """date238 print current date and time"""239 t = datetime.datetime.now()240 return t.strftime("%Y-%m-%d %H:%M:%S.") + str(t.microsecond)241def diff(*args):242 """diff FILE1 FILE2243 print differences between two files"""244 files = expand(*args).splitlines()245 try:246 file1, file2 = files247 except Exception:248 raise ValueError("exactly two files required")249 lines1 = _file(file1).readlines()250 lines2 = _file(file2).readlines()251 udiff = difflib.unified_diff(lines1, lines2, file1, file2)252 # append endlines to lines where it is missing253 difflines = [l + ["", "\n"][l[-1] != "\n"] for l in udiff]254 return "".join(difflines)255def du(*args):256 """du [-h] FILE...257 print [human readable] disk usage of FILEs"""258 opts, filenames = _getopts(args, "h")259 if "-h" in opts:260 size_formatter = _human_readable_size261 else:262 size_formatter = lambda size: str(size)263 filenames = expand(*filenames, accept_pipe=False, min=1).splitlines()264 total_size = 0265 retval = []266 for direntry in filenames:267 size = None268 if os.path.isdir(direntry):269 for root, dirs, filelist in os.walk(direntry):270 for filename in filelist:271 fullname = os.path.join(root, filename)272 size = os.stat(fullname).st_size273 retval.append("%-8s %s" % (size_formatter(size), fullname))274 total_size += size275 elif os.path.isfile(direntry):276 size = os.stat(direntry).st_size277 total_size += size278 retval.append("%-8s %s" % (size_formatter(size), direntry))279 retval.append(size_formatter(total_size))280 return "\n".join(retval)281def echo(*args):282 return " ".join(args)283def env():284 """env285 print environment variables"""286 rv = []287 for key in sorted(_g_pyenv.keys()):288 rv.append("%s=%s" % (key, repr(_g_pyenv[key])))289 return "\n".join(rv)290def expand(*filenames, **kwargs):291 accept_pipe = kwargs.get("accept_pipe", True)292 min_count = kwargs.get("min", 0)293 must_exist = kwargs.get("exist", False)294 rv = []295 if not filenames:296 if accept_pipe and _g_pipe_has_data:297 rv.append(_g_pipe_filename)298 else:299 for pattern in filenames:300 extends_to = glob.glob(pattern)301 if extends_to:302 for filepath in extends_to:303 if "/" in filepath and "\\" in filepath:304 filepath = filepath.replace('\\', '/')305 rv.append(filepath)306 elif not must_exist:307 rv.append(pattern)308 if not min_count <= len(rv):309 raise ValueError("expected at least %s file(s), got %s" %310 (min_count, len(rv)))311 return "\n".join(rv)312def export(assignment):313 """export VAR=VALUE314 assign VALUE to environment variable VAR"""315 if not "=" in assignment or not assignment.split("=")[0].strip():316 raise ValueError("expected VAR=VALUE")317 _g_pyenv.__setitem__(*assignment.split("=", 1))318 return ""319def grep(pattern, *filenames):320 """grep PATTERN [FILE...]321 show matching lines in file(s)"""322 matching_lines = []323 all_files = expand(*filenames).splitlines()324 for filename in all_files:325 for line in file(filename).xreadlines():326 if pattern in line:327 matching_lines.append(line)328 return "".join(matching_lines)329def head(*args):330 """head [-n NUM] [FILE...]331 show first NUM lines in file(s)"""332 opts, filenames = _getopts(args, "n:")333 all_files = expand(*filenames).splitlines()334 if "-n" in opts:335 lines = int(opts["-n"])336 else:337 lines = 10338 rv = []339 for filename in all_files:340 line_count = 0341 for line in file(filename).xreadlines():342 line_count += 1343 if line_count > lines:344 break345 rv.append(line)346 return "".join(rv)347def help(func=None):348 """help [COMMAND]349 print help (on COMMAND)"""350 if not func:351 rv = []352 for c in globals().keys():353 if c.startswith("_"):354 continue355 if not isinstance(globals()[c], types.FunctionType):356 continue357 if not globals()[c].__doc__:358 continue359 if len(globals()[c].__doc__.splitlines()) != 2:360 continue361 rv.append("%-26s%s" %362 tuple([l.strip() for l in363 globals()[c].__doc__.splitlines()]))364 rv = sorted(rv)365 elif isinstance(globals().get(func, None), types.FunctionType):366 rv = inspect.getsource(globals().get(func)).splitlines()367 return "\n".join(rv)368def kill(*pids):369 """kill PID...370 terminate processes"""371 for pid in pids:372 os.kill(int(pid), signal.SIGTERM)373 return ""374def ls(*args):375 """ls [-l]376 list files on current working directory"""377 opts, filenames = _getopts(args, "l")378 if filenames:379 files = sorted([f for f in expand(*filenames, exist=True).splitlines()])380 else:381 _, subdirs, files = os.walk(".").next()382 files = sorted([d + "/" for d in subdirs]) + sorted(files)383 if "-l" in opts:384 rv = []385 for f in files:386 fstat = os.stat(f)387 rv.append("%10s %s %s" % (388 fstat.st_size,389 time.strftime("%Y-%m-%d %H:%M", time.localtime(fstat.st_mtime)),390 f))391 else:392 rv = files393 return "\n".join(rv)394def nl(*filenames):395 """nl FILE...396 number lines"""397 all_files = expand(*filenames).splitlines()398 rv = []399 line_no = 0400 for filename in all_files:401 for line in file(filename).xreadlines():402 line_no += 1403 rv.append("%5s %s" % (line_no, line))404 return "".join(rv)405def mkdir(*args):406 """mkdir [-p] DIRNAME...407 make directories, -p: intermediates if necessary"""408 args, dirnames = _getopts(args, "-p")409 for dirname in dirnames:410 if "-p" in args:411 os.makedirs(dirname)412 else:413 os.mkdir(dirname)414 return ""415def redir(dst_filename):416 # redirect data from input pipe to a file417 src_filename = expand(accept_pipe=True)418 if src_filename:419 file(dst_filename, "wb").write(420 file(src_filename, "rb").read())421 return ""422def rm(*args):423 """rm [-r] FILE...424 remove file"""425 args, filenames = _getopts(args, "-r")426 filenames = expand(*filenames, accept_pipe=False, min=1).splitlines()427 for filename in filenames:428 if "-r" in args and os.path.isdir(filename):429 shutil.rmtree(filename)430 else:431 os.remove(filename)432 return ""433def rmdir(dirname):434 """rmdir DIRNAME435 remove directory"""436 os.rmdir(dirname)437 return ""438def cat(*filenames):439 """cat FILE...440 concatenate contents of listed files"""441 return "".join([file(f).read() for f in expand(*filenames).splitlines()])442def df(*args):443 """df [-h] DIRNAME444 print [human readable] free space on DIRNAME"""445 args, dirnames = _getopts(args, "-h")446 if "-h" in args:447 human_readable = True448 else:449 human_readable = False450 try:451 dirname = dirnames[0]452 except IndexError:453 raise Exception("directory name missing")454 if os.name == "nt": # Windows455 cfree = ctypes.c_ulonglong(0)456 ctypes.windll.kernel32.GetDiskFreeSpaceExW(457 ctypes.c_wchar_p(dirname), None, None,458 ctypes.byref(cfree))459 free = cfree.value460 else: # Posix461 st = os.statvfs(dirname)462 free = st.f_bavail * st.f_frsize463 if human_readable:464 retval = _human_readable_size(free)465 else:466 retval = str(free)467 return retval468def md5sum(*filenames):469 """md5sum FILE...470 print MD5 (128-bit) checksums."""471 rv = []472 for filename in expand(*filenames).splitlines():473 rv.append("%-34s%s" %474 (md5.md5(file(filename, "rb").read()).hexdigest(),475 filename))476 return "\n".join(rv)477def mv(src, dst):478 """mv SOURCE DEST479 move file or directory to destination"""480 shutil.move(src, dst)481 return ""482def cp(src, dst):483 """cp SOURCE DEST484 copy file or directory to destination"""485 shutil.copy(src, dst)486 return ""487def pipe(expr_left, expr_right):488 global _g_pipe_has_data489 try:490 pipe_in_data = eval(expr_left)491 file(_g_pipe_filename, "wb").write(pipe_in_data)492 del pipe_in_data493 _g_pipe_has_data = True494 rv = eval(expr_right)495 finally:496 try:497 os.remove(_g_pipe_filename)498 except Exception:499 pass500 _g_pipe_has_data = False501 return rv502def ps(*args):503 """ps [-v] [PID...]504 list processes (-v virtual memory)"""505 args, pids = _getopts(args, "-v")506 rv = []507 pids = set(pids)508 if os.name == "nt":509 if "-v" in args:510 opt_field = "PageFileUsage"511 else:512 opt_field = "parentprocessid"513 _, o, _ = _shell_soe(514 "wmic process get %s,processid,description,commandline" %515 (opt_field,))516 for line in o.splitlines():517 try:518 cmd_desc_almostlast, lastfield = line.rstrip().rsplit(" ", 1)519 cmd_desc, almostlast = (520 cmd_desc_almostlast.rstrip().rsplit(" ", 1))521 cmd, desc = cmd_desc.rstrip().rsplit(" ", 1)522 if opt_field == "PageFileUsage":523 pid = lastfield524 pdata = almostlast # PageFileUsage525 else:526 pid = lastfield527 pdata = almostlast # parent pid528 if not desc.lower() in cmd.strip().lower():529 cmd = "[%s] %s" % (desc.strip(), cmd.strip())530 if not pids or pid in pids:531 rv.append("%8s %8s %s" %532 (pid.strip(), pdata.strip(), cmd.strip()))533 except Exception:534 pass535 else:536 if "-v" in args:537 opt_field = "size"538 else:539 opt_field = "ppid"540 _, o, _ = _shell_soe("ps ax -o%s,pid,cmd" % (opt_field,))541 for line in o.splitlines():542 pdata, pid, cmd = line.strip().split(None, 2)543 if not pids or pid in pids:544 rv.append("%8s %8s %s" % (pid, pdata, cmd.strip()))545 return "\n".join(rv)546def psh(*cmd):547 """psh COMMAND548 run COMMAND in powershell (Windows)"""549 _, o, e = _shell_soe(550 ("powershell.exe",) + cmd)551 return o + e552_g_pspycosh_conn = None553def pspycosh(psconn):554 """pspycosh HOSTSPEC555 open pycosh shell on a pythonshare server"""556 global _g_pspycosh_conn557 if isinstance(psconn, pythonshare.client.Connection):558 _g_pspycosh_conn = psconn559 else:560 _g_pspycosh_conn = pythonshare.connect(psconn)561 _g_pspycosh_conn.exec_(_g_pycosh_source)562 return ""563def psput(psconn, pattern):564 """psput CONNSPEC FILE...565 upload files to pythonshare server"""566 if isinstance(psconn, pythonshare.client.Connection):567 conn = psconn568 close_connection = False569 else:570 conn = pythonshare.connect(psconn)571 close_connection = True572 conn.exec_("import base64")573 rv = []574 for filename in expand(pattern, accept_pipe=False).splitlines():575 data = file(filename).read()576 conn.eval_('file(%s, "wb").write(base64.b64decode(%s))' %577 (repr(os.path.basename(filename)),578 repr(base64.b64encode(data))))579 rv.append(filename)580 if close_connection:581 conn.close()582 return "\n".join(rv)583def psget(psconn, pattern):584 """psget CONNSPEC FILE...585 download files from pythonshare server"""586 if isinstance(psconn, pythonshare.client.Connection):587 conn = psconn588 close_connection = False589 else:590 conn = pythonshare.connect(psconn)591 close_connection = True592 conn.exec_("".join(inspect.getsourcelines(expand)[0]))593 conn.exec_("import glob")594 rv = []595 for filename in conn.eval_('expand(%s, accept_pipe=False)' %596 repr(pattern)).splitlines():597 file(os.path.basename(filename), "w").write(598 conn.eval_("file(%s, 'rb').read()" % (repr(filename),)))599 rv.append(filename)600 return "\n".join(rv)601def pwd():602 """pwd603 print current working directory"""604 return os.getcwd()605def pye(*code):606 """pye CODE607 evaluate Python CODE"""608 code = " ".join(code)609 if _g_pipe_has_data:610 _g_pyenv["pipe_in"] = file(expand(accept_pipe=True), "rb")611 try:612 return str(eval(code, globals(), _g_pyenv))613 finally:614 if "pipe_in" in _g_pyenv:615 del _g_pyenv["pipe_in"]616def pyx(*code):617 """pyx CODE618 execute Python CODE"""619 code = " ".join(code)620 if _g_pipe_has_data:621 _g_pyenv["pipe_in"] = file(expand(accept_pipe=True), "rb")622 try:623 try:624 exec code in globals(), _g_pyenv625 except Exception, e:626 return str(e)627 finally:628 if "pipe_in" in _g_pyenv:629 del _g_pyenv["pipe_in"]630 return ""631def sed(cmd, *filenames):632 """sed s/P/R[/N] [FILE]633 replace P with R in FILE"""634 rv = []635 try:636 pattern, repl, count = re.findall("s/([^/]*)/([^/]*)/(.*)", cmd)[0]637 pattern = re.compile(pattern)638 except:639 raise ValueError('invalid command "%s"' % (cmd,))640 all_files = expand(*filenames).splitlines()641 for filename in all_files:642 for line in file(filename).readlines():643 try:644 count_arg = (int(count),)645 except:646 if count == "g":647 count_arg = ()648 elif count == "":649 count_arg = (1,)650 else:651 raise ValueError('invalid count: "%s"' % (count,))652 rv.append(re.subn(* ((pattern, repl, line) + count_arg))[0])653 return "".join(rv)654def sh(*cmd):655 """sh COMMAND656 run COMMAND in shell"""657 s, o, e = _shell_soe(" ".join(cmd))658 return "[exit status: %s]\n%s" % (s, o+e)659def sleep(seconds):660 """sleep SECONDS661 sleep for SECONDS (float)"""662 time.sleep(float(seconds))663 return ""664def sort(*args):665 """sort [-n] [-k N] [FILE]666 sort lines [numerically] according to column N"""667 opts, filenames = _getopts(args, "k:n")668 filenames = expand(*filenames, accept_pipe=True).splitlines()669 rv = []670 for filename in filenames:671 lines = [[l.split(), l] for l in file(filename).readlines()]672 if "-k" in opts:673 k = int(opts["-k"]) - 1674 for line in lines:675 line[0][0], line[0][k] = line[0][k], line[0][0]676 if "-n" in opts:677 for line in lines:678 try:679 line[0][0] = int(line[0][0])680 except:681 pass682 lines.sort()683 rv.extend([line[1] for line in lines])684 return "".join(rv)685def sync():686 """sync687 flush system write back caches"""688 if os.name == "nt":689 retval = str(ctypes.windll.kernel32.SetSystemFileCacheSize(-1, -1, 0))690 else:691 _, _, retval = _shell_soe("sync")692 return retval693def tail(*args):694 """tail [-n NUM] [FILE...]695 show last NUM lines in file(s)"""696 opts, filenames = _getopts(args, "n:")697 all_files = expand(*filenames).splitlines()698 if "-n" in opts:699 lines = int(opts["-n"])700 else:701 lines = 10702 rv = []703 if lines > 0:704 for filename in all_files:705 rv.extend(file(filename).readlines()[-lines:])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful