Best Python code snippet using slash
config.py
Source:config.py  
1#!/usr/bin/env python32"""3This module handles access to the configuration files.  The configuration4files--including the environment files--are accessed by the other python scripts5through this file.6This is setup such that other files need only call the `get()` functions, and7all the loading and caching will happen automatically internal to this file.8As of right now, this is hard-coded to access configuration files at a specific9name and path.10Module Attributes:11  logger (Logger): Logger for this module.12(C) Copyright 2020 Jonathan Casey.  All Rights Reserved Worldwide.13"""14import configparser15from enum import Enum16import itertools17import logging18import logging.config19import os.path20from grand_trade_auto.general import dirs21logger = logging.getLogger(__name__)22def read_conf_file_fake_header(conf_rel_file,23        conf_base_dir=dirs.get_conf_path(), fake_section='fake',):24    """25    Read config file in configparser format, but insert a fake header for26    first section.  This is aimed at files that are close to configparser27    format, but do not have a section header for the first section.28    The fake section name is not important.29    Args:30      conf_rel_file (str): Relative file path to config file.31      conf_base_dir (str): Base file path to use with relative path.  If not32        provided, this will use the absolute path of this module.33      fake_section (str): Fake section name, if needed.34    Returns:35      parser (ConfigParser): ConfigParser for file loaded.36    """37    conf_file = os.path.join(conf_base_dir, conf_rel_file)38    parser = configparser.ConfigParser()39    with open(conf_file, encoding="utf_8") as file:40        parser.read_file(itertools.chain(['[' + fake_section + ']'], file))41    return parser42def read_conf_file(conf_rel_file, conf_base_dir=dirs.get_conf_path()):43    """44    Read config file in configparser format.45    Args:46      conf_rel_file (str): Relative file path to config file.47      conf_base_dir (str): Base file path to use with relative path.  If not48        provided, this will use the absolute path of this module.49    Returns:50      parser (ConfigParser): ConfigParser for file loaded.51    """52    conf_file = os.path.join(conf_base_dir, conf_rel_file)53    parser = configparser.ConfigParser()54    parser.read(conf_file)55    return parser56def get_matching_secrets_id(secrets_cp, submod, main_id):57    """58    Retrieves the section name (ID) for in the .secrets.conf that matches the59    submodule and main config ID provided.60    Args:61      secrets_cp (ConfigParser): A config parser for the .secrets.conf file62        already loaded.63      submod (str): The name of the submodule that should be the prefix in the64        section name for this in the .secrets.conf file.65      main_id (str): The name of section from the relevant submodule's config to66        ID this element.67    Returns:68      (str or None): The name of the matching section in the .secrets.conf; or69        None if no match.70    """71    for secrets_section_name in secrets_cp:72        try:73            submod_found, id_found = secrets_section_name.split('::')74            if submod_found.strip().lower() == submod.strip().lower() \75                    and id_found.strip().lower() == main_id.strip().lower():76                return secrets_section_name77        except ValueError:78            continue79    return None80class CastType(Enum):81    """82    Enum of cast types.83    These are used to specify a target type when casting in `castVar()`.84    """85    INT = 'int'86    FLOAT = 'float'87    STRING = 'string'88def cast_var(var, cast_type, fallback_to_original=False):89    """90    Cast variable to the specified type.91    Args:92      var (*): Variable of an unknown type.93      cast_type (CastType): Type that var should be cast to, if possible.94      fallback_to_original (bool): If true, will return original var if cast95        fails; otherwise, failed cast will raise exception.96    Returns:97      var (CastType, or ?): Same as var provided, but of the type specified by98        CastType; but if cast failed and fallback to original was true, will99        return original var in original type.100    Raises:101      (TypeError): Cannot cast because type specified is not supported.102      (ValueError): Cast failed and fallback to original was not True.103    """104    try:105        if cast_type == CastType.INT:106            return int(var)107        if cast_type == CastType.FLOAT:108            return float(var)109        if cast_type == CastType.STRING:110            return str(var)111        raise TypeError('Cast failed -- unsupported type.')112    except (TypeError, ValueError):113        if fallback_to_original:114            return var115        raise116def parse_list_from_conf_string(conf_str, val_type, delim=',',117        strip_quotes=False):118    """119    Parse a string into a list of items based on the provided specifications.120    Args:121      conf_str (str): The string to be split.122      val_type (CastType): The type to cast each element to.123      delim (str): The delimiter on which to split conf_str.124      strip_quotes (bool): Whether or not there are quotes to be stripped from125        each item after split and strip.126    Returns:127      list_out (list of val_type): List of all elements found in conf_str after128        splitting on delim.  Each element will be of val_type.  This will129        silently skip any element that cannot be cast.130    """131    if not conf_str:132        return []133    val_raw_list = conf_str.split(delim)134    list_out = []135    for val in val_raw_list:136        try:137            if strip_quotes:138                val = val.strip().strip('\'"')139            cast_val = cast_var(val.strip(), val_type)140            list_out.append(cast_val)141        except (ValueError, TypeError):142            # may have been a blank line without a delim143            pass144    return list_out145class LevelFilter(logging.Filter):      # pylint: disable=too-few-public-methods146    """147    A logging filter for the level to set min and max log levels for a handler.148    While the min level is redundant given logging already implements this with149    the base level functionality, the max level adds a new control.150    Class Attributes:151      N/A152    Instance Attributes:153      _min_exc_levelno (int or None): The min log level above which is to be154        included (exclusive).  Can be None to skip min level check.155      _max_inc_levelno (int or None): The max log level below which is to be156        included (inclusive).  Can be None to skip max level check.157    """158    def __init__(self, min_exc_level=None, max_inc_level=None):159        """160        Creates the level filter.161        Args:162          min_exc_level (int/str/None): The min log level above which is to be163            inclued (exclusive).  Can be provided as the int level number or as164            the level name.  Can be omitted/None to disable filtering the min165            level.166          max_inc_level (int/str/None): The max log level below which is to be167            inclued (inclusive).  Can be provided as the int level number or as168            the level name.  Can be omitted/None to disable filtering the max169            level.170        """171        try:172            self._min_exc_levelno = int(min_exc_level)173        except ValueError:174            # Level name dict is bi-directional lookup -- See python source175            self._min_exc_levelno = logging.getLevelName(min_exc_level.upper())176        except TypeError:177            self._min_exc_levelno = None178        try:179            self._max_inc_levelno = int(max_inc_level)180        except ValueError:181            # Level name dict is bi-directional lookup -- See python source182            self._max_inc_levelno = logging.getLevelName(max_inc_level.upper())183        except TypeError:184            self._max_inc_levelno = None185        super().__init__()186    def filter(self, record):187        """188        Filters the provided record according to the logic in this method.189        Args:190          record (LogRecord): The log record that is being checked whether to191            log.192        Returns:193          (bool): True if should log; False to drop.194        """195        if self._min_exc_levelno is not None \196                and record.levelno <= self._min_exc_levelno:197            return False198        if self._max_inc_levelno is not None \199                and record.levelno > self._max_inc_levelno:200            return False201        return True202def find_existing_handler_from_config(logger_cp, handler_name):203    """204    Finds the handler already existing in the root logger that matches the205    configuration specified in the provided config parser and with the given206    handler name.207    Args:208      logger_cp (ConfigParser): The config parser for the logger.conf file209        loaded that is the exact same one as used to init the looger with210        fileConfig().211      handler_name (str): The name of the handler to try to match.  Should exist212        in [handler] > keys as well as have a [handler_{handler_name}] section213        in the logger_cp.214    Returns:215      h_existing (Handler or None): Returns the first handler loaded into the216        root logger that matches the provided handler based on the config file.217        Not a perfect match, but a best guess, so can have false positives if218        certain criteria are identical for multiple handlers.  None if no match219        found.220    """221    root_logger = logging.getLogger()222    for h_existing in root_logger.handlers:223        # Until v3.10, handler name not stored from fileConfig :(224        # Will attempt match on some other parameters, but not perfectly225        try:226            h_conf = logger_cp[f'handler_{handler_name}']227        except KeyError:228            logger.warning(          # pylint: disable=logging-not-lazy229                    f'Handler \'{handler_name}\' provided in'230                    + ' logging.conf > [handlers] > keys, but missing'231                    + ' matching handler section.')232            continue233        if type(h_existing).__name__ != h_conf['class'] \234                and f'handlers.{type(h_existing).__name__}' \235                    != h_conf['class']:236            continue237        if logging.getLevelName(h_existing.level) \238                != h_conf['level'].strip().upper():239            continue240        h_conf_fmt = logger_cp[ \241                f'formatter_{h_conf["formatter"]}']['format'].strip()242        if h_existing.formatter._fmt \243                != h_conf_fmt:                # pylint: disable=protected-access244            continue245        return h_existing246    return None247def init_logger(override_log_level=None):248    """249    Initializes the logger(s).  This is meant to be called once per main entry.250    This does not alter that each module should be getting the logger for their251    module name, most likely from the root logger.252    This will apply the override log level if applicable.253    It will also check for the boundary level between stdout and stderr254    handlers, if applicable, and set a filter level.  This must be set in the255    `logging.conf` and must have both a stdout and a stderr handler; but once256    true, will apply to ALL stdout and ALL stderr StreamHandlers!257    Args:258      override_log_level (str): The log level to override and set for the root259        logger as well as the specified handlers from the260        `cli arg level override` section of `logger.conf`.  In addition to the261        standard logger level names and the `disabled` level added by this app,262        the names `all` and `verbose` can also be used for `notset` to get263        everything.264    """265    logging.addLevelName(99, 'DISABLED')266    logging.config.fileConfig(os.path.join(dirs.get_conf_path(), 'logger.conf'),267            disable_existing_loggers=False)268    root_logger = logging.getLogger()269    if override_log_level is not None:270        try:271            new_levelno = int(override_log_level)272            new_level =  logging.getLevelName(new_levelno)273        except ValueError:274            new_level = override_log_level.upper()275            if new_level in ['ALL', 'VERBOSE']:276                new_level = 'NOTSET'277            new_levelno = logging.getLevelName(new_level)278        root_logger.setLevel(new_level)279    conf_file = os.path.join(dirs.get_conf_path(), 'logger.conf')280    logger_cp = configparser.RawConfigParser()281    logger_cp.read(conf_file)282    handler_names = [h.strip() \283            for h in logger_cp['handlers']['keys'].split(',')]284    for h_name in handler_names:285        h_existing = find_existing_handler_from_config(logger_cp, h_name)286        if h_existing is None:287            continue288        if override_log_level is not None:289            lower_level_override = logger_cp.getboolean(f'handler_{h_name}',290                    'allow level override lower', fallback=False)291            raise_level_override = logger_cp.getboolean(f'handler_{h_name}',292                    'allow level override raise', fallback=False)293            if lower_level_override and not raise_level_override:294                if new_levelno < h_existing.level:295                    h_existing.setLevel(new_level)296            elif raise_level_override and not lower_level_override:297                if new_levelno > h_existing.level:298                    h_existing.setLevel(new_level)299            # Skip both -- would only allow to set to level it already was300        max_level = logger_cp.get(f'handler_{h_name}', 'max level',301                fallback=None)302        if max_level is not None:...test_utils.py
Source:test_utils.py  
...80def test_parse_header_links(value, expected):81    assert parse_header_links(value) == expected82@pytest.mark.asyncio83async def test_logs_debug(server, capsys):84    with override_log_level("debug"):85        async with httpx.AsyncClient() as client:86            response = await client.get(server.url)87            assert response.status_code == 20088    stderr = capsys.readouterr().err89    assert 'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"' in stderr90@pytest.mark.asyncio91async def test_logs_trace(server, capsys):92    with override_log_level("trace"):93        async with httpx.AsyncClient() as client:94            response = await client.get(server.url)95            assert response.status_code == 20096    stderr = capsys.readouterr().err97    assert 'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"' in stderr98@pytest.mark.asyncio99async def test_logs_redirect_chain(server, capsys):100    with override_log_level("debug"):101        async with httpx.AsyncClient(follow_redirects=True) as client:102            response = await client.get(server.url.copy_with(path="/redirect_301"))103            assert response.status_code == 200104    stderr = capsys.readouterr().err.strip()105    redirected_request_line, ok_request_line = stderr.split("\n")106    assert redirected_request_line.endswith(107        "HTTP Request: GET http://127.0.0.1:8000/redirect_301 "108        '"HTTP/1.1 301 Moved Permanently"'109    )110    assert ok_request_line.endswith(111        'HTTP Request: GET http://127.0.0.1:8000/ "HTTP/1.1 200 OK"'112    )113def test_get_ssl_cert_file():114    # Two environments is not set....cmd_status.py
Source:cmd_status.py  
...77        print_status(ServiceStatus.UP, 'repository', repo_folder)78    # Getting the postgres status by trying to get a database cursor79    database_data = [profile.database_username, profile.database_hostname, profile.database_port]80    try:81        with override_log_level():  # temporarily suppress noisy logging82            backend = manager.get_backend()83            backend.cursor()84    except IncompatibleDatabaseSchema:85        message = 'Database schema version is incompatible with the code: run `verdi database migrate`.'86        print_status(ServiceStatus.DOWN, 'postgres', message)87        exit_code = ExitCode.CRITICAL88    except Exception as exc:89        message = 'Unable to connect as {}@{}:{}'.format(*database_data)90        print_status(ServiceStatus.DOWN, 'postgres', message, exception=exc, print_traceback=print_traceback)91        exit_code = ExitCode.CRITICAL92    else:93        print_status(ServiceStatus.UP, 'postgres', 'Connected as {}@{}:{}'.format(*database_data))94    # Getting the rmq status95    if not no_rmq:96        try:97            with Capturing(capture_stderr=True):98                with override_log_level():  # temporarily suppress noisy logging99                    comm = manager.create_communicator(with_orm=False)100                    comm.close()101        except Exception as exc:102            message = f'Unable to connect to rabbitmq with URL: {profile.get_rmq_url()}'103            print_status(ServiceStatus.ERROR, 'rabbitmq', message, exception=exc, print_traceback=print_traceback)104            exit_code = ExitCode.CRITICAL105        else:106            print_status(ServiceStatus.UP, 'rabbitmq', f'Connected as {profile.get_rmq_url()}')107    # Getting the daemon status108    try:109        client = manager.get_daemon_client()110        delete_stale_pid_file(client)111        daemon_status = get_daemon_status(client)112        daemon_status = daemon_status.split('\n')[0]  # take only the first line...__init__.py
Source:__init__.py  
1#!/usr/bin/env python2from typing import (3    List,4    Optional5)6import logging7import subprocess8from concurrent.futures import ThreadPoolExecutor9from os import (10    listdir,11    path,12)13import sys14from hummingbot.logger.struct_logger import (15    StructLogRecord,16    StructLogger17)18STRUCT_LOGGER_SET = False19DEV_STRATEGY_PREFIX = "dev"20_prefix_path = None21# Do not raise exceptions during log handling22logging.setLogRecordFactory(StructLogRecord)23logging.setLoggerClass(StructLogger)24_shared_executor = None25_data_path = None26def root_path() -> str:27    from os.path import realpath, join28    return realpath(join(__file__, "../../"))29def get_executor() -> ThreadPoolExecutor:30    global _shared_executor31    if _shared_executor is None:32        _shared_executor = ThreadPoolExecutor()33    return _shared_executor34def prefix_path() -> str:35    global _prefix_path36    if _prefix_path is None:37        from os.path import (38            realpath,39            join40        )41        _prefix_path = realpath(join(__file__, "../../"))42    return _prefix_path43def set_prefix_path(path: str):44    global _prefix_path45    _prefix_path = path46def data_path() -> str:47    global _data_path48    if _data_path is None:49        from os.path import (50            realpath,51            join52        )53        _data_path = realpath(join(prefix_path(), "data"))54    import os55    if not os.path.exists(_data_path):56        os.makedirs(_data_path)57    return _data_path58def set_data_path(path: str):59    global _data_path60    _data_path = path61_independent_package: Optional[bool] = None62def is_independent_package() -> bool:63    global _independent_package64    import os65    if _independent_package is None:66        _independent_package = not os.path.basename(sys.executable).startswith("python")67    return _independent_package68def check_dev_mode():69    try:70        if is_independent_package():71            return False72        if not path.isdir(".git"):73            return False74        current_branch = subprocess.check_output(["git", "symbolic-ref", "--short", "HEAD"]).decode("utf8").rstrip()75        if current_branch != "master":76            return True77    except Exception:78        return False79def chdir_to_data_directory():80    if not is_independent_package():81        # Do nothing.82        return83    import appdirs84    import os85    app_data_dir: str = appdirs.user_data_dir("Hummingbot", "hummingbot.io")86    os.makedirs(os.path.join(app_data_dir, "logs"), 0o711, exist_ok=True)87    os.makedirs(os.path.join(app_data_dir, "conf"), 0o711, exist_ok=True)88    os.chdir(app_data_dir)89    set_prefix_path(app_data_dir)90def add_remote_logger_handler(loggers):91    from hummingbot.logger.reporting_proxy_handler import ReportingProxyHandler92    root_logger = logging.getLogger()93    try:94        remote_logger = ReportingProxyHandler(level="ERROR",95                                              proxy_url="https://api.coinalpha.com/reporting-proxy",96                                              capacity=5)97        root_logger.addHandler(remote_logger)98        for logger_name in loggers:99            logger = logging.getLogger(logger_name)100            logger.addHandler(remote_logger)101    except Exception:102        root_logger.error("Error adding remote log handler.", exc_info=True)103def init_logging(conf_filename: str,104                 override_log_level: Optional[str] = None,105                 dev_mode: bool = False,106                 strategy_file_path: str = "hummingbot"):107    import io108    import logging.config109    from os.path import join110    import pandas as pd111    from typing import Dict112    from ruamel.yaml import YAML113    from hummingbot.client.config.global_config_map import global_config_map114    from hummingbot.logger.struct_logger import (115        StructLogRecord,116        StructLogger117    )118    global STRUCT_LOGGER_SET119    if not STRUCT_LOGGER_SET:120        logging.setLogRecordFactory(StructLogRecord)121        logging.setLoggerClass(StructLogger)122        STRUCT_LOGGER_SET = True123    # Do not raise exceptions during log handling124    logging.raiseExceptions = False125    file_path: str = join(prefix_path(), "conf", conf_filename)126    yaml_parser: YAML = YAML()127    with open(file_path) as fd:128        yml_source: str = fd.read()129        yml_source = yml_source.replace("$PROJECT_DIR", prefix_path())130        yml_source = yml_source.replace("$DATETIME", pd.Timestamp.now().strftime("%Y-%m-%d-%H-%M-%S"))131        yml_source = yml_source.replace("$STRATEGY_FILE_PATH", strategy_file_path.replace(".yml", ""))132        io_stream: io.StringIO = io.StringIO(yml_source)133        config_dict: Dict = yaml_parser.load(io_stream)134        if override_log_level is not None and "loggers" in config_dict:135            for logger in config_dict["loggers"]:136                if global_config_map["logger_override_whitelist"].value and \137                        logger in global_config_map["logger_override_whitelist"].value:138                    config_dict["loggers"][logger]["level"] = override_log_level139        logging.config.dictConfig(config_dict)140        # add remote logging to logger if in dev mode141        if dev_mode:142            add_remote_logger_handler(config_dict.get("loggers", []))143def get_strategy_list() -> List[str]:144    """145    Search `hummingbot.strategy` folder for all available strategies146    Automatically hide all strategies that starts with "dev" if on master branch147    """148    try:149        folder = path.realpath(path.join(__file__, "../strategy"))150        # Only include valid directories151        strategies = [d for d in listdir(folder) if path.isdir(path.join(folder, d)) and not d.startswith("__")]152        on_dev_mode = check_dev_mode()153        if not on_dev_mode:154            strategies = [s for s in strategies if not s.startswith(DEV_STRATEGY_PREFIX)]155        return sorted(strategies)156    except Exception as e:157        logging.getLogger().warning(f"Error getting strategy set: {str(e)}")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
