How to use plugin_type method in avocado

Best Python code snippet using avocado_python

PluginRegistry.py

Source:PluginRegistry.py Github

copy

Full Screen

1##############################################################################2#3# Copyright (c) 2001 Zope Foundation and Contributors4#5# This software is subject to the provisions of the Zope Public License,6# Version 2.0 (ZPL). A copy of the ZPL should accompany this7# distribution.8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS11# FOR A PARTICULAR PURPOSE12#13##############################################################################14""" Classes: PluginRegistry15"""16import logging17import os18from AccessControl import ClassSecurityInfo19from AccessControl.Permissions import manage_users as ManageUsers20from Acquisition import aq_parent21from Acquisition import aq_inner22from App.class_init import default__class_init__ as InitializeClass23from App.Common import package_home24from App.ImageFile import ImageFile25from OFS.SimpleItem import SimpleItem26from Persistence import PersistentMapping27from Products.PageTemplates.PageTemplateFile import PageTemplateFile28from webdav.interfaces import IWriteLock29from zope.interface import implements30from Products.PluginRegistry.interfaces import IPluginRegistry31try:32 from Products.PluginRegistry.exportimport import _updatePluginRegistry33except ImportError:34 _HAS_GENERIC_SETUP = False35else:36 _HAS_GENERIC_SETUP = True37product_dir = package_home(globals())38product_prefix = os.path.join(os.path.split(product_dir)[:-1])39_wwwdir = os.path.join( product_dir, 'www' )40logger = logging.getLogger('PluginRegistry')41class PluginRegistry(SimpleItem):42 """ Implement IPluginRegistry as an independent, ZMI-manageable object.43 o Each plugin type holds an ordered list of (id, wrapper) tuples.44 """45 implements(IPluginRegistry, IWriteLock)46 security = ClassSecurityInfo()47 meta_type = 'Plugin Registry'48 _plugins = None49 def __init__(self, plugin_type_info=()):50 if isinstance(plugin_type_info, basestring):51 # some tool is passing us our ID.52 raise ValueError('Must pass a sequence of plugin info dicts!')53 self._plugin_types = [x[0] for x in plugin_type_info]54 self._plugin_type_info = PersistentMapping()55 for interface in plugin_type_info:56 self._plugin_type_info[interface[0]] = {57 'id': interface[1]58 , 'title': interface[2]59 , 'description': interface[3]60 }61 #62 # IPluginRegistry implementation63 #64 security.declareProtected(ManageUsers, 'listPluginTypeInfo')65 def listPluginTypeInfo(self):66 """ See IPluginRegistry.67 """68 result = []69 for ptype in self._plugin_types:70 info = self._plugin_type_info[ptype].copy()71 info['interface'] = ptype72 info['methods'] = ptype.names()73 result.append(info)74 return result75 security.declareProtected(ManageUsers, 'listPlugins')76 def listPlugins(self, plugin_type):77 """ See IPluginRegistry.78 """79 result = []80 parent = aq_parent(aq_inner(self))81 for plugin_id in self._getPlugins(plugin_type):82 plugin = parent._getOb(plugin_id)83 if not _satisfies(plugin, plugin_type):84 logger.debug('Active plugin %s no longer implements %s'85 % (plugin_id, plugin_type)86 )87 else:88 result.append((plugin_id, plugin))89 return result90 security.declareProtected(ManageUsers, 'getPluginInfo')91 def getPluginInfo(self, plugin_type):92 """ See IPluginRegistry.93 """94 plugin_type = self._getInterfaceFromName(plugin_type)95 return self._plugin_type_info[plugin_type]96 security.declareProtected(ManageUsers, 'listPluginIds')97 def listPluginIds(self, plugin_type):98 """ See IPluginRegistry.99 """100 return self._getPlugins(plugin_type)101 security.declareProtected(ManageUsers, 'activatePlugin')102 def activatePlugin(self, plugin_type, plugin_id):103 """ See IPluginRegistry.104 """105 plugins = list(self._getPlugins(plugin_type))106 if plugin_id in plugins:107 raise KeyError, 'Duplicate plugin id: %s' % plugin_id108 parent = aq_parent(aq_inner(self))109 plugin = parent._getOb(plugin_id)110 if not _satisfies(plugin, plugin_type):111 raise ValueError, 'Plugin does not implement %s' % plugin_type112 plugins.append(plugin_id)113 self._plugins[plugin_type] = tuple(plugins)114 security.declareProtected(ManageUsers, 'deactivatePlugin')115 def deactivatePlugin(self, plugin_type, plugin_id):116 """ See IPluginRegistry.117 """118 plugins = list(self._getPlugins(plugin_type))119 if not plugin_id in plugins:120 raise KeyError, 'Invalid plugin id: %s' % plugin_id121 plugins = [x for x in plugins if x != plugin_id]122 self._plugins[plugin_type] = tuple(plugins)123 security.declareProtected(ManageUsers, 'movePluginsUp')124 def movePluginsUp(self, plugin_type, ids_to_move):125 """ See IPluginRegistry.126 """127 ids = list(self._getPlugins(plugin_type))128 count = len(ids)129 indexes = list(map(ids.index, ids_to_move))130 indexes.sort()131 for i1 in indexes:132 if i1 < 0 or i1 >= count:133 raise IndexError, i1134 i2 = i1 - 1135 if i2 < 0:136 # i1 is already on top137 continue138 ids[i2], ids[i1] = ids[i1], ids[i2]139 self._plugins[plugin_type] = tuple(ids)140 security.declareProtected(ManageUsers, 'movePluginsDown')141 def movePluginsDown(self, plugin_type, ids_to_move):142 """ See IPluginRegistry.143 """144 ids = list(self._getPlugins(plugin_type))145 count = len(ids)146 indexes = list(map(ids.index, ids_to_move))147 indexes.sort()148 indexes.reverse()149 for i1 in indexes:150 if i1 < 0 or i1 >= count:151 raise IndexError, i1152 i2 = i1 + 1153 if i2 == len(ids):154 # i1 is already on the bottom155 continue156 ids[i2], ids[i1] = ids[i1], ids[i2]157 self._plugins[plugin_type] = tuple(ids)158 #159 # ZMI160 #161 arrow_right_gif = ImageFile('www/arrow-right.gif', globals())162 arrow_left_gif = ImageFile('www/arrow-left.gif', globals())163 arrow_up_gif = ImageFile('www/arrow-up.gif', globals())164 arrow_down_gif = ImageFile('www/arrow-down.gif', globals())165 security.declareProtected(ManageUsers, 'manage_activatePlugins')166 def manage_activatePlugins(self167 , plugin_type168 , plugin_ids169 , RESPONSE170 ):171 """ Shim into ZMI.172 """173 interface = self._getInterfaceFromName(plugin_type)174 for id in plugin_ids:175 self.activatePlugin(interface, id)176 RESPONSE.redirect('%s/manage_plugins?plugin_type=%s'177 % (self.absolute_url(), plugin_type)178 )179 security.declareProtected(ManageUsers, 'manage_deactivatePlugins')180 def manage_deactivatePlugins(self181 , plugin_type182 , plugin_ids183 , RESPONSE184 ):185 """ Shim into ZMI.186 """187 interface = self._getInterfaceFromName(plugin_type)188 for id in plugin_ids:189 self.deactivatePlugin(interface, id)190 RESPONSE.redirect('%s/manage_plugins?plugin_type=%s'191 % (self.absolute_url(), plugin_type)192 )193 security.declareProtected(ManageUsers, 'manage_movePluginsUp')194 def manage_movePluginsUp(self195 , plugin_type196 , plugin_ids197 , RESPONSE198 ):199 """ Shim into ZMI.200 """201 interface = self._getInterfaceFromName(plugin_type)202 self.movePluginsUp(interface, plugin_ids)203 RESPONSE.redirect('%s/manage_plugins?plugin_type=%s'204 % (self.absolute_url(), plugin_type)205 )206 security.declareProtected(ManageUsers, 'manage_movePluginsDown')207 def manage_movePluginsDown(self208 , plugin_type209 , plugin_ids210 , RESPONSE211 ):212 """ Shim into ZMI.213 """214 interface = self._getInterfaceFromName(plugin_type)215 self.movePluginsDown(interface, plugin_ids)216 RESPONSE.redirect('%s/manage_plugins?plugin_type=%s'217 % (self.absolute_url(), plugin_type)218 )219 security.declareProtected(ManageUsers, 'getAllPlugins')220 def getAllPlugins(self, plugin_type):221 """ Return a mapping segregating active / available plugins.222 'plugin_type' is the __name__ of the interface.223 """224 interface = self._getInterfaceFromName(plugin_type)225 active = self._getPlugins(interface)226 available = []227 for id, value in aq_parent(aq_inner(self)).objectItems():228 if _satisfies(value, interface):229 if id not in active:230 available.append(id)231 return { 'active' : active, 'available' : available }232 security.declareProtected(ManageUsers, 'removePluginById')233 def removePluginById(self, plugin_id):234 """ Remove a plugin from any plugin types which have it configured.235 """236 for plugin_type in self._plugin_types:237 if plugin_id in self._getPlugins(plugin_type):238 self.deactivatePlugin(plugin_type, plugin_id)239 security.declareProtected(ManageUsers, 'manage_plugins')240 manage_plugins = PageTemplateFile('plugins', _wwwdir)241 security.declareProtected(ManageUsers, 'manage_active')242 manage_active = PageTemplateFile('active_plugins', _wwwdir)243 manage_twoLists = PageTemplateFile('two_lists', _wwwdir)244 manage_options=(({ 'label' : 'Plugins'245 , 'action' : 'manage_plugins'246 # , 'help' : ('PluggableAuthService'247 # , 'plugins.stx')248 }249 , { 'label' : 'Active'250 , 'action' : 'manage_active'251 }252 )253 + SimpleItem.manage_options254 )255 if _HAS_GENERIC_SETUP:256 security.declareProtected(ManageUsers, 'manage_exportImportForm')257 manage_exportImportForm = PageTemplateFile('export_import', _wwwdir)258 security.declareProtected(ManageUsers, 'getConfigAsXML')259 def getConfigAsXML(self):260 """ Return XML representing the registry's configuration.261 """262 from exportimport import PluginRegistryExporter263 pre = PluginRegistryExporter(self).__of__(self)264 return pre.generateXML()265 security.declareProtected(ManageUsers, 'manage_exportImport')266 def manage_exportImport(self, updated_xml, should_purge, RESPONSE):267 """ Parse XML and update the registry.268 """269 #XXX encoding?270 _updatePluginRegistry(self, updated_xml, should_purge)271 RESPONSE.redirect('%s/manage_exportImportForm'272 '?manage_tabs_message=Registry+updated.'273 % self.absolute_url())274 security.declareProtected(ManageUsers, 'manage_FTPget')275 def manage_FTPget(self, REQUEST, RESPONSE):276 """277 """278 return self.getConfigAsXML()279 security.declareProtected(ManageUsers, 'PUT')280 def PUT(self, REQUEST, RESPONSE):281 """282 """283 xml = REQUEST['BODYFILE'].read()284 _updatePluginRegistry(self, xml, True)285 manage_options = (manage_options[:2]286 + ({ 'label' : 'Export / Import'287 , 'action' : 'manage_exportImportForm'288 },)289 + manage_options[2:]290 )291 #292 # Helper methods293 #294 security.declarePrivate('_getPlugins')295 def _getPlugins(self, plugin_type):296 parent = aq_parent(aq_inner(self))297 if plugin_type not in self._plugin_types:298 raise KeyError, plugin_type299 if self._plugins is None:300 self._plugins = PersistentMapping()301 return self._plugins.setdefault(plugin_type, ())302 security.declarePrivate('_getInterfaceFromName')303 def _getInterfaceFromName(self, plugin_type_name):304 """ Convert the string name to an interface.305 o Raise KeyError is no such interface is known.306 """307 found = [x[0] for x in self._plugin_type_info.items()308 if x[1]['id'] == plugin_type_name]309 if not found:310 raise KeyError, plugin_type_name311 if len(found) > 1:312 raise KeyError, 'Waaa!: %s' % plugin_type_name313 return found[0]314InitializeClass(PluginRegistry)315def _satisfies(plugin, iface):316 checker = getattr(iface, 'providedBy', None)317 if checker is None: # BBB for Zope 2.7?318 checker = iface.isImplementedBy319 return checker(plugin)320def emptyPluginRegistry(ignored):321 """ Return empty registry, for filling from setup profile.322 """...

Full Screen

Full Screen

meta_runtime.py

Source:meta_runtime.py Github

copy

Full Screen

1# Copyright (c) 2020, Felix Fontein <felix@fontein.de>2# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)3# SPDX-License-Identifier: GPL-3.0-or-later4from __future__ import (absolute_import, division, print_function)5__metaclass__ = type6import os7from .ansible import PLUGIN_TYPES8from .yaml import load_yaml9def record_redirect(plugin_type, record, source, destination):10 if source in record:11 if record[source] != destination:12 raise Exception(13 'ERROR: {plugin_type} {source} maps to '14 'both {destination_1} and {destination_2}'.format(15 plugin_type=plugin_type,16 source=source,17 destination_1=record[source],18 destination_2=destination,19 ))20 record[source] = destination21def path_to_name(path, base_dir, remove_extension=True):22 if remove_extension:23 path = path[:-len('.py')]24 redirect_name = os.path.normpath(os.path.relpath(path, base_dir)).replace(os.path.sep, '.')25 return redirect_name26def scan_file_redirects(redirects, collection_root='.', remove=False):27 for plugin_type in PLUGIN_TYPES:28 if plugin_type in ('test', 'filter'):29 # Test and filter plugins are not coupled to filenames30 continue31 plugin_redirects = redirects[plugin_type]32 base_dir = os.path.join(collection_root, 'plugins', plugin_type)33 for root, dirnames, filenames in os.walk(base_dir):34 for filename in filenames:35 if not filename.endswith('.py'):36 continue37 if filename in ('__init__', ):38 continue39 path = os.path.join(root, filename)40 if os.path.islink(path):41 dest = os.readlink(path)42 if not dest.endswith('.py'):43 print('WARNING: link {link} does not point to Python file, '44 'but to {dest}'.format(link=path, dest=dest))45 continue46 dest_path = os.path.join(root, dest)47 source = path_to_name(path, base_dir)48 destination = path_to_name(dest_path, base_dir)49 record_redirect(plugin_type, plugin_redirects, source, destination)50 if remove:51 os.unlink(path)52def scan_flatmap_redirects(redirects, collection_root='.'):53 for plugin_type in ('action', 'modules'):54 plugin_redirects = redirects[plugin_type]55 base_dir = os.path.join(collection_root, 'plugins', plugin_type)56 for root, dirnames, filenames in os.walk(base_dir):57 if root == base_dir:58 continue59 for filename in filenames:60 if not filename.endswith('.py'):61 continue62 if filename in ('__init__', ):63 continue64 source = path_to_name(os.path.join(base_dir, filename), base_dir)65 destination = path_to_name(os.path.join(root, filename), base_dir)66 record_redirect(plugin_type, plugin_redirects, source, destination)67def scan_plugins(plugins, redirects, runtime, collection_root='.', all_plugins=False):68 plugin_routing = runtime.get('plugin_routing') or {}69 for plugin_type in PLUGIN_TYPES:70 plugins_set = plugins[plugin_type]71 plugin_redirects = redirects[plugin_type]72 for source, destination in plugin_redirects.items():73 plugins_set.add(source)74 plugins_set.add(destination)75 base_dir = os.path.join(collection_root, 'plugins', plugin_type)76 for root, dirnames, filenames in os.walk(base_dir):77 if plugin_type == 'module_utils':78 for dirname in dirnames:79 path = os.path.join(root, dirname)80 plugins_set.add(path_to_name(path, base_dir, remove_extension=False))81 for filename in filenames:82 if not filename.endswith('.py'):83 continue84 if filename in ('__init__', ):85 continue86 path = os.path.join(root, filename)87 plugins_set.add(path_to_name(path, base_dir))88 if plugin_type in plugin_routing:89 for plugin_name, plugin_data in plugin_routing[plugin_type].items():90 if 'tombstone' in plugin_data or all_plugins:91 plugins_set.add(plugin_name)92def name_to_path(name, base_dir):93 return os.path.join(base_dir, name.replace('.', os.path.sep) + '.py')94def add_file_redirects(redirects, collection_root='.'):95 for plugin_type in PLUGIN_TYPES:96 if plugin_type in ('test', 'filter'):97 # Test and filter plugins are not coupled to filenames98 continue99 base_dir = os.path.join(collection_root, 'plugins', plugin_type)100 for source, destination in redirects[plugin_type].items():101 src = name_to_path(source, base_dir)102 dst = name_to_path(destination, base_dir)103 rel_dst = os.path.normpath(os.path.relpath(dst, os.path.dirname(src)))104 if os.path.islink(src):105 if os.path.normpath(os.readlink(src)) == rel_dst:106 continue107 os.symlink(rel_dst, src)108def extract_meta_redirects(redirects, runtime, collection_name, remove=False):109 plugin_routing = runtime.get('plugin_routing') or {}110 collection_prefix = '{name}.'.format(name=collection_name)111 for plugin_type in PLUGIN_TYPES:112 plugins = plugin_routing.get(plugin_type)113 plugin_redirects = redirects[plugin_type]114 if plugins:115 for plugin_name, plugin_data in plugins.items():116 redirect = plugin_data.get('redirect')117 if redirect and redirect.startswith(collection_prefix):118 record_redirect(119 plugin_type, plugin_redirects, plugin_name,120 redirect[len(collection_prefix):])121 if remove and plugin_type not in ('test', 'filter'):122 del plugin_data['redirect']123def add_meta_redirects(redirects, runtime, collection_name):124 plugin_routing = runtime.get('plugin_routing')125 for plugin_type in PLUGIN_TYPES:126 for source, destination in redirects[plugin_type].items():127 if plugin_routing is None:128 runtime['plugin_routing'] = plugin_routing = dict()129 if plugin_type not in plugin_routing:130 plugin_routing[plugin_type] = dict()131 if source not in plugin_routing[plugin_type]:132 plugin_routing[plugin_type][source] = dict()133 plugin_routing[plugin_type][source]['redirect'] = (134 '{collection_name}.{destination}'.format(135 collection_name=collection_name,136 destination=destination,137 )138 )139def sort_plugin_routing(runtime):140 plugin_routing = runtime.get('plugin_routing')141 if not plugin_routing:142 return143 for plugin_type in PLUGIN_TYPES:144 plugins = plugin_routing.get(plugin_type)145 if not plugins:146 continue147 plugin_routing[plugin_type] = dict(sorted(plugins.items()))148def load_ansible_core_runtime():149 # Load ansible-core's ansible.builtin runtime150 from ansible import release as ansible_release151 ansible_builtin_runtime_path = os.path.join(152 os.path.dirname(ansible_release.__file__), 'config', 'ansible_builtin_runtime.yml')...

Full Screen

Full Screen

registry.py

Source:registry.py Github

copy

Full Screen

1from collections import namedtuple2import pkg_resources3from plover.oslayer.config import PLUGINS_PLATFORM4from plover import log5class Plugin:6 def __init__(self, plugin_type, name, obj):7 self.plugin_type = plugin_type8 self.name = name9 self.obj = obj10 self.__doc__ = obj.__doc__ or ''11 def __str__(self):12 return f'{self.plugin_type}:{self.name}'13PluginDistribution = namedtuple('PluginDistribution', 'dist plugins')14class Registry:15 PLUGIN_TYPES = (16 'command',17 'dictionary',18 'extension',19 'gui',20 'gui.qt.machine_option',21 'gui.qt.tool',22 'machine',23 'macro',24 'meta',25 'system',26 )27 def __init__(self, suppress_errors=True):28 self._plugins = {}29 self._distributions = {}30 self._suppress_errors = suppress_errors31 for plugin_type in self.PLUGIN_TYPES:32 self._plugins[plugin_type] = {}33 def register_plugin(self, plugin_type, name, obj):34 plugin = Plugin(plugin_type, name, obj)35 self._plugins[plugin_type][name.lower()] = plugin36 return plugin37 def register_plugin_from_entrypoint(self, plugin_type, entrypoint):38 log.info('%s: %s (from %s in %s)', plugin_type, entrypoint.name,39 entrypoint.dist, entrypoint.dist.location)40 try:41 obj = entrypoint.load()42 except:43 log.error('error loading %s plugin: %s (from %s)', plugin_type,44 entrypoint.name, entrypoint.module_name, exc_info=True)45 if not self._suppress_errors:46 raise47 else:48 plugin = self.register_plugin(plugin_type, entrypoint.name, obj)49 # Keep track of distributions providing plugins.50 dist_id = str(entrypoint.dist)51 dist = self._distributions.get(dist_id)52 if dist is None:53 dist = PluginDistribution(entrypoint.dist, set())54 self._distributions[dist_id] = dist55 dist.plugins.add(plugin)56 def get_plugin(self, plugin_type, plugin_name):57 return self._plugins[plugin_type][plugin_name.lower()]58 def list_plugins(self, plugin_type):59 return sorted(self._plugins[plugin_type].values(),60 key=lambda p: p.name)61 def list_distributions(self):62 return [dist for dist_id, dist in sorted(self._distributions.items())]63 def update(self):64 # Is support for the QT GUI available?65 try:66 pkg_resources.load_entry_point('plover', 'plover.gui', 'qt')67 except (pkg_resources.ResolutionError, ImportError):68 has_gui_qt = False69 else:70 has_gui_qt = True71 # Register available plugins.72 for plugin_type in self.PLUGIN_TYPES:73 if plugin_type.startswith('gui.qt.') and not has_gui_qt:74 continue75 entrypoint_type = f'plover.{plugin_type}'76 for entrypoint in pkg_resources.iter_entry_points(entrypoint_type):77 if 'gui_qt' in entrypoint.extras and not has_gui_qt:78 continue79 self.register_plugin_from_entrypoint(plugin_type, entrypoint)80 if PLUGINS_PLATFORM is None:81 continue82 entrypoint_type = f'plover.{PLUGINS_PLATFORM}.{plugin_type}'83 for entrypoint in pkg_resources.iter_entry_points(entrypoint_type):84 self.register_plugin_from_entrypoint(plugin_type, entrypoint)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful