Best Python code snippet using avocado_python
SRNd.py
Source:SRNd.py  
...46        'instance_name': ('SRNd', 11)47    }48    self.config = self.init_srnd_config(def_config)49    self._use_psutil = psutil_import_result and not self.config['use_chroot']50    self._init_sysinfo()51    # install / update plugins52    self.log(self.logger.INFO, "installing / updating plugins")53    for directory in os.listdir('install_files'):54      copy_tree(os.path.join('install_files', directory), os.path.join(self.config['data_dir'], directory), preserve_times=True, update=True)55    #add data_dir in syspath and fix permission56    sys.path.append(os.path.abspath(self.config['data_dir']))57    # create jail58    os.chdir(self.config['data_dir'])59    # get initial owner60    init_owner = (os.geteuid(), os.getegid())61    # get owner62    if self.config['setuid'] != '':63      owner = (self.config['uid'], self.config['gid'])64    else:65      owner = init_owner66    # init feeds67    self.feeds = FeedsManager(log=self.log, logger=self.logger, infeed_config=self._load_infeeds_config(), infeed_debuglevel=self.config['infeed_debuglevel'])68    # test and fixing plugin dir permissions69    for directory in os.listdir('plugins'):70      dir_ = os.path.join('plugins', directory)71      try:72        self._permission_fix(0o755, owner, dir_)73      except OSError as e:74        if e.errno == 1:75          # FIXME what does this errno actually mean? write actual descriptions for error codes -.-76          self.log(self.logger.WARNING, "couldn't change owner of %s. %s will likely fail to create own directories." % (dir_, directory))77        else:78          # FIXME: exit might not allow logger to actually output the message.79          self.log(self.logger.CRITICAL, "trying to chown plugin directory %s failed: %s" % (dir_, e))80          exit(1)81    # create some directories82    for directory in ('filesystem', 'outfeeds', 'plugins'):83      dir_ = os.path.join('config', 'hooks', directory)84      if not os.path.exists(dir_):85        os.makedirs(dir_)86    # check for directory structure87    directories = (88        'incoming',89        os.path.join('incoming', 'tmp'),90        os.path.join('incoming', 'spam'),91        'articles',92        os.path.join('articles', 'censored'),93        os.path.join('articles', 'restored'),94        os.path.join('articles', 'invalid'),95        os.path.join('articles', 'duplicate'),96        'groups',97        'hooks',98        'stats',99        self.config['db_dir'])100    for directory in directories:101      if not os.path.exists(directory):102        os.mkdir(directory)103        self._permission_fix(0o755, owner, directory)104    # migrate db105    self._auto_db_migration()106    # fix all permission if owner change. it is a long time107    if old_owner != owner:108      self.log(self.logger.INFO, "onwer change, fixing all permissions...")109      self._deep_permission_fix(owner, '.', False)110    # protect some files from changes, if srnd dropping privileges111    if owner != init_owner:112      self._protect_files(init_owner)113    # base fixing permissions114    self._deep_permission_fix(owner, os.path.join('config', 'hooks', 'filesystem'), False)115    self._deep_permission_fix(owner, self.config['db_dir'], False)116    # init db manager117    self._db_manager = __import__('srnd.db_utils').db_utils.DatabaseManager(self.config['db_dir'])118    # importing plugins119    # we need to do this before chrooting because plugins may need to import other libraries120    self.plugins = dict()121    self.update_plugins()122    # start listening123    if self.config['bind_use_ipv6']:124      self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)125    else:126      self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)127    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)128    try:129      self.log(self.logger.INFO, 'start listening at %s:%i' % (self.config['bind_ip'], self.config['bind_port']))130      self.socket.bind((self.config['bind_ip'], self.config['bind_port']))131    except socket.error as e:132      if e.errno == 13:133        # FIXME: exit might not allow logger to actually output the message.134        self.log(self.logger.CRITICAL, '''[error] current user account does not have CAP_NET_BIND_SERVICE: %s135        You have three options:136         - run SRNd as root137         - assign CAP_NET_BIND_SERVICE to the user you intend to use138         - use a port > 1024 by setting bind_port at %s''' % (e, os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')))139        exit(2)140      elif e.errno == 98:141        # FIXME: exit might not allow logger to actually output the message.142        self.log(self.logger.CRITICAL, '%s:%i already in use, change to a different port by setting bind_port at %s' % (self.config['bind_ip'], self.config['bind_port'], os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')))143        exit(2)144      else:145        raise e146    self.socket.listen(5)147    if self.config['use_chroot']:148      self.log(self.logger.INFO, 'chrooting..')149      try:150        os.chroot('.')151      except OSError as e:152        if e.errno == 1:153          print "[error] current user account does not have CAP_SYS_CHROOT."154          print "        You have three options:"155          print "         - run SRNd as root"156          print "         - assign CAP_SYS_CHROOT to the user you intend to use"157          print "         - disable chroot in {0} by setting chroot=False".format(os.path.join(self.config['data_dir'], 'config', 'SRNd.conf'))158          exit(3)159        else:160          raise e161    if self.config['setuid'] != '':162      self.log(self.logger.INFO, 'dropping privileges..')163      try:164        os.setgid(self.config['gid'])165        os.setuid(self.config['uid'])166      except OSError as e:167        if e.errno == 1:168          print "[error] current user account does not have CAP_SETUID/CAP_SETGID: ", e169          print "        You have three options:"170          print "         - run SRNd as root"171          print "         - assign CAP_SETUID and CAP_SETGID to the user you intend to use"172          print "         - disable setuid in {0} by setting setuid=".format(os.path.join(self.config['data_dir'], 'config', 'SRNd.conf'))173          exit(4)174        else:175          raise e176    threading.Thread.__init__(self)177    self.name = "SRNd-listener"178    self.dropper = dropper.dropper(179        thread_name='SRNd-dropper',180        logger=self.logger,181        master=self,182        debug=self.config['dropper_debuglevel'],183        db_connector=self._db_manager.connect,184        instance_name=self.config['instance_name']185    )186    self.start_up_timestamp = -1187    self.ctl_socket_handlers = dict()188    self.ctl_socket_handlers["status"] = self.ctl_socket_handler_status189    self.ctl_socket_handlers["log"] = self.ctl_socket_handler_logger190    self.ctl_socket_handlers["stats"] = self.ctl_socket_handler_stats191    self.hooks = dict()192    self.hook_blacklist = dict()193    self._feeds_lock = threading.Lock()194  def _auto_db_migration(self):195    # Work only for default plugins and db locations196    targets = (197        ('censor.db3', 'censor.db3'), ('dropper.db3', 'dropper.db3'), ('hashes.db3', 'hashes.db3'), ('postman.db3', 'postman.db3'),198        ('overchan.db3', os.path.join('plugins', 'overchan', 'overchan.db3')), ('pastes.db3', os.path.join('plugins', 'paste', 'pastes.db3'))199    )200    for target in targets:201      new_location = os.path.join(self.config['db_dir'], target[0])202      if os.path.isfile(target[1]) and os.path.isfile(new_location):203        self.log(self.logger.ERROR, 'DB migrator: {0} duplicate found {1}. If you copy {2} manually, delete {1}. If not - WTF!'.format(new_location, target[1], target[0]))204      elif os.path.isfile(target[1]):205        try:206          os.rename(target[1], new_location)207        except OSError as e:208          self.log(self.logger.ERROR, 'DB migrator: Error move {} to {}: {}'.format(target[1], new_location, e))209  def _deep_permission_fix(self, owner, path, only_dir=True):210    """Set permission and owner to all files and directories"""211    #TODO: 755 or 777? Or maybe 700212    mode_dir = 0o755 # rwxrwx-r-x213    mode_file = 0o664 # rw-rw-r--214    for dirpath, _, filenames in os.walk(path):215      self._permission_fix(mode_dir, owner, dirpath)216      if not only_dir:217        for file_ in filenames:218          file_link = os.path.join(dirpath, file_)219          if not os.path.islink(file_link):220            # ignore symlinks221            self._permission_fix(mode_file, owner, file_link)222  def _permission_fix(self, mode, owner, path):223    try:224      os.chmod(path, mode)225      os.chown(path, owner[0], owner[1])226    except OSError as e:227      username = pwd.getpwuid(self.config['uid']).pw_name if self.config['setuid'] else pwd.getpwuid(os.geteuid()).pw_name228      if e.errno == 1:229        warnings = (230            "can't change ownership of {}.".format(path),231            "If you want to run as '{}' user modify {} :".format(username, os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')),232            "set use_chroot=False, setuid={0}, start SRNd root privileges(su, sudo etc.), stop, wait, and starting SRNd as {0} without root privileges,".format(username),233            "or change ownership manually eg. 'sudo chown -R {0}:{0} data/', or delete the data directory".format(username),234            "die."235        )236        self.log(self.logger.CRITICAL, '\n'.join(warnings))237      else:238        self.log(self.logger.CRITICAL, "couldn't change owner or permission of {}: {}".format(path, e))239      exit(1)240  def _protect_files(self, owner):241    mode_file = 0o664242    # Prevent add and load plugin after dropping privileges243    for dir_ in ('config', 'srnd'):244      self._deep_permission_fix(owner, dir_, False)245    # Prevent modify files and templates files - plugin can be reload246    # TODO: Protect all plugin files and directories, without tmp and out. Also, templates directory may be renamed in plugin config247    for plugin_dir in os.listdir('plugins'):248      for target in os.listdir(os.path.join('plugins', plugin_dir)):249        path = os.path.join('plugins', plugin_dir, target)250        if os.path.isfile(path):251          self._permission_fix(mode_file, owner, path)252        elif target == 'templates':253          self._deep_permission_fix(owner, path, False)254  def get_info(self, data=None):255    if data is not None and data.get('command', None) in self.ctl_socket_handlers:256      return self.ctl_socket_handlers[data['command']](data)257    else:258      return None259  def _get_sysinfo(self, target):260    result = None261    if target == 'cpu':262      if self._use_psutil:263        result = self._sysinfo['psutil'].cpu_percent(interval=None)264      else:265        result = 0266    elif target == 'ram':267      if self._use_psutil:268        result = self._sysinfo['psutil'].memory_info()[0]269      else:270        if self._sysinfo['ramfile'] is not None:271          self._sysinfo['ramfile'].seek(0)272          result = int(self._sysinfo['ramfile'].read().split(' ')[1]) * self._sysinfo['pagesize']273        else:274          result = 0275    elif target == 'disk_free':276      result = self._sysinfo['statvfs'].f_bavail * self._sysinfo['statvfs'].f_frsize277    elif target == 'disk_used':278      result = (self._sysinfo['statvfs'].f_blocks - self._sysinfo['statvfs'].f_bfree) * self._sysinfo['statvfs'].f_frsize279    return result280  def _init_sysinfo(self):281    self._sysinfo = dict()282    if self._use_psutil:283      self._sysinfo['psutil'] = psutil.Process()284    else:285      try:286        self._sysinfo['ramfile'] = open('/proc/self/statm', 'r')287      except Exception as e:288        self.log(self.logger.WARNING, 'can\'t open ram stat file at /proc/self/statm: %s' % e)289        self._sysinfo['ramfile'] = None290      if 'SC_PAGESIZE' in os.sysconf_names:291        self._sysinfo['pagesize'] = os.sysconf('SC_PAGESIZE')292      elif 'SC_PAGE_SIZE' in os.sysconf_names:293        self._sysinfo['pagesize'] = os.sysconf('SC_PAGE_SIZE')294      elif '_SC_PAGESIZE' in os.sysconf_names:...sysinfo.py
Source:sysinfo.py  
...114    description = 'Collects system information before/after the job is run'115    def __init__(self, config):116        self.sysinfo = None117        self.sysinfo_enabled = config.get('sysinfo.collect.enabled')118    def _init_sysinfo(self, job_logdir):119        if self.sysinfo is None:120            basedir = path.init_dir(job_logdir, 'sysinfo')121            self.sysinfo = sysinfo.SysInfo(basedir=basedir)122    def pre_tests(self, job):123        if not self.sysinfo_enabled:124            return125        self._init_sysinfo(job.logdir)126        self.sysinfo.start()127    def post_tests(self, job):128        if not self.sysinfo_enabled:129            return130        self._init_sysinfo(job.logdir)131        self.sysinfo.end()132class SysInfo(CLICmd):133    """134    Collect system information135    """136    name = 'sysinfo'137    description = 'Collect system information'138    def configure(self, parser):139        """140        Add the subparser for the run action.141        :param parser: The Avocado command line application parser142        :type parser: :class:`avocado.core.parser.ArgumentParser`143        """144        parser = super(SysInfo, self).configure(parser)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
