How to use python_logger method in Robotframework

Best Python code snippet using robotframework

aptest.py

Source:aptest.py Github

copy

Full Screen

1import subprocess2import time3from runner.utils.sonicauto import SonicAuto4from runner.settings import python_logger5class UploadAptestException(Exception):6 def __init__(self, info):7 super().__init__(f"Upload result to ApTest is skipped: {info}")8class ApTest:9 def __init__(self, host='10.208.1.8', token='qatoken', user_name='automation', retry=5):10 self.host = host11 self.token = token12 self.user_name = user_name13 self.retry = retry14 self.uuid_map = {}15 def upload_aptest(self, results, platform, scmlabel):16 num_of_case_uploaded = 017 try:18 # get session_group_id19 sa = SonicAuto()20 product_id = str(sa.get_product_id_from_platform(platform))21 if product_id == 0:22 raise UploadAptestException("product id is not found")23 session_group_id = sa.get_session_group_name(scmlabel, product_id)24 if session_group_id == '':25 raise UploadAptestException("session group id is not found")26 # get ApTest suite name from product_name27 application_platform_aptest_mapping = {28 'GMS': 'GMSVP',29 'LICENSEMANAGER': 'License_Manager',30 'MYSONICWALL': 'Latte',31 'SANDBOX': 'Latte',32 'SNOWSHOE': 'Latte',33 'CFC': 'Desktop_Clients'34 }35 productid_aptest_mapping = {36 '2': 'SMA_1000_AO_Phase-6',37 '3': 'EmailSecurity',38 '5': 'SMA_100_AO_Phase6',39 '6': 'firmware_sonicos',40 '7': 'wxa',41 '8': 'Web_Application_Firewall',42 '9': 'SMA_1000_AO_Phase-6',43 '10': 'CloudWAF',44 '11': 'Capture_Client',45 '12': 'Latte'46 }47 aptest_suite_name = ''48 if product_id == '1':49 aptest_suite_name = application_platform_aptest_mapping[platform]50 else:51 aptest_suite_name = productid_aptest_mapping[product_id]52 retry = self.retry53 while (retry):54 if self.get_session_numbers_from_group_id(aptest_suite_name, session_group_id):55 retry -= 156 time.sleep(20)57 else:58 break59 for tc in results:60 if not 'uuid' in tc:61 python_logger.write("INFO: Testcase uuid not defined. Results not uploaded to ApTest" + '\n')62 continue63 uuid = tc['uuid']64 if uuid in self.uuid_map:65 session_number = self.uuid_map[uuid]66 if self.update_results(scmlabel, platform, aptest_suite_name, session_number, **tc) == '0':67 num_of_case_uploaded += 168 except UploadAptestException as e:69 python_logger.write(e.args[0] + '\n')70 except Exception as e:71 python_logger.write(e.args[0] + '\n')72 return num_of_case_uploaded73 def get_session_numbers_from_group_id(self, aptest_suite_name, session_group_id):74 rpc_query_string = '?suite=' + aptest_suite_name + '&sessiongroupid=' + session_group_id75 rpc_url = 'https://' + self.host + '/sw-getSessionsWithVar.pl' + rpc_query_string76 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')77 params = ['--connect-timeout', '20', '--silent', '-k']78 params.append(rpc_url)79 try:80 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]81 return_string = rpc_result_string.decode('ASCII')82 rpc_result_list = return_string.split('\n')83 rpc_return_value = rpc_result_list.pop(0)84 python_logger.write("DEBUG: RPC result value is: " + rpc_return_value + '\n')85 for session_setname in rpc_result_list:86 if session_setname == '':87 continue88 session_number, set_name = session_setname.split(',')89 rpc_query_string = '?suite=' + aptest_suite_name + '&set=' + set_name + '&session=' + session_number90 rpc_url = 'https://' + self.host + '/sw-autotclist.pl' + rpc_query_string91 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')92 params = ['--connect-timeout', '20', '--silent', '-k']93 params.append(rpc_url)94 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]95 return_string = rpc_result_string.decode('ASCII')96 rpc_result_list_uuid = return_string.split('\n')97 rpc_return_code = rpc_result_list_uuid.pop(0)98 python_logger.write("DEBUG: TC list result code is: " + rpc_return_code + '\n')99 if rpc_return_code == 1:100 python_logger.write("Failed return status from getUUIDsFromSessionNumber" + '\n')101 elif len(rpc_result_list_uuid) == 1:102 python_logger.write("No corresponding testcases for the given session number" + '\n')103 else:104 for testcase_data in rpc_result_list_uuid:105 uuid = testcase_data.split(',')106 self.uuid_map[uuid[0]] = session_number107 except:108 python_logger.error("Unable to run rpc query")109 return 1110 def update_results(self, scmlabel, platform, aptest_suite_name, session_number, **tc):111 uuid = tc['uuid']112 result = ''113 if 'result' in tc:114 result = tc['result']115 note = ''116 if 'note' in tc:117 note = tc['note']118 custom_key = ''119 if 'custom_key' in tc:120 custom_key = tc['custom_key']121 custom_val = ''122 if 'custom_val' in tc:123 custom_val = tc['custom_val']124 note_default = ''125 if scmlabel != '1.1.1.1' and platform != 'TestProd':126 note_default = 'BuildVersion:' + scmlabel + '__Product' + platform127 if note:128 note_default = note_default + '__Note:' + note129 transform_result = {'PASSED': 'pass', 'FAILED': 'fail', 'SKIPPED': 'untested'}130 if result in transform_result:131 result = transform_result[result]132 execdata = 'EXECDATA' + '_' + custom_key133 rpc_query_string = '?rpctoken=' + self.token + '&username=' + self.user_name + '&suite=' + aptest_suite_name + '&command=result&sess=' + session_number + '&uuid=' + uuid + '&result=' + result + '&' + execdata + '=' + custom_val + '&note=' + note_default134 rpc_query_string = rpc_query_string.replace('[', '\[')135 rpc_query_string = rpc_query_string.replace(']', '\]')136 rpc_url = 'https://' + self.host + '/run/rpc.mpl' + rpc_query_string137 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')138 params = ['--connect-timeout', '20', '--silent', '-k']139 params.append(rpc_url)140 try:141 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]142 return_string = rpc_result_string.decode('ASCII')143 rpc_result_list = return_string.split('\n')144 rpc_return_value = rpc_result_list[0]145 rpc_return_message = rpc_result_list[1]146 python_logger.write("DEBUG: RPC result value is: " + rpc_return_value + '\n')147 python_logger.write("DEBUG: RPC result message is: " + rpc_return_message + '\n')148 return rpc_return_value149 except:150 python_logger.error("ERROR: Unable to run rpc query")151 return152# if __name__ == '__main__':153# ap = ApTest()154# cts_result = [{'uuid': '8A271988-0464-11DE-860E-445A00F93527', 'result': 'PASSED'},155# {'uuid': '8A39711E-0464-11DE-860E-445A00F93527', 'result': 'PASSED', 'note': '[trialrun]'}]156# scmlabel = '6.5.2.2-44n'157# platform = '5600'...

Full Screen

Full Screen

Logger.py

Source:Logger.py Github

copy

Full Screen

1#!/usr/bin/env python2"""This is a simple class for wrapping functions in the3native python logging module."""45import logging, sys, os, shutil6import logging.handlers7from mini_utility import get_spkg_path, make_path8from static_data import OK, FAIL, LOG_MAX_SIZE, LOGS_TO_KEEP, LOG_FILE910FORMAT_STRING = '%(asctime)s|%(levelname)s|%(message)s|'11LOGGER_NAME = "bombardier"1213class LoggerClass:1415 """This class implements a simple interface that other logging16 type classes also implement. This is the most complete17 implementation. """1819 def __init__(self):20 spkg_path = get_spkg_path()21 self.python_logger = None22 self.formatter = None23 self.std_err_handler = None24 self.file_handler = None25 self.log_path = None26 if spkg_path:27 self.log_path = make_path(get_spkg_path(), LOG_FILE)28 if self.check_log_size() == FAIL:29 self.cycle_log()30 self.make_logger()3132 def check_log_size(self):33 """ We were dealing with Windows, and couldn't use Python's34 fancy rolling logger effectively. We therefore hd to see35 if the log is too big and deal with it periodically. -36 pbanka37 """38 if not os.path.isfile(self.log_path):39 return40 size = os.stat(self.log_path)[6]41 if size > LOG_MAX_SIZE:42 return FAIL43 return OK4445 def cycle_log(self):46 "Manually cycle logs"47 oldest_log_file = "%s.%s" % (self.log_path, LOGS_TO_KEEP)48 if os.path.isfile(oldest_log_file):49 try:50 os.unlink(oldest_log_file)51 except OSError:52 return53 for i in range(LOGS_TO_KEEP-1, 0, -1):54 source_path = "%s.%s" % (self.log_path, i)55 if os.path.isfile(source_path):56 dest_path = "%s.%s" % (self.log_path, (i+1))57 shutil.copyfile(source_path, dest_path)58 os.unlink(source_path)59 shutil.copyfile(self.log_path, "%s.1" % self.log_path)60 try:61 os.unlink(self.log_path)62 except OSError:63 return6465 def make_logger(self):66 "use the logging subsystem to obtain a logger"67 self.python_logger = logging.getLogger(LOGGER_NAME)68 try:69 self.file_handler = logging.FileHandler(self.log_path)70 except IOError:71 try:72 self.file_handler = logging.FileHandler(self.log_path)73 except IOError:74 print "Unable to log to %s" % self.log_path75 self.file_handler = logging.StreamHandler(sys.stderr)76 self.formatter = logging.Formatter(FORMAT_STRING)77 self.file_handler.setFormatter(self.formatter)78 self.python_logger.addHandler(self.file_handler)79 self.python_logger.setLevel(logging.DEBUG)8081 def info(self, msg):82 "wrap python logging facility"83 self.python_logger.info(msg)8485 def debug(self, msg):86 "wrap python logging facility"87 self.python_logger.debug(msg)8889 def warning(self, msg):90 "wrap python logging facility"91 self.python_logger.warning(msg)9293 def error(self, msg):94 "wrap python logging facility"95 self.python_logger.error(msg)9697 def critical(self, msg):98 "wrap python logging facility"99 self.python_logger.critical(msg)100101 def add_std_err_logging(self):102 "Get stderr logging in addition to file logging"103 self.std_err_handler = logging.StreamHandler(sys.stderr)104 self.std_err_handler.setFormatter(self.formatter)105 self.python_logger.addHandler(self.std_err_handler)106107 def rm_file_logging(self):108 "Sometimes you don't want to log to a file"109 if self.file_handler:110 msg = "Removing logging to %s" % self.log_path111 self.python_logger.info(msg)112 self.python_logger.removeHandler(self.file_handler)113 else:114 msg = "Being told to remove nonexistent file logging handler."115 self.python_logger.warning(msg)116117 def rm_std_err_logging(self):118 "Sometimes you're tired of seeing stderr logging"119 if self.std_err_handler:120 self.python_logger.info("Removing logging to standard error")121 self.python_logger.removeHandler(self.std_err_handler)122 else:123 msg = "Being told to remove nonexistent stderr handler."124 self.python_logger.warning(msg)125 ...

Full Screen

Full Screen

ServerLogger.py

Source:ServerLogger.py Github

copy

Full Screen

1import StringIO2import syslog3syslog.openlog("dispatcher", syslog.LOG_PID, syslog.LOG_USER)4syslog.LOG_UPTO(syslog.LOG_INFO)5import logging, sys, os, shutil6import logging.handlers7FORMAT_STRING = '%(asctime)s|%(levelname)s|%(message)s|'8class ServerLogger:9 def __init__(self, name, use_syslog = False):10 self.name = name11 self.python_logger = logging.getLogger(self.name)12 self.python_logger.setLevel(logging.DEBUG)13 self.formatter = logging.Formatter(FORMAT_STRING)14 self.file_handle = None15 self.use_syslog = use_syslog16 self.output_pointer = None17 def add_stringio_handle(self):18 self.file_handle = StringIO.StringIO()19 self.output_pointer = 020 file_log_handler = logging.StreamHandler(self.file_handle)21 file_log_handler.setFormatter(self.formatter)22 self.python_logger.addHandler(file_log_handler)23 def add_std_err(self):24 std_err_handler = logging.StreamHandler(sys.stderr)25 std_err_handler.setFormatter(self.formatter)26 self.python_logger.addHandler(std_err_handler)27 def get_new_logs(self):28 self.file_handle.seek(self.output_pointer)29 new_output = self.file_handle.read()30 output_list = new_output.split('\n')31 self.output_pointer += len(new_output) - len(output_list[-1])32 return output_list[:-1]33 def get_final_logs(self):34 self.file_handle.seek(self.output_pointer)35 return self.file_handle.read().split('\n')36 def get_complete_log(self):37 self.file_handle.seek(0)38 return self.file_handle.read()39 def log_message(self, level, message, username=None):40 if not username:41 username = self.name42 level_map = {"DEBUG": self.debug,43 "INFO": self.info,44 "WARNING": self.warning,45 "ERROR": self.error46 }47 if level in level_map:48 level_map[level](message, username)49 50 def debug(self, msg, username=None):51 if not username:52 username = self.name53 self.log(syslog.LOG_DEBUG, msg, username)54 self.python_logger.debug("%s|%s" % (username, msg))55 def info(self, msg, username=None):56 if not username:57 username = self.name58 self.log(syslog.LOG_INFO, msg, username)59 self.python_logger.info("%s|%s" % (username, msg))60 def warning(self, msg, username=None):61 if not username:62 username = self.name63 self.log(syslog.LOG_WARNING, msg, username)64 self.python_logger.warning("%s|%s" % (username, msg))65 def error(self, msg, username=None):66 if not username:67 username = self.name68 self.log(syslog.LOG_ERR, msg, username)69 self.python_logger.error("%s|%s" % (username, msg))70 def log(self, level, msg, username=None):71 if not username:72 username = self.name73 if self.use_syslog:74 syslog.syslog(level, "%-15s|dispatcher: %s" % (username, msg))75def test1():76 a = ServerLogger("test1")77 a.add_std_err()78 a.info("hello", "test_user")79def test2():80 a = ServerLogger("test1")81 a.info("hello2", "test_user")82if __name__ == "__main__":83 test1()...

Full Screen

Full Screen

logging.py

Source:logging.py Github

copy

Full Screen

1import logging2from logging.handlers import TimedRotatingFileHandler3from .utils import make_random_string4def configure_logging(log_filepath=None, when=None, interval=None, backup_count=None):5 formatter = DispatchingFormatter({6 'aggregator': logging.Formatter('%(asctime)s - %(name)s - %(req_id)s - %(subsystem)s - %(levelname)s - %(message)s'),7 'quart.serving': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'),8 'quart.app': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s in %(module)s: %(message)s'),9 }, logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))10 if log_filepath:11 handler = TimedRotatingFileHandler(log_filepath, when=when, interval=interval, backupCount=backup_count)12 else:13 handler = logging.StreamHandler()14 handler.setLevel(logging.DEBUG)15 handler.setFormatter(formatter)16 # Our own internal application logger wrapper17 logger = logging.getLogger('aggregator')18 logger.setLevel(logging.DEBUG)19 logger.addHandler(handler)20 application_logger = Logger(logger, subsystem='root')21 return application_logger, handler22def configure_logging_for_tests():23 logger = logging.getLogger('aggregator')24 logger.setLevel(logging.DEBUG)25 return Logger(logger, subsystem='root')26class Logger(object):27 def __init__(self, python_logger, **extra):28 extra.setdefault('subsystem', '')29 extra.setdefault('req_id', '__n/a__')30 self.python_logger = python_logger31 self.extra = extra32 def info(self, msg, **extra):33 e = self.extra.copy()34 e.update(extra)35 self.python_logger.info(msg, extra=e)36 def error(self, msg, exc_info=None, **extra):37 e = self.extra.copy()38 e.update(extra)39 self.python_logger.error(msg, exc_info=exc_info, extra=e)40 def exception(self, msg, exc_info=True, **extra):41 e = self.extra.copy()42 e.update(extra)43 self.python_logger.exception(msg, exc_info=exc_info, extra=e)44 def getLogger(self, **extra):45 new_extra = self.extra.copy()46 new_extra.update(extra)47 return Logger(self.python_logger, **new_extra)48 def getLoggerWithRandomReqId(self, prefix):49 new_extra = self.extra.copy()50 new_extra['req_id'] = f'{prefix}-{make_random_string(7)}'51 return Logger(self.python_logger, **new_extra)52# From https://stackoverflow.com/a/3462668553class DispatchingFormatter:54 def __init__(self, formatters, default_formatter):55 self._formatters = formatters56 self._default_formatter = default_formatter57 def format(self, record):58 # Search from record's logger up to it's parents:59 logger = logging.getLogger(record.name)60 while logger:61 # Check if suitable formatter for current logger exists:62 if logger.name in self._formatters:63 formatter = self._formatters[logger.name]64 break65 else:66 logger = logger.parent67 else:68 # If no formatter found, just use default:69 formatter = self._default_formatter...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful