How to use override_log_level method in Slash

Best Python code snippet using slash

config.py

Source:config.py Github

copy

Full Screen

1#!/usr/bin/env python32"""3This module handles access to the configuration files. The configuration4files--including the environment files--are accessed by the other python scripts5through this file.6This is setup such that other files need only call the `get()` functions, and7all the loading and caching will happen automatically internal to this file.8As of right now, this is hard-coded to access configuration files at a specific9name and path.10Module Attributes:11 logger (Logger): Logger for this module.12(C) Copyright 2020 Jonathan Casey. All Rights Reserved Worldwide.13"""14import configparser15from enum import Enum16import itertools17import logging18import logging.config19import os.path20from grand_trade_auto.general import dirs21logger = logging.getLogger(__name__)22def read_conf_file_fake_header(conf_rel_file,23 conf_base_dir=dirs.get_conf_path(), fake_section='fake',):24 """25 Read config file in configparser format, but insert a fake header for26 first section. This is aimed at files that are close to configparser27 format, but do not have a section header for the first section.28 The fake section name is not important.29 Args:30 conf_rel_file (str): Relative file path to config file.31 conf_base_dir (str): Base file path to use with relative path. If not32 provided, this will use the absolute path of this module.33 fake_section (str): Fake section name, if needed.34 Returns:35 parser (ConfigParser): ConfigParser for file loaded.36 """37 conf_file = os.path.join(conf_base_dir, conf_rel_file)38 parser = configparser.ConfigParser()39 with open(conf_file, encoding="utf_8") as file:40 parser.read_file(itertools.chain(['[' + fake_section + ']'], file))41 return parser42def read_conf_file(conf_rel_file, conf_base_dir=dirs.get_conf_path()):43 """44 Read config file in configparser format.45 Args:46 conf_rel_file (str): Relative file path to config file.47 conf_base_dir (str): Base file path to use with relative path. If not48 provided, this will use the absolute path of this module.49 Returns:50 parser (ConfigParser): ConfigParser for file loaded.51 """52 conf_file = os.path.join(conf_base_dir, conf_rel_file)53 parser = configparser.ConfigParser()54 parser.read(conf_file)55 return parser56def get_matching_secrets_id(secrets_cp, submod, main_id):57 """58 Retrieves the section name (ID) for in the .secrets.conf that matches the59 submodule and main config ID provided.60 Args:61 secrets_cp (ConfigParser): A config parser for the .secrets.conf file62 already loaded.63 submod (str): The name of the submodule that should be the prefix in the64 section name for this in the .secrets.conf file.65 main_id (str): The name of section from the relevant submodule's config to66 ID this element.67 Returns:68 (str or None): The name of the matching section in the .secrets.conf; or69 None if no match.70 """71 for secrets_section_name in secrets_cp:72 try:73 submod_found, id_found = secrets_section_name.split('::')74 if submod_found.strip().lower() == submod.strip().lower() \75 and id_found.strip().lower() == main_id.strip().lower():76 return secrets_section_name77 except ValueError:78 continue79 return None80class CastType(Enum):81 """82 Enum of cast types.83 These are used to specify a target type when casting in `castVar()`.84 """85 INT = 'int'86 FLOAT = 'float'87 STRING = 'string'88def cast_var(var, cast_type, fallback_to_original=False):89 """90 Cast variable to the specified type.91 Args:92 var (*): Variable of an unknown type.93 cast_type (CastType): Type that var should be cast to, if possible.94 fallback_to_original (bool): If true, will return original var if cast95 fails; otherwise, failed cast will raise exception.96 Returns:97 var (CastType, or ?): Same as var provided, but of the type specified by98 CastType; but if cast failed and fallback to original was true, will99 return original var in original type.100 Raises:101 (TypeError): Cannot cast because type specified is not supported.102 (ValueError): Cast failed and fallback to original was not True.103 """104 try:105 if cast_type == CastType.INT:106 return int(var)107 if cast_type == CastType.FLOAT:108 return float(var)109 if cast_type == CastType.STRING:110 return str(var)111 raise TypeError('Cast failed -- unsupported type.')112 except (TypeError, ValueError):113 if fallback_to_original:114 return var115 raise116def parse_list_from_conf_string(conf_str, val_type, delim=',',117 strip_quotes=False):118 """119 Parse a string into a list of items based on the provided specifications.120 Args:121 conf_str (str): The string to be split.122 val_type (CastType): The type to cast each element to.123 delim (str): The delimiter on which to split conf_str.124 strip_quotes (bool): Whether or not there are quotes to be stripped from125 each item after split and strip.126 Returns:127 list_out (list of val_type): List of all elements found in conf_str after128 splitting on delim. Each element will be of val_type. This will129 silently skip any element that cannot be cast.130 """131 if not conf_str:132 return []133 val_raw_list = conf_str.split(delim)134 list_out = []135 for val in val_raw_list:136 try:137 if strip_quotes:138 val = val.strip().strip('\'"')139 cast_val = cast_var(val.strip(), val_type)140 list_out.append(cast_val)141 except (ValueError, TypeError):142 # may have been a blank line without a delim143 pass144 return list_out145class LevelFilter(logging.Filter): # pylint: disable=too-few-public-methods146 """147 A logging filter for the level to set min and max log levels for a handler.148 While the min level is redundant given logging already implements this with149 the base level functionality, the max level adds a new control.150 Class Attributes:151 N/A152 Instance Attributes:153 _min_exc_levelno (int or None): The min log level above which is to be154 included (exclusive). Can be None to skip min level check.155 _max_inc_levelno (int or None): The max log level below which is to be156 included (inclusive). Can be None to skip max level check.157 """158 def __init__(self, min_exc_level=None, max_inc_level=None):159 """160 Creates the level filter.161 Args:162 min_exc_level (int/str/None): The min log level above which is to be163 inclued (exclusive). Can be provided as the int level number or as164 the level name. Can be omitted/None to disable filtering the min165 level.166 max_inc_level (int/str/None): The max log level below which is to be167 inclued (inclusive). Can be provided as the int level number or as168 the level name. Can be omitted/None to disable filtering the max169 level.170 """171 try:172 self._min_exc_levelno = int(min_exc_level)173 except ValueError:174 # Level name dict is bi-directional lookup -- See python source175 self._min_exc_levelno = logging.getLevelName(min_exc_level.upper())176 except TypeError:177 self._min_exc_levelno = None178 try:179 self._max_inc_levelno = int(max_inc_level)180 except ValueError:181 # Level name dict is bi-directional lookup -- See python source182 self._max_inc_levelno = logging.getLevelName(max_inc_level.upper())183 except TypeError:184 self._max_inc_levelno = None185 super().__init__()186 def filter(self, record):187 """188 Filters the provided record according to the logic in this method.189 Args:190 record (LogRecord): The log record that is being checked whether to191 log.192 Returns:193 (bool): True if should log; False to drop.194 """195 if self._min_exc_levelno is not None \196 and record.levelno <= self._min_exc_levelno:197 return False198 if self._max_inc_levelno is not None \199 and record.levelno > self._max_inc_levelno:200 return False201 return True202def find_existing_handler_from_config(logger_cp, handler_name):203 """204 Finds the handler already existing in the root logger that matches the205 configuration specified in the provided config parser and with the given206 handler name.207 Args:208 logger_cp (ConfigParser): The config parser for the logger.conf file209 loaded that is the exact same one as used to init the looger with210 fileConfig().211 handler_name (str): The name of the handler to try to match. Should exist212 in [handler] > keys as well as have a [handler_{handler_name}] section213 in the logger_cp.214 Returns:215 h_existing (Handler or None): Returns the first handler loaded into the216 root logger that matches the provided handler based on the config file.217 Not a perfect match, but a best guess, so can have false positives if218 certain criteria are identical for multiple handlers. None if no match219 found.220 """221 root_logger = logging.getLogger()222 for h_existing in root_logger.handlers:223 # Until v3.10, handler name not stored from fileConfig :(224 # Will attempt match on some other parameters, but not perfectly225 try:226 h_conf = logger_cp[f'handler_{handler_name}']227 except KeyError:228 logger.warning( # pylint: disable=logging-not-lazy229 f'Handler \'{handler_name}\' provided in'230 + ' logging.conf > [handlers] > keys, but missing'231 + ' matching handler section.')232 continue233 if type(h_existing).__name__ != h_conf['class'] \234 and f'handlers.{type(h_existing).__name__}' \235 != h_conf['class']:236 continue237 if logging.getLevelName(h_existing.level) \238 != h_conf['level'].strip().upper():239 continue240 h_conf_fmt = logger_cp[ \241 f'formatter_{h_conf["formatter"]}']['format'].strip()242 if h_existing.formatter._fmt \243 != h_conf_fmt: # pylint: disable=protected-access244 continue245 return h_existing246 return None247def init_logger(override_log_level=None):248 """249 Initializes the logger(s). This is meant to be called once per main entry.250 This does not alter that each module should be getting the logger for their251 module name, most likely from the root logger.252 This will apply the override log level if applicable.253 It will also check for the boundary level between stdout and stderr254 handlers, if applicable, and set a filter level. This must be set in the255 `logging.conf` and must have both a stdout and a stderr handler; but once256 true, will apply to ALL stdout and ALL stderr StreamHandlers!257 Args:258 override_log_level (str): The log level to override and set for the root259 logger as well as the specified handlers from the260 `cli arg level override` section of `logger.conf`. In addition to the261 standard logger level names and the `disabled` level added by this app,262 the names `all` and `verbose` can also be used for `notset` to get263 everything.264 """265 logging.addLevelName(99, 'DISABLED')266 logging.config.fileConfig(os.path.join(dirs.get_conf_path(), 'logger.conf'),267 disable_existing_loggers=False)268 root_logger = logging.getLogger()269 if override_log_level is not None:270 try:271 new_levelno = int(override_log_level)272 new_level = logging.getLevelName(new_levelno)273 except ValueError:274 new_level = override_log_level.upper()275 if new_level in ['ALL', 'VERBOSE']:276 new_level = 'NOTSET'277 new_levelno = logging.getLevelName(new_level)278 root_logger.setLevel(new_level)279 conf_file = os.path.join(dirs.get_conf_path(), 'logger.conf')280 logger_cp = configparser.RawConfigParser()281 logger_cp.read(conf_file)282 handler_names = [h.strip() \283 for h in logger_cp['handlers']['keys'].split(',')]284 for h_name in handler_names:285 h_existing = find_existing_handler_from_config(logger_cp, h_name)286 if h_existing is None:287 continue288 if override_log_level is not None:289 lower_level_override = logger_cp.getboolean(f'handler_{h_name}',290 'allow level override lower', fallback=False)291 raise_level_override = logger_cp.getboolean(f'handler_{h_name}',292 'allow level override raise', fallback=False)293 if lower_level_override and not raise_level_override:294 if new_levelno < h_existing.level:295 h_existing.setLevel(new_level)296 elif raise_level_override and not lower_level_override:297 if new_levelno > h_existing.level:298 h_existing.setLevel(new_level)299 # Skip both -- would only allow to set to level it already was300 max_level = logger_cp.get(f'handler_{h_name}', 'max level',301 fallback=None)302 if max_level is not None:...

Full Screen

Full Screen

test_utils.py

Source:test_utils.py Github

copy

Full Screen

...80def test_parse_header_links(value, expected):81 assert parse_header_links(value) == expected82@pytest.mark.asyncio83async def test_logs_debug(server, capsys):84 with override_log_level("debug"):85 async with httpx.AsyncClient() as client:86 response = await client.get(server.url)87 assert response.status_code == 20088 stderr = capsys.readouterr().err89 assert 'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"' in stderr90@pytest.mark.asyncio91async def test_logs_trace(server, capsys):92 with override_log_level("trace"):93 async with httpx.AsyncClient() as client:94 response = await client.get(server.url)95 assert response.status_code == 20096 stderr = capsys.readouterr().err97 assert 'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"' in stderr98@pytest.mark.asyncio99async def test_logs_redirect_chain(server, capsys):100 with override_log_level("debug"):101 async with httpx.AsyncClient(follow_redirects=True) as client:102 response = await client.get(server.url.copy_with(path="/redirect_301"))103 assert response.status_code == 200104 stderr = capsys.readouterr().err.strip()105 redirected_request_line, ok_request_line = stderr.split("\n")106 assert redirected_request_line.endswith(107 "HTTP Request: GET http://127.0.0.1:8000/redirect_301 "108 '"HTTP/1.1 301 Moved Permanently"'109 )110 assert ok_request_line.endswith(111 'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"'112 )113def test_get_ssl_cert_file():114 # Two environments is not set....

Full Screen

Full Screen

cmd_status.py

Source:cmd_status.py Github

copy

Full Screen

...77 print_status(ServiceStatus.UP, 'repository', repo_folder)78 # Getting the postgres status by trying to get a database cursor79 database_data = [profile.database_username, profile.database_hostname, profile.database_port]80 try:81 with override_log_level(): # temporarily suppress noisy logging82 backend = manager.get_backend()83 backend.cursor()84 except IncompatibleDatabaseSchema:85 message = 'Database schema version is incompatible with the code: run `verdi database migrate`.'86 print_status(ServiceStatus.DOWN, 'postgres', message)87 exit_code = ExitCode.CRITICAL88 except Exception as exc:89 message = 'Unable to connect as {}@{}:{}'.format(*database_data)90 print_status(ServiceStatus.DOWN, 'postgres', message, exception=exc, print_traceback=print_traceback)91 exit_code = ExitCode.CRITICAL92 else:93 print_status(ServiceStatus.UP, 'postgres', 'Connected as {}@{}:{}'.format(*database_data))94 # Getting the rmq status95 if not no_rmq:96 try:97 with Capturing(capture_stderr=True):98 with override_log_level(): # temporarily suppress noisy logging99 comm = manager.create_communicator(with_orm=False)100 comm.close()101 except Exception as exc:102 message = f'Unable to connect to rabbitmq with URL: {profile.get_rmq_url()}'103 print_status(ServiceStatus.ERROR, 'rabbitmq', message, exception=exc, print_traceback=print_traceback)104 exit_code = ExitCode.CRITICAL105 else:106 print_status(ServiceStatus.UP, 'rabbitmq', f'Connected as {profile.get_rmq_url()}')107 # Getting the daemon status108 try:109 client = manager.get_daemon_client()110 delete_stale_pid_file(client)111 daemon_status = get_daemon_status(client)112 daemon_status = daemon_status.split('\n')[0] # take only the first line...

Full Screen

Full Screen

__init__.py

Source:__init__.py Github

copy

Full Screen

1#!/usr/bin/env python2from typing import (3 List,4 Optional5)6import logging7import subprocess8from concurrent.futures import ThreadPoolExecutor9from os import (10 listdir,11 path,12)13import sys14from hummingbot.logger.struct_logger import (15 StructLogRecord,16 StructLogger17)18STRUCT_LOGGER_SET = False19DEV_STRATEGY_PREFIX = "dev"20_prefix_path = None21# Do not raise exceptions during log handling22logging.setLogRecordFactory(StructLogRecord)23logging.setLoggerClass(StructLogger)24_shared_executor = None25_data_path = None26def root_path() -> str:27 from os.path import realpath, join28 return realpath(join(__file__, "../../"))29def get_executor() -> ThreadPoolExecutor:30 global _shared_executor31 if _shared_executor is None:32 _shared_executor = ThreadPoolExecutor()33 return _shared_executor34def prefix_path() -> str:35 global _prefix_path36 if _prefix_path is None:37 from os.path import (38 realpath,39 join40 )41 _prefix_path = realpath(join(__file__, "../../"))42 return _prefix_path43def set_prefix_path(path: str):44 global _prefix_path45 _prefix_path = path46def data_path() -> str:47 global _data_path48 if _data_path is None:49 from os.path import (50 realpath,51 join52 )53 _data_path = realpath(join(prefix_path(), "data"))54 import os55 if not os.path.exists(_data_path):56 os.makedirs(_data_path)57 return _data_path58def set_data_path(path: str):59 global _data_path60 _data_path = path61_independent_package: Optional[bool] = None62def is_independent_package() -> bool:63 global _independent_package64 import os65 if _independent_package is None:66 _independent_package = not os.path.basename(sys.executable).startswith("python")67 return _independent_package68def check_dev_mode():69 try:70 if is_independent_package():71 return False72 if not path.isdir(".git"):73 return False74 current_branch = subprocess.check_output(["git", "symbolic-ref", "--short", "HEAD"]).decode("utf8").rstrip()75 if current_branch != "master":76 return True77 except Exception:78 return False79def chdir_to_data_directory():80 if not is_independent_package():81 # Do nothing.82 return83 import appdirs84 import os85 app_data_dir: str = appdirs.user_data_dir("Hummingbot", "hummingbot.io")86 os.makedirs(os.path.join(app_data_dir, "logs"), 0o711, exist_ok=True)87 os.makedirs(os.path.join(app_data_dir, "conf"), 0o711, exist_ok=True)88 os.chdir(app_data_dir)89 set_prefix_path(app_data_dir)90def add_remote_logger_handler(loggers):91 from hummingbot.logger.reporting_proxy_handler import ReportingProxyHandler92 root_logger = logging.getLogger()93 try:94 remote_logger = ReportingProxyHandler(level="ERROR",95 proxy_url="https://api.coinalpha.com/reporting-proxy",96 capacity=5)97 root_logger.addHandler(remote_logger)98 for logger_name in loggers:99 logger = logging.getLogger(logger_name)100 logger.addHandler(remote_logger)101 except Exception:102 root_logger.error("Error adding remote log handler.", exc_info=True)103def init_logging(conf_filename: str,104 override_log_level: Optional[str] = None,105 dev_mode: bool = False,106 strategy_file_path: str = "hummingbot"):107 import io108 import logging.config109 from os.path import join110 import pandas as pd111 from typing import Dict112 from ruamel.yaml import YAML113 from hummingbot.client.config.global_config_map import global_config_map114 from hummingbot.logger.struct_logger import (115 StructLogRecord,116 StructLogger117 )118 global STRUCT_LOGGER_SET119 if not STRUCT_LOGGER_SET:120 logging.setLogRecordFactory(StructLogRecord)121 logging.setLoggerClass(StructLogger)122 STRUCT_LOGGER_SET = True123 # Do not raise exceptions during log handling124 logging.raiseExceptions = False125 file_path: str = join(prefix_path(), "conf", conf_filename)126 yaml_parser: YAML = YAML()127 with open(file_path) as fd:128 yml_source: str = fd.read()129 yml_source = yml_source.replace("$PROJECT_DIR", prefix_path())130 yml_source = yml_source.replace("$DATETIME", pd.Timestamp.now().strftime("%Y-%m-%d-%H-%M-%S"))131 yml_source = yml_source.replace("$STRATEGY_FILE_PATH", strategy_file_path.replace(".yml", ""))132 io_stream: io.StringIO = io.StringIO(yml_source)133 config_dict: Dict = yaml_parser.load(io_stream)134 if override_log_level is not None and "loggers" in config_dict:135 for logger in config_dict["loggers"]:136 if global_config_map["logger_override_whitelist"].value and \137 logger in global_config_map["logger_override_whitelist"].value:138 config_dict["loggers"][logger]["level"] = override_log_level139 logging.config.dictConfig(config_dict)140 # add remote logging to logger if in dev mode141 if dev_mode:142 add_remote_logger_handler(config_dict.get("loggers", []))143def get_strategy_list() -> List[str]:144 """145 Search `hummingbot.strategy` folder for all available strategies146 Automatically hide all strategies that starts with "dev" if on master branch147 """148 try:149 folder = path.realpath(path.join(__file__, "../strategy"))150 # Only include valid directories151 strategies = [d for d in listdir(folder) if path.isdir(path.join(folder, d)) and not d.startswith("__")]152 on_dev_mode = check_dev_mode()153 if not on_dev_mode:154 strategies = [s for s in strategies if not s.startswith(DEV_STRATEGY_PREFIX)]155 return sorted(strategies)156 except Exception as e:157 logging.getLogger().warning(f"Error getting strategy set: {str(e)}")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful