Best Python code snippet using lisa_python
system_pm.py
Source:system_pm.py  
...65        if mode_lower not in allowed_modes:66            raise ConanException("CONAN_SYSREQUIRES_MODE=%s is not allowed, allowed modes=%r"67                                 % (mode, allowed_modes))68        return mode_lower69    def add_repository(self, repository, repo_key=None, update=True):70        self._tool.add_repository(repository, repo_key=repo_key)71        if update:72            self.update()73    def update(self):74        """75            Get the system package tool update command76        """77        mode = self._get_sysrequire_mode()78        if mode in ("disabled", "verify"):79            self._output.info("Not updating system_requirements. CONAN_SYSREQUIRES_MODE=%s" % mode)80            return81        self._is_up_to_date = True82        self._tool.update()83    def install(self, packages, update=True, force=False, arch_names=None):84        """ Get the system package tool install command and install one package85        :param packages: String with a package to be installed or a list with its variants e.g. "libusb-dev libxusb-devel"86        :param update: Run update command before to install87        :param force: Force installing all packages88        :param arch_names: Package suffix/prefix name used by installer tool e.g. {"x86_64": "amd64"}89        :return: None90        """91        packages = [packages] if isinstance(packages, str) else list(packages)92        packages = self._get_package_names(packages, arch_names)93        mode = self._get_sysrequire_mode()94        if mode in ("verify", "disabled"):95            # Report to output packages need to be installed96            if mode == "disabled":97                self._output.info("The following packages need to be installed:\n %s"98                                  % "\n".join(packages))99                return100            if mode == "verify" and not self._installed(packages):101                self._output.error("The following packages need to be installed:\n %s"102                                   % "\n".join(packages))103                raise ConanException("Aborted due to CONAN_SYSREQUIRES_MODE=%s. "104                                     "Some system packages need to be installed" % mode)105        if not force and self._installed(packages):106            return107        # From here system packages can be updated/modified108        if update and not self._is_up_to_date:109            self.update()110        self._install_any(packages)111    def install_packages(self, packages, update=True, force=False, arch_names=None):112        """ Get the system package tool install command and install all packages and/or variants.113            Inputs:114            "pkg-variant1"  # (1) Install only one package115            ["pkg-variant1", "otherpkg", "thirdpkg"] # (2) All must be installed116            [("pkg-variant1", "pkg-variant2"), "otherpkg", "thirdpkg"] # (3) Install only one variant117                                                                             and all other packages118            "pkg1 pkg2", "pkg3 pkg4" # (4) Invalid119            ["pkg1 pkg2", "pkg3 pkg4"] # (5) Invalid120        :param packages: Supports multiple formats (string,list,tuple). Lists and tuples into a list121        are considered variants and is processed just like self.install(). A list of string is122        considered a list of packages to be installed (only not installed yet).123        :param update: Run update command before to install124        :param force: Force installing all packages, including all installed.125        :param arch_names: Package suffix/prefix name used by installer tool e.g. {"x86_64": "amd64"}126        :return: None127        """128        packages = [packages] if isinstance(packages, six.string_types) else list(packages)129        # only one (first) variant will be installed130        list_variants = list(filter(lambda x: isinstance(x, (tuple, list)), packages))131        # all packages will be installed132        packages = list(filter(lambda x: not isinstance(x, (tuple, list)), packages))133        if [pkg for pkg in packages if " " in pkg]:134            raise ConanException("Each string must contain only one package to be installed. "135                                 "Use a list instead e.g. ['foo', 'bar'].")136        for variant in list_variants:137            self.install(variant, update=update, force=force, arch_names=arch_names)138        packages = self._get_package_names(packages, arch_names)139        mode = self._get_sysrequire_mode()140        if mode in ("verify", "disabled"):141            # Report to output packages need to be installed142            if mode == "disabled":143                self._output.info("The following packages need to be installed:\n %s"144                                  % "\n".join(packages))145                return146            if mode == "verify" and not self._installed(packages):147                self._output.error("The following packages need to be installed:\n %s"148                                   % "\n".join(packages))149                raise ConanException("Aborted due to CONAN_SYSREQUIRES_MODE=%s. "150                                     "Some system packages need to be installed" % mode)151        packages = packages if force else self._to_be_installed(packages)152        if not force and not packages:153            return154        # From here system packages can be updated/modified155        if update and not self._is_up_to_date:156            self.update()157        self._install_all(packages)158    def _get_package_names(self, packages, arch_names):159        """ Parse package names according it architecture160        :param packages: list with all package to be installed e.g. ["libusb-dev libfoobar-dev"]161        :param arch_names: Package suffix/prefix name used by installer tool162        :return: list with all parsed names e.g. ["libusb-dev:armhf libfoobar-dev:armhf"]163        """164        if self._conanfile and self._conanfile.settings and cross_building(self._conanfile):165            _, build_arch, _, host_arch = get_cross_building_settings(self._conanfile)166            arch = host_arch or build_arch167            parsed_packages = []168            for package in packages:169                if isinstance(package, (tuple, list)):170                    parsed_packages.append(tuple(self._get_package_names(package, arch_names)))171                else:172                    for package_name in package.split(" "):173                        parsed_packages.append(self._tool.get_package_name(package_name, arch,174                                                                           arch_names))175            return parsed_packages176        return packages177    def installed(self, package_name):178        return self._tool.installed(package_name)179    def _to_be_installed(self, packages):180        """ Returns a list with all not installed packages.181        """182        not_installed = [pkg for pkg in packages if not self._tool.installed(pkg)]183        return not_installed184    def _installed(self, packages):185        """ Return True if at least one of the packages is installed.186        """187        if not packages:188            return True189        for pkg in packages:190            if self._tool.installed(pkg):191                self._output.info("Package already installed: %s" % pkg)192                return True193        return False194    def _install_all(self, packages):195        self._tool.install(" ".join(sorted(packages)))196    def _install_any(self, packages):197        if len(packages) == 1:198            return self._tool.install(packages[0])199        for pkg in packages:200            try:201                return self._tool.install(pkg)202            except ConanException:203                pass204        raise ConanException("Could not install any of %s" % packages)205class BaseTool(object):206    def __init__(self, output=None):207        self._output = default_output(output, 'conans.client.tools.system_pm.BaseTool')208    def get_package_name(self, package, arch, arch_names):209        """ Retrieve package name to installed according the target arch.210        :param package: Regular package name e.g libusb-dev211        :param arch: Host arch from Conanfile.settings212        :param arch_names: Dictionary with suffix/prefix names e.g {"x86_64": "amd64"}213        :return: Package name for Tool e.g. libusb-dev:i386214        """215        return package216class NullTool(BaseTool):217    def add_repository(self, repository, repo_key=None):218        pass219    def update(self):220        pass221    def install(self, package_name):222        self._output.warn("Only available for linux with apt-get, yum, or pacman or OSX with brew or"223                          " FreeBSD with pkg or Solaris with pkgutil")224    def installed(self, package_name):225        return False226class AptTool(BaseTool):227    def add_repository(self, repository, repo_key=None):228        _run(self._runner, "%sapt-add-repository %s" % (self._sudo_str, repository),229             output=self._output)230        if repo_key:231            _run(self._runner, "wget -qO - %s | %sapt-key add -" % (repo_key, self._sudo_str),232                 output=self._output)233    def update(self):234        _run(self._runner, "%sapt-get update" % self._sudo_str, output=self._output)235    def install(self, package_name):236        recommends_str = '' if self._recommends else '--no-install-recommends '237        _run(self._runner,238             "%sapt-get install -y %s%s" % (self._sudo_str, recommends_str, package_name),239             output=self._output)240    def installed(self, package_name):241        exit_code = self._runner("dpkg-query -W -f='${Status}' %s | grep -q \"ok installed\""242                                 % package_name, None)243        return exit_code == 0244    def get_package_name(self, package, arch, arch_names):245        if arch_names is None:246            arch_names = {"x86_64": "amd64",247                         "x86": "i386",248                         "ppc32": "powerpc",249                         "ppc64le": "ppc64el",250                         "armv7": "arm",251                         "armv7hf": "armhf",252                         "armv8": "arm64",253                         "s390x": "s390x"}254        if arch in arch_names:255            return "%s:%s" % (package, arch_names[arch])256        return package257class YumTool(BaseTool):258    def add_repository(self, repository, repo_key=None):259        raise ConanException("YumTool::add_repository not implemented")260    def update(self):261        _run(self._runner, "%syum check-update -y" % self._sudo_str, accepted_returns=[0, 100],262             output=self._output)263    def install(self, package_name):264        _run(self._runner, "%syum install -y %s" % (self._sudo_str, package_name),265             output=self._output)266    def installed(self, package_name):267        exit_code = self._runner("rpm -q %s" % package_name, None)268        return exit_code == 0269    def get_package_name(self, package, arch, arch_names):270        if arch_names is None:271            arch_names = {"x86_64": "x86_64",272                         "x86": "i?86",273                         "ppc32": "powerpc",274                         "ppc64le": "ppc64le",275                         "armv7": "armv7",276                         "armv7hf": "armv7hl",277                         "armv8": "aarch64",278                         "s390x": "s390x"}279        if arch in arch_names:280            return "%s.%s" % (package, arch_names[arch])281        return package282class DnfTool(YumTool):283    def add_repository(self, repository, repo_key=None):284        raise ConanException("DnfTool::add_repository not implemented")285    def update(self):286        _run(self._runner, "%sdnf check-update -y" % self._sudo_str, accepted_returns=[0, 100],287             output=self._output)288    def install(self, package_name):289        _run(self._runner, "%sdnf install -y %s" % (self._sudo_str, package_name),290             output=self._output)291class BrewTool(BaseTool):292    def add_repository(self, repository, repo_key=None):293        raise ConanException("BrewTool::add_repository not implemented")294    def update(self):295        _run(self._runner, "brew update", output=self._output)296    def install(self, package_name):297        _run(self._runner, "brew install %s" % package_name, output=self._output)298    def installed(self, package_name):299        exit_code = self._runner('test -n "$(brew ls --versions %s)"' % package_name, None)300        return exit_code == 0301class PkgTool(BaseTool):302    def add_repository(self, repository, repo_key=None):303        raise ConanException("PkgTool::add_repository not implemented")304    def update(self):305        _run(self._runner, "%spkg update" % self._sudo_str, output=self._output)306    def install(self, package_name):307        _run(self._runner, "%spkg install -y %s" % (self._sudo_str, package_name),308             output=self._output)309    def installed(self, package_name):310        exit_code = self._runner("pkg info %s" % package_name, None)311        return exit_code == 0312class PkgUtilTool(BaseTool):313    def add_repository(self, repository, repo_key=None):314        raise ConanException("PkgUtilTool::add_repository not implemented")315    def update(self):316        _run(self._runner, "%spkgutil --catalog" % self._sudo_str, output=self._output)317    def install(self, package_name):318        _run(self._runner, "%spkgutil --install --yes %s" % (self._sudo_str, package_name),319             output=self._output)320    def installed(self, package_name):321        exit_code = self._runner('test -n "`pkgutil --list %s`"' % package_name, None)322        return exit_code == 0323class ChocolateyTool(BaseTool):324    def add_repository(self, repository, repo_key=None):325        raise ConanException("ChocolateyTool::add_repository not implemented")326    def update(self):327        _run(self._runner, "choco outdated", output=self._output)328    def install(self, package_name):329        _run(self._runner, "choco install --yes %s" % package_name, output=self._output)330    def installed(self, package_name):331        exit_code = self._runner('choco search --local-only --exact %s | '332                                 'findstr /c:"1 packages installed."' % package_name, None)333        return exit_code == 0334class PacManTool(BaseTool):335    def add_repository(self, repository, repo_key=None):336        raise ConanException("PacManTool::add_repository not implemented")337    def update(self):338        _run(self._runner, "%spacman -Syyu --noconfirm" % self._sudo_str, output=self._output)339    def install(self, package_name):340        _run(self._runner, "%spacman -S --noconfirm %s" % (self._sudo_str, package_name),341             output=self._output)342    def installed(self, package_name):343        exit_code = self._runner("pacman -Qi %s" % package_name, None)344        return exit_code == 0345    def get_package_name(self, package, arch, arch_names):346        if arch_names is None:347            arch_names = {"x86": "lib32"}348        if arch in arch_names:349            return "%s-%s" % (arch_names[arch], package)350        return package351class ZypperTool(BaseTool):352    def add_repository(self, repository, repo_key=None):353        raise ConanException("ZypperTool::add_repository not implemented")354    def update(self):355        _run(self._runner, "%szypper --non-interactive ref" % self._sudo_str, output=self._output)356    def install(self, package_name):357        _run(self._runner, "%szypper --non-interactive in %s" % (self._sudo_str, package_name),358             output=self._output)359    def installed(self, package_name):360        exit_code = self._runner("rpm -q %s" % package_name, None)361        return exit_code == 0362    def get_package_name(self, package, arch, arch_names):363        if arch_names is None:364            arch_names = {"x86": "i586"}365        if arch in arch_names:366            return "%s.%s" % (arch_names[arch], package)...scope_map.py
Source:scope_map.py  
1# --------------------------------------------------------------------------------------------2# Copyright (c) Microsoft Corporation. All rights reserved.3# Licensed under the MIT License. See License.txt in the project root for license information.4# --------------------------------------------------------------------------------------------5from enum import Enum6from azure.cli.core.util import CLIError7from ._utils import get_resource_group_name_by_registry_name, parse_actions_from_repositories8class ScopeMapActions(Enum):9    CONTENT_DELETE = 'content/delete'10    CONTENT_READ = 'content/read'11    CONTENT_WRITE = 'content/write'12    METADATA_READ = 'metadata/read'13    METADATA_WRITE = 'metadata/write'14def acr_scope_map_create(cmd,15                         client,16                         registry_name,17                         scope_map_name,18                         repository_actions_list,19                         resource_group_name=None,20                         description=None):21    resource_group_name = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)22    actions = parse_actions_from_repositories(repository_actions_list)23    return client.create(24        resource_group_name,25        registry_name,26        scope_map_name,27        actions,28        description29    )30def acr_scope_map_delete(cmd,31                         client,32                         registry_name,33                         scope_map_name,34                         yes=None,35                         resource_group_name=None):36    if not yes:37        from knack.prompting import prompt_y_n38        confirmation = prompt_y_n("Deleting the scope map '{}' will remove its permissions with associated tokens. "39                                  "Proceed?".format(scope_map_name))40        if not confirmation:41            return None42    resource_group_name = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)43    return client.delete(resource_group_name, registry_name, scope_map_name)44def acr_scope_map_update(cmd,45                         client,46                         registry_name,47                         scope_map_name,48                         add_repository=None,49                         remove_repository=None,50                         resource_group_name=None,51                         description=None):52    if not (add_repository or remove_repository or description):53        raise CLIError('No scope map properties to update.')54    resource_group_name = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)55    current_scope_map = acr_scope_map_show(cmd, client, registry_name, scope_map_name, resource_group_name)56    current_actions = current_scope_map.actions57    if add_repository or remove_repository:58        add_actions_set = set(parse_actions_from_repositories(add_repository)) if add_repository else set()59        remove_actions_set = set(parse_actions_from_repositories(remove_repository)) if remove_repository else set()60        # Duplicate actions can lead to inconsistency based on order of operations (set subtraction isn't associative).61        # Eg: ({A, B} - {B}) U {B, C} = {A, B, C},  ({A, B} U {B, C}) - {B}  = {A, C}62        duplicate_actions = set.intersection(add_actions_set, remove_actions_set)63        if duplicate_actions:64            # Display these actions to users: remove 'repositories/' prefix from 'repositories/<repo>/<action>'65            errors = sorted(map(lambda action: action[action.find('/') + 1:], duplicate_actions))66            raise CLIError(67                'Update ambiguity. Duplicate actions were provided with --add and --remove arguments.\n{}'68                .format(errors))69        final_actions_set = set(current_scope_map.actions).union(add_actions_set).difference(remove_actions_set)70        current_actions = list(final_actions_set)71    return client.update(72        resource_group_name,73        registry_name,74        scope_map_name,75        description,76        current_actions77    )78def acr_scope_map_show(cmd,79                       client,80                       registry_name,81                       scope_map_name,82                       resource_group_name=None):83    resource_group_name = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)84    return client.get(85        resource_group_name,86        registry_name,87        scope_map_name88    )89def acr_scope_map_list(cmd,90                       client,91                       registry_name,92                       resource_group_name=None):93    resource_group_name = get_resource_group_name_by_registry_name(cmd.cli_ctx, registry_name, resource_group_name)94    return client.list(95        resource_group_name,96        registry_name...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
