How to use _GetPredicate method in autotest

Best Python code snippet using autotest_python

DynamicGroupsPlugin.py

Source:DynamicGroupsPlugin.py Github

copy

Full Screen

1##############################################################################2#3# Copyright (c) 2001 Zope Foundation and Contributors4#5# This software is subject to the provisions of the Zope Public License,6# Version 2.1 (ZPL). A copy of the ZPL should accompany this7# distribution.8# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED9# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED10# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS11# FOR A PARTICULAR PURPOSE.12#13##############################################################################14""" Classes: DynamicGroupsPlugin15$Id$16"""17import copy18from Acquisition import aq_inner, aq_parent19from AccessControl import ClassSecurityInfo20from AccessControl.requestmethod import postonly21from OFS.SimpleItem import SimpleItem22from OFS.PropertyManager import PropertyManager23from OFS.Folder import Folder24from OFS.Cache import Cacheable25from App.class_init import InitializeClass26from zope.interface import Interface27from Products.PageTemplates.PageTemplateFile import PageTemplateFile28from Products.PageTemplates.Expressions import getEngine29from Products.PluggableAuthService.interfaces.plugins \30 import IGroupsPlugin31from Products.PluggableAuthService.interfaces.plugins \32 import IGroupEnumerationPlugin33from Products.PluggableAuthService.permissions import ManageGroups34from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin35from Products.PluggableAuthService.utils import createViewName36from Products.PluggableAuthService.utils import classImplements37from Products.PluggableAuthService.utils import csrf_only38class IDynamicGroupsPlugin(Interface):39 """ Marker interface.40 """41manage_addDynamicGroupsPluginForm = PageTemplateFile(42 'www/dgpAdd', globals(), __name__= 'manage_addDynamicGroupsPluginForm' )43def addDynamicGroupsPlugin( dispatcher, id, title='', RESPONSE=None ):44 """ Add a DGP to 'dispatcher'.45 """46 dgp = DynamicGroupsPlugin( id, title )47 dispatcher._setObject( id, dgp )48 if RESPONSE is not None:49 RESPONSE.redirect( '%s/manage_main?manage_tabs_messsage=%s'50 % ( dispatcher.absolute_url()51 , 'DPG+added.'52 )53 )54class DynamicGroupDefinition( SimpleItem, PropertyManager ):55 """ Represent a single dynamic group.56 """57 meta_type = 'Dynamic Group Definition'58 security = ClassSecurityInfo()59 security.declareObjectProtected( ManageGroups )60 _v_compiled = None61 _properties = ( { 'id' : 'id'62 , 'type' : 'string'63 , 'mode' : ''64 }65 , { 'id' : 'predicate'66 , 'type' : 'string'67 , 'mode' : 'w'68 }69 , { 'id' : 'title'70 , 'type' : 'string'71 , 'mode' : 'w'72 }73 , { 'id' : 'description'74 , 'type' : 'text'75 , 'mode' : 'w'76 }77 , { 'id' : 'active'78 , 'type' : 'boolean'79 , 'mode' : 'w'80 }81 )82 def __init__( self, id, predicate, title, description, active ):83 self._setId( id )84 self._setPredicate( predicate )85 self.title = title86 self.description = description87 self.active = bool( active )88 def __call__( self, principal, request=None ):89 """ Evaluate our expression to determine whether 'principal' belongs.90 """91 predicate = self._getPredicate()92 plugin = aq_parent( aq_inner( self ) )93 properties = {}94 for k, v in self.propertyItems():95 properties[ k ] = v96 data = getEngine().getContext( { 'request' : request97 , 'nothing' : None98 , 'principal' : principal99 , 'group' : properties100 , 'plugin' : plugin101 } )102 result = predicate( data )103 if isinstance( result, Exception ):104 raise result105 return result106 security.declarePrivate( '_setPredicate' )107 def _setPredicate( self, predicate ):108 self.predicate = predicate109 if self._v_compiled is not None:110 del self._v_compiled111 security.declarePrivate( '_getPredicate' )112 def _getPredicate( self ):113 if self._v_compiled is None:114 self._v_compiled = getEngine().compile( self.predicate )115 return self._v_compiled116 security.declarePrivate( '_updateProperty' )117 def _updateProperty( self, id, value ):118 if id == 'predicate':119 self._setPredicate( value )120 else:121 PropertyManager._updateProperty( self, id, value )122 #123 # ZMI124 #125 manage_options = ( PropertyManager.manage_options126 + SimpleItem.manage_options127 )128InitializeClass( DynamicGroupDefinition )129class DynamicGroupsPlugin( Folder, BasePlugin, Cacheable ):130 """ Define groups via business rules.131 o Membership in a candidate group is established via a predicate,132 expressed as a TALES expression. Names available to the predicate133 include:134 'group' -- the dynamic group definition object itself135 'plugin' -- this plugin object136 'principal' -- the principal being tested.137 'request' -- the request object.138 """139 meta_type = 'Dynamic Groups Plugin'140 security = ClassSecurityInfo()141 def __init__( self, id, title='' ):142 self._setId( id )143 self.title = title144 #145 # Plugin implementations146 #147 security.declareProtected( ManageGroups, 'getGroupsForPrincipal' )148 def getGroupsForPrincipal( self, principal, request=None ):149 """ See IGroupsPlugin.150 """151 grps = []152 DGD = DynamicGroupDefinition.meta_type153 for group in self.objectValues( DGD ):154 if group.active and group( principal, request ):155 grps.append('%s%s' % (self.prefix, group.getId()))156 return grps157 security.declareProtected( ManageGroups, 'enumerateGroups' )158 def enumerateGroups( self159 , id=None160 , exact_match=False161 , sort_by=None162 , max_results=None163 , **kw164 ):165 """ See IGroupEnumerationPlugin.166 """167 group_info = []168 group_ids = []169 plugin_id = self.getId()170 view_name = createViewName('enumerateGroups', id)171 # Look in the cache first...172 keywords = copy.deepcopy(kw)173 keywords.update( { 'id' : id174 , 'exact_match' : exact_match175 , 'sort_by' : sort_by176 , 'max_results' : max_results177 }178 )179 cached_info = self.ZCacheable_get( view_name=view_name180 , keywords=keywords181 , default=None182 )183 if cached_info is not None:184 return tuple(cached_info)185 if isinstance( id, str ):186 id = [ id ]187 if exact_match and id:188 group_ids.extend( id )189 if group_ids:190 group_filter = None191 else: # Searching192 group_ids = self.listGroupIds()193 group_filter = _DynamicGroupFilter( id, **kw )194 known = self.listGroupIds()195 for group_id in group_ids:196 g_info = self.getGroupInfo(group_id, raise_keyerror=False)197 if g_info is not None:198 url = '/%s/%s/manage_propertiesForm' % (199 self.absolute_url(1), group_id)200 info = {}201 info.update( self.getGroupInfo( group_id ) )202 info[ 'pluginid' ] = plugin_id203 info[ 'properties_url' ] = url204 info[ 'members_url' ] = url205 info[ 'id' ] = '%s%s' % (self.prefix, info['id'])206 if not group_filter or group_filter( info ):207 if info[ 'active' ]:208 group_info.append( info )209 # Put the computed value into the cache210 self.ZCacheable_set(group_info, view_name=view_name, keywords=keywords)211 return tuple( group_info )212 #213 # Housekeeping214 #215 security.declareProtected( ManageGroups, 'listGroupIds' )216 def listGroupIds( self ):217 """ Return a list of IDs for the dynamic groups we manage.218 """219 return self.objectIds( DynamicGroupDefinition.meta_type )220 security.declareProtected( ManageGroups, 'getGroupInfo' )221 def getGroupInfo( self, group_id, raise_keyerror=True ):222 """ Return a mappings describing one dynamic group we manage.223 o If 'raise_keyerror' is True, raise KeyError if we don't have an224 existing group definition for 'group_ id'. Otherwise, return225 None.226 o Keys include:227 'id' -- the group's ID228 'predicate' -- the TALES expression defining group membership229 'active' -- boolean flag: is the group currently active?230 """231 try:232 original = self._getOb( group_id )233 except AttributeError:234 try:235 original = self._getOb( group_id[len(self.prefix):] )236 except AttributeError:237 if raise_keyerror:238 raise KeyError, group_id239 else:240 return None241 if not isinstance( original, DynamicGroupDefinition ):242 if raise_keyerror:243 raise KeyError, group_id244 else:245 return None246 info = {}247 for k, v in original.propertyItems():248 info[ k ] = v249 return info250 security.declareProtected( ManageGroups, 'listGroupInfo' )251 def listGroupInfo( self ):252 """ Return a list of mappings describing the dynamic groups we manage.253 o Keys include:254 'id' -- the group's ID255 'predicate' -- the TALES expression defining group membership256 'active' -- boolean flag: is the group currently active?257 """258 return [ self.getGroupInfo( x ) for x in self.listGroupIds() ]259 security.declarePrivate( 'addGroup' )260 def addGroup( self261 , group_id262 , predicate263 , title=''264 , description=''265 , active=True266 ):267 """ Add a group definition.268 o Raise KeyError if we have an existing group definition269 for 'group_id'.270 """271 if group_id in self.listGroupIds():272 raise KeyError, 'Duplicate group ID: %s' % group_id273 info = DynamicGroupDefinition( group_id274 , predicate275 , title276 , description277 , active278 )279 self._setObject( group_id, info )280 # This method changes the enumerateGroups return value281 view_name = createViewName('enumerateGroups')282 self.ZCacheable_invalidate(view_name=view_name)283 security.declarePrivate( 'updateGroup' )284 def updateGroup( self285 , group_id286 , predicate287 , title=None288 , description=None289 , active=None290 ):291 """ Update a group definition.292 o Raise KeyError if we don't have an existing group definition293 for 'group_id'.294 o Don't update 'title', 'description', or 'active' unless supplied.295 """296 if group_id not in self.listGroupIds():297 raise KeyError, 'Invalid group ID: %s' % group_id298 group = self._getOb( group_id )299 group._setPredicate( predicate )300 if title is not None:301 group.title = title302 if description is not None:303 group.description = description304 if active is not None:305 group.active = active306 # This method changes the enumerateGroups return value307 view_name = createViewName('enumerateGroups')308 self.ZCacheable_invalidate(view_name=view_name)309 view_name = createViewName('enumerateGroups', group_id)310 self.ZCacheable_invalidate(view_name=view_name)311 security.declarePrivate( 'removeGroup' )312 def removeGroup( self, group_id ):313 """ Remove a group definition.314 o Raise KeyError if we don't have an existing group definition315 for 'group_id'.316 """317 if group_id not in self.listGroupIds():318 raise KeyError, 'Invalid group ID: %s' % group_id319 self._delObject( group_id )320 # This method changes the enumerateGroups return value321 view_name = createViewName('enumerateGroups')322 self.ZCacheable_invalidate(view_name=view_name)323 view_name = createViewName('enumerateGroups', group_id)324 self.ZCacheable_invalidate(view_name=view_name)325 #326 # ZMI327 #328 manage_options = ( ( { 'label' : 'Groups'329 , 'action' : 'manage_groups'330 }331 ,332 )333 + Folder.manage_options[:1]334 + BasePlugin.manage_options[:1]335 + Folder.manage_options[1:]336 + Cacheable.manage_options337 )338 manage_groups = PageTemplateFile( 'www/dgpGroups'339 , globals()340 , __name__='manage_groups'341 )342 security.declareProtected( ManageGroups, 'manage_addGroup' )343 @csrf_only344 @postonly345 def manage_addGroup( self346 , group_id347 , title348 , description349 , predicate350 , active=True351 , RESPONSE=None352 , REQUEST=None353 ):354 """ Add a group via the ZMI.355 """356 self.addGroup( group_id357 , predicate358 , title359 , description360 , active361 )362 message = 'Group+%s+added' % group_id363 if RESPONSE is not None:364 RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'365 % ( self.absolute_url(), message )366 )367 security.declareProtected( ManageGroups, 'manage_updateGroup' )368 @csrf_only369 @postonly370 def manage_updateGroup( self371 , group_id372 , predicate373 , title=None374 , description=None375 , active=True376 , RESPONSE=None377 , REQUEST=None378 ):379 """ Update a group via the ZMI.380 """381 self.updateGroup( group_id382 , predicate383 , title384 , description385 , active386 )387 message = 'Group+%s+updated' % group_id388 if RESPONSE is not None:389 RESPONSE.redirect( ( '%s/manage_groups?group_id=%s&'390 + 'manage_tabs_message=%s'391 ) % ( self.absolute_url(), group_id, message )392 )393 security.declareProtected( ManageGroups, 'manage_removeGroups' )394 @csrf_only395 @postonly396 def manage_removeGroups( self397 , group_ids398 , RESPONSE=None399 , REQUEST=None400 ):401 """ Remove one or more groups via the ZMI.402 """403 group_ids = filter( None, group_ids )404 if not group_ids:405 message = 'no+groups+selected'406 else:407 for group_id in group_ids:408 self.removeGroup( group_id )409 message = 'Groups+removed'410 if RESPONSE is not None:411 RESPONSE.redirect( '%s/manage_groups?manage_tabs_message=%s'412 % ( self.absolute_url(), message )413 )414classImplements( DynamicGroupsPlugin415 , IDynamicGroupsPlugin416 , IGroupsPlugin417 , IGroupEnumerationPlugin418 )419InitializeClass( DynamicGroupsPlugin )420class _DynamicGroupFilter:421 def __init__( self422 , id=None423 , **kw424 ):425 self._filter_ids = id426 def __call__( self, group_info ):427 if self._filter_ids:428 key = 'id'429 else:430 return 1 # TODO: try using 'kw'431 value = group_info.get( key )432 if not value:433 return 0434 for id in self._filter_ids:435 if value.find( id ) >= 0:436 return 1...

Full Screen

Full Screen

Xmp.py

Source:Xmp.py Github

copy

Full Screen

1# __BEGIN_LICENSE__2#Copyright (c) 2015, United States Government, as represented by the 3#Administrator of the National Aeronautics and Space Administration. 4#All rights reserved.5# __END_LICENSE__6import sys7import os8import re9from cStringIO import StringIO10import tempfile11import traceback12import rdflib13from rdflib.Graph import Graph14import iso860115import pytz16import PIL17from django.conf import settings18IMAGE_EXTENSIONS = ('.jpg', '.jpeg', '.png')19def findImageName(xmpName):20 base = os.path.splitext(xmpName)[0]21 for ext in IMAGE_EXTENSIONS:22 if os.path.exists(base + ext):23 return base + ext24 raise Exception('no image corresponding to xmp file %s' % xmpName)25class Xmp(object):26 def __init__(self, fname):27 self.graph = Graph()28 if os.path.splitext(fname)[1].lower() in IMAGE_EXTENSIONS:29 self.parseImageHeader(fname)30 else:31 self.parseXmp(fname)32 self.xmpName = fname33 def parseImageHeader(self, fname):34 fd, xmpFname = tempfile.mkstemp('-parseImageHeader.xmp')35 os.close(fd)36 os.system('exiftool -fast -tagsfromfile %s "-all>xmp:all" "-xmp:all>xmp:all" %s'37 % (fname, xmpFname))38 self.parseXmp(xmpFname)39 try:40 os.unlink(xmpFname)41 except OSError:42 traceback.print_exc()43 print >> sys.stderr, '[parseImageHeader: could not delete %s]' % xmpFname44 def parseXmp(self, xmpFile):45 xmp = file(xmpFile, 'r').read()46 match = re.search('<rdf:RDF.*</rdf:RDF>', xmp, re.DOTALL)47 xmp = match.group(0)48 self.graph.parse(StringIO(xmp))49 def _getPredicate(self, field):50 prefix, attr = field.split(':', 1)51 nsuri = self.graph.namespace_manager.store.namespace(prefix)52 if nsuri is None:53 return None54 else:55 return rdflib.URIRef(nsuri + attr)56 def get(self, field, dflt='ERROR'):57 subject = rdflib.URIRef('')58 predicate = self._getPredicate(field)59 value = self.graph.value(subject, predicate, None)60 if value is None:61 if dflt == 'ERROR':62 raise KeyError(field)63 else:64 return dflt65 else:66 return str(value)67 def getDegMin(self, field, dirValues):68 val = self.get(field, None)69 if val is None:70 return None71 degMin = val[:-1]72 degS, minS = degMin.split(',')73 deg = float(degS)74 minutes = float(minS)75 dirS = val[-1]76 if dirS == dirValues[0]:77 sign = 178 elif dirS == dirValues[1]:79 sign = -180 else:81 raise ValueError('expected dir in %s, got %s' % (dirValues, dirS))82 return sign * (deg + minutes / 60.)83 @staticmethod84 def getRational(s):85 m = re.search(r'(-?\d+)/(-?\d+)', s)86 if m:87 num = int(m.group(1))88 denom = int(m.group(2))89 if denom == 0:90 return None91 else:92 return float(num) / denom93 else:94 return float(s)95 @staticmethod96 def normalizeYaw(yaw, yawRef):97 '''Assumes yaw is a float, a string representation of a float, or None.98 Values 0 and -999 get mapped to None.'''99 if yaw is not None and not isinstance(yaw, float):100 yaw = Xmp.getRational(yaw)101 yaw = Xmp.checkMissing(yaw)102 yawRef = Xmp.checkMissing(yawRef)103 if yaw is None:104 return (None, '')105 # todo: correct for magnetic declination here106 # (if yawRef == 'M', apply correction and set107 # yawRef to 'T')108 if yaw < 0:109 yaw = yaw + 360110 elif yaw > 360:111 yaw = yaw - 360112 return (yaw, yawRef)113 def getYaw(self):114 yawStr = self.get('exif:GPSImgDirection', None)115 yawRefStr = self.get('exif:GPSImgDirectionRef', None)116 return self.normalizeYaw(yawStr, yawRefStr)117 @staticmethod118 def normalizeAltitude(altitude, altitudeRef):119 '''Assumes yaw is a float, a string representation of a float, or None.120 Values 0 and -999 get mapped to None.'''121 if altitude is not None and not isinstance(altitude, float):122 altitude = Xmp.getRational(altitude)123 altitude = Xmp.checkMissing(altitude)124 altitudeRef = Xmp.checkMissing(altitudeRef)125 if altitude is not None and abs(altitude) > 20000:126 # Ricoh Caplio sometimes outputs huge bogus altitude values127 altitude = None128 if altitude is None:129 return (altitude, '')130 # to do: possible transforms here131 return (altitude, altitudeRef)132 def getAltitude(self):133 altitudeStr = self.get('exif:GPSAltitude', None)134 altitudeRefStr = 'S' # EXIF always uses sea-level as reference135 return self.normalizeAltitude(altitudeStr, altitudeRefStr)136 @staticmethod137 def checkMissing(val):138 if val in (0, -999, ''):139 return None140 else:141 return val142 def getTime(self, field):143 timeStr = self.get('exif:DateTimeOriginal', None)144 if timeStr is None:145 return None146 t = iso8601.parse_date(timeStr,147 default_timezone=pytz.timezone(settings.TIME_ZONE))148 return t.replace(tzinfo=None) - t.utcoffset() # normalize to utc149 def getDict(self):150 timestamp = self.getTime('exif:DateTimeOriginal')151 lat = self.checkMissing(self.getDegMin('exif:GPSLatitude', 'NS'))152 lon = self.checkMissing(self.getDegMin('exif:GPSLongitude', 'EW'))153 yaw, yawRef = self.getYaw()154 altitude, altitudeRef = self.getAltitude()155 widthStr = self.get('tiff:ImageWidth', None)156 heightStr = self.get('tiff:ImageLength', None)157 if widthStr is not None and heightStr is not None:158 widthPixels, heightPixels = int(widthStr), int(heightStr)159 else:160 im = PIL.Image.open(findImageName(self.xmpName))161 widthPixels, heightPixels = im.size162 del im163 vals0 = dict(timestamp=timestamp,164 latitude=lat,165 longitude=lon,166 yaw=yaw,167 yawRef=yawRef,168 altitude=altitude,169 altitudeRef=altitudeRef,170 widthPixels=widthPixels,171 heightPixels=heightPixels)172 return dict([(k, v) for k, v in vals0.iteritems()173 if self.checkMissing(v) is not None])174 def copyToPlacemark(self, td):175 vals = self.getDict()176 for k, v in vals.iteritems():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful