Best Python code snippet using localstack_python
debug2.py
Source:debug2.py  
1# -*- coding: utf-8 -*-2#3# Univention Debug24#  debug2.py5#6# Copyright 2008-2012 Univention GmbH7#8# http://www.univention.de/9#10# All rights reserved.11#12# The source code of this program is made available13# under the terms of the GNU Affero General Public License version 314# (GNU AGPL V3) as published by the Free Software Foundation.15#16# Binary versions of this program provided by Univention to you as17# well as other copyrighted, protected or trademarked materials like18# Logos, graphics, fonts, specific documentations and configurations,19# cryptographic keys etc. are subject to a license agreement between20# you and Univention and not subject to the GNU AGPL V3.21#22# In the case you use this program under the terms of the GNU AGPL V3,23# the program is provided in the hope that it will be useful,24# but WITHOUT ANY WARRANTY; without even the implied warranty of25# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the26# GNU Affero General Public License for more details.27#28# You should have received a copy of the GNU Affero General Public29# License with the Debian GNU/Linux or Univention distribution in file30# /usr/share/common-licenses/AGPL-3; if not, see31# <http://www.gnu.org/licenses/>.32import logging33#import logging.handlers34ERROR = 035WARN = 136PROCESS = 237INFO = 338ALL = 439DEFAULT = WARN40# The default levels provided are DEBUG(10), INFO(20), WARNING(30), ERROR(40) and CRITICAL(50).41# Mapping old levels to new ones42_map_lvl_old2new = {43	0: logging.ERROR,    # 4044	1: logging.WARNING,  # 3045	2: 25,               # 2546	3: logging.INFO,     # 2047	4: 15,               # 1548}49MAIN = 0x0050LDAP = 0x0151USERS = 0x0252NETWORK = 0x0353SSL = 0x0454SLAPD = 0x0555SEARCH = 0x0656TRANSFILE = 0x0757LISTENER = 0x0858POLICY = 0x0959ADMIN = 0x0A60CONFIG = 0x0B61LICENSE = 0x0C62KERBEROS = 0x0D63DHCP = 0x0E64_map_id_old2new = {65	MAIN: "MAIN",66	LDAP: "LDAP",67	USERS: "USERS",68	NETWORK: "NETWORK",69	SSL: "SSL",70	SLAPD: "SLAPD",71	SEARCH: "SEARCH",72	TRANSFILE: "TRANSFILE",73	LISTENER: "LISTENER",74	POLICY: "POLICY",75	ADMIN: "ADMIN",76	CONFIG: "CONFIG",77	LICENSE: "LICENSE",78	KERBEROS: "KERBEROS",79	DHCP: "DHCP",80}81#13.08.08 13:13:57  LISTENER    ( ERROR   ) : listener: 182#13.08.08 13:13:57  LISTENER    ( WARN    ) : received signal 283#13.08.08 13:14:02  DEBUG_INIT84_outfmt = '%(asctime)s,%(msecs)d %(name)-11s (%(levelname)-7s): %(message)s'85_outfmt_syslog = '%(name)-11s (%(levelname)-7s): %(message)s'86_datefmt = '%d.%m.%Y %H:%M:%S'87_logfilename = None88_handler_console = None89_handler_file = None90_handler_syslog = None91_do_flush = False92_enable_function = False93_enable_syslog = False94_logger_level = {}95# set default level for each logger96for key in _map_id_old2new.values():97	_logger_level[key] = _map_lvl_old2new[DEFAULT]98def init( logfilename, do_flush=0, enable_function=0, enable_syslog=0 ):99	global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level100	_logfilename = logfilename101	# create root logger102	logging.basicConfig(level=logging.DEBUG,103						filename = '/dev/null',       # disabled104						format = _outfmt,105						datefmt = _datefmt)106	formatter = logging.Formatter( _outfmt, _datefmt )107	if logfilename == 'stderr' or logfilename == 'stdout':108		# add stderr or stdout handler109		try:110			if _handler_console:111				logging.getLogger('').removeHandler(_handler_console)112				_handler_console = None113			if logfilename == 'stdout':114				_handler_console = logging.StreamHandler( sys.stdout )115			else:116				_handler_console = logging.StreamHandler( sys.stderr )117			_handler_console.setLevel( logging.DEBUG )118			_handler_console.setFormatter(formatter)119			logging.getLogger('').addHandler(_handler_console)120		except:121			print 'opening %s failed' % logfilename122	else:123		if _handler_file:124			logging.getLogger('').removeHandler(_handler_file)125			_handler_file = None126		try:127			# add file handler128			_handler_file = logging.FileHandler( logfilename, 'a+' )129			_handler_file.setLevel( logging.DEBUG )130			_handler_file.setFormatter(formatter)131			logging.getLogger('').addHandler(_handler_file)132		except:133			print 'opening %s failed' % logfilename134# 	if enable_syslog:135# 		try:136# 			# add syslog handler137# 			_handler_syslog = logging.handlers.SysLogHandler( ('localhost', 514), logging.handlers.SysLogHandler.LOG_ERR )138# 			_handler_syslog.setLevel( _map_lvl_old2new[ERROR] )139# 			_handler_syslog.setFormatter(formatter)140# 			logging.getLogger('').addHandler(_handler_syslog)141# 		except:142# 			raise143# 			print 'opening syslog failed'144	logging.addLevelName( 25, 'PROCESS' )145	logging.addLevelName( 15, 'ALL' )146	logging.addLevelName( 100, '------' )147	logging.getLogger('MAIN').log( 100, 'DEBUG_INIT' )148	_do_flush = do_flush149	_enable_function = enable_function150	_enable_syslog = enable_syslog151def reopen():152	global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level153	logging.getLogger('MAIN').log( 100, 'DEBUG_REINIT' )154	init( _logfilename, _do_flush, _enable_function, _enable_syslog )155def set_level( id, level ):156	global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level157	new_id = _map_id_old2new.get(id, 'MAIN')158	if level > ALL:159		level = ALL160	elif level < ERROR:161		level = ERROR162	new_level = _map_lvl_old2new[ level ]163	_logger_level[ new_id ] = new_level164def set_function( activated ):165	global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level166	_enable_function = activated167def debug( id, level, msg, utf8=True):168	global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level169	new_id = _map_id_old2new.get(id, 'MAIN')170	new_level = _map_lvl_old2new[ level ]171	if new_level >= _logger_level[ new_id ]:172		logging.getLogger( new_id ).log( new_level, msg )173		# flush if requested174		if _do_flush:175			for handler in [ _handler_console, _handler_file, _handler_syslog ]:176				if handler:177					handler.flush()178class function:179	def __init__(self, text, utf8=True):180		global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level181		self.text=text182		if _enable_function:183			logging.getLogger('MAIN').log( 100, 'UNIVENTION_DEBUG_BEGIN : ' + self.text )184			# flush if requested185			if _do_flush:186				for handler in [ _handler_console, _handler_file, _handler_syslog ]:187					if handler:188						handler.flush()189	def __del__(self):190		global _logfilename, _handler_console, _handler_file, _handler_syslog, _do_flush, _enable_function, _enable_syslog, _logger_level191		if _enable_function:192			logging.getLogger('MAIN').log( 100, 'UNIVENTION_DEBUG_END   : ' + self.text )193			# flush if requested194			if _do_flush:195				for handler in [ _handler_console, _handler_file, _handler_syslog ]:196					if handler:...batches.py
Source:batches.py  
...91        if self._tref is None:     # first request starts flush timer.92            self._tref = timer2.apply_interval(self.flush_interval * 1000,93                                               self._do_flush)94        if not self._count() % self.flush_every:95            self._do_flush()96    def _do_flush(self):97        self.debug("Wake-up to flush buffer...")98        requests = None99        if self._buffer.qsize():100            requests = list(consume_queue(self._buffer))101            if requests:102                self.debug("Buffer complete: %s" % (len(requests), ))103                self.flush(requests)104        if not requests:105            self.debug("Cancelling timer: Nothing in buffer.")106            self._tref.cancel()  # cancel timer.107            self._tref = None108    def apply_buffer(self, requests, args=(), kwargs={}):109        acks_late = [], []110        [acks_late[r.task.acks_late].append(r) for r in requests]...server.py
Source:server.py  
1import os2import threading3import SocketServer4import BaseHTTPServer5from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer6from utils import synchronized7###8class ThreadingXMLRPCServer(SocketServer.ThreadingMixIn,9                            SimpleXMLRPCServer):10    pass11_server = None12def create_server(port):13    global _server14    assert _server is None15    _server = ThreadingXMLRPCServer(('localhost', port),16                                    SimpleXMLRPCRequestHandler)17    _server.register_function(request_ok, 'request_ok')18    _server.register_function(release, 'release')19    _server.register_function(set_queuesize, 'set_queuesize')20    _server.register_function(set_flush, 'set_flush')21    _server.register_function(zero, 'zero')22    return _server23def get_server():24    global _server25    return _server26###27###28thelock = threading.Lock()29_do_flush = True                        # adjust for testing/debugging30_queuesize = 131active_pids = set()32waiting_pids = []33def _flush_pid_queues():34    global active_pids, waiting_pids35    if not _do_flush:36        return37    38    rm = set()39    for pid in active_pids:40        try:41            os.getpgid(pid)42        except OSError:43            # pid no longer active44            rm.add(pid)45    active_pids -= rm46    47    rm = set()48    for pid in waiting_pids:49        try:50            os.getpgid(pid)51        except OSError:52            # pid no longer active53            rm.add(pid)54    for p in rm:55        waiting_pids.remove(p)56    print 'ACTIVE', active_pids57    print 'WAITING', waiting_pids58@synchronized(thelock)59def request_ok(pid):60    global active_pids, waiting_pids61    print 'REQUEST OK:', pid, active_pids, waiting_pids, _queuesize62    63    if pid in active_pids:64        return 165    _flush_pid_queues()66    n = len(active_pids)67    allow = _queuesize - n68    print 'ALLOW:', allow69    if allow:70        ok = False71        if len(waiting_pids) >= allow and pid in waiting_pids[:allow]:72            print 'IN'73            waiting_pids.remove(pid)74            ok = True75        elif len(waiting_pids) < allow:76            print 'SHORT'77            ok = True78        79        if ok:80            print 'OK'81            active_pids.add(pid)82            return 183    if pid not in waiting_pids:84        waiting_pids.append(pid)85    print 'NOK'86    return 087@synchronized(thelock)88def release(pid):89    global active_pids, waiting_pids90    if pid in active_pids:91        active_pids.remove(pid)92        if pid in waiting_pids:93            waiting_pids.remove(pid)94        return 195    if pid in waiting_pids:96        waiting_pids.remove(pid)97    98    return 099@synchronized(thelock)100def set_queuesize(n):101    global _queuesize102    n = int(n)103    assert n > 0104    105    _queuesize = n106    return 1107@synchronized(thelock)108def set_flush(flag):109    global _do_flush110    _do_flush = bool(flag)111    return int(_do_flush)112@synchronized(thelock)113def zero():114    global _do_flush, _queuesize, active_pids, waiting_pids115    116    _do_flush = True117    _queuesize = 1118    active_pids = set()119    waiting_pids = []...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
