Best Python code snippet using slash
loadwsgi.py
Source:loadwsgi.py  
1# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)2# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php3from __future__ import with_statement4import os5import sys6import re7import pkg_resources8from paste.deploy.compat import ConfigParser, unquote, iteritems, dictkeys9from paste.deploy.util import fix_call, lookup_object10__all__ = ['loadapp', 'loadserver', 'loadfilter', 'appconfig']11############################################################12## Utility functions13############################################################14def import_string(s):15    return pkg_resources.EntryPoint.parse("x=" + s).load(False)16def _aslist(obj):17    """18    Turn object into a list; lists and tuples are left as-is, None19    becomes [], and everything else turns into a one-element list.20    """21    if obj is None:22        return []23    elif isinstance(obj, (list, tuple)):24        return obj25    else:26        return [obj]27def _flatten(lst):28    """29    Flatten a nested list.30    """31    if not isinstance(lst, (list, tuple)):32        return [lst]33    result = []34    for item in lst:35        result.extend(_flatten(item))36    return result37class NicerConfigParser(ConfigParser):38    def __init__(self, filename, *args, **kw):39        ConfigParser.__init__(self, *args, **kw)40        self.filename = filename41        if hasattr(self, '_interpolation'):42            self._interpolation = self.InterpolateWrapper(self._interpolation)43    read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp)44    def defaults(self):45        """Return the defaults, with their values interpolated (with the46        defaults dict itself)47        Mainly to support defaults using values such as %(here)s48        """49        defaults = ConfigParser.defaults(self).copy()50        for key, val in iteritems(defaults):51            defaults[key] = self.get('DEFAULT', key) or val52        return defaults53    def _interpolate(self, section, option, rawval, vars):54        # Python < 3.255        try:56            return ConfigParser._interpolate(57                self, section, option, rawval, vars)58        except Exception:59            e = sys.exc_info()[1]60            args = list(e.args)61            args[0] = 'Error in file %s: %s' % (self.filename, e)62            e.args = tuple(args)63            e.message = args[0]64            raise65    class InterpolateWrapper(object):66        # Python >= 3.267        def __init__(self, original):68            self._original = original69        def __getattr__(self, name):70            return getattr(self._original, name)71        def before_get(self, parser, section, option, value, defaults):72            try:73                return self._original.before_get(parser, section, option,74                                                 value, defaults)75            except Exception:76                e = sys.exc_info()[1]77                args = list(e.args)78                args[0] = 'Error in file %s: %s' % (parser.filename, e)79                e.args = tuple(args)80                e.message = args[0]81                raise82############################################################83## Object types84############################################################85class _ObjectType(object):86    name = None87    egg_protocols = None88    config_prefixes = None89    def __init__(self):90        # Normalize these variables:91        self.egg_protocols = [_aslist(p) for p in _aslist(self.egg_protocols)]92        self.config_prefixes = [_aslist(p) for p in _aslist(self.config_prefixes)]93    def __repr__(self):94        return '<%s protocols=%r prefixes=%r>' % (95            self.name, self.egg_protocols, self.config_prefixes)96    def invoke(self, context):97        assert context.protocol in _flatten(self.egg_protocols)98        return fix_call(context.object,99                        context.global_conf, **context.local_conf)100class _App(_ObjectType):101    name = 'application'102    egg_protocols = ['paste.app_factory', 'paste.composite_factory',103                     'paste.composit_factory']104    config_prefixes = [['app', 'application'], ['composite', 'composit'],105                       'pipeline', 'filter-app']106    def invoke(self, context):107        if context.protocol in ('paste.composit_factory',108                                'paste.composite_factory'):109            return fix_call(context.object,110                            context.loader, context.global_conf,111                            **context.local_conf)112        elif context.protocol == 'paste.app_factory':113            return fix_call(context.object, context.global_conf, **context.local_conf)114        else:115            assert 0, "Protocol %r unknown" % context.protocol116APP = _App()117class _Filter(_ObjectType):118    name = 'filter'119    egg_protocols = [['paste.filter_factory', 'paste.filter_app_factory']]120    config_prefixes = ['filter']121    def invoke(self, context):122        if context.protocol == 'paste.filter_factory':123            return fix_call(context.object,124                            context.global_conf, **context.local_conf)125        elif context.protocol == 'paste.filter_app_factory':126            def filter_wrapper(wsgi_app):127                # This should be an object, so it has a nicer __repr__128                return fix_call(context.object,129                                wsgi_app, context.global_conf,130                                **context.local_conf)131            return filter_wrapper132        else:133            assert 0, "Protocol %r unknown" % context.protocol134FILTER = _Filter()135class _Server(_ObjectType):136    name = 'server'137    egg_protocols = [['paste.server_factory', 'paste.server_runner']]138    config_prefixes = ['server']139    def invoke(self, context):140        if context.protocol == 'paste.server_factory':141            return fix_call(context.object,142                            context.global_conf, **context.local_conf)143        elif context.protocol == 'paste.server_runner':144            def server_wrapper(wsgi_app):145                # This should be an object, so it has a nicer __repr__146                return fix_call(context.object,147                                wsgi_app, context.global_conf,148                                **context.local_conf)149            return server_wrapper150        else:151            assert 0, "Protocol %r unknown" % context.protocol152SERVER = _Server()153# Virtual type: (@@: There's clearly something crufty here;154# this probably could be more elegant)155class _PipeLine(_ObjectType):156    name = 'pipeline'157    def invoke(self, context):158        app = context.app_context.create()159        filters = [c.create() for c in context.filter_contexts]160        filters.reverse()161        for filter in filters:162            app = filter(app)163        return app164PIPELINE = _PipeLine()165class _FilterApp(_ObjectType):166    name = 'filter_app'167    def invoke(self, context):168        next_app = context.next_context.create()169        filter = context.filter_context.create()170        return filter(next_app)171FILTER_APP = _FilterApp()172class _FilterWith(_App):173    name = 'filtered_with'174    def invoke(self, context):175        filter = context.filter_context.create()176        filtered = context.next_context.create()177        if context.next_context.object_type is APP:178            return filter(filtered)179        else:180            # filtering a filter181            def composed(app):182                return filter(filtered(app))183            return composed184FILTER_WITH = _FilterWith()185############################################################186## Loaders187############################################################188def loadapp(uri, name=None, **kw):189    return loadobj(APP, uri, name=name, **kw)190def loadfilter(uri, name=None, **kw):191    return loadobj(FILTER, uri, name=name, **kw)192def loadserver(uri, name=None, **kw):193    return loadobj(SERVER, uri, name=name, **kw)194def appconfig(uri, name=None, relative_to=None, global_conf=None):195    context = loadcontext(APP, uri, name=name,196                          relative_to=relative_to,197                          global_conf=global_conf)198    return context.config()199_loaders = {}200def loadobj(object_type, uri, name=None, relative_to=None,201            global_conf=None):202    context = loadcontext(203        object_type, uri, name=name, relative_to=relative_to,204        global_conf=global_conf)205    return context.create()206def loadcontext(object_type, uri, name=None, relative_to=None,207                global_conf=None):208    if '#' in uri:209        if name is None:210            uri, name = uri.split('#', 1)211        else:212            # @@: Ignore fragment or error?213            uri = uri.split('#', 1)[0]214    if name is None:215        name = 'main'216    if ':' not in uri:217        raise LookupError("URI has no scheme: %r" % uri)218    scheme, path = uri.split(':', 1)219    scheme = scheme.lower()220    if scheme not in _loaders:221        raise LookupError(222            "URI scheme not known: %r (from %s)"223            % (scheme, ', '.join(_loaders.keys())))224    return _loaders[scheme](225        object_type,226        uri, path, name=name, relative_to=relative_to,227        global_conf=global_conf)228def _loadconfig(object_type, uri, path, name, relative_to,229                global_conf):230    isabs = os.path.isabs(path)231    # De-Windowsify the paths:232    path = path.replace('\\', '/')233    if not isabs:234        if not relative_to:235            raise ValueError(236                "Cannot resolve relative uri %r; no relative_to keyword "237                "argument given" % uri)238        relative_to = relative_to.replace('\\', '/')239        if relative_to.endswith('/'):240            path = relative_to + path241        else:242            path = relative_to + '/' + path243    if path.startswith('///'):244        path = path[2:]245    path = unquote(path)246    loader = ConfigLoader(path)247    if global_conf:248        loader.update_defaults(global_conf, overwrite=False)249    return loader.get_context(object_type, name, global_conf)250_loaders['config'] = _loadconfig251def _loadegg(object_type, uri, spec, name, relative_to,252             global_conf):253    loader = EggLoader(spec)254    return loader.get_context(object_type, name, global_conf)255_loaders['egg'] = _loadegg256def _loadfunc(object_type, uri, spec, name, relative_to,257             global_conf):258    loader = FuncLoader(spec)259    return loader.get_context(object_type, name, global_conf)260_loaders['call'] = _loadfunc261############################################################262## Loaders263############################################################264class _Loader(object):265    def get_app(self, name=None, global_conf=None):266        return self.app_context(267            name=name, global_conf=global_conf).create()268    def get_filter(self, name=None, global_conf=None):269        return self.filter_context(270            name=name, global_conf=global_conf).create()271    def get_server(self, name=None, global_conf=None):272        return self.server_context(273            name=name, global_conf=global_conf).create()274    def app_context(self, name=None, global_conf=None):275        return self.get_context(276            APP, name=name, global_conf=global_conf)277    def filter_context(self, name=None, global_conf=None):278        return self.get_context(279            FILTER, name=name, global_conf=global_conf)280    def server_context(self, name=None, global_conf=None):281        return self.get_context(282            SERVER, name=name, global_conf=global_conf)283    _absolute_re = re.compile(r'^[a-zA-Z]+:')284    def absolute_name(self, name):285        """286        Returns true if the name includes a scheme287        """288        if name is None:289            return False290        return self._absolute_re.search(name)291class ConfigLoader(_Loader):292    def __init__(self, filename):293        self.filename = filename = filename.strip()294        defaults = {295            'here': os.path.dirname(os.path.abspath(filename)),296            '__file__': os.path.abspath(filename)297            }298        self.parser = NicerConfigParser(filename, defaults=defaults)299        self.parser.optionxform = str  # Don't lower-case keys300        with open(filename) as f:301            self.parser.read_file(f)302    def update_defaults(self, new_defaults, overwrite=True):303        for key, value in iteritems(new_defaults):304            if not overwrite and key in self.parser._defaults:305                continue306            self.parser._defaults[key] = value307    def get_context(self, object_type, name=None, global_conf=None):308        if self.absolute_name(name):309            return loadcontext(object_type, name,310                               relative_to=os.path.dirname(self.filename),311                               global_conf=global_conf)312        section = self.find_config_section(313            object_type, name=name)314        if global_conf is None:315            global_conf = {}316        else:317            global_conf = global_conf.copy()318        defaults = self.parser.defaults()319        global_conf.update(defaults)320        local_conf = {}321        global_additions = {}322        get_from_globals = {}323        for option in self.parser.options(section):324            if option.startswith('set '):325                name = option[4:].strip()326                global_additions[name] = global_conf[name] = (327                    self.parser.get(section, option))328            elif option.startswith('get '):329                name = option[4:].strip()330                get_from_globals[name] = self.parser.get(section, option)331            else:332                if option in defaults:333                    # @@: It's a global option (?), so skip it334                    continue335                local_conf[option] = self.parser.get(section, option)336        for local_var, glob_var in get_from_globals.items():337            local_conf[local_var] = global_conf[glob_var]338        if object_type in (APP, FILTER) and 'filter-with' in local_conf:339            filter_with = local_conf.pop('filter-with')340        else:341            filter_with = None342        if 'require' in local_conf:343            for spec in local_conf['require'].split():344                pkg_resources.require(spec)345            del local_conf['require']346        if section.startswith('filter-app:'):347            context = self._filter_app_context(348                object_type, section, name=name,349                global_conf=global_conf, local_conf=local_conf,350                global_additions=global_additions)351        elif section.startswith('pipeline:'):352            context = self._pipeline_app_context(353                object_type, section, name=name,354                global_conf=global_conf, local_conf=local_conf,355                global_additions=global_additions)356        elif 'use' in local_conf:357            context = self._context_from_use(358                object_type, local_conf, global_conf, global_additions,359                section)360        else:361            context = self._context_from_explicit(362                object_type, local_conf, global_conf, global_additions,363                section)364        if filter_with is not None:365            filter_with_context = LoaderContext(366                obj=None,367                object_type=FILTER_WITH,368                protocol=None,369                global_conf=global_conf, local_conf=local_conf,370                loader=self)371            filter_with_context.filter_context = self.filter_context(372                name=filter_with, global_conf=global_conf)373            filter_with_context.next_context = context374            return filter_with_context375        return context376    def _context_from_use(self, object_type, local_conf, global_conf,377                          global_additions, section):378        use = local_conf.pop('use')379        context = self.get_context(380            object_type, name=use, global_conf=global_conf)381        context.global_conf.update(global_additions)382        context.local_conf.update(local_conf)383        if '__file__' in global_conf:384            # use sections shouldn't overwrite the original __file__385            context.global_conf['__file__'] = global_conf['__file__']386        # @@: Should loader be overwritten?387        context.loader = self388        if context.protocol is None:389            # Determine protocol from section type390            section_protocol = section.split(':', 1)[0]391            if section_protocol in ('application', 'app'):392                context.protocol = 'paste.app_factory'393            elif section_protocol in ('composit', 'composite'):394                context.protocol = 'paste.composit_factory'395            else:396                # This will work with 'server' and 'filter', otherwise it397                # could fail but there is an error message already for398                # bad protocols399                context.protocol = 'paste.%s_factory' % section_protocol400        return context401    def _context_from_explicit(self, object_type, local_conf, global_conf,402                               global_addition, section):403        possible = []404        for protocol_options in object_type.egg_protocols:405            for protocol in protocol_options:406                if protocol in local_conf:407                    possible.append((protocol, local_conf[protocol]))408                    break409        if len(possible) > 1:410            raise LookupError(411                "Multiple protocols given in section %r: %s"412                % (section, possible))413        if not possible:414            raise LookupError(415                "No loader given in section %r" % section)416        found_protocol, found_expr = possible[0]417        del local_conf[found_protocol]418        value = import_string(found_expr)419        context = LoaderContext(420            value, object_type, found_protocol,421            global_conf, local_conf, self)422        return context423    def _filter_app_context(self, object_type, section, name,424                            global_conf, local_conf, global_additions):425        if 'next' not in local_conf:426            raise LookupError(427                "The [%s] section in %s is missing a 'next' setting"428                % (section, self.filename))429        next_name = local_conf.pop('next')430        context = LoaderContext(None, FILTER_APP, None, global_conf,431                                local_conf, self)432        context.next_context = self.get_context(433            APP, next_name, global_conf)434        if 'use' in local_conf:435            context.filter_context = self._context_from_use(436                FILTER, local_conf, global_conf, global_additions,437                section)438        else:439            context.filter_context = self._context_from_explicit(440                FILTER, local_conf, global_conf, global_additions,441                section)442        return context443    def _pipeline_app_context(self, object_type, section, name,444                              global_conf, local_conf, global_additions):445        if 'pipeline' not in local_conf:446            raise LookupError(447                "The [%s] section in %s is missing a 'pipeline' setting"448                % (section, self.filename))449        pipeline = local_conf.pop('pipeline').split()450        if local_conf:451            raise LookupError(452                "The [%s] pipeline section in %s has extra "453                "(disallowed) settings: %s"454                % (', '.join(local_conf.keys())))455        context = LoaderContext(None, PIPELINE, None, global_conf,456                                local_conf, self)457        context.app_context = self.get_context(458            APP, pipeline[-1], global_conf)459        context.filter_contexts = [460            self.get_context(FILTER, name, global_conf)461            for name in pipeline[:-1]]462        return context463    def find_config_section(self, object_type, name=None):464        """465        Return the section name with the given name prefix (following the466        same pattern as ``protocol_desc`` in ``config``.  It must have the467        given name, or for ``'main'`` an empty name is allowed.  The468        prefix must be followed by a ``:``.469        Case is *not* ignored.470        """471        possible = []472        for name_options in object_type.config_prefixes:473            for name_prefix in name_options:474                found = self._find_sections(475                    self.parser.sections(), name_prefix, name)476                if found:477                    possible.extend(found)478                    break479        if not possible:480            raise LookupError(481                "No section %r (prefixed by %s) found in config %s"482                % (name,483                   ' or '.join(map(repr, _flatten(object_type.config_prefixes))),484                   self.filename))485        if len(possible) > 1:486            raise LookupError(487                "Ambiguous section names %r for section %r (prefixed by %s) "488                "found in config %s"489                % (possible, name,490                   ' or '.join(map(repr, _flatten(object_type.config_prefixes))),491                   self.filename))492        return possible[0]493    def _find_sections(self, sections, name_prefix, name):494        found = []495        if name is None:496            if name_prefix in sections:497                found.append(name_prefix)498            name = 'main'499        for section in sections:500            if section.startswith(name_prefix + ':'):501                if section[len(name_prefix) + 1:].strip() == name:502                    found.append(section)503        return found504class EggLoader(_Loader):505    def __init__(self, spec):506        self.spec = spec507    def get_context(self, object_type, name=None, global_conf=None):508        if self.absolute_name(name):509            return loadcontext(object_type, name,510                               global_conf=global_conf)511        entry_point, protocol, ep_name = self.find_egg_entry_point(512            object_type, name=name)513        return LoaderContext(514            entry_point,515            object_type,516            protocol,517            global_conf or {}, {},518            self,519            distribution=pkg_resources.get_distribution(self.spec),520            entry_point_name=ep_name)521    def find_egg_entry_point(self, object_type, name=None):522        """523        Returns the (entry_point, protocol) for the with the given524        ``name``.525        """526        if name is None:527            name = 'main'528        possible = []529        for protocol_options in object_type.egg_protocols:530            for protocol in protocol_options:531                pkg_resources.require(self.spec)532                entry = pkg_resources.get_entry_info(533                    self.spec,534                    protocol,535                    name)536                if entry is not None:537                    possible.append((entry.load(), protocol, entry.name))538                    break539        if not possible:540            # Better exception541            dist = pkg_resources.get_distribution(self.spec)542            raise LookupError(543                "Entry point %r not found in egg %r (dir: %s; protocols: %s; "544                "entry_points: %s)"545                % (name, self.spec,546                   dist.location,547                   ', '.join(_flatten(object_type.egg_protocols)),548                   ', '.join(_flatten([549                dictkeys(pkg_resources.get_entry_info(self.spec, prot, name) or {})550                for prot in protocol_options] or '(no entry points)'))))551        if len(possible) > 1:552            raise LookupError(553                "Ambiguous entry points for %r in egg %r (protocols: %s)"554                % (name, self.spec, ', '.join(_flatten(protocol_options))))555        return possible[0]556class FuncLoader(_Loader):557    """ Loader that supports specifying functions inside modules, without558    using eggs at all. Configuration should be in the format:559        use = call:my.module.path:function_name560        561    Dot notation is supported in both the module and function name, e.g.:562        use = call:my.module.path:object.method563    """564    def __init__(self, spec):565        self.spec = spec566        if not ':' in spec:567            raise LookupError("Configuration not in format module:function")568    def get_context(self, object_type, name=None, global_conf=None):569        obj = lookup_object(self.spec)570        return LoaderContext(571            obj,572            object_type,573            None, # determine protocol from section type574            global_conf or {},575            {},576            self,577            )578class LoaderContext(object):579    def __init__(self, obj, object_type, protocol,580                 global_conf, local_conf, loader,581                 distribution=None, entry_point_name=None):582        self.object = obj583        self.object_type = object_type584        self.protocol = protocol585        #assert protocol in _flatten(object_type.egg_protocols), (586        #    "Bad protocol %r; should be one of %s"587        #    % (protocol, ', '.join(map(repr, _flatten(object_type.egg_protocols)))))588        self.global_conf = global_conf589        self.local_conf = local_conf590        self.loader = loader591        self.distribution = distribution592        self.entry_point_name = entry_point_name593    def create(self):594        return self.object_type.invoke(self)595    def config(self):596        conf = AttrDict(self.global_conf)597        conf.update(self.local_conf)598        conf.local_conf = self.local_conf599        conf.global_conf = self.global_conf600        conf.context = self601        return conf602class AttrDict(dict):603    """604    A dictionary that can be assigned to.605    """...SwaggerToSdkNewCLI.py
Source:SwaggerToSdkNewCLI.py  
1"""Swagger to SDK"""2import os3import shutil4import logging5import json6from pathlib import Path7import tempfile8from git import Repo, GitCommandError9from .SwaggerToSdkCore import (10    read_config_from_github,11    DEFAULT_COMMIT_MESSAGE,12    get_input_paths,13    extract_conf_from_readmes,14    get_readme_files_from_git_object,15    build_file_content,16    solve_relative_path,17    this_conf_will_generate_for_this_pr18)19from .autorest_tools import (20    execute_simple_command,21    generate_code,22    merge_options,23)24from azure_devtools.ci_tools.git_tools import (25    checkout_and_create_branch,26    do_commit,27)28from azure_devtools.ci_tools.github_tools import (29    configure_user,30    manage_git_folder,31)32_LOGGER = logging.getLogger(__name__)33def move_wrapper_files_or_dirs(src_root, dst_root, global_conf, local_conf):34    """Save wrapper files somewhere for replace them after generation.35    """36    src_relative_path = local_conf.get('output_dir', '')37    src_abs_path = Path(src_root, src_relative_path)38    dst_abs_path = Path(dst_root, src_relative_path)39    wrapper_files_or_dirs = merge_options(global_conf, local_conf, "wrapper_filesOrDirs") or []40    for wrapper_file_or_dir in wrapper_files_or_dirs:41        for file_path in src_abs_path.glob(wrapper_file_or_dir):42            relative_file_path = file_path.relative_to(src_abs_path)43            file_path_dest = Path(dst_abs_path, relative_file_path)44            if file_path.is_file():45                file_path_dest.parent.mkdir(parents=True, exist_ok=True)46            _LOGGER.info("Moving %s to %s", str(file_path), str(file_path_dest))47            # This does not work in Windows if generatd and dest are not in the same drive48            # file_path.replace(file_path_dest)49            shutil.move(file_path, file_path_dest)50def delete_extra_files(sdk_root, global_conf, local_conf):51    src_relative_path = local_conf.get('output_dir', '')52    src_abs_path = Path(sdk_root, src_relative_path)53    delete_files_or_dirs = merge_options(global_conf, local_conf, "delete_filesOrDirs") or []54    for delete_file_or_dir in delete_files_or_dirs:55        for file_path in src_abs_path.glob(delete_file_or_dir):56            if file_path.is_file():57                file_path.unlink()58            else:59                shutil.rmtree(str(file_path))60def move_autorest_files(client_generated_path, sdk_root, global_conf, local_conf):61    """Update data from generated to final folder.62    This is one only if output_dir is set, otherwise it's considered generated in place63    and does not required moving64    """65    dest = local_conf.get('output_dir', None)66    if not dest:67        return68    destination_folder = get_local_path_dir(sdk_root, dest)69    generated_relative_base_directory = local_conf.get('generated_relative_base_directory') or \70        global_conf.get('generated_relative_base_directory')71    if generated_relative_base_directory:72        client_possible_path = [elt for elt in client_generated_path.glob(generated_relative_base_directory) if elt.is_dir()]73        try:74            client_generated_path = client_possible_path.pop()75        except IndexError:76            err_msg = "Incorrect generated_relative_base_directory folder: {}\n".format(generated_relative_base_directory)77            err_msg += "Base folders were: : {}\n".format([f.relative_to(client_generated_path) for f in client_generated_path.iterdir()])78            _LOGGER.critical(err_msg)79            raise ValueError(err_msg)80        if client_possible_path:81            err_msg = "generated_relative_base_directory parameter is ambiguous: {} {}".format(82                client_generated_path,83                client_possible_path84            )85            _LOGGER.critical(err_msg)86            raise ValueError(err_msg)87    shutil.rmtree(str(destination_folder))88    # This does not work in Windows if generatd and dest are not in the same drive89    # client_generated_path.replace(destination_folder)90    shutil.move(client_generated_path, destination_folder)91def write_build_file(sdk_root, local_conf):92    build_dir = local_conf.get('build_dir')93    if build_dir:94        build_folder = get_local_path_dir(sdk_root, build_dir)95        build_file = Path(build_folder, "build.json")96        with open(build_file, 'w') as build_fd:97            json.dump(build_file_content(), build_fd, indent=2)98def execute_after_script(sdk_root, global_conf, local_conf):99    after_scripts = merge_options(global_conf, local_conf, "after_scripts", keep_list_order=True) or []100    local_envs = dict(os.environ)101    local_envs.update(global_conf.get("envs", {}))102    for script in after_scripts:103        _LOGGER.info("Execute after script: %s", script)104        execute_simple_command(script, cwd=sdk_root, shell=True, env=local_envs)105def get_local_path_dir(root, relative_path):106    build_folder = Path(root, relative_path)107    if not build_folder.is_dir():108        err_msg = "Folder does not exist or is not accessible: {}".format(109            build_folder)110        _LOGGER.critical(err_msg)111        raise ValueError(err_msg)112    return build_folder113def build_project(temp_dir, project, absolute_markdown_path, sdk_folder, global_conf, local_conf, autorest_bin=None):114    absolute_generated_path = Path(temp_dir, project)115    absolute_save_path = Path(temp_dir, "save")116    move_wrapper_files_or_dirs(sdk_folder, absolute_save_path, global_conf, local_conf)117    generate_code(absolute_markdown_path,118                  global_conf,119                  local_conf,120                  absolute_generated_path if "output_dir" in local_conf else None,121                  autorest_bin)122    move_autorest_files(absolute_generated_path, sdk_folder, global_conf, local_conf)123    move_wrapper_files_or_dirs(absolute_save_path, sdk_folder, global_conf, local_conf)124    delete_extra_files(sdk_folder, global_conf, local_conf)125    write_build_file(sdk_folder, local_conf)126    execute_after_script(sdk_folder, global_conf, local_conf)127def build_libraries(config, skip_callback, restapi_git_folder, sdk_repo, temp_dir, autorest_bin=None):128    """Main method of the the file"""129    global_conf = config["meta"]130    global_conf["autorest_options"] = solve_relative_path(global_conf.get("autorest_options", {}), sdk_repo.working_tree_dir)131    global_conf["envs"] = solve_relative_path(global_conf.get("envs", {}), sdk_repo.working_tree_dir)132    global_conf["advanced_options"] = solve_relative_path(global_conf.get("advanced_options", {}), sdk_repo.working_tree_dir)133    for project, local_conf in config.get("projects", {}).items():134        if skip_callback(project, local_conf):135            _LOGGER.info("Skip project %s", project)136            continue137        local_conf["autorest_options"] = solve_relative_path(local_conf.get("autorest_options", {}), sdk_repo.working_tree_dir)138        markdown_relative_path, optional_relative_paths = get_input_paths(global_conf, local_conf)139        _LOGGER.info(f"Markdown input: {markdown_relative_path}")140        _LOGGER.info(f"Optional inputs: {optional_relative_paths}")141        absolute_markdown_path = None142        if markdown_relative_path:143            absolute_markdown_path = Path(restapi_git_folder, markdown_relative_path).resolve()144        if optional_relative_paths:145            local_conf.setdefault('autorest_options', {})['input-file'] = [146                Path(restapi_git_folder, input_path).resolve()147                for input_path148                in optional_relative_paths149            ]150        sdk_folder = sdk_repo.working_tree_dir151        build_project(152            temp_dir,153            project,154            absolute_markdown_path,155            sdk_folder,156            global_conf,157            local_conf,158            autorest_bin159        )160def generate_sdk_from_git_object(git_object, branch_name, restapi_git_id, sdk_git_id, base_branch_names, *, fallback_base_branch_name="master", sdk_tag=None):161    """Generate SDK from a commit or a PR object.162    git_object is the initial commit/PR from the RestAPI repo. If git_object is a PR, prefer to checkout Github PR "merge_commit_sha"163    restapi_git_id explains where to clone the repo.164    sdk_git_id explains where to push the commit.165    sdk_tag explains what is the tag used in the Readme for the swagger-to-sdk section. If not provided, use sdk_git_id.166    branch_name is the expected branch name in the SDK repo.167    - If this branch exists, use it.168    - If not, use the base branch to create that branch (base branch is where I intend to do my PR)169    - If base_branch_names is not provided, use fallback_base_branch_name as base170    - If this base branch is provided and does not exists, create this base branch first using fallback_base_branch_name (this one is required to exist)171    WARNING:172    This method might push to "branch_name" and "base_branch_name". No push will be made to "fallback_base_branch_name"173    """174    gh_token = os.environ["GH_TOKEN"]175    message_template = DEFAULT_COMMIT_MESSAGE176    autorest_bin = None177    if sdk_tag is None:178        sdk_tag = sdk_git_id179    try:  # Checkout the sha if commit obj180        branched_rest_api_id = restapi_git_id+'@'+git_object.sha181        pr_number = None182    except (AttributeError, TypeError):  # This is a PR, don't clone the fork but "base" repo and PR magic commit183        if git_object.merge_commit_sha:184            branched_rest_api_id = git_object.base.repo.full_name+'@'+git_object.merge_commit_sha185        else:186            branched_rest_api_id = git_object.base.repo.full_name187        pr_number = git_object.number188    # Always clone SDK from fallback branch that is required to exist189    branched_sdk_git_id = sdk_git_id+'@'+fallback_base_branch_name190    # I don't know if the destination branch exists, try until it works191    config = None192    branch_list = base_branch_names + [branch_name] + [fallback_base_branch_name]193    for branch in branch_list:194        try:195            config = read_config_from_github(sdk_git_id, branch, gh_token)196        except Exception:197            pass198        else:199            break200    if config is None:201        raise ValueError("Unable to locate configuration in {}".format(branch_list))202    global_conf = config["meta"]203    # If PR is only about a language that this conf can't handle, skip fast204    if not this_conf_will_generate_for_this_pr(git_object, global_conf):205        _LOGGER.info("Skipping this job based on conf not impacted by Git object")206        return207    with tempfile.TemporaryDirectory() as temp_dir:208        clone_dir = Path(temp_dir) / Path(global_conf.get("advanced_options", {}).get("clone_dir", "sdk"))209        _LOGGER.info("Clone dir will be: %s", clone_dir)210        with manage_git_folder(gh_token, Path(temp_dir) / Path("rest"), branched_rest_api_id, pr_number=pr_number) as restapi_git_folder, \211            manage_git_folder(gh_token, clone_dir, branched_sdk_git_id) as sdk_folder:212            readme_files_infered = get_readme_files_from_git_object(git_object, restapi_git_folder)213            _LOGGER.info("Readmes files infered from PR: %s ", readme_files_infered)214            if not readme_files_infered:215                _LOGGER.info("No Readme in PR, quit")216                return217            # SDK part218            sdk_repo = Repo(str(sdk_folder))219            for base_branch in base_branch_names:220                _LOGGER.info('Checkout and create %s', base_branch)221                checkout_and_create_branch(sdk_repo, base_branch)222            _LOGGER.info('Try to checkout destination branch %s', branch_name)223            try:224                sdk_repo.git.checkout(branch_name)225                _LOGGER.info('The branch exists.')226            except GitCommandError:227                _LOGGER.info('Destination branch does not exists')228                # Will be created by do_commit229            configure_user(gh_token, sdk_repo)230            # Look for configuration in Readme231            _LOGGER.info('Extract conf from Readmes for target: %s', sdk_git_id)232            extract_conf_from_readmes(readme_files_infered, restapi_git_folder, sdk_tag, config)233            _LOGGER.info('End of extraction')234            def skip_callback(project, local_conf):235                # We know "project" is based on Path in "readme_files_infered"236                if Path(project) in readme_files_infered:237                    return False238                # Might be a regular project239                markdown_relative_path, optional_relative_paths = get_input_paths(global_conf, local_conf)240                if not (241                        markdown_relative_path in readme_files_infered or242                        any(input_file in readme_files_infered for input_file in optional_relative_paths)):243                    _LOGGER.info(f"In project {project} no files involved in this commit")244                    return True245                return False246            build_libraries(config, skip_callback, restapi_git_folder,247                            sdk_repo, temp_dir, autorest_bin)248            try:249                commit_for_sha = git_object.commit   # Commit250            except AttributeError:251                commit_for_sha = list(git_object.get_commits())[-1].commit  # PR252            message = message_template + "\n\n" + commit_for_sha.message253            commit_sha = do_commit(sdk_repo, message, branch_name, commit_for_sha.sha)254            if commit_sha:255                for base_branch in base_branch_names:256                    sdk_repo.git.push('origin', base_branch, set_upstream=True)257                sdk_repo.git.push('origin', branch_name, set_upstream=True)...test.py
Source:test.py  
1# Copyright (C) 2017 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13#14# See the License for the specific language governing permissions and15# limitations under the License.16import os17import shutil18import tempfile19import unittest20from devstack_local_conf import LocalConf21from collections import OrderedDict22class TestDevstackLocalConf(unittest.TestCase):23    def setUp(self):24        self.tmpdir = tempfile.mkdtemp()25    def tearDown(self):26        shutil.rmtree(self.tmpdir)27    def test_plugins(self):28        "Test that plugins without dependencies work"29        localrc = {'test_localrc': '1'}30        local_conf = {'install':31                      {'nova.conf':32                       {'main':33                        {'test_conf': '2'}}}}34        services = {'cinder': True}35        # We use ordereddict here to make sure the plugins are in the36        # *wrong* order for testing.37        plugins = OrderedDict([38            ('bar', 'git://git.openstack.org/openstack/bar-plugin'),39            ('foo', 'git://git.openstack.org/openstack/foo-plugin'),40            ('baz', 'git://git.openstack.org/openstack/baz-plugin'),41            ])42        p = dict(localrc=localrc,43                 local_conf=local_conf,44                 base_services=[],45                 services=services,46                 plugins=plugins,47                 base_dir='./test',48                 path=os.path.join(self.tmpdir, 'test.local.conf'))49        lc = LocalConf(p.get('localrc'),50                       p.get('local_conf'),51                       p.get('base_services'),52                       p.get('services'),53                       p.get('plugins'),54                       p.get('base_dir'))55        lc.write(p['path'])56        plugins = []57        with open(p['path']) as f:58            for line in f:59                if line.startswith('enable_plugin'):60                    plugins.append(line.split()[1])61        self.assertEqual(['bar', 'baz', 'foo'], plugins)62    def test_plugin_deps(self):63        "Test that plugins with dependencies work"64        os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', 'devstack'))65        os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', '.git'))66        os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', 'devstack'))67        os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', '.git'))68        with open(os.path.join(69                self.tmpdir,70                'foo-plugin', 'devstack', 'settings'), 'w') as f:71            f.write('define_plugin foo\n')72        with open(os.path.join(73                self.tmpdir,74                'bar-plugin', 'devstack', 'settings'), 'w') as f:75            f.write('define_plugin bar\n')76            f.write('plugin_requires bar foo\n')77        localrc = {'test_localrc': '1'}78        local_conf = {'install':79                      {'nova.conf':80                       {'main':81                        {'test_conf': '2'}}}}82        services = {'cinder': True}83        # We use ordereddict here to make sure the plugins are in the84        # *wrong* order for testing.85        plugins = OrderedDict([86            ('bar', 'git://git.openstack.org/openstack/bar-plugin'),87            ('foo', 'git://git.openstack.org/openstack/foo-plugin'),88            ])89        p = dict(localrc=localrc,90                 local_conf=local_conf,91                 base_services=[],92                 services=services,93                 plugins=plugins,94                 base_dir=self.tmpdir,95                 path=os.path.join(self.tmpdir, 'test.local.conf'))96        lc = LocalConf(p.get('localrc'),97                       p.get('local_conf'),98                       p.get('base_services'),99                       p.get('services'),100                       p.get('plugins'),101                       p.get('base_dir'))102        lc.write(p['path'])103        plugins = []104        with open(p['path']) as f:105            for line in f:106                if line.startswith('enable_plugin'):107                    plugins.append(line.split()[1])108        self.assertEqual(['foo', 'bar'], plugins)109    def test_plugin_circular_deps(self):110        "Test that plugins with circular dependencies fail"111        os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', 'devstack'))112        os.makedirs(os.path.join(self.tmpdir, 'foo-plugin', '.git'))113        os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', 'devstack'))114        os.makedirs(os.path.join(self.tmpdir, 'bar-plugin', '.git'))115        with open(os.path.join(116                self.tmpdir,117                'foo-plugin', 'devstack', 'settings'), 'w') as f:118            f.write('define_plugin foo\n')119            f.write('plugin_requires foo bar\n')120        with open(os.path.join(121                self.tmpdir,122                'bar-plugin', 'devstack', 'settings'), 'w') as f:123            f.write('define_plugin bar\n')124            f.write('plugin_requires bar foo\n')125        localrc = {'test_localrc': '1'}126        local_conf = {'install':127                      {'nova.conf':128                       {'main':129                        {'test_conf': '2'}}}}130        services = {'cinder': True}131        # We use ordereddict here to make sure the plugins are in the132        # *wrong* order for testing.133        plugins = OrderedDict([134            ('bar', 'git://git.openstack.org/openstack/bar-plugin'),135            ('foo', 'git://git.openstack.org/openstack/foo-plugin'),136            ])137        p = dict(localrc=localrc,138                 local_conf=local_conf,139                 base_services=[],140                 services=services,141                 plugins=plugins,142                 base_dir=self.tmpdir,143                 path=os.path.join(self.tmpdir, 'test.local.conf'))144        with self.assertRaises(Exception):145            lc = LocalConf(p.get('localrc'),146                           p.get('local_conf'),147                           p.get('base_services'),148                           p.get('services'),149                           p.get('plugins'),150                           p.get('base_dir'))151            lc.write(p['path'])152if __name__ == '__main__':...test_storage.py
Source:test_storage.py  
1import os2from paste.deploy import appconfig3import paste.fixture4from ckan.config.middleware import make_app5import ckan.model as model6from ckan.tests import conf_dir, url_for, CreateTestData7from ckan.controllers.admin import get_sysadmins8from ckan.controllers.storage import create_pairtree_marker9class TestStorageAPIController:10    @classmethod11    def setup_class(cls):12        config = appconfig('config:test.ini', relative_to=conf_dir)13        for key in config.local_conf.keys():14            if key.startswith('ofs'):15                del config.local_conf[key]16        config.local_conf['ofs.impl'] = 'pairtree'17        config.local_conf['ckan.storage.bucket'] = 'ckantest'18        config.local_conf['ofs.storage_dir'] = '/tmp/ckan-test-ckanext-storage'19        create_pairtree_marker( config.local_conf['ofs.storage_dir'] )20        wsgiapp = make_app(config.global_conf, **config.local_conf)21        cls.app = paste.fixture.TestApp(wsgiapp)22        CreateTestData.create_test_user()23    @classmethod24    def teardown_class(cls):25        CreateTestData.delete()26    def test_index(self):27        url = url_for('storage_api')28        res = self.app.get(url)29        out = res.json30        assert len(res.json) == 331    def test_authz(self):32        url = url_for('storage_api_auth_form', label='abc')33        # Non logged in users can not upload34        res = self.app.get(url, status=[302,401])35        36        # Logged in users can upload37        res = self.app.get(url, status=[200], extra_environ={'REMOTE_USER':'tester'})38       39        40        # TODO: ? test for non-authz case41        # url = url_for('storage_api_auth_form', label='abc')42        # res = self.app.get(url, status=[302,401])43class TestStorageAPIControllerLocal:44    @classmethod45    def setup_class(cls):46        config = appconfig('config:test.ini', relative_to=conf_dir)47        for key in config.local_conf.keys():48            if key.startswith('ofs'):49                del config.local_conf[key]50        config.local_conf['ckan.storage.bucket'] = 'ckantest'51        config.local_conf['ofs.impl'] = 'pairtree'52        config.local_conf['ofs.storage_dir'] = '/tmp/ckan-test-ckanext-storage'53        create_pairtree_marker( config.local_conf['ofs.storage_dir'] )54        wsgiapp = make_app(config.global_conf, **config.local_conf)55        cls.app = paste.fixture.TestApp(wsgiapp)56        CreateTestData.create()57        model.Session.remove()58        user = model.User.by_name('tester')59        cls.extra_environ = {'Authorization': str(user.apikey)}60    @classmethod61    def teardown_class(cls):62        CreateTestData.delete()63    def test_auth_form(self):64        url = url_for('storage_api_auth_form', label='abc')65        res = self.app.get(url, extra_environ=self.extra_environ, status=200)66        assert res.json['action'] == u'/storage/upload_handle', res.json67        assert res.json['fields'][-1]['value'] == 'abc', res68        url = url_for('storage_api_auth_form', label='abc/xxx')69        res = self.app.get(url, extra_environ=self.extra_environ, status=200)70        assert res.json['fields'][-1]['value'] == 'abc/xxx'71    def test_metadata(self):72        url = url_for('storage_api_get_metadata', label='abc')73        res = self.app.get(url, status=404)74        # TODO: test get metadata on real setup ...75        label = 'abc'76        url = url_for('storage_api_set_metadata',77            extra_environ=self.extra_environ,78            label=label,79            data=dict(80                label=label81                )82            )83        # res = self.app.get(url, status=404)84# Disabling because requires access to google storage to run (and this is not85# generally available to devs ...)86class _TestStorageAPIControllerGoogle:87    @classmethod88    def setup_class(cls):89        config = appconfig('config:test.ini', relative_to=conf_dir)90        config.local_conf['ckan.storage.bucket'] = 'ckantest'91        config.local_conf['ofs.impl'] = 'google'92        if 'ofs.gs_secret_access_key' not in config.local_conf:93            raise Exception('You will need to configure access to google storage to run this test')94        # You will need these configured in your95        # config.local_conf['ofs.gs_access_key_id'] = 'GOOGCABCDASDASD'96        # config.local_conf['ofs.gs_secret_access_key'] = '134zsdfjkw4234addad'97        # need to ensure not configured for local as breaks google setup98        # (and cannot delete all ofs keys as need the gs access codes)99        if 'ofs.storage_dir' in config.local_conf:100            del config.local_conf['ofs.storage_dir']101        wsgiapp = make_app(config.global_conf, **config.local_conf)102        cls.app = paste.fixture.TestApp(wsgiapp)103        # setup test data including testsysadmin user104        CreateTestData.create()105        model.Session.remove()106        user = model.User.by_name('tester')107        cls.extra_environ = {'Authorization': str(user.apikey)}108    @classmethod109    def teardown_class(cls):110        CreateTestData.delete()111    def test_auth_form(self):112        url = url_for('storage_api_auth_form', label='abc')113        res = self.app.get(url, extra_environ=self.extra_environ, status=200)114        assert res.json['fields'][-1]['value'] == 'abc', res115        url = url_for('storage_api_auth_form', label='abc/xxx')116        res = self.app.get(url, extra_environ=self.extra_environ, status=200)117        assert res.json['fields'][-1]['value'] == 'abc/xxx'118        url = url_for('storage_api_auth_form', label='abc',119                success_action_redirect='abc')120        res = self.app.get(url, extra_environ=self.extra_environ, status=200)121        fields = dict([ (x['name'], x['value']) for x in res.json['fields'] ])122        assert fields['success_action_redirect'] == u'http://localhost/storage/upload/success_empty?label=abc'123    # TODO: re-enable124    # Disabling as there seems to be a mismatch between OFS and more recent125    # versions of boto (e.g. >= 2.1.1)126    # Specifically fill_in_auth method on Connection objects has gone away127    def _test_auth_request(self):128        url = url_for('storage_api_auth_request', label='abc')129        res = self.app.get(url, extra_environ=self.extra_environ, status=200)130        assert res.json['method'] == 'POST'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
