Best Python code snippet using autotest_python
nxagent-helper
Source:nxagent-helper  
1#!/usr/bin/python2.42#3# Copyright 2007 Google Inc.4#5# This program is free software; you can redistribute it and/or6# modify it under the terms of the GNU General Public License7# as published by the Free Software Foundation; either version 28# of the License, or (at your option) any later version.9#10# This program is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with this program; if not, write to the Free Software17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.18#19# Author: diamond@google.com (Stephen Shirley)20"""nxserver program for accepting nx connections.21"""22import os23import re24import signal25import subprocess26import sys27import time28import traceback29sys.path.append('/usr/freenx/lib')30import nxlog31import nxloadconfig32prog_name = "nxagent-helper"33_COMMANDS = ( "start", "resume" )34state_lines = { 'starting': re.compile(r'^Session: Starting session at '),35    'running': re.compile(r'^Session: Session (started|resumed) at '),36    'suspending': re.compile(r'^Session: Suspending session at '),37    'suspended': re.compile(r'^Session: Session suspended at '),38    'terminating': re.compile(r'^Session: (Terminat|Abort)ing session at '),39    'terminated': re.compile(r'^Session: Session (terminat|abort)ed at ')40  }41info_lines = { 'watchdog_pid':42      re.compile(r"^Info: Watchdog running with pid '(?P<pid>\d+)'."),43    'kill_watchdog':44      re.compile(r"^Info: Waiting the watchdog process to complete."),45    'agent_pid':46      re.compile(r"^Info: Agent running with pid '(?P<pid>\d+)'."),47    'general_error':48      re.compile(r"^Error: (?P<error>.*)$")49  }50options = {}51def main():52  """Do setup, then read and handle command"""53  basic_setup()54  handle_command(*read_command()) # Expand the list passed back into two55                                  # seperate arguments.56def basic_setup():57  """Setup logging, read configuration"""58  nxlog.setup(prog_name)59  nxlog.log(nxlog.LOG_DEBUG, "started with pid %d\n" % os.getpid())60  nxloadconfig.setup(prog_name)61  level = nxloadconfig.conf.get('LOG_LEVEL', '5') # Default to LOG_NOTICE62  try:63    nxlog.set_log_level(level)64  except ValueError:65    nxlog.log(nxlog.LOG_ERR, "Invalid log level: %s\n" % level)66  nxlog.log(nxlog.LOG_INFO, "config parsed\n")67def read_command():68  """Read a single command from stdin, check that it's valid, return it."""69  raw_cmd = raw_input()70  cmd = raw_cmd.split()71  if len(cmd) < 2:72    nxlog.log(nxlog.LOG_CRIT, "Command has invalid format: %s\n" %73        repr(raw_cmd))74    sys.exit(1)75  if cmd[0].lower() not in _COMMANDS:76    nxlog.log(nxlog.LOG_CRIT, "Unknown command given: %s\n" % repr(raw_cmd))77    sys.exit(1)78  return cmd[0].lower(), cmd[1]79def handle_command(cmd, sessionid):80  """Handle the commands received."""81  global options82  nxlog.log(nxlog.LOG_DEBUG, "command: %s %s\n" % (cmd, sessionid))83  #FIXME: should be using a better, non-hardcoded location84  if cmd == 'start':85    nxlog.log(nxlog.LOG_NOTICE, "Starting session %s\n" % sessionid)86    session_dir = '/tmp/nx/S-%s' % sessionid87    try:88      args = [line[:-1] for line in89          open('/tmp/nx/S-%s/args' % sessionid).readlines()]90      nxlog.log(nxlog.LOG_NOTICE, "Session args: %s\n" % repr(args))91      # Open the session log to pass to nxagent92      nxagent_log = open(os.path.join(session_dir, 'session.log'), 'w')93      options_path = os.path.join(session_dir, 'options')94      opt_read(options_path)95    except IOError, e:96      nxlog.log(nxlog.LOG_ERR, "Session file error: %s\n" % e)97      sys.exit(1)98    options['dir'] = session_dir99    cleanup()100    os.environ['LD_LIBRARY_PATH'] = '/usr/NX/lib'101    os.environ['DISPLAY'] = 'nx/nx,options=%s:%s' % \102        (options_path, options['display_num'])103    os.environ['X_AUTHORITY'] = os.path.join(session_dir, 'authority')104    setup_xauth()105    options['p'] = subprocess.Popen(['/usr/freenx/bin/nxagent'] + args,106        close_fds=True, stdin=None, stdout=nxagent_log,107        stderr=subprocess.STDOUT)108    options['agent_pid'] = options['p'].pid109    write_agent_pid()110    nxlog.log(nxlog.LOG_NOTICE, "Spawned nxagent pid %d\n" % options['p'].pid)111    # Don't want to propogate this to other children.112    del os.environ['LD_LIBRARY_PATH']113    os.environ['DISPLAY'] = ':%s' % options['display_num']114    nxagent_log = open(os.path.join(session_dir, 'session.log'), 'r')115    follow_log(nxagent_log)116  elif cmd == 'resume':117    nxlog.log(nxlog.LOG_NOTICE, "Resuming session %s\n" % sessionid)118    nxlog.log(nxlog.LOG_CRIT, "Session resumption not yet supported\n")119    sys.exit(1)120def follow_log(log):121  """Follow and parse the log of nxagent.122  FIXME: Keeps track of the state of nxagent, performs actions on certain123  state changes.124  """125  try:126    nxlog.log(nxlog.LOG_DEBUG, "Following agent log\n")127    while True:128      line = log.readline()129      if not line:130        if options['p'].poll() is not None:131          ret = options['p'].returncode132          if ret < 0:133            nxlog.log(nxlog.LOG_NOTICE, "Nxagent has exited, "134                "killed by signal %d\n" % -ret)135          else:136            nxlog.log(nxlog.LOG_NOTICE, "Nxagent has exited: %s\n" % ret)137          break138        time.sleep(0.5)139        continue140      line = line.rstrip() # Remove trailing newline141      if match_state(line):142        nxlog.log(nxlog.LOG_DEBUG, "Matched state: %s\n" % line)143      elif match_info(line):144        nxlog.log(nxlog.LOG_DEBUG, "Matched info: %s\n" % line)145  # If any problems occur, we want to cleanup first. When we're done cleaning146  # up, re-raise the exception.147  except:148    nxlog.log(nxlog.LOG_ERR, "Got exception, cleaning up\n")149    print "NX> 1009 Session status: terminated"150    cleanup()151    raise152  nxlog.log(nxlog.LOG_DEBUG, "Finished following agent log\n")153def match_state(line):154  """Try match the given line against a session state155  Return:156    True: the line matched157    False: the line didn't match158  """159  # Faking a static variable160  old_state = match_state.cur_state = getattr(match_state, 'cur_state', None)161  if not line.startswith('Session: '):162    return False163  for state, rx in state_lines.iteritems():164    if rx.search(line):165      match_state.cur_state = state166      handle_state_change(match_state.cur_state, old_state)167      return True168  return False169def match_info(line):170  """Try match the given line against an info line regex171  172  Return:173    True: the line matched174    False: the line didn't match175  """176  for info, rx in info_lines.iteritems():177    m = rx.search(line)178    if m:179      handle_info(info, m)180      return True181  return False182def handle_state_change(cur_state, old_state):183  """Compare the current state to the previous state, and handle as 184  appropriate185  186  Args:187    cur_state: current state name188    old_state: previous state name189  Return:190    None191  """192  if cur_state == old_state:193    return194  nxlog.log(nxlog.LOG_NOTICE,"Nxagent state was: %s Now: %s\n" % \195      (old_state, cur_state))196  print "NX> 1009 Session status: %s" % cur_state197  sys.stdout.flush()198  if cur_state == 'starting' and old_state is None:199    start_app()200  elif cur_state == 'terminated' and old_state != 'terminated':201    nxlog.log(nxlog.LOG_NOTICE, "Nxagent finished, cleaning up")202    cleanup()203def handle_info(info, m):204  """Execute the required response to a given info line match205  206  Args:207    info: The id of the info line208    m: The match object from the info line's regex209  Return:210    None211  """212  if info == 'watchdog_pid':213    options['watchdog_pid'] = int(m.group('pid'))214    nxlog.log(nxlog.LOG_NOTICE, "matched info watchdog, pid %s\n" %215        options['watchdog_pid'])216  elif info == 'agent_pid':217    real_agent_pid = int(m.group('pid'))218    nxlog.log(nxlog.LOG_NOTICE, "matched info agent_pid, pid %s\n" %219        real_agent_pid)220    if options['agent_pid'] != real_agent_pid:221      # Probably caused by nxagent being a shell script222      nxlog.log(nxlog.LOG_WARNING, "Agent pid (%s) doesn't match "223          "spawned pid (%s)\n" % (options['agent_pid'], real_agent_pid))224      options['agent_pid'] = real_agent_pid225      write_agent_pid()226  elif info == 'kill_watchdog':227    if 'watchdog_pid' not in options:228      nxlog.log(nxlog.LOG_ERR, "matched info kill_watchdog, "229          "but no known watchdog pid\n")230    else:231      try:232        os.kill(options['watchdog_pid'], signal.SIGTERM)233      except OSError, (errno, strerror):234        nxlog.log(nxlog.LOG_WARNING, "matched info kill_watchdog, "235            "got error from kill[%d]: %s\n" % (errno, strerror))236      else:237        nxlog.log(nxlog.LOG_NOTICE, "matched info kill_watchdog, sent TERM.\n")238  elif info == 'general_error':239    nxlog.log(nxlog.LOG_ERR, "Agent error: %s" % m.group('error'))240  else:241    # If none of the above handers match...242    nxlog.log(nxlog.LOG_ERR, "matched info %s, but failed to "243        "find handler for it\n" % info)244def opt_read(opt_path):245  """Parse the nxagent options file into a dict."""246  global options247  options = {}248  #FIXME: this needs to be much more robust249  opt_str = open(opt_path).read().rstrip()250  opt_str, options['display_num'] = opt_str.rsplit(':', 1)251  for pair in opt_str.split(','):252    if pair == 'nx/nx': continue253    name, val = pair.split('=')254    options[name] = val255def setup_xauth():256  """Setup Xauthority file using session cookie."""257  global options258  os.system('xauth add localhost:%(display_num)s MIT-MAGIC-COOKIE-1 '259      '%(cookie)s &> $X_AUTHORITY.log' % options)260  os.system('xauth add :%(display_num)s MIT-MAGIC-COOKIE-1 %(cookie)s' % \261      options)262def start_app():263  """Start the session application (startkde/gnome-session/etc)."""264  global options265  if options['type'] in ('gnome', 'kde'):266    app = nxloadconfig.conf['COMMAND_START_%s' % options['type'].upper()]267  elif options['type'] == 'application':268    app = options['application']269  assert(app) #FIXME(diamond): handle this better270  cmd = 'nxstart %d %s &' % (options['agent_pid'], app)271  nxlog.log(nxlog.LOG_INFO, "Nxagent ready, launching app: %s" % cmd)272  log = open(os.path.join(options['dir'], 'app.log'), 'w')273  s = subprocess.Popen(cmd, shell=True, close_fds=True,274      stdin=None, stdout=log, stderr=subprocess.STDOUT)275  options['app_pid'] = s.pid276  #FIXME(diamond): check for errors here277def cleanup():278  """Cleanup after various things (X11 locks etc)."""279  global options280  for proc in 'watchdog', 'agent', 'app':281    pid_name = "%s_pid" % proc282    if pid_name in options:283      try:284        os.kill(options[pid_name], signal.SIGTERM)285      except OSError, (errno, strerror):286        nxlog.log(nxlog.LOG_WARNING, "Cleanup handler got error %d when "287            "killing %s: %s\n" % (errno, proc, strerror))288      else:289        nxlog.log(nxlog.LOG_WARNING, "Cleanup handler sent TERM to %s\n" % proc)290  for f in ['/tmp/.X%(display_num)s-lock' % options,291    '/tmp/.X11-unix/X%(display_num)s' % options]:292      if os.path.exists(f):293        os.remove(f)294def write_agent_pid():295  """Send the agent pid to nxserver-inner"""296  global options297  sys.stdout.write('NX> 8888 agentpid: %s\n' % options['agent_pid'])298  sys.stdout.flush()299if __name__ == '__main__':300  try:301    main()302  except SystemExit, e:303    sys.exit(e.code)304  except Exception, e:305    trace = traceback.format_exc()306    nxlog.log(nxlog.LOG_ERR, 'Going down because exception caught '307                             'at the top level.')308    for line in trace.split('\n'):309      nxlog.log(nxlog.LOG_ERR, '%s' % line)...watchdog.py
Source:watchdog.py  
...26                time.sleep(self.watchdog_timeout_sec)27            else:28                logger.log('DEBUG', 'Watchdog disabled')29                break30    def kill_watchdog(self):31        """ Permanently halts operation of the watchdog task """32        self.watchdog_timeout_sec = 033    def watchdog_task(self):34        """ Called once each watchdog period """35        logger.log('DEBUG', f'Watchdog({self.watchdog_name}) Timeout(sec)={self.watchdog_timeout_sec}')36if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
