Best Python code snippet using fMBT_python
pycosh.py
Source:pycosh.py  
...76def _output(s):77    sys.stdout.write(s)78    sys.stdout.write("\n")79    sys.stdout.flush()80def _shell_soe(cmd):81    try:82        p = subprocess.Popen(cmd, shell=True,83                             stdin=subprocess.PIPE,84                             stdout=subprocess.PIPE,85                             stderr=subprocess.PIPE)86        out, err = p.communicate()87        status = p.returncode88    except OSError:89        status, out, err = None, None, None90    if out != None and sys.stdout.encoding:91        out = out.decode(sys.stdout.encoding).encode("utf-8")92    if err != None and sys.stderr.encoding:93        err = err.decode(sys.stderr.encoding).encode("utf-8")94    return status, out, err95def cmd2py(cmdline):96    if "|" in cmdline:97        cmd_left, cmd_right = cmdline.split("|", 1)98        funccall = "pipe(%s, %s)" % (repr(cmd2py(cmd_left)),99                                     repr(cmd2py(cmd_right)))100    elif ">" in cmdline:101        cmd_left, filename = cmdline.split(">", 1)102        funccall = "pipe(%s, %s)" % (103            repr(cmd2py(cmd_left)),104            repr("redir('%s')" % (filename.strip(),)))105    else:106        cmdline_list = shlex.split(cmdline.strip())107        funcname = cmdline_list[0]108        args = cmdline_list[1:]109        funccall = funcname + repr(tuple(args))110    return funccall111def prompt():112    """prompt113    print prompt"""114    try:115        user = getpass.getuser()116    except Exception:117        user = ""118    try:119        hostname = socket.gethostname()120    except Exception:121        hostname = ""122    return (user + "@" +123            hostname + ":" +124            os.getcwd() + ": ")125def awk(prog, *args):126    """awk PROG [FILE...]127    PROG syntax: [/REGEXP/]{print $N...}"""128    filenames = expand(*args, accept_pipe=True).splitlines()129    if not filenames:130        raise ValueError("missing input")131    rv = []132    awk_syntax = re.compile('(/([^/]*)/)?\{([^}]*)\}')133    parsed_prog = awk_syntax.match(prog)134    if not parsed_prog:135        raise ValueError('syntax error in awk program')136    awk_pattern = parsed_prog.group(2)137    if not awk_pattern is None:138        awk_pattern_re = re.compile(awk_pattern)139    else:140        awk_pattern_re = re.compile("")141    awk_statements = [s.strip() for s in parsed_prog.group(3).split(";")]142    awk_fieldno_re = re.compile("\$([0-9]+)")143    awk_fieldsep_re = re.compile("[ \n\t\r]*")144    for filename in filenames:145        for line in open(filename).xreadlines():146            if awk_pattern_re.search(line):147                for stmt in awk_statements:148                    if stmt.startswith("print"):149                        what = stmt[5:].strip()150                        if not what:151                            # plain "print" results in full line152                            what = "$0"153                        else:154                            # no variable handling for now...155                            what = what.replace('"', '')156                        fields = [int(n) for n in awk_fieldno_re.findall(what)]157                        translate = {}158                        if fields:159                            line_fields = [line.splitlines()[0]] + [160                                l for l in awk_fieldsep_re.split(line) if l]161                            for field in fields:162                                if field < len(line_fields):163                                    translate["$" + str(field)] = line_fields[field]164                                else:165                                    translate["$" + str(field)] = ""166                            for rep in reversed(sorted(translate.keys())):167                                # if not reversed, might replace $1 before $10168                                what = what.replace(rep, translate[rep])169                        rv.append(what)170    return "\n".join(rv)171def cd(dirname):172    """cd DIRNAME173    change current working directory"""174    d = expand(dirname, accept_pipe=False, min=1, exist=True).splitlines()175    if len(d) > 1:176        raise ValueError("unambiguous directory name")177    os.chdir(os.path.join(os.getcwd(), d[0]))178    return ""179def curl(*args):180    """curl [-x P][-o FILE] URL181    download URL (use proxy P), save to FILE"""182    opts, urls = _getopts(args, "x:o:")183    rv = []184    if not urls:185        raise ValueError("missing URL(s)")186    if "-x" not in opts and "http_proxy" in _g_pyenv:187        opts["-x"] = _g_pyenv.get("http_proxy")188    if "-x" in opts:189        proxy = urllib2.ProxyHandler({190            'http': opts["-x"],191            'https': opts["-x"],192            'ftp': opts["-x"]})193    else:194        proxy = urllib2.ProxyHandler({})195    opener = urllib2.build_opener(proxy)196    urllib2.install_opener(opener)197    for url in urls:198        data = urllib2.urlopen(url).read()199        if "-o" in opts:200            _file(opts["-o"], "a").write(data)201        else:202            rv.append(data)203    return "".join(rv)204def find(*args):205    """find [-n FILE] DIR206    find file(s) in directory"""207    opts, remainder = _getopts(args, "n:")208    if not remainder:209        raise ValueError("missing DIR")210    dirname = remainder[0]211    if "-n" in opts:212        findname = opts["-n"]213    else:214        findname = "*"215    dirname_ends_with_sep = dirname[-1] in ["/", "\\"]216    slash_only = not "\\" in dirname217    if slash_only:218        sep = "/"219    else:220        sep = os.path.sep221    rv = []222    # DIR + NAME forms a path without duplicate path separators223    for root, dirs, files in os.walk(dirname):224        if slash_only:225            root = root.replace("\\", "/")226        for name in dirs + files:227            if fnmatch.fnmatch(name, findname):228                if root == dirname:229                    if dirname_ends_with_sep:230                        rv.append(name)231                    else:232                        rv.append(sep + name)233                else:234                    rv.append(root[len(dirname):] + sep + name)235    return "\n".join(rv)236def date():237    """date238    print current date and time"""239    t = datetime.datetime.now()240    return t.strftime("%Y-%m-%d %H:%M:%S.") + str(t.microsecond)241def diff(*args):242    """diff FILE1 FILE2243    print differences between two files"""244    files = expand(*args).splitlines()245    try:246        file1, file2 = files247    except Exception:248        raise ValueError("exactly two files required")249    lines1 = _file(file1).readlines()250    lines2 = _file(file2).readlines()251    udiff = difflib.unified_diff(lines1, lines2, file1, file2)252    # append endlines to lines where it is missing253    difflines = [l + ["", "\n"][l[-1] != "\n"] for l in udiff]254    return "".join(difflines)255def du(*args):256    """du [-h] FILE...257    print [human readable] disk usage of FILEs"""258    opts, filenames = _getopts(args, "h")259    if "-h" in opts:260        size_formatter = _human_readable_size261    else:262        size_formatter = lambda size: str(size)263    filenames = expand(*filenames, accept_pipe=False, min=1).splitlines()264    total_size = 0265    retval = []266    for direntry in filenames:267        size = None268        if os.path.isdir(direntry):269            for root, dirs, filelist in os.walk(direntry):270               for filename in filelist:271                   fullname = os.path.join(root, filename)272                   size = os.stat(fullname).st_size273                   retval.append("%-8s %s" % (size_formatter(size), fullname))274                   total_size += size275        elif os.path.isfile(direntry):276            size = os.stat(direntry).st_size277            total_size += size278            retval.append("%-8s %s" % (size_formatter(size), direntry))279    retval.append(size_formatter(total_size))280    return "\n".join(retval)281def echo(*args):282    return " ".join(args)283def env():284    """env285    print environment variables"""286    rv = []287    for key in sorted(_g_pyenv.keys()):288        rv.append("%s=%s" % (key, repr(_g_pyenv[key])))289    return "\n".join(rv)290def expand(*filenames, **kwargs):291    accept_pipe = kwargs.get("accept_pipe", True)292    min_count = kwargs.get("min", 0)293    must_exist = kwargs.get("exist", False)294    rv = []295    if not filenames:296        if accept_pipe and _g_pipe_has_data:297            rv.append(_g_pipe_filename)298    else:299        for pattern in filenames:300            extends_to = glob.glob(pattern)301            if extends_to:302                for filepath in extends_to:303                    if "/" in filepath and "\\" in filepath:304                        filepath = filepath.replace('\\', '/')305                    rv.append(filepath)306            elif not must_exist:307                rv.append(pattern)308    if not min_count <= len(rv):309        raise ValueError("expected at least %s file(s), got %s" %310                         (min_count, len(rv)))311    return "\n".join(rv)312def export(assignment):313    """export VAR=VALUE314    assign VALUE to environment variable VAR"""315    if not "=" in assignment or not assignment.split("=")[0].strip():316        raise ValueError("expected VAR=VALUE")317    _g_pyenv.__setitem__(*assignment.split("=", 1))318    return ""319def grep(pattern, *filenames):320    """grep PATTERN [FILE...]321    show matching lines in file(s)"""322    matching_lines = []323    all_files = expand(*filenames).splitlines()324    for filename in all_files:325        for line in file(filename).xreadlines():326            if pattern in line:327                matching_lines.append(line)328    return "".join(matching_lines)329def head(*args):330    """head [-n NUM] [FILE...]331    show first NUM lines in file(s)"""332    opts, filenames = _getopts(args, "n:")333    all_files = expand(*filenames).splitlines()334    if "-n" in opts:335        lines = int(opts["-n"])336    else:337        lines = 10338    rv = []339    for filename in all_files:340        line_count = 0341        for line in file(filename).xreadlines():342            line_count += 1343            if line_count > lines:344                break345            rv.append(line)346    return "".join(rv)347def help(func=None):348    """help [COMMAND]349    print help (on COMMAND)"""350    if not func:351        rv = []352        for c in globals().keys():353            if c.startswith("_"):354                continue355            if not isinstance(globals()[c], types.FunctionType):356                continue357            if not globals()[c].__doc__:358                continue359            if len(globals()[c].__doc__.splitlines()) != 2:360                continue361            rv.append("%-26s%s" %362                      tuple([l.strip() for l in363                             globals()[c].__doc__.splitlines()]))364        rv = sorted(rv)365    elif isinstance(globals().get(func, None), types.FunctionType):366        rv = inspect.getsource(globals().get(func)).splitlines()367    return "\n".join(rv)368def kill(*pids):369    """kill PID...370    terminate processes"""371    for pid in pids:372        os.kill(int(pid), signal.SIGTERM)373    return ""374def ls(*args):375    """ls [-l]376    list files on current working directory"""377    opts, filenames = _getopts(args, "l")378    if filenames:379        files = sorted([f for f in expand(*filenames, exist=True).splitlines()])380    else:381        _, subdirs, files = os.walk(".").next()382        files = sorted([d + "/" for d in subdirs]) + sorted(files)383    if "-l" in opts:384        rv = []385        for f in files:386            fstat = os.stat(f)387            rv.append("%10s  %s  %s" % (388                fstat.st_size,389                time.strftime("%Y-%m-%d %H:%M", time.localtime(fstat.st_mtime)),390                f))391    else:392        rv = files393    return "\n".join(rv)394def nl(*filenames):395    """nl FILE...396    number lines"""397    all_files = expand(*filenames).splitlines()398    rv = []399    line_no = 0400    for filename in all_files:401        for line in file(filename).xreadlines():402            line_no += 1403            rv.append("%5s  %s" % (line_no, line))404    return "".join(rv)405def mkdir(*args):406    """mkdir [-p] DIRNAME...407    make directories, -p: intermediates if necessary"""408    args, dirnames = _getopts(args, "-p")409    for dirname in dirnames:410        if "-p" in args:411            os.makedirs(dirname)412        else:413            os.mkdir(dirname)414    return ""415def redir(dst_filename):416    # redirect data from input pipe to a file417    src_filename = expand(accept_pipe=True)418    if src_filename:419        file(dst_filename, "wb").write(420            file(src_filename, "rb").read())421    return ""422def rm(*args):423    """rm [-r] FILE...424    remove file"""425    args, filenames = _getopts(args, "-r")426    filenames = expand(*filenames, accept_pipe=False, min=1).splitlines()427    for filename in filenames:428        if "-r" in args and os.path.isdir(filename):429            shutil.rmtree(filename)430        else:431            os.remove(filename)432    return ""433def rmdir(dirname):434    """rmdir DIRNAME435    remove directory"""436    os.rmdir(dirname)437    return ""438def cat(*filenames):439    """cat FILE...440    concatenate contents of listed files"""441    return "".join([file(f).read() for f in expand(*filenames).splitlines()])442def df(*args):443    """df [-h] DIRNAME444    print [human readable] free space on DIRNAME"""445    args, dirnames = _getopts(args, "-h")446    if "-h" in args:447        human_readable = True448    else:449        human_readable = False450    try:451        dirname = dirnames[0]452    except IndexError:453        raise Exception("directory name missing")454    if os.name == "nt": # Windows455        cfree = ctypes.c_ulonglong(0)456        ctypes.windll.kernel32.GetDiskFreeSpaceExW(457            ctypes.c_wchar_p(dirname), None, None,458            ctypes.byref(cfree))459        free = cfree.value460    else:  # Posix461        st = os.statvfs(dirname)462        free = st.f_bavail * st.f_frsize463    if human_readable:464        retval = _human_readable_size(free)465    else:466        retval = str(free)467    return retval468def md5sum(*filenames):469    """md5sum FILE...470    print MD5 (128-bit) checksums."""471    rv = []472    for filename in expand(*filenames).splitlines():473        rv.append("%-34s%s" %474                  (md5.md5(file(filename, "rb").read()).hexdigest(),475                   filename))476    return "\n".join(rv)477def mv(src, dst):478    """mv SOURCE DEST479    move file or directory to destination"""480    shutil.move(src, dst)481    return ""482def cp(src, dst):483    """cp SOURCE DEST484    copy file or directory to destination"""485    shutil.copy(src, dst)486    return ""487def pipe(expr_left, expr_right):488    global _g_pipe_has_data489    try:490        pipe_in_data = eval(expr_left)491        file(_g_pipe_filename, "wb").write(pipe_in_data)492        del pipe_in_data493        _g_pipe_has_data = True494        rv = eval(expr_right)495    finally:496        try:497            os.remove(_g_pipe_filename)498        except Exception:499            pass500        _g_pipe_has_data = False501    return rv502def ps(*args):503    """ps [-v] [PID...]504    list processes (-v virtual memory)"""505    args, pids = _getopts(args, "-v")506    rv = []507    pids = set(pids)508    if os.name == "nt":509        if "-v" in args:510            opt_field = "PageFileUsage"511        else:512            opt_field = "parentprocessid"513        _, o, _ = _shell_soe(514            "wmic process get %s,processid,description,commandline" %515            (opt_field,))516        for line in o.splitlines():517            try:518                cmd_desc_almostlast, lastfield = line.rstrip().rsplit(" ", 1)519                cmd_desc, almostlast = (520                    cmd_desc_almostlast.rstrip().rsplit(" ", 1))521                cmd, desc = cmd_desc.rstrip().rsplit(" ", 1)522                if opt_field == "PageFileUsage":523                    pid = lastfield524                    pdata = almostlast # PageFileUsage525                else:526                    pid = lastfield527                    pdata = almostlast # parent pid528                if not desc.lower() in cmd.strip().lower():529                    cmd = "[%s] %s" % (desc.strip(), cmd.strip())530                if not pids or pid in pids:531                    rv.append("%8s %8s %s" %532                              (pid.strip(), pdata.strip(), cmd.strip()))533            except Exception:534                pass535    else:536        if "-v" in args:537            opt_field = "size"538        else:539            opt_field = "ppid"540        _, o, _ = _shell_soe("ps ax -o%s,pid,cmd" % (opt_field,))541        for line in o.splitlines():542            pdata, pid, cmd = line.strip().split(None, 2)543            if not pids or pid in pids:544                rv.append("%8s %8s %s" % (pid, pdata, cmd.strip()))545    return "\n".join(rv)546def psh(*cmd):547    """psh COMMAND548    run COMMAND in powershell (Windows)"""549    _, o, e = _shell_soe(550        ("powershell.exe",) + cmd)551    return o + e552_g_pspycosh_conn = None553def pspycosh(psconn):554    """pspycosh HOSTSPEC555    open pycosh shell on a pythonshare server"""556    global _g_pspycosh_conn557    if isinstance(psconn, pythonshare.client.Connection):558        _g_pspycosh_conn = psconn559    else:560        _g_pspycosh_conn = pythonshare.connect(psconn)561    _g_pspycosh_conn.exec_(_g_pycosh_source)562    return ""563def psput(psconn, pattern):564    """psput CONNSPEC FILE...565    upload files to pythonshare server"""566    if isinstance(psconn, pythonshare.client.Connection):567        conn = psconn568        close_connection = False569    else:570        conn = pythonshare.connect(psconn)571        close_connection = True572    conn.exec_("import base64")573    rv = []574    for filename in expand(pattern, accept_pipe=False).splitlines():575        data = file(filename).read()576        conn.eval_('file(%s, "wb").write(base64.b64decode(%s))' %577                   (repr(os.path.basename(filename)),578                    repr(base64.b64encode(data))))579        rv.append(filename)580    if close_connection:581        conn.close()582    return "\n".join(rv)583def psget(psconn, pattern):584    """psget CONNSPEC FILE...585    download files from pythonshare server"""586    if isinstance(psconn, pythonshare.client.Connection):587        conn = psconn588        close_connection = False589    else:590        conn = pythonshare.connect(psconn)591        close_connection = True592    conn.exec_("".join(inspect.getsourcelines(expand)[0]))593    conn.exec_("import glob")594    rv = []595    for filename in conn.eval_('expand(%s, accept_pipe=False)' %596                               repr(pattern)).splitlines():597        file(os.path.basename(filename), "w").write(598            conn.eval_("file(%s, 'rb').read()" % (repr(filename),)))599        rv.append(filename)600    return "\n".join(rv)601def pwd():602    """pwd603    print current working directory"""604    return os.getcwd()605def pye(*code):606    """pye CODE607    evaluate Python CODE"""608    code = " ".join(code)609    if _g_pipe_has_data:610        _g_pyenv["pipe_in"] = file(expand(accept_pipe=True), "rb")611    try:612        return str(eval(code, globals(), _g_pyenv))613    finally:614        if "pipe_in" in _g_pyenv:615            del _g_pyenv["pipe_in"]616def pyx(*code):617    """pyx CODE618    execute Python CODE"""619    code = " ".join(code)620    if _g_pipe_has_data:621        _g_pyenv["pipe_in"] = file(expand(accept_pipe=True), "rb")622    try:623        try:624            exec code in globals(), _g_pyenv625        except Exception, e:626            return str(e)627    finally:628        if "pipe_in" in _g_pyenv:629            del _g_pyenv["pipe_in"]630    return ""631def sed(cmd, *filenames):632    """sed s/P/R[/N] [FILE]633    replace P with R in FILE"""634    rv = []635    try:636        pattern, repl, count = re.findall("s/([^/]*)/([^/]*)/(.*)", cmd)[0]637        pattern = re.compile(pattern)638    except:639        raise ValueError('invalid command "%s"' % (cmd,))640    all_files = expand(*filenames).splitlines()641    for filename in all_files:642        for line in file(filename).readlines():643            try:644                count_arg = (int(count),)645            except:646                if count == "g":647                    count_arg = ()648                elif count == "":649                    count_arg = (1,)650                else:651                    raise ValueError('invalid count: "%s"' % (count,))652            rv.append(re.subn(* ((pattern, repl, line) + count_arg))[0])653    return "".join(rv)654def sh(*cmd):655    """sh COMMAND656    run COMMAND in shell"""657    s, o, e = _shell_soe(" ".join(cmd))658    return "[exit status: %s]\n%s" % (s, o+e)659def sleep(seconds):660    """sleep SECONDS661    sleep for SECONDS (float)"""662    time.sleep(float(seconds))663    return ""664def sort(*args):665    """sort [-n] [-k N] [FILE]666    sort lines [numerically] according to column N"""667    opts, filenames = _getopts(args, "k:n")668    filenames = expand(*filenames, accept_pipe=True).splitlines()669    rv = []670    for filename in filenames:671        lines = [[l.split(), l] for l in file(filename).readlines()]672        if "-k" in opts:673            k = int(opts["-k"]) - 1674            for line in lines:675                line[0][0], line[0][k] = line[0][k], line[0][0]676        if "-n" in opts:677            for line in lines:678                try:679                    line[0][0] = int(line[0][0])680                except:681                    pass682        lines.sort()683        rv.extend([line[1] for line in lines])684    return "".join(rv)685def sync():686    """sync687    flush system write back caches"""688    if os.name == "nt":689        retval = str(ctypes.windll.kernel32.SetSystemFileCacheSize(-1, -1, 0))690    else:691        _, _, retval = _shell_soe("sync")692    return retval693def tail(*args):694    """tail [-n NUM] [FILE...]695    show last NUM lines in file(s)"""696    opts, filenames = _getopts(args, "n:")697    all_files = expand(*filenames).splitlines()698    if "-n" in opts:699        lines = int(opts["-n"])700    else:701        lines = 10702    rv = []703    if lines > 0:704        for filename in all_files:705            rv.extend(file(filename).readlines()[-lines:])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
