Best Python code snippet using fMBT_python
pythonscript.py
Source:pythonscript.py  
1# XScript implementation for python2import uno3import unohelper4import sys5import os6import imp7import time89class LogLevel:10    NONE = 011    ERROR = 112    DEBUG = 21314# Configuration ----------------------------------------------------15LogLevel.use = LogLevel.NONE                # production level16#LogLevel.use = LogLevel.ERROR               # for script developers17#LogLevel.use = LogLevel.DEBUG               # for script framework developers18LOG_STDOUT = True                           # True, writes to stdout (difficult on windows)19                                            # False, writes to user/Scripts/python/log.txt20ENABLE_EDIT_DIALOG=False                    # offers a minimal editor for editing.21#-------------------------------------------------------------------2223def encfile(uni):24    return uni.encode( sys.getfilesystemencoding())2526def lastException2String():27    (excType,excInstance,excTraceback) = sys.exc_info()28    ret = str(excType) + ": "+str(excInstance) + "\n" + \29          uno._uno_extract_printable_stacktrace( excTraceback )30    return ret3132def logLevel2String( level ):33    ret = " NONE"34    if level == LogLevel.ERROR:35        ret = "ERROR"36    elif level >= LogLevel.DEBUG:37        ret = "DEBUG"38    return ret3940def getLogTarget():41    ret = sys.stdout42    if not LOG_STDOUT:43        try:44            pathSubst = uno.getComponentContext().ServiceManager.createInstance(45                "com.sun.star.util.PathSubstitution" )46            userInstallation =  pathSubst.getSubstituteVariableValue( "user" )47            if len( userInstallation ) > 0:48                systemPath = uno.fileUrlToSystemPath( userInstallation + "/Scripts/python/log.txt" )49                ret = file( systemPath , "a" )50        except Exception,e:51            print "Exception during creation of pythonscript logfile: "+ lastException2String() + "\n, delagating log to stdout\n"52    return ret53  54class Logger(LogLevel):55    def __init__(self , target ):56        self.target = target5758    def isDebugLevel( self ):59        return self.use >= self.DEBUG60    61    def debug( self, msg ):62        if self.isDebugLevel():63            self.log( self.DEBUG, msg )64    65    def isErrorLevel( self ):66        return self.use >= self.ERROR6768    def error( self, msg ):69        if self.isErrorLevel():70            self.log( self.ERROR, msg )7172    def log( self, level, msg ):73        if self.use >= level:74            try:75                self.target.write(76                    time.asctime() +77                    " [" +78                    logLevel2String( level ) +79                    "] " +80                    encfile(msg) +81                    "\n" )82                self.target.flush()83            except Exception,e:84                print "Error during writing to stdout: " +lastException2String() + "\n"8586log = Logger( getLogTarget() )8788log.debug( "pythonscript loading" )8990#from com.sun.star.lang import typeOfXServiceInfo, typeOfXTypeProvider91from com.sun.star.uno import RuntimeException92from com.sun.star.lang import XServiceInfo93from com.sun.star.io import IOException94from com.sun.star.ucb import CommandAbortedException, XCommandEnvironment, XProgressHandler95from com.sun.star.task import XInteractionHandler96from com.sun.star.beans import XPropertySet97from com.sun.star.container import XNameContainer98from com.sun.star.xml.sax import XDocumentHandler, InputSource99from com.sun.star.uno import Exception as UnoException100from com.sun.star.script import XInvocation101from com.sun.star.awt import XActionListener102103from com.sun.star.script.provider import XScriptProvider, XScript, XScriptContext, ScriptFrameworkErrorException104from com.sun.star.script.browse import XBrowseNode105from com.sun.star.script.browse.BrowseNodeTypes import SCRIPT, CONTAINER, ROOT106from com.sun.star.util import XModifyListener107108LANGUAGENAME = "Python"109GLOBAL_SCRIPTCONTEXT_NAME = "XSCRIPTCONTEXT"110CALLABLE_CONTAINER_NAME =  "g_exportedScripts"111112# pythonloader looks for a static g_ImplementationHelper variable113g_ImplementationHelper = unohelper.ImplementationHelper()114g_implName = "org.openoffice.pyuno.LanguageScriptProviderFor"+LANGUAGENAME115116117118BLOCK_SIZE = 65536119def readTextFromStream( inputStream ):120    # read the file121    code = uno.ByteSequence( "" )122    while True:123        read,out = inputStream.readBytes( None , BLOCK_SIZE )124        code = code + out125        if read < BLOCK_SIZE:126           break127    return code.value128    129def toIniName( str ):130    # TODO: what is the official way to get to know whether i am on the windows platform ?131    if( hasattr(sys , "dllhandle") ):132        return str + ".ini"133    return str + "rc"134135136""" definition: storageURI is the system dependent, absolute file url, where the script is stored on disk137                scriptURI is the system independent uri138"""139class MyUriHelper:140141    def __init__( self, ctx, location ):142        self.s_UriMap = \143        { "share" : "vnd.sun.star.expand:${$BRAND_BASE_DIR/program/" +  toIniName( "bootstrap") + "::BaseInstallation}/share/Scripts/python" , \144          "share:uno_packages" : "vnd.sun.star.expand:$UNO_SHARED_PACKAGES_CACHE/uno_packages", \145          "user" : "vnd.sun.star.expand:${$BRAND_BASE_DIR/program/" + toIniName( "bootstrap") + "::UserInstallation}/user/Scripts/python" , \146          "user:uno_packages" : "vnd.sun.star.expand:$UNO_USER_PACKAGES_CACHE/uno_packages" } 147        self.m_uriRefFac = ctx.ServiceManager.createInstanceWithContext("com.sun.star.uri.UriReferenceFactory",ctx)148        if location.startswith( "vnd.sun.star.tdoc" ):149            self.m_baseUri = location + "/Scripts/python"150            self.m_scriptUriLocation = "document"151        else:152            self.m_baseUri = expandUri( self.s_UriMap[location] )153            self.m_scriptUriLocation = location154        log.isDebugLevel() and log.debug( "initialized urihelper with baseUri="+self.m_baseUri + ",m_scriptUriLocation="+self.m_scriptUriLocation )155        156    def getRootStorageURI( self ):157        return self.m_baseUri158    159    def getStorageURI( self, scriptURI ):160        return self.scriptURI2StorageUri(scriptURI)161162    def getScriptURI( self, storageURI ):163        return self.storageURI2ScriptUri(storageURI)164165    def storageURI2ScriptUri( self, storageURI ):166        if not storageURI.startswith( self.m_baseUri ):167            message = "pythonscript: storage uri '" + storageURI + "' not in base uri '" + self.m_baseUri + "'"168            log.isDebugLevel() and log.debug( message )169            raise RuntimeException( message )170171        ret = "vnd.sun.star.script:" + \172              storageURI[len(self.m_baseUri)+1:].replace("/","|") + \173              "?language=" + LANGUAGENAME + "&location=" + self.m_scriptUriLocation174        log.isDebugLevel() and log.debug( "converting storageURI="+storageURI + " to scriptURI=" + ret )175        return ret176    177    def scriptURI2StorageUri( self, scriptURI ):178        try:179            myUri = self.m_uriRefFac.parse(scriptURI)180            ret = self.m_baseUri + "/" + myUri.getName().replace( "|", "/" )181            log.isDebugLevel() and log.debug( "converting scriptURI="+scriptURI + " to storageURI=" + ret )182            return ret183        except UnoException, e:184            log.error( "error during converting scriptURI="+scriptURI + ": " + e.Message)185            raise RuntimeException( "pythonscript:scriptURI2StorageUri: " +e.getMessage(), None )186        except Exception, e:187            log.error( "error during converting scriptURI="+scriptURI + ": " + str(e))188            raise RuntimeException( "pythonscript:scriptURI2StorageUri: " + str(e), None )189        190191class ModuleEntry:192    def __init__( self, lastRead, module ):193        self.lastRead = lastRead194        self.module = module195196def hasChanged( oldDate, newDate ):197    return newDate.Year > oldDate.Year or \198           newDate.Month > oldDate.Month or \199           newDate.Day > oldDate.Day or \200           newDate.Hours > oldDate.Hours or \201           newDate.Minutes > oldDate.Minutes or \202           newDate.Seconds > oldDate.Seconds or \203           newDate.HundredthSeconds > oldDate.HundredthSeconds204205def ensureSourceState( code ):206    if not code.endswith( "\n" ):207        code = code + "\n"208    code = code.replace( "\r", "" )209    return code210211212def checkForPythonPathBesideScript( url ):213    if url.startswith( "file:" ):214        path = unohelper.fileUrlToSystemPath( url+"/pythonpath.zip" );215        log.log( LogLevel.DEBUG,  "checking for existence of " + path )216        if 1 == os.access( encfile(path), os.F_OK) and not path in sys.path:217            log.log( LogLevel.DEBUG, "adding " + path + " to sys.path" )218            sys.path.append( path )219220        path = unohelper.fileUrlToSystemPath( url+"/pythonpath" );221        log.log( LogLevel.DEBUG,  "checking for existence of " + path )222        if 1 == os.access( encfile(path), os.F_OK) and not path in sys.path:223            log.log( LogLevel.DEBUG, "adding " + path + " to sys.path" )224            sys.path.append( path )225        226    227class ScriptContext(unohelper.Base):228    def __init__( self, ctx, doc ):229        self.ctx = ctx230        self.doc = doc231       232   # XScriptContext233    def getDocument(self):234        return self.getDesktop().getCurrentComponent()235236    def getDesktop(self):237        return self.ctx.ServiceManager.createInstanceWithContext(238            "com.sun.star.frame.Desktop", self.ctx )239240    def getComponentContext(self):241        return self.ctx242243#----------------------------------244# Global Module Administration245# does not fit together with script246# engine lifetime management247#----------------------------------248#g_scriptContext = ScriptContext( uno.getComponentContext(), None )249#g_modules = {}250#def getModuleByUrl( url, sfa ):251#    entry =  g_modules.get(url)252#    load = True253#    lastRead = sfa.getDateTimeModified( url )254#    if entry:255#        if hasChanged( entry.lastRead, lastRead ):256#            log.isDebugLevel() and log.debug("file " + url + " has changed, reloading")257#        else:258#            load = False259#            260#    if load:261#        log.isDebugLevel() and log.debug( "opening >" + url + "<" )262#263#        code = readTextFromStream( sfa.openFileRead( url ) )264            265        # execute the module266#        entry = ModuleEntry( lastRead, imp.new_module("ooo_script_framework") )267#        entry.module.__dict__[GLOBAL_SCRIPTCONTEXT_NAME] = g_scriptContext268#        entry.module.__file__ = url269#        exec code in entry.module.__dict__270#        g_modules[ url ] = entry271#        log.isDebugLevel() and log.debug( "mapped " + url + " to " + str( entry.module ) )272#    return entry.module273274class ProviderContext:275    def __init__( self, storageType, sfa, uriHelper, scriptContext ):276        self.storageType = storageType277        self.sfa = sfa278        self.uriHelper = uriHelper279        self.scriptContext = scriptContext280        self.modules = {}281        self.rootUrl = None282        self.mapPackageName2Path = None283284    def getTransientPartFromUrl( self, url ):285        rest = url.replace( self.rootUrl , "",1 ).replace( "/","",1)286        return rest[0:rest.find("/")]287    288    def getPackageNameFromUrl( self, url ):289        rest = url.replace( self.rootUrl , "",1 ).replace( "/","",1)290        start = rest.find("/") +1291        return rest[start:rest.find("/",start)]292        293        294    def removePackageByUrl( self, url ):295        items = self.mapPackageName2Path.items()296        for i in items:297            if url in i[1].pathes:298                self.mapPackageName2Path.pop(i[0])299                break300301    def addPackageByUrl( self, url ):302        packageName = self.getPackageNameFromUrl( url )303        transientPart = self.getTransientPartFromUrl( url )304        log.isDebugLevel() and log.debug( "addPackageByUrl : " + packageName + ", " + transientPart + "("+url+")" + ", rootUrl="+self.rootUrl )305        if self.mapPackageName2Path.has_key( packageName ):306            package = self.mapPackageName2Path[ packageName ]307            package.pathes = package.pathes + (url, )308        else:309            package = Package( (url,), transientPart)310            self.mapPackageName2Path[ packageName ] = package311    312    def isUrlInPackage( self, url ):313        values = self.mapPackageName2Path.values()314        for i in values:315#	    print "checking " + url + " in " + str(i.pathes)316            if url in i.pathes:317               return True318#        print "false"319        return False320            321    def setPackageAttributes( self, mapPackageName2Path, rootUrl ):322        self.mapPackageName2Path = mapPackageName2Path323        self.rootUrl = rootUrl324        325    def getPersistentUrlFromStorageUrl( self, url ):326        # package name is the second directory327        ret = url328        if self.rootUrl:329            pos = len( self.rootUrl) +1330            ret = url[0:pos]+url[url.find("/",pos)+1:len(url)]331        log.isDebugLevel() and log.debug( "getPersistentUrlFromStorageUrl " + url +  " -> "+ ret)332        return ret333334    def getStorageUrlFromPersistentUrl( self, url):335        ret = url336        if self.rootUrl:337            pos = len(self.rootUrl)+1338            packageName = url[pos:url.find("/",pos+1)]339            package = self.mapPackageName2Path[ packageName ]340            ret = url[0:pos]+ package.transientPathElement + "/" + url[pos:len(url)]341        log.isDebugLevel() and log.debug( "getStorageUrlFromPersistentUrl " + url + " -> "+ ret)342        return ret343    344    def getModuleByUrl( self, url ):345        entry =  self.modules.get(url)346        load = True347        lastRead = self.sfa.getDateTimeModified( url )348        if entry:349            if hasChanged( entry.lastRead, lastRead ):350                log.isDebugLevel() and log.debug( "file " + url + " has changed, reloading" )351            else:352                load = False353                354        if load:355            log.isDebugLevel() and log.debug( "opening >" + url + "<" )356            357            src = readTextFromStream( self.sfa.openFileRead( url ) )358            checkForPythonPathBesideScript( url[0:url.rfind('/')] )359            src = ensureSourceState( src )            360            361            # execute the module362            entry = ModuleEntry( lastRead, imp.new_module("ooo_script_framework") )363            entry.module.__dict__[GLOBAL_SCRIPTCONTEXT_NAME] = self.scriptContext364365            code = None366            if url.startswith( "file:" ):367                code = compile( src, encfile(uno.fileUrlToSystemPath( url ) ), "exec" )368            else:369                code = compile( src, url, "exec" )370            exec code in entry.module.__dict__371            entry.module.__file__ = url372            self.modules[ url ] = entry373            log.isDebugLevel() and log.debug( "mapped " + url + " to " + str( entry.module ) )374        return  entry.module375        376#--------------------------------------------------377def isScript( candidate ):378    ret = False379    if isinstance( candidate, type(isScript) ):380        ret = True381    return ret382    383#-------------------------------------------------------384class ScriptBrowseNode( unohelper.Base, XBrowseNode , XPropertySet, XInvocation, XActionListener ):385    def __init__( self, provCtx, uri, fileName, funcName, func ):386        self.fileName = fileName387        self.funcName = funcName388        self.provCtx = provCtx389        self.func = func390        self.uri = uri391        392    def getName( self ):393        return self.funcName394395    def getChildNodes(self):396        return ()397398    def hasChildNodes(self):399        return False400    401    def getType( self):402        return SCRIPT403404    def getPropertyValue( self, name ):405        ret = None406        try:407            if name == "URI":408                ret = self.provCtx.uriHelper.getScriptURI(409                    self.provCtx.getPersistentUrlFromStorageUrl( self.uri + "$" + self.funcName ) )410            elif name == "Description":411                ret = getattr( self.func, "__doc__", None )412            elif name == "Editable" and ENABLE_EDIT_DIALOG:413                ret = not self.provCtx.sfa.isReadOnly( self.uri )414        415            log.isDebugLevel() and log.debug( "ScriptBrowseNode.getPropertyValue called for " + name + ", returning " + str(ret) )416        except Exception,e:417            log.error( "ScriptBrowseNode.getPropertyValue error " + lastException2String())418            raise419                                              420        return ret421    def setPropertyValue( self, name, value ):422        log.isDebugLevel() and log.debug( "ScriptBrowseNode.setPropertyValue called " + name + "=" +str(value ) )423    def getPropertySetInfo( self ):424        log.isDebugLevel() and log.debug( "ScriptBrowseNode.getPropertySetInfo called "  )425        return None426               427    def getIntrospection( self ):428        return None429430    def invoke( self, name, params, outparamindex, outparams ):431        if name == "Editable":432            servicename = "com.sun.star.awt.DialogProvider"433            ctx = self.provCtx.scriptContext.getComponentContext()434            dlgprov = ctx.ServiceManager.createInstanceWithContext(435                servicename, ctx )436437            self.editor = dlgprov.createDialog(438                "vnd.sun.star.script:" +439                "ScriptBindingLibrary.MacroEditor?location=application")440441            code = readTextFromStream(self.provCtx.sfa.openFileRead(self.uri))442            code = ensureSourceState( code )443            self.editor.getControl("EditorTextField").setText(code)444445            self.editor.getControl("RunButton").setActionCommand("Run")446            self.editor.getControl("RunButton").addActionListener(self)447            self.editor.getControl("SaveButton").setActionCommand("Save")448            self.editor.getControl("SaveButton").addActionListener(self)449450            self.editor.execute()451452        return None453454    def actionPerformed( self, event ):455        try:456            if event.ActionCommand == "Run":457                code = self.editor.getControl("EditorTextField").getText()458                code = ensureSourceState( code )459                mod = imp.new_module("ooo_script_framework")460                mod.__dict__[GLOBAL_SCRIPTCONTEXT_NAME] = self.provCtx.scriptContext461                exec code in mod.__dict__462                values = mod.__dict__.get( CALLABLE_CONTAINER_NAME , None )463                if not values:464                    values = mod.__dict__.values()465                    466                for i in values:467                    if isScript( i ):468                        i()469                        break470                    471            elif event.ActionCommand == "Save":472                toWrite = uno.ByteSequence(473                    str(474                    self.editor.getControl("EditorTextField").getText().encode(475                    sys.getdefaultencoding())) )476                copyUrl = self.uri + ".orig"477                self.provCtx.sfa.move( self.uri, copyUrl )478                out = self.provCtx.sfa.openFileWrite( self.uri )479                out.writeBytes( toWrite )480                out.close()481                self.provCtx.sfa.kill( copyUrl )482#                log.isDebugLevel() and log.debug("Save is not implemented yet")483#                text = self.editor.getControl("EditorTextField").getText()484#                log.isDebugLevel() and log.debug("Would save: " + text)485        except Exception,e:486            # TODO: add an error box here !487            log.error( lastException2String() )488            489490    def setValue( self, name, value ):491        return None492493    def getValue( self, name ):494        return None495496    def hasMethod( self, name ):497        return False498499    def hasProperty( self, name ):500        return False501502    503#-------------------------------------------------------504class FileBrowseNode( unohelper.Base, XBrowseNode ):505    def __init__( self, provCtx, uri , name ):506        self.provCtx = provCtx507        self.uri = uri508        self.name = name509        self.module = None510        511    def getName( self ):512        return self.name513 514    def getChildNodes(self):515        ret = ()516        try:517            self.module = self.provCtx.getModuleByUrl( self.uri )518            values = self.module.__dict__.get( CALLABLE_CONTAINER_NAME , None )519            520            # no g_exportedScripts, export every function521            if not isinstance(values, type(())):522                values = self.module.__dict__.values()523                    524            scriptNodeList = []525            for i in values:526                if isScript( i ):527                    scriptNodeList.append(528                        ScriptBrowseNode(529                        self.provCtx, self.uri, self.name, i.__name__, i  ))530            ret = tuple( scriptNodeList )531            # must compile  !532            log.isDebugLevel() and log.debug( "returning " +str(len(ret)) + " ScriptChildNodes on " + self.uri )533        except Exception, e:534            text = lastException2String()535            log.error( "Error while evaluating " + self.uri + ":" + text )536            raise537        return ret538539    def hasChildNodes(self):540        try:541            return len(self.getChildNodes()) > 0542        except Exception, e:543            return False544    545    def getType( self):546        return CONTAINER547548        549550class DirBrowseNode( unohelper.Base, XBrowseNode ):551    def __init__( self, provCtx, name, rootUrl ):552        self.provCtx = provCtx553        self.name = name554        self.rootUrl = rootUrl555556    def getName( self ):557        return self.name558559    def getChildNodes( self ):560        try:561            log.isDebugLevel() and log.debug( "DirBrowseNode.getChildNodes called for " + self.rootUrl )562            contents = self.provCtx.sfa.getFolderContents( self.rootUrl, True )563            browseNodeList = []564            for i in contents:565                if i.endswith( ".py" ):566                    log.isDebugLevel() and log.debug( "adding filenode " + i )567                    browseNodeList.append(568                        FileBrowseNode( self.provCtx, i, i[i.rfind("/")+1:len(i)-3] ) )569                elif self.provCtx.sfa.isFolder( i ) and not i.endswith("/pythonpath"):570                    log.isDebugLevel() and log.debug( "adding DirBrowseNode " + i )571                    browseNodeList.append( DirBrowseNode( self.provCtx, i[i.rfind("/")+1:len(i)],i))572            return tuple( browseNodeList )573        except Exception, e:574            text = lastException2String()575            log.error( "DirBrowseNode error: " + str(e) + " while evaluating " + self.rootUrl)576            log.error( text)577            return ()578579    def hasChildNodes( self ):580        return True581582    def getType( self ):583        return CONTAINER584585    def getScript( self, uri ):586        log.debug( "DirBrowseNode getScript " + uri + " invoked" )587        raise IllegalArgumentException( "DirBrowseNode couldn't instantiate script " + uri , self , 0 )588589590class ManifestHandler( XDocumentHandler, unohelper.Base ):591    def __init__( self, rootUrl ):592        self.rootUrl = rootUrl593        594    def startDocument( self ):595        self.urlList = []596        597    def endDocument( self ):598        pass599        600    def startElement( self , name, attlist):601        if name == "manifest:file-entry":602            if attlist.getValueByName( "manifest:media-type" ) == "application/vnd.sun.star.framework-script":603                self.urlList.append(604                    self.rootUrl + "/" + attlist.getValueByName( "manifest:full-path" ) )605606    def endElement( self, name ):607        pass608609    def characters ( self, chars ):610        pass611612    def ignoreableWhitespace( self, chars ):613        pass614615    def setDocumentLocator( self, locator ):616        pass617618def isPyFileInPath( sfa, path ):619    ret = False620    contents = sfa.getFolderContents( path, True )621    for i in contents:622        if sfa.isFolder(i):623            ret = isPyFileInPath(sfa,i)624        else:625            if i.endswith(".py"):626                ret = True627        if ret:628            break629    return ret630631# extracts META-INF directory from 632def getPathesFromPackage( rootUrl, sfa ):633    ret = ()634    try:635        fileUrl = rootUrl + "/META-INF/manifest.xml" 636        inputStream = sfa.openFileRead( fileUrl )637        parser = uno.getComponentContext().ServiceManager.createInstance( "com.sun.star.xml.sax.Parser" )638        handler = ManifestHandler( rootUrl )639        parser.setDocumentHandler( handler )640        parser.parseStream( InputSource( inputStream , "", fileUrl, fileUrl ) )641        for i in tuple(handler.urlList):642            if not isPyFileInPath( sfa, i ):643                handler.urlList.remove(i)644        ret = tuple( handler.urlList )645    except UnoException, e:646        text = lastException2String()647        log.debug( "getPathesFromPackage " + fileUrl + " Exception: " +text )648        pass649    return ret650    651652class Package:653    def __init__( self, pathes, transientPathElement ):654        self.pathes = pathes655        self.transientPathElement = transientPathElement656657class DummyInteractionHandler( unohelper.Base, XInteractionHandler ):658    def __init__( self ):659        pass660    def handle( self, event):661        log.isDebugLevel() and log.debug( "pythonscript: DummyInteractionHandler.handle " + str( event ) )662663class DummyProgressHandler( unohelper.Base, XProgressHandler ):664    def __init__( self ):665        pass666    667    def push( self,status ): 668        log.isDebugLevel() and log.debug( "pythonscript: DummyProgressHandler.push " + str( status ) )669    def update( self,status ): 670        log.isDebugLevel() and log.debug( "pythonscript: DummyProgressHandler.update " + str( status ) )671    def pop( self ): 672        log.isDebugLevel() and log.debug( "pythonscript: DummyProgressHandler.push " + str( event ) )673674class CommandEnvironment(unohelper.Base, XCommandEnvironment):675    def __init__( self ):676        self.progressHandler = DummyProgressHandler()677        self.interactionHandler = DummyInteractionHandler()678    def getInteractionHandler( self ):679        return self.interactionHandler680    def getProgressHandler( self ):681        return self.progressHandler682683#maybe useful for debugging purposes684#class ModifyListener( unohelper.Base, XModifyListener ):685#    def __init__( self ):686#        pass687#    def modified( self, event ):688#        log.isDebugLevel() and log.debug( "pythonscript: ModifyListener.modified " + str( event ) )689#    def disposing( self, event ):690#        log.isDebugLevel() and log.debug( "pythonscript: ModifyListener.disposing " + str( event ) )691    692def mapStorageType2PackageContext( storageType ):693    ret = storageType694    if( storageType == "share:uno_packages" ):695        ret = "shared"696    if( storageType == "user:uno_packages" ):697        ret = "user"698    return ret699700def getPackageName2PathMap( sfa, storageType ):701    ret = {}702    packageManagerFactory = uno.getComponentContext().getValueByName(703        "/singletons/com.sun.star.deployment.thePackageManagerFactory" )704    packageManager = packageManagerFactory.getPackageManager(705        mapStorageType2PackageContext(storageType))706#    packageManager.addModifyListener( ModifyListener() )707    log.isDebugLevel() and log.debug( "pythonscript: getPackageName2PathMap start getDeployedPackages" )708    packages = packageManager.getDeployedPackages(709        packageManager.createAbortChannel(), CommandEnvironment( ) )710    log.isDebugLevel() and log.debug( "pythonscript: getPackageName2PathMap end getDeployedPackages (" + str(len(packages))+")" )711712    for i in packages:713        log.isDebugLevel() and log.debug( "inspecting package " + i.Name + "("+i.Identifier.Value+")" )714        transientPathElement = penultimateElement( i.URL )715        j = expandUri( i.URL )716        pathes = getPathesFromPackage( j, sfa )717        if len( pathes ) > 0:718            # map package name to url, we need this later719            log.isErrorLevel() and log.error( "adding Package " + transientPathElement + " " + str( pathes ) )720            ret[ lastElement( j ) ] = Package( pathes, transientPathElement )721    return ret722723def penultimateElement( aStr ):724    lastSlash = aStr.rindex("/")725    penultimateSlash = aStr.rindex("/",0,lastSlash-1)726    return  aStr[ penultimateSlash+1:lastSlash ]727728def lastElement( aStr):729    return aStr[ aStr.rfind( "/" )+1:len(aStr)]730731class PackageBrowseNode( unohelper.Base, XBrowseNode ):732    def __init__( self, provCtx, name, rootUrl ):733        self.provCtx = provCtx734        self.name = name735        self.rootUrl = rootUrl736737    def getName( self ):738        return self.name739740    def getChildNodes( self ):741        items = self.provCtx.mapPackageName2Path.items()742        browseNodeList = []743        for i in items:744            if len( i[1].pathes ) == 1:745                browseNodeList.append(746                    DirBrowseNode( self.provCtx, i[0], i[1].pathes[0] ))747            else:748                for j in i[1].pathes:749                    browseNodeList.append(750                        DirBrowseNode( self.provCtx, i[0]+"."+lastElement(j), j ) )751        return tuple( browseNodeList )752753    def hasChildNodes( self ):754        return len( self.mapPackageName2Path ) > 0755756    def getType( self ):757        return CONTAINER758759    def getScript( self, uri ):760        log.debug( "DirBrowseNode getScript " + uri + " invoked" )761        raise IllegalArgumentException( "PackageBrowseNode couldn't instantiate script " + uri , self , 0 )762763764765766class PythonScript( unohelper.Base, XScript ):767    def __init__( self, func, mod ):768        self.func = func769        self.mod = mod770    def invoke(self, args, out, outindex ):771        log.isDebugLevel() and log.debug( "PythonScript.invoke " + str( args ) )772        try:773            ret = self.func( *args )774        except UnoException,e:775            # UNO Exception continue to fly ...776            text = lastException2String()777            complete = "Error during invoking function " + \778                str(self.func.__name__) + " in module " + \779                self.mod.__file__ + " (" + text + ")"780            log.isDebugLevel() and log.debug( complete )781            # some people may beat me up for modifying the exception text,782            # but otherwise office just shows783            # the type name and message text with no more information,784            # this is really bad for most users. 785            e.Message = e.Message + " (" + complete + ")"786            raise787        except Exception,e:788            # General python exception are converted to uno RuntimeException789            text = lastException2String()790            complete = "Error during invoking function " + \791                str(self.func.__name__) + " in module " + \792                self.mod.__file__ + " (" + text + ")"793            log.isDebugLevel() and log.debug( complete )794            raise RuntimeException( complete , self )795        log.isDebugLevel() and log.debug( "PythonScript.invoke ret = " + str( ret ) )796        return ret, (), ()797798def expandUri(  uri ):799    if uri.startswith( "vnd.sun.star.expand:" ):800        uri = uri.replace( "vnd.sun.star.expand:", "",1)801        uri = uno.getComponentContext().getByName(802                    "/singletons/com.sun.star.util.theMacroExpander" ).expandMacros( uri )803    if uri.startswith( "file:" ):804        uri = uno.absolutize("",uri)   # necessary to get rid of .. in uri805    return uri806    807#--------------------------------------------------------------808class PythonScriptProvider( unohelper.Base, XBrowseNode, XScriptProvider, XNameContainer):809    def __init__( self, ctx, *args ):810        if log.isDebugLevel():811            mystr = ""812            for i in args:813                if len(mystr) > 0:814                    mystr = mystr +","815                mystr = mystr + str(i)816            log.debug( "Entering PythonScriptProvider.ctor" + mystr )817818        storageType = ""819        if isinstance(args[0],unicode ):820            storageType = args[0]821        else:822            storageType = args[0].SCRIPTING_DOC_URI823        isPackage = storageType.endswith( ":uno_packages" )824825        try:826#            urlHelper = ctx.ServiceManager.createInstanceWithArgumentsAndContext(827#                "com.sun.star.script.provider.ScriptURIHelper", (LANGUAGENAME, storageType), ctx)828            urlHelper = MyUriHelper( ctx, storageType )829            log.isDebugLevel() and log.debug( "got urlHelper " + str( urlHelper ) )830        831            rootUrl = expandUri( urlHelper.getRootStorageURI() )832            log.isDebugLevel() and log.debug( storageType + " transformed to " + rootUrl )833834            ucbService = "com.sun.star.ucb.SimpleFileAccess"835            sfa = ctx.ServiceManager.createInstanceWithContext( ucbService, ctx )836            if not sfa:837                log.debug("PythonScriptProvider couldn't instantiate " +ucbService)838                raise RuntimeException(839                    "PythonScriptProvider couldn't instantiate " +ucbService, self)840            self.provCtx = ProviderContext(841                storageType, sfa, urlHelper, ScriptContext( uno.getComponentContext(), None ) )842            if isPackage:843                mapPackageName2Path = getPackageName2PathMap( sfa, storageType )844                self.provCtx.setPackageAttributes( mapPackageName2Path , rootUrl )845                self.dirBrowseNode = PackageBrowseNode( self.provCtx, LANGUAGENAME, rootUrl )846            else:847                self.dirBrowseNode = DirBrowseNode( self.provCtx, LANGUAGENAME, rootUrl )848            849        except Exception, e:850            text = lastException2String()851            log.debug( "PythonScriptProvider could not be instantiated because of : " + text )852            raise e853854    def getName( self ):855        return self.dirBrowseNode.getName()856857    def getChildNodes( self ):858        return self.dirBrowseNode.getChildNodes()    859860    def hasChildNodes( self ):861        return self.dirBrowseNode.hasChildNodes()862863    def getType( self ):864        return self.dirBrowseNode.getType()865866    def getScript( self, uri ):867        log.debug( "DirBrowseNode getScript " + uri + " invoked" )868        869        raise IllegalArgumentException( "DirBrowseNode couldn't instantiate script " + uri , self , 0 )870871    def getScript( self, scriptUri ):872        try:873            log.isDebugLevel() and log.debug( "getScript " + scriptUri + " invoked")874            875            storageUri = self.provCtx.getStorageUrlFromPersistentUrl(876                self.provCtx.uriHelper.getStorageURI(scriptUri) );877            log.isDebugLevel() and log.debug( "getScript: storageUri = " + storageUri)878            fileUri = storageUri[0:storageUri.find( "$" )]879            funcName = storageUri[storageUri.find( "$" )+1:len(storageUri)]        880            881            mod = self.provCtx.getModuleByUrl( fileUri )882            log.isDebugLevel() and log.debug( " got mod " + str(mod) )883            884            func = mod.__dict__[ funcName ]885886            log.isDebugLevel() and log.debug( "got func " + str( func ) )887            return PythonScript( func, mod )888        except Exception, e:889            text = lastException2String()890            log.error( text )891            raise ScriptFrameworkErrorException( text, self, scriptUri, LANGUAGENAME, 0 )892        893894    # XServiceInfo895    def getSupportedServices( self ):896        return g_ImplementationHelper.getSupportedServices(g_implName)897898    def supportsService( self, ServiceName ):899        return g_ImplementationHelper.supportsService( g_implName, ServiceName )900901    def getImplementationName(self):902        return g_implName903904    def getByName( self, name ):905        log.debug( "getByName called" + str( name ))906        return None907908        909    def getElementNames( self ):910        log.debug( "getElementNames called")911        return ()912    913    def hasByName( self, name ):914        try:915            log.debug( "hasByName called " + str( name ))916            uri = expandUri(name)917            ret = self.provCtx.isUrlInPackage( uri )918            log.debug( "hasByName " + uri + " " +str( ret ) )919            return ret920        except Exception, e:921            text = lastException2String()922            log.debug( "Error in hasByName:" +  text )923            return False924925    def removeByName( self, name ):926        log.debug( "removeByName called" + str( name ))927        uri = expandUri( name )928        if self.provCtx.isUrlInPackage( uri ):929            self.provCtx.removePackageByUrl( uri )930        else:931            log.debug( "removeByName unknown uri " + str( name ) + ", ignoring" )932            raise NoSuchElementException( uri + "is not in package" , self )933        log.debug( "removeByName called" + str( uri ) + " successful" )934        935    def insertByName( self, name, value ):936        log.debug( "insertByName called " + str( name ) + " " + str( value ))937        uri = expandUri( name )938        if isPyFileInPath( self.provCtx.sfa, uri ):939            self.provCtx.addPackageByUrl( uri )940        else:941            # package is no python package ...942            log.debug( "insertByName: no python files in " + str( uri ) + ", ignoring" )943            raise IllegalArgumentException( uri + " does not contain .py files", self, 1 )944        log.debug( "insertByName called " + str( uri ) + " successful" )945946    def replaceByName( self, name, value ):947        log.debug( "replaceByName called " + str( name ) + " " + str( value ))948        removeByName( name )949        insertByName( name )950        log.debug( "replaceByName called" + str( uri ) + " successful" )951952    def getElementType( self ):953        log.debug( "getElementType called" )954        return uno.getTypeByName( "void" )955    956    def hasElements( self ):957        log.debug( "hasElements got called")958        return False959    960g_ImplementationHelper.addImplementation( \961	PythonScriptProvider,g_implName, \962    ("com.sun.star.script.provider.LanguageScriptProvider",963     "com.sun.star.script.provider.ScriptProviderFor"+ LANGUAGENAME,),)964965966log.debug( "pythonscript finished intializing" )
...extApi.py
Source:extApi.py  
...36                response.status = falcon.HTTP_20037                response.body = self.api.getResults(sessionId)38            except Exception as e:39                error = "/get_result error: {}".format(40                    MultiProcessingLog.exception2string(e)41                )42                logging.info(error)43                response.body = json.dumps({"error": error})44    class delResult(object):45        cors = None46        def __init__(self, api):47            extApi.delResult.cors = api.cors_allow_all48            self.api = api49        def on_get(self, request, response, sessionId):50            error = None51            try:52                if os.path.exists(self.api.results[sessionId]["photo"]):53                    os.remove(self.api.results[sessionId]["photo"])54                self.api.results.pop(sessionId)55                response.status = falcon.HTTP_20056                response.body = json.dumps({"removed": sessionId})57            except Exception as e:58                error = "/del_result error: {}".format(59                    MultiProcessingLog.exception2string(e)60                )61                logging.info(error)62                response.body = json.dumps({"error": error})63    class setPhoto(object):64        cors = None65        def __init__(self, api):66            extApi.setPhoto.cors = api.cors_allow_all67            self.api = api68        def observeDel(self, info):69            finished = True70            res = json.loads(self.api.getResults(info["id"]))71            for k in info["methods"]:72                if res.get(k, None) is None:73                    finished = False74                    break75            if finished and os.path.exists(info["photo"]):76                os.remove(info["photo"])77        def infoProc(self, info):78            try:79                obj = fdf.Photos.All.INFO()80                info["results"]["info"] = obj.check(info["photo"]).as_dict81                self.observeDel(info)82            except Exception as e:83                if not info.get("results", None):84                    info["results"]["error"] = MultiProcessingLog.exception2string(e)85        def cnnProc(self, info):86            try:87                obj = fdf.Photos.All.CNN(88                    json.dumps(89                        {90                            "modelPath": "{}/fdf/m88-1.pb".format(91                                os.path.dirname(os.path.abspath(__file__))92                            ),93                            "modelInput": "input_input",94                            "modelOutput": "softmax_tensor/Softmax",95                        }96                    )97                )98                info["results"]["cnn"] = obj.check(info["photo"]).as_dict99                self.observeDel(info)100            except Exception as e:101                if not info.get("results", None):102                    info["results"]["error"] = MultiProcessingLog.exception2string(e)103        def pcaProc(self, info):104            try:105                obj = fdf.Photos.All.PCA('{ "paranoidLevel": 1.0 }')106                info["results"]["pca"] = obj.check(info["photo"]).as_dict107                self.observeDel(info)108            except Exception as e:109                if not info.get("results", None):110                    info["results"]["error"] = MultiProcessingLog.exception2string(e)111        def benfordProc(self, info):112            try:113                obj = fdf.Photos.Jpeg.BENFORD('{ "paranoidLevel": 1.0 }')114                info["results"]["benford"] = obj.check(info["photo"]).as_dict115                self.observeDel(info)116            except Exception as e:117                if not info.get("results", None):118                    info["results"]["error"] = MultiProcessingLog.exception2string(e)119        def exifProc(self, info):120            try:121                obj = fdf.Photos.Jpeg.EXIF('{ "paranoidLevel": 1.0 }')122                info["results"]["exif"] = obj.check(info["photo"]).as_dict123                self.observeDel(info)124            except Exception as e:125                if not info.get("results", None):126                    info["results"]["error"] = MultiProcessingLog.exception2string(e)127        def qualityProc(self, info):128            try:129                obj = fdf.Photos.Jpeg.QUALITY('{ "paranoidLevel": 1.0 }')130                info["results"]["quality"] = obj.check(info["photo"]).as_dict131                self.observeDel(info)132            except Exception as e:133                if not info.get("results", None):134                    info["results"]["error"] = MultiProcessingLog.exception2string(e)135 136        def on_post(self, request, response, methods):137            error = None138            sessionId = str(uuid.uuid4())139            try:140                self.api.results[sessionId] = self.api.manager.dict()141                upload = cgi.FieldStorage(fp=request.stream, environ=request.env)142                destination = os.path.join(143                    os.path.dirname(os.path.abspath(__file__)),144                    self.api.dataFolder,145                    "{}_{}".format(sessionId, upload["photo"].filename),146                )147                info = {148                    "id": sessionId,149                    "photo": destination,150                    "methods": methods.split("_"),151                    "results": self.api.results[sessionId],152                }153                with open(destination, "wb") as output_file:154                    shutil.copyfileobj(upload["photo"].file, output_file)155                for m in info["methods"]:156                    self.api.results[sessionId][m] = None157                    if m == "info":158                        p = Process(target=self.infoProc, args=(info,))159                        p.start()160                    elif m == "cnn":161                        p = Process(target=self.cnnProc, args=(info,))162                        p.start()163                    elif m == "pca":164                        p = Process(target=self.pcaProc, args=(info,))165                        p.start()166                    elif m == "benford":167                        p = Process(target=self.benfordProc, args=(info,))168                        p.start()169                    elif m == "exif":170                        p = Process(target=self.exifProc, args=(info,))171                        p.start()172                    elif m == "quality":173                        p = Process(target=self.qualityProc, args=(info,))174                        p.start()175                    else:176                        pass177                response.body = json.dumps({"sessionId": str(sessionId)})178            except Exception as e:179                error = "/set_photo error: {}".format(180                    MultiProcessingLog.exception2string(e)181                )182                logging.info(error)183                response.body = json.dumps(184                    {"error": error, "sessionId": str(sessionId)}...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
