Best Python code snippet using slash
speaker.py
Source:speaker.py  
...195    def set_log_level(l): Speaker.level= l196    @staticmethod197    def get_log_level(*a, **kw): return Speaker.level198    @staticmethod199    def set_log_color(*a, **kw): pass200    @staticmethod201    def get_log_color(): return 1202        #return Speaker.colored203    @staticmethod204    def log_raise(*args):205        """ log_raise(*args): --> logging and Raising206        207        args[0:-1] is the un-formatted string.208        args[-1] is the Exception to be raised.209        """ 210        exc= args[-1]211        s= (args[0] % args[1:-1])212        Speaker.log_error(s)213        raise exc(s)214    @staticmethod215    def log_int_raise(*args):216        s= (args[0] % args[1:])217        Speaker.log_error(s)218        raise InternalError(s)219#220##221### logging initialization222##223#224def init_speaker(conf= {}):225    # prevent multiple initilization226    if not hasattr(Speaker, '_spkr_initialized'):227        Speaker.log_info("Logger already initialized. If you "228                         "want to change log level or colored "229                         "logs, use set_log_level, set_log_color.")230        return231    sdefs= SpeakerDefaults()232    cf_ok= True233    #smtp_ok= True234    # need to read conf file or already done?235    if isinstance(conf, dict):236        conf_dict= conf237    else:238        conf_dict= {}239        try:240            execfile(conf, globals(), conf_dict)241        except Exception, cf_ko:242            # Remember the error: after logger243            # initialization it will be emitted244            cf_ok= False245    sdefs.update(conf_dict)246    #colors= sdefs.get('LOG_COLOR', False)247    #Speaker.set_log_color(colors)248    pref= sdefs['LOG_PREFIX']249    lvl= sdefs['LOG_LEVEL']250    Speaker.level= lvl251    if sdefs['LOG_TO_STDOUT']:252        _root_logger= Speaker.init_logging(lvl, pref)253        _handler= logging.StreamHandler()254        _root_logger.addHandler(_handler)255        _handler.setFormatter(_fmt_log)256    logfile= sdefs['LOG_TO_FILE']257    if logfile:258        frot= sdefs['LOG_FILE_ROTATE']259        fnum= sdefs['LOG_FILE_NUMBER']260        if frot:261            from logging.handlers import TimedRotatingFileHandler...server.py
Source:server.py  
...11from .. import hooks12from ..conf import config13from .tests_distributer import TestsDistributer14_logger = logbook.Logger(__name__)15log.set_log_color(_logger.name, logbook.NOTICE, 'blue')16NO_MORE_TESTS = "NO_MORE_TESTS"17PROTOCOL_ERROR = "PROTOCOL_ERROR"18WAITING_FOR_CLIENTS = "WAITING_FOR_CLIENTS"19class ServerStates(Enum):20    NOT_INITIALIZED = 121    WAIT_FOR_CLIENTS = 222    WAIT_FOR_COLLECTION_VALIDATION = 323    SERVE_TESTS = 424    STOP_TESTS_SERVING = 525    STOP_SERVE = 626class KeepaliveServer(object):27    def __init__(self):28        super(KeepaliveServer, self).__init__()29        self.clients_last_communication_time = {}...runner.py
Source:runner.py  
...11from .core.exclusions import is_excluded12from .core import requirements13from .utils.iteration import PeekableIterator14_logger = logbook.Logger(__name__)15log.set_log_color(_logger.name, logbook.NOTICE, 'blue')16def run_tests(iterable, stop_on_error=None):17    """18    Runs tests from an iterable using the current session19    """20    # pylint: disable=maybe-no-member21    def should_stop_on_error():22        if stop_on_error is None:23            return config.root.run.stop_on_error24        return stop_on_error25    if context.session is None or not context.session.started:26        raise NoActiveSession("A session is not currently started")27    test_iterator = PeekableIterator(iterable)28    last_filename = None29    complete = False...parallel_manager.py
Source:parallel_manager.py  
...9from ..conf import config10from .server import Server, ServerStates, KeepaliveServer11from .worker_configuration import TmuxWorkerConfiguration, ProcessWorkerConfiguration12_logger = logbook.Logger(__name__)13log.set_log_color(_logger.name, logbook.NOTICE, 'blue')14TIME_BETWEEN_CHECKS = 215MAX_CONNECTION_RETRIES = 20016def get_xmlrpc_proxy(address, port):17    return xmlrpc_client.ServerProxy('http://{}:{}'.format(address, port))18class ParallelManager(object):19    def __init__(self, args):20        super(ParallelManager, self).__init__()21        self.server = None22        self.workers_error_dircetory = mkdtemp()23        self.args = args24        self.workers_num = config.root.parallel.num_workers25        self.workers = {}26        self.server_thread = None27        self.keepalive_server = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
