Best Python code snippet using autotest_python
monitors_util.py
Source:monitors_util.py  
...111        for regex, callback in alert_hooks:112            match = re.match(regex, line.strip())113            if match:114                callback(*match.groups())115def lookup_lastlines(lastlines_dirpath, path):116    """Retrieve last lines seen for path.117    Open corresponding lastline file for path118    If there isn't one or isn't a match return None119    Args:120      lastlines_dirpath: str; Dirpath to store lastlines files to.121      path: str; Filepath to source file that lastlines came from.122    Returns:123      str; Last lines seen if they exist124      - Or -125      None; Otherwise126    """127    underscored = path.replace('/', '_')128    try:129        lastlines_file = open(os.path.join(lastlines_dirpath, underscored))130    except (OSError, IOError):131        return132    lastlines = lastlines_file.read()133    lastlines_file.close()134    os.remove(lastlines_file.name)135    if not lastlines:136        return137    try:138        target_file = open(path)139    except (OSError, IOError):140        return141    # Load it all in for now142    target_data = target_file.read()143    target_file.close()144    # Get start loc in the target_data string, scanning from right145    loc = target_data.rfind(lastlines)146    if loc == -1:147        return148    # Then translate this into a reverse line number149    # (count newlines that occur afterward)150    reverse_lineno = target_data.count('\n', loc + len(lastlines))151    return reverse_lineno152def write_lastlines_file(lastlines_dirpath, path, data):153    """Write data to lastlines file for path.154    Args:155      lastlines_dirpath: str; Dirpath to store lastlines files to.156      path: str; Filepath to source file that data comes from.157      data: str;158    Returns:159      str; Filepath that lastline data was written to.160    """161    underscored = path.replace('/', '_')162    dest_path = os.path.join(lastlines_dirpath, underscored)163    open(dest_path, 'w').write(data)164    return dest_path165def nonblocking(pipe):166    """Set python file object to nonblocking mode.167    This allows us to take advantage of pipe.read()168    where we don't have to specify a buflen.169    Cuts down on a few lines we'd have to maintain.170    Args:171      pipe: file; File object to modify172    Returns: pipe173    """174    flags = fcntl.fcntl(pipe, fcntl.F_GETFL)175    fcntl.fcntl(pipe, fcntl.F_SETFL, flags| os.O_NONBLOCK)176    return pipe177def launch_tails(follow_paths, lastlines_dirpath=None):178    """Launch a tail process for each follow_path.179    Args:180      follow_paths: list;181      lastlines_dirpath: str;182    Returns:183      tuple; (procs, pipes) or184          ({path: subprocess.Popen, ...}, {file: path, ...})185    """186    if lastlines_dirpath and not os.path.exists(lastlines_dirpath):187        os.makedirs(lastlines_dirpath)188    tail_cmd = ('/usr/bin/tail', '--retry', '--follow=name')189    procs = {}  # path -> tail_proc190    pipes = {}  # tail_proc.stdout -> path191    for path in follow_paths:192        cmd = list(tail_cmd)193        if lastlines_dirpath:194            reverse_lineno = lookup_lastlines(lastlines_dirpath, path)195            if reverse_lineno is None:196                reverse_lineno = 1197            cmd.append('--lines=%d' % reverse_lineno)198        cmd.append(path)199        tail_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)200        procs[path] = tail_proc201        pipes[nonblocking(tail_proc.stdout)] = path202    return procs, pipes203def poll_tail_pipes(pipes, lastlines_dirpath=None, waitsecs=5):204    """Wait on tail pipes for new data for waitsecs, return any new lines.205    Args:206      pipes: dict; {subprocess.Popen: follow_path, ...}207      lastlines_dirpath: str; Path to write lastlines to.208      waitsecs: int; Timeout to pass to select...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
