How to use _init_sysinfo method in avocado

Best Python code snippet using avocado_python Github


Full Screen

...46 'instance_name': ('SRNd', 11)47 }48 self.config = self.init_srnd_config(def_config)49 self._use_psutil = psutil_import_result and not self.config['use_chroot']50 self._init_sysinfo()51 # install / update plugins52 self.log(self.logger.INFO, "installing / updating plugins")53 for directory in os.listdir('install_files'):54 copy_tree(os.path.join('install_files', directory), os.path.join(self.config['data_dir'], directory), preserve_times=True, update=True)55 #add data_dir in syspath and fix permission56 sys.path.append(os.path.abspath(self.config['data_dir']))57 # create jail58 os.chdir(self.config['data_dir'])59 # get initial owner60 init_owner = (os.geteuid(), os.getegid())61 # get owner62 if self.config['setuid'] != '':63 owner = (self.config['uid'], self.config['gid'])64 else:65 owner = init_owner66 # init feeds67 self.feeds = FeedsManager(log=self.log, logger=self.logger, infeed_config=self._load_infeeds_config(), infeed_debuglevel=self.config['infeed_debuglevel'])68 # test and fixing plugin dir permissions69 for directory in os.listdir('plugins'):70 dir_ = os.path.join('plugins', directory)71 try:72 self._permission_fix(0o755, owner, dir_)73 except OSError as e:74 if e.errno == 1:75 # FIXME what does this errno actually mean? write actual descriptions for error codes -.-76 self.log(self.logger.WARNING, "couldn't change owner of %s. %s will likely fail to create own directories." % (dir_, directory))77 else:78 # FIXME: exit might not allow logger to actually output the message.79 self.log(self.logger.CRITICAL, "trying to chown plugin directory %s failed: %s" % (dir_, e))80 exit(1)81 # create some directories82 for directory in ('filesystem', 'outfeeds', 'plugins'):83 dir_ = os.path.join('config', 'hooks', directory)84 if not os.path.exists(dir_):85 os.makedirs(dir_)86 # check for directory structure87 directories = (88 'incoming',89 os.path.join('incoming', 'tmp'),90 os.path.join('incoming', 'spam'),91 'articles',92 os.path.join('articles', 'censored'),93 os.path.join('articles', 'restored'),94 os.path.join('articles', 'invalid'),95 os.path.join('articles', 'duplicate'),96 'groups',97 'hooks',98 'stats',99 self.config['db_dir'])100 for directory in directories:101 if not os.path.exists(directory):102 os.mkdir(directory)103 self._permission_fix(0o755, owner, directory)104 # migrate db105 self._auto_db_migration()106 # fix all permission if owner change. it is a long time107 if old_owner != owner:108 self.log(self.logger.INFO, "onwer change, fixing all permissions...")109 self._deep_permission_fix(owner, '.', False)110 # protect some files from changes, if srnd dropping privileges111 if owner != init_owner:112 self._protect_files(init_owner)113 # base fixing permissions114 self._deep_permission_fix(owner, os.path.join('config', 'hooks', 'filesystem'), False)115 self._deep_permission_fix(owner, self.config['db_dir'], False)116 # init db manager117 self._db_manager = __import__('srnd.db_utils').db_utils.DatabaseManager(self.config['db_dir'])118 # importing plugins119 # we need to do this before chrooting because plugins may need to import other libraries120 self.plugins = dict()121 self.update_plugins()122 # start listening123 if self.config['bind_use_ipv6']:124 self.socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)125 else:126 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)127 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)128 try:129 self.log(self.logger.INFO, 'start listening at %s:%i' % (self.config['bind_ip'], self.config['bind_port']))130 self.socket.bind((self.config['bind_ip'], self.config['bind_port']))131 except socket.error as e:132 if e.errno == 13:133 # FIXME: exit might not allow logger to actually output the message.134 self.log(self.logger.CRITICAL, '''[error] current user account does not have CAP_NET_BIND_SERVICE: %s135 You have three options:136 - run SRNd as root137 - assign CAP_NET_BIND_SERVICE to the user you intend to use138 - use a port > 1024 by setting bind_port at %s''' % (e, os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')))139 exit(2)140 elif e.errno == 98:141 # FIXME: exit might not allow logger to actually output the message.142 self.log(self.logger.CRITICAL, '%s:%i already in use, change to a different port by setting bind_port at %s' % (self.config['bind_ip'], self.config['bind_port'], os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')))143 exit(2)144 else:145 raise e146 self.socket.listen(5)147 if self.config['use_chroot']:148 self.log(self.logger.INFO, 'chrooting..')149 try:150 os.chroot('.')151 except OSError as e:152 if e.errno == 1:153 print "[error] current user account does not have CAP_SYS_CHROOT."154 print " You have three options:"155 print " - run SRNd as root"156 print " - assign CAP_SYS_CHROOT to the user you intend to use"157 print " - disable chroot in {0} by setting chroot=False".format(os.path.join(self.config['data_dir'], 'config', 'SRNd.conf'))158 exit(3)159 else:160 raise e161 if self.config['setuid'] != '':162 self.log(self.logger.INFO, 'dropping privileges..')163 try:164 os.setgid(self.config['gid'])165 os.setuid(self.config['uid'])166 except OSError as e:167 if e.errno == 1:168 print "[error] current user account does not have CAP_SETUID/CAP_SETGID: ", e169 print " You have three options:"170 print " - run SRNd as root"171 print " - assign CAP_SETUID and CAP_SETGID to the user you intend to use"172 print " - disable setuid in {0} by setting setuid=".format(os.path.join(self.config['data_dir'], 'config', 'SRNd.conf'))173 exit(4)174 else:175 raise e176 threading.Thread.__init__(self)177 = "SRNd-listener"178 self.dropper = dropper.dropper(179 thread_name='SRNd-dropper',180 logger=self.logger,181 master=self,182 debug=self.config['dropper_debuglevel'],183 db_connector=self._db_manager.connect,184 instance_name=self.config['instance_name']185 )186 self.start_up_timestamp = -1187 self.ctl_socket_handlers = dict()188 self.ctl_socket_handlers["status"] = self.ctl_socket_handler_status189 self.ctl_socket_handlers["log"] = self.ctl_socket_handler_logger190 self.ctl_socket_handlers["stats"] = self.ctl_socket_handler_stats191 self.hooks = dict()192 self.hook_blacklist = dict()193 self._feeds_lock = threading.Lock()194 def _auto_db_migration(self):195 # Work only for default plugins and db locations196 targets = (197 ('censor.db3', 'censor.db3'), ('dropper.db3', 'dropper.db3'), ('hashes.db3', 'hashes.db3'), ('postman.db3', 'postman.db3'),198 ('overchan.db3', os.path.join('plugins', 'overchan', 'overchan.db3')), ('pastes.db3', os.path.join('plugins', 'paste', 'pastes.db3'))199 )200 for target in targets:201 new_location = os.path.join(self.config['db_dir'], target[0])202 if os.path.isfile(target[1]) and os.path.isfile(new_location):203 self.log(self.logger.ERROR, 'DB migrator: {0} duplicate found {1}. If you copy {2} manually, delete {1}. If not - WTF!'.format(new_location, target[1], target[0]))204 elif os.path.isfile(target[1]):205 try:206 os.rename(target[1], new_location)207 except OSError as e:208 self.log(self.logger.ERROR, 'DB migrator: Error move {} to {}: {}'.format(target[1], new_location, e))209 def _deep_permission_fix(self, owner, path, only_dir=True):210 """Set permission and owner to all files and directories"""211 #TODO: 755 or 777? Or maybe 700212 mode_dir = 0o755 # rwxrwx-r-x213 mode_file = 0o664 # rw-rw-r--214 for dirpath, _, filenames in os.walk(path):215 self._permission_fix(mode_dir, owner, dirpath)216 if not only_dir:217 for file_ in filenames:218 file_link = os.path.join(dirpath, file_)219 if not os.path.islink(file_link):220 # ignore symlinks221 self._permission_fix(mode_file, owner, file_link)222 def _permission_fix(self, mode, owner, path):223 try:224 os.chmod(path, mode)225 os.chown(path, owner[0], owner[1])226 except OSError as e:227 username = pwd.getpwuid(self.config['uid']).pw_name if self.config['setuid'] else pwd.getpwuid(os.geteuid()).pw_name228 if e.errno == 1:229 warnings = (230 "can't change ownership of {}.".format(path),231 "If you want to run as '{}' user modify {} :".format(username, os.path.join(self.config['data_dir'], 'config', 'SRNd.conf')),232 "set use_chroot=False, setuid={0}, start SRNd root privileges(su, sudo etc.), stop, wait, and starting SRNd as {0} without root privileges,".format(username),233 "or change ownership manually eg. 'sudo chown -R {0}:{0} data/', or delete the data directory".format(username),234 "die."235 )236 self.log(self.logger.CRITICAL, '\n'.join(warnings))237 else:238 self.log(self.logger.CRITICAL, "couldn't change owner or permission of {}: {}".format(path, e))239 exit(1)240 def _protect_files(self, owner):241 mode_file = 0o664242 # Prevent add and load plugin after dropping privileges243 for dir_ in ('config', 'srnd'):244 self._deep_permission_fix(owner, dir_, False)245 # Prevent modify files and templates files - plugin can be reload246 # TODO: Protect all plugin files and directories, without tmp and out. Also, templates directory may be renamed in plugin config247 for plugin_dir in os.listdir('plugins'):248 for target in os.listdir(os.path.join('plugins', plugin_dir)):249 path = os.path.join('plugins', plugin_dir, target)250 if os.path.isfile(path):251 self._permission_fix(mode_file, owner, path)252 elif target == 'templates':253 self._deep_permission_fix(owner, path, False)254 def get_info(self, data=None):255 if data is not None and data.get('command', None) in self.ctl_socket_handlers:256 return self.ctl_socket_handlers[data['command']](data)257 else:258 return None259 def _get_sysinfo(self, target):260 result = None261 if target == 'cpu':262 if self._use_psutil:263 result = self._sysinfo['psutil'].cpu_percent(interval=None)264 else:265 result = 0266 elif target == 'ram':267 if self._use_psutil:268 result = self._sysinfo['psutil'].memory_info()[0]269 else:270 if self._sysinfo['ramfile'] is not None:271 self._sysinfo['ramfile'].seek(0)272 result = int(self._sysinfo['ramfile'].read().split(' ')[1]) * self._sysinfo['pagesize']273 else:274 result = 0275 elif target == 'disk_free':276 result = self._sysinfo['statvfs'].f_bavail * self._sysinfo['statvfs'].f_frsize277 elif target == 'disk_used':278 result = (self._sysinfo['statvfs'].f_blocks - self._sysinfo['statvfs'].f_bfree) * self._sysinfo['statvfs'].f_frsize279 return result280 def _init_sysinfo(self):281 self._sysinfo = dict()282 if self._use_psutil:283 self._sysinfo['psutil'] = psutil.Process()284 else:285 try:286 self._sysinfo['ramfile'] = open('/proc/self/statm', 'r')287 except Exception as e:288 self.log(self.logger.WARNING, 'can\'t open ram stat file at /proc/self/statm: %s' % e)289 self._sysinfo['ramfile'] = None290 if 'SC_PAGESIZE' in os.sysconf_names:291 self._sysinfo['pagesize'] = os.sysconf('SC_PAGESIZE')292 elif 'SC_PAGE_SIZE' in os.sysconf_names:293 self._sysinfo['pagesize'] = os.sysconf('SC_PAGE_SIZE')294 elif '_SC_PAGESIZE' in os.sysconf_names:...

Full Screen

Full Screen Github


Full Screen

...114 description = 'Collects system information before/after the job is run'115 def __init__(self, config):116 self.sysinfo = None117 self.sysinfo_enabled = config.get('sysinfo.collect.enabled')118 def _init_sysinfo(self, job_logdir):119 if self.sysinfo is None:120 basedir = path.init_dir(job_logdir, 'sysinfo')121 self.sysinfo = sysinfo.SysInfo(basedir=basedir)122 def pre_tests(self, job):123 if not self.sysinfo_enabled:124 return125 self._init_sysinfo(job.logdir)126 self.sysinfo.start()127 def post_tests(self, job):128 if not self.sysinfo_enabled:129 return130 self._init_sysinfo(job.logdir)131 self.sysinfo.end()132class SysInfo(CLICmd):133 """134 Collect system information135 """136 name = 'sysinfo'137 description = 'Collect system information'138 def configure(self, parser):139 """140 Add the subparser for the run action.141 :param parser: The Avocado command line application parser142 :type parser: :class:`avocado.core.parser.ArgumentParser`143 """144 parser = super(SysInfo, self).configure(parser)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?