Best Python code snippet using autotest_python
RunJobAnselm.py
Source:RunJobAnselm.py  
1# Class definition:2#   RunJobAnselm3#   [Add description here]4#   Instances are generated with RunJobFactory via pUtil::getRunJob()5#   Implemented as a singleton class6#   http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern7# Import relevant python/pilot modules8from RunJobHPC import RunJobHPC                  # Parent RunJob class9import os, sys, commands, time10import traceback11import atexit, signal12import saga13# Pilot modules14import Site, pUtil, Job, Node, RunJobUtilities15from pUtil import tolog, isAnalysisJob, readpar, getExperiment16from FileStateClient import updateFileStates, dumpFileStates17from ErrorDiagnosis import ErrorDiagnosis # import here to avoid issues seen at BU with missing module18from PilotErrors import PilotErrors19from datetime import datetime20from processes import get_cpu_consumption_time21class RunJobAnselm(RunJobHPC):22    # private data members23    __runjob = "RunJobAnselm"                          # String defining the sub class24    __instance = None                           # Boolean used by subclasses to become a Singleton25    #__error = PilotErrors()                    # PilotErrors object26    # Required methods27    def __init__(self):28        """ Default initialization """29        # e.g. self.__errorLabel = errorLabel30        pass31    def __new__(cls, *args, **kwargs):32        """ Override the __new__ method to make the class a singleton """33        if not cls.__instance:34            cls.__instance = super(RunJobHPC, cls).__new__(cls, *args, **kwargs)35        return cls.__instance36    def getRunJob(self):37        """ Return a string with the execution class name """38        return self.__runjob39    def getRunJobFileName(self):40        """ Return the filename of the module """41        return super(RunJobAnselm, self).getRunJobFileName()42    # def argumentParser(self):  <-- see example in RunJob.py43    def allowLoopingJobKiller(self):44        """ Should the pilot search for looping jobs? """45        # The pilot has the ability to monitor the payload work directory. If there are no updated files within a certain46        # time limit, the pilot will consider the as stuck (looping) and will kill it. The looping time limits are set47        # in environment.py (see e.g. loopingLimitDefaultProd)48        return False49    def jobStateChangeNotification(self, src_obj, fire_on, value):50        tolog("Job state changed to '%s'" % value)51        return True52    def executePayload(self, thisExperiment, runCommandList, job, repeat_num = 0):53        """ execute the payload """54        55        t0 = os.times() 56        res_tuple = (0, 'Undefined')57            58        setup_commands = ['module load openmpi']59        60        cpu_number = self.cpu_number_per_node * self.nodes61        nodes = self.nodes62        63        # loop over all run commands (only >1 for multi-trfs)64        current_job_number = 065        getstatusoutput_was_interrupted = False66        number_of_jobs = len(runCommandList)67        for cmd in runCommandList:68            current_job_number += 169            try:70                to_script = "\n".join(cmd['environment'])71                to_script =  to_script + "\n" + "\n".join(setup_commands)72                to_script = "%s\nmpiexec -n %d %s %s" % (to_script, cpu_number ,cmd["payload"], cmd["parameters"])    73                74                thisExperiment.updateJobSetupScript(job.workdir, to_script=to_script)75                76                # Simple SAGA fork variant77                tolog("******* SAGA call to execute payload *********")78                try:79    80                    js = saga.job.Service("pbs://localhost")81                    jd = saga.job.Description()82                    jd.project = self.project_id # should be taken from resourse description (pandaqueue)83                    jd.wall_time_limit = self.walltime 84                    jd.executable      = to_script85                    jd.total_cpu_count = cpu_number 86                    jd.output = job.stdout87                    jd.error = job.stderr88                    jd.queue = self.executed_queue   # should be taken from resourse description (pandaqueue)89                    jd.working_directory = job.workdir90                    91                    fork_job = js.create_job(jd)92                    fork_job.add_callback(saga.STATE, self.jobStateChangeNotification)93                    94                    #tolog("\n(PBS) Command: %s\n"  % to_script)95                    fork_job.run()96                    tolog("Local job ID: %s" % fork_job.id)97                    fork_job.wait()98                    tolog("Job State              : %s" % (fork_job.state))99                    tolog("Exitcode               : %s" % (fork_job.exit_code))100                    tolog("Create time            : %s" % (fork_job.created))101                    tolog("Start time             : %s" % (fork_job.started))102                    tolog("End time               : %s" % (fork_job.finished))103                    tolog("Walltime limit         : %s (min)" % (jd.wall_time_limit))104                    tolog("Allocated nodes (cores): %s (%s)" % (nodes,cpu_number))105                    cons_time = datetime.strptime(fork_job.finished, '%c') - datetime.strptime(fork_job.started, '%c')106                    cons_time_sec =  (cons_time.microseconds + (cons_time.seconds + cons_time.days * 24 * 3600) * 10**6) / 10**6107                    tolog("Execution time         : %s (sec. %s)" % (str(cons_time), cons_time_sec))108                    #job.timeExe = int(fork_job.finished - fork_job.started)109                    110                    res_tuple = (fork_job.exit_code, "Look into: %s" % job.stdout)111                    112                    ####################################################113                except saga.SagaException, ex:114                    # Catch all saga exceptions115                    tolog("An exception occured: (%s) %s " % (ex.type, (str(ex))))116                    # Trace back the exception. That can be helpful for debugging.117                    tolog(" \n*** Backtrace:\n %s" % ex.traceback)118                    break119                    120                tolog("**********************************************")121                tolog("******* SAGA call finished *******************")122                tolog("**********************************************")  123    124            except Exception, e:125                tolog("!!FAILED!!3000!! Failed to run command %s" % str(e))126                getstatusoutput_was_interrupted = True127                if self.getFailureCode: 128                    job.result[2] = self.getFailureCode129                    tolog("!!FAILED!!3000!! Failure code: %d" % (self.getFailureCode))130                    break131 132            if res_tuple[0] == 0:133                tolog("Job command %d/%d finished" % (current_job_number, number_of_jobs))134            else:135                tolog("Job command %d/%d failed: res = %s" % (current_job_number, number_of_jobs, str(res_tuple)))136                break137    138        t1 = os.times()139        cpuconsumptiontime = get_cpu_consumption_time(t0)140        job.cpuConsumptionTime = int(cpuconsumptiontime)141        job.cpuConsumptionUnit = 's'142        job.cpuConversionFactor = 1.0143        tolog("Job CPU usage: %s %s" % (job.cpuConsumptionTime, job.cpuConsumptionUnit))144        tolog("Job CPU conversion factor: %1.10f" % (job.cpuConversionFactor))145        job.timeExe = int(round(t1[4] - t0[4]))146        147        tolog("Original exit code: %s" % (res_tuple[0]))148        if res_tuple[0] != None:149            tolog("Exit code: %s (returned from OS)" % (res_tuple[0]%255))150            res0, exitAcronym, exitMsg = self.getTrfExitInfo(res_tuple[0], job.workdir)151        else:152            tolog("Exit code: None (returned from OS, Job was canceled)")153            res0 = None154            exitMsg = "Job was canceled by internal call"155        # check the job report for any exit code that should replace the res_tuple[0]156        157        res = (res0, res_tuple[1], exitMsg)158    159        # dump an extract of the payload output160        if number_of_jobs > 1:161            _stdout = job.stdout162            _stderr = job.stderr163            _stdout = _stdout.replace(".txt", "_N.txt")164            _stderr = _stderr.replace(".txt", "_N.txt")165            tolog("NOTE: For %s output, see files %s, %s (N = [1, %d])" % (job.payload, _stdout, _stderr, number_of_jobs))166        else:167            tolog("NOTE: For %s output, see files %s, %s" % (job.payload, job.stdout, job.stderr))168    169        # JEM job-end callback170        try:171            from JEMstub import notifyJobEnd2JEM172            notifyJobEnd2JEM(job, tolog)173        except:174            pass # don't care (fire and forget)175    176        return res, job, getstatusoutput_was_interrupted, current_job_number177if __name__ == "__main__":178    tolog("Starting RunJobAnselm")179    # Get error handler180    error = PilotErrors()181    # Get runJob object182    runJob = RunJobAnselm()183    184    # Setup HPC specific parameters for Anselm185    186    runJob.cpu_number_per_node = 16187    runJob.walltime = 30188    runJob.max_nodes = 2000 189    runJob.number_of_threads = 8  # 1 - one thread per task190    runJob.min_walltime = 30 191    runJob.waittime = 5192    runJob.nodes = 2193    runJob.project_id = "DD-13-2"194    runJob.executed_queue = readpar('localqueue') 195    196    # Define a new parent group197    os.setpgrp()198    # Protect the runJob code with exception handling199    hP_ret = False200    try:201        # always use this filename as the new jobDef module name202        import newJobDef203        jobSite = Site.Site()204        return_tuple = runJob.argumentParser()205        tolog("argumentParser returned: %s" % str(return_tuple))206        jobSite.setSiteInfo(return_tuple)207#            jobSite.setSiteInfo(argParser(sys.argv[1:]))208        # reassign workdir for this job209        jobSite.workdir = jobSite.wntmpdir210        if runJob.getPilotLogFilename() != "":211            pUtil.setPilotlogFilename(runJob.getPilotLogFilename())212        # set node info213        node = Node.Node()214        node.setNodeName(os.uname()[1])215        node.collectWNInfo(jobSite.workdir)216        # redirect stder217        sys.stderr = open("%s/runjob.stderr" % (jobSite.workdir), "w")218        tolog("Current job workdir is: %s" % os.getcwd())219        tolog("Site workdir is: %s" % jobSite.workdir)220        # get the experiment object221        thisExperiment = getExperiment(runJob.getExperiment())222        tolog("RunJob will serve experiment: %s" % (thisExperiment.getExperiment()))223        # set the cache (used e.g. by LSST)224        #if runJob.getCache():225        #    thisExperiment.setCache(runJob.getCache())226        #JR = JobRecovery()227        try:228            job = Job.Job()229            job.setJobDef(newJobDef.job)230            job.workdir = jobSite.workdir231            job.experiment = runJob.getExperiment()232            # figure out and set payload file names233            job.setPayloadName(thisExperiment.getPayloadName(job))234        except Exception, e:235            pilotErrorDiag = "Failed to process job info: %s" % str(e)236            tolog("!!WARNING!!3000!! %s" % (pilotErrorDiag))237            runJob.failJob(0, error.ERR_UNKNOWN, job, pilotErrorDiag=pilotErrorDiag)238        # prepare for the output file data directory239        # (will only created for jobs that end up in a 'holding' state)240        job.datadir = runJob.getParentWorkDir() + "/PandaJob_%s_data" % (job.jobId)241        # register cleanup function242        atexit.register(runJob.cleanup, job)243        # to trigger an exception so that the SIGTERM signal can trigger cleanup function to run244        # because by default signal terminates process without cleanup.245        def sig2exc(sig, frm):246            """ signal handler """247            error = PilotErrors()248            runJob.setGlobalPilotErrorDiag("!!FAILED!!3000!! SIGTERM Signal %s is caught in child pid=%d!\n" % (sig, os.getpid()))249            tolog(runJob.getGlobalPilotErrorDiag())250            if sig == signal.SIGTERM:251                runJob.setGlobalErrorCode(error.ERR_SIGTERM)252            elif sig == signal.SIGQUIT:253                runJob.setGlobalErrorCode(error.ERR_SIGQUIT)254            elif sig == signal.SIGSEGV:255                runJob.setGlobalErrorCode(error.ERR_SIGSEGV)256            elif sig == signal.SIGXCPU:257                runJob.setGlobalErrorCode(error.ERR_SIGXCPU)258            elif sig == signal.SIGBUS:259                runJob.setGlobalErrorCode(error.ERR_SIGBUS)260            elif sig == signal.SIGUSR1:261                runJob.setGlobalErrorCode(error.ERR_SIGUSR1)262            else:263                runJob.setGlobalErrorCode(error.ERR_KILLSIGNAL)264            runJob.setFailureCode(runJob.getGlobalErrorCode)265            # print to stderr266            print >> sys.stderr, runJob.getGlobalPilotErrorDiag()267            raise SystemError(sig)268        signal.signal(signal.SIGTERM, sig2exc)269        signal.signal(signal.SIGQUIT, sig2exc)270        signal.signal(signal.SIGSEGV, sig2exc)271        signal.signal(signal.SIGXCPU, sig2exc)272        signal.signal(signal.SIGBUS, sig2exc)273        # see if it's an analysis job or not274        analysisJob = isAnalysisJob(job.trf.split(",")[0])275        if analysisJob:276            tolog("User analysis job")277        else:278            tolog("Production job")279        tolog("runJob received a job with prodSourceLabel=%s" % (job.prodSourceLabel))280        # setup starts here ................................................................................281        # update the job state file282        job.jobState = "setup"283        #_retjs = JR.updateJobStateTest(job, jobSite, node, mode="test")284        # send [especially] the process group back to the pilot285        job.setState([job.jobState, 0, 0])286        rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort())287        # prepare the setup and get the run command list288        ec, runCommandList, job, multi_trf = runJob.setup(job, jobSite, thisExperiment)289        if ec != 0:290            tolog("!!WARNING!!2999!! runJob setup failed: %s" % (job.pilotErrorDiag))291            runJob.failJob(0, ec, job, pilotErrorDiag=job.pilotErrorDiag)292        tolog("Setup has finished successfully")293        # job has been updated, display it again294        job.displayJob()295        # (setup ends here) ................................................................................296        tolog("Setting stage-in state until all input files have been copied")297        job.setState(["stagein", 0, 0])298        # send the special setup string back to the pilot (needed for the log transfer on xrdcp systems)299        rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort())300        # stage-in .........................................................................................301        # update the job state file302        job.jobState = "stagein"303        #_retjs = JR.updateJobStateTest(job, jobSite, node, mode="test")304        # update copysetup[in] for production jobs if brokerage has decided that remote I/O should be used305        if job.transferType == 'direct':306            tolog('Brokerage has set transfer type to \"%s\" (remote I/O will be attempted for input files, any special access mode will be ignored)' %\307                  (job.transferType))308            RunJobUtilities.updateCopysetups('', transferType=job.transferType)309        # stage-in all input files (if necessary)310        job, ins, statusPFCTurl, usedFAXandDirectIO = runJob.stageIn(job, jobSite, analysisJob)311        if job.result[2] != 0:312            tolog("Failing job with ec: %d" % (ec))313            runJob.failJob(0, job.result[2], job, ins=ins, pilotErrorDiag=job.pilotErrorDiag)314        # after stageIn, all file transfer modes are known (copy_to_scratch, file_stager, remote_io)315        # consult the FileState file dictionary if cmd3 should be updated (--directIn should not be set if all316        # remote_io modes have been changed to copy_to_scratch as can happen with ByteStream files)317        # and update the run command list if necessary.318        # in addition to the above, if FAX is used as a primary site mover and direct access is enabled, then319        # the run command should not contain the --oldPrefix, --newPrefix options but use --usePFCTurl320        hasInput = job.inFiles != ['']321        if hasInput:322            runCommandList = RunJobUtilities.updateRunCommandList(runCommandList, runJob.getParentWorkDir(), job.jobId, statusPFCTurl, analysisJob, usedFAXandDirectIO, hasInput, job.prodDBlockToken)323        # (stage-in ends here) .............................................................................324        # change to running state since all input files have been staged325        tolog("Changing to running state since all input files have been staged")326        job.setState(["running", 0, 0])327        rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort())328        # update the job state file329        job.jobState = "running"330        #_retjs = JR.updateJobStateTest(job, jobSite, node, mode="test")331        # run the job(s) ...................................................................................332        # Set ATLAS_CONDDB if necessary, and other env vars333        RunJobUtilities.setEnvVars(jobSite.sitename)334        # execute the payload335        res, job, getstatusoutput_was_interrupted, current_job_number = runJob.executePayload(thisExperiment, runCommandList, job)336        # if payload leaves the input files, delete them explicitly337        if ins:338            ec = pUtil.removeFiles(job.workdir, ins)339        # payload error handling340        ed = ErrorDiagnosis()341        if res[0] == None:342            job.jobState = "cancelled"343            job.setState(["cancelled", 0, 0])344            rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort())345        else:346            job = ed.interpretPayload(job, res, getstatusoutput_was_interrupted, current_job_number, runCommandList, runJob.getFailureCode())347        348        if job.result[1] != 0 or job.result[2] != 0:349            runJob.failJob(job.result[1], job.result[2], job, pilotErrorDiag=job.pilotErrorDiag)350        # stage-out ........................................................................................351        # update the job state file352        tolog(runJob.getOutputDir()) 353        354        job.jobState = "stageout"355        #_retjs = JR.updateJobStateTest(job, jobSite, node, mode="test")356        # verify and prepare and the output files for transfer357        ec, pilotErrorDiag, outs, outsDict = RunJobUtilities.prepareOutFiles(job.outFiles, job.logFile, job.workdir)358        if ec:359            # missing output file (only error code from prepareOutFiles)360            runJob.failJob(job.result[1], ec, job, pilotErrorDiag=pilotErrorDiag)361        tolog("outsDict: %s" % str(outsDict))362        # update the current file states363        updateFileStates(outs, runJob.getParentWorkDir(), job.jobId, mode="file_state", state="created")364        dumpFileStates(runJob.getParentWorkDir(), job.jobId)365        # create xml string to pass to dispatcher for atlas jobs366        outputFileInfo = {}367        if outs or (job.logFile and job.logFile != ''):368            # get the datasets for the output files369            dsname, datasetDict = runJob.getDatasets(job)370            # re-create the metadata.xml file, putting guids of ALL output files into it.371            # output files that miss guids from the job itself will get guids in PFCxml function372            # first rename and copy the trf metadata file for non-build jobs373            if not pUtil.isBuildJob(outs):374                runJob.moveTrfMetadata(job.workdir, job.jobId)375            # create the metadata for the output + log files376            ec, job, outputFileInfo = runJob.createFileMetadata(list(outs), job, outsDict, dsname, datasetDict, jobSite.sitename, analysisJob=analysisJob)377            if ec:378                runJob.failJob(0, ec, job, pilotErrorDiag=job.pilotErrorDiag)379        # move output files from workdir to local DDM area380        finalUpdateDone = False381        if outs:382            tolog("Setting stage-out state until all output files have been copied")383            job.setState(["stageout", 0, 0])384            rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort())385            # stage-out output files386            387            ec, job, rf, latereg = runJob.stageOut(job, jobSite, outs, analysisJob, dsname, datasetDict, outputFileInfo)388            # error handling389            if job.result[0] == "finished" or ec == error.ERR_PUTFUNCNOCALL:390                rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort(), final=True)391            else:392                rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort(), final=True, latereg=latereg)393            if ec == error.ERR_NOSTORAGE:394                # update the current file states for all files since nothing could be transferred395                updateFileStates(outs, runJob.getParentWorkDir(), job.jobId, mode="file_state", state="not_transferred")396                dumpFileStates(runJob.getParentWorkDir(), job.jobId)397            finalUpdateDone = True398            if ec != 0:399                runJob.sysExit(job, rf)400            # (stage-out ends here) .......................................................................401        job.setState(["finished", 0, 0])402        if not finalUpdateDone:403            rt = RunJobUtilities.updatePilotServer(job, runJob.getPilotServer(), runJob.getPilotPort(), final=True)404        runJob.sysExit(job)405    except Exception, errorMsg:406        error = PilotErrors()407        if runJob.getGlobalPilotErrorDiag() != "":408            pilotErrorDiag = "Exception caught in runJobAnselm: %s" % (runJob.getGlobalPilotErrorDiag())409        else:410            pilotErrorDiag = "Exception caught in runJobAnselm: %s" % str(errorMsg)411        if 'format_exc' in traceback.__all__:412            pilotErrorDiag += ", " + traceback.format_exc()    413        try:414            tolog("!!FAILED!!3001!! %s" % (pilotErrorDiag))415        except Exception, e:416            if len(pilotErrorDiag) > 10000:417                pilotErrorDiag = pilotErrorDiag[:10000]418                tolog("!!FAILED!!3001!! Truncated (%s): %s" % (e, pilotErrorDiag))419            else:420                pilotErrorDiag = "Exception caught in runJob: %s" % (e)421                tolog("!!FAILED!!3001!! %s" % (pilotErrorDiag))422#        # restore the proxy if necessary423#        if hP_ret:424#            rP_ret = proxyguard.restoreProxy()425#            if not rP_ret:426#                tolog("Warning: Problems with storage can occur since proxy could not be restored")427#            else:428#                hP_ret = False429#                tolog("ProxyGuard has finished successfully")430        tolog("sys.path=%s" % str(sys.path))431        cmd = "pwd;ls -lF %s;ls -lF;ls -lF .." % (runJob.getPilotInitDir())432        tolog("Executing command: %s" % (cmd))433        out = commands.getoutput(cmd)434        tolog("%s" % (out))435        job = Job.Job()436        job.setJobDef(newJobDef.job)437        job.pilotErrorDiag = pilotErrorDiag438        job.result[0] = "failed"439        if runJob.getGlobalErrorCode() != 0:440            job.result[2] = runJob.getGlobalErrorCode()441        else:442            job.result[2] = error.ERR_RUNJOBEXC443        tolog("Failing job with error code: %d" % (job.result[2]))444        # fail the job without calling sysExit/cleanup (will be called anyway)445        runJob.failJob(0, job.result[2], job, pilotErrorDiag=pilotErrorDiag, docleanup=False)446    447    448    449    450    451    452    453    454    ...run_job.py
Source:run_job.py  
1#!/usr/bin/env python2# Description: run job3# Derived from topcons2_workflow_run_job.py4# how to create md55# import hashlib6# md5_key = hashlib.md5(string).hexdigest()7# subfolder = md5_key[:2]8# 9import os10import sys11import subprocess12import time13import myfunc14import glob15import hashlib16import shutil17import json18import webserver_common19os.environ["PATH"] += os.pathsep + "/usr/local/bin" # this solved the problem for CentOS6.420progname =  os.path.basename(sys.argv[0])21wspace = ''.join([" "]*len(progname))22rundir = os.path.dirname(os.path.realpath(__file__))23suq_basedir = "/tmp"24if os.path.exists("/scratch"):25    suq_basedir = "/scratch"26elif os.path.exists("/tmp"):27    suq_basedir = "/tmp"28runscript = "%s/%s"%(rundir, "soft/proq3/run_proq3.sh")29pdb2aa_script = "%s/%s"%(rundir, "soft/proq3/bin/aa321CA.pl")30basedir = os.path.realpath("%s/.."%(rundir)) # path of the application, i.e. pred/31path_md5cache = "%s/static/md5"%(basedir)32path_profile_cache = "%s/static/result/profilecache/"%(basedir)33contact_email = "nanjiang.shu@scilifelab.se"34# note that here the url should be without http://35usage_short="""36Usage: %s dumped-model-file [-fasta seqfile]37       %s -jobid JOBID -outpath DIR -tmpdir DIR38       %s -email EMAIL -baseurl BASE_WWW_URL39       %s [-force]40"""%(progname, wspace, wspace, wspace)41usage_ext="""\42Description:43    run job44OPTIONS:45  -force        Do not use cahced result46  -h, --help    Print this help message and exit47Created 2016-02-02, updated 2017-10-09, Nanjiang Shu48"""49usage_exp="""50Examples:51    %s /data3/tmp/tmp_dkgSD/query.pdb -outpath /data3/result/rst_mXLDGD -tmpdir /data3/tmp/tmp_dkgSD52"""%(progname)53def PrintHelp(fpout=sys.stdout):#{{{54    print(usage_short, file=fpout)55    print(usage_ext, file=fpout)56    print(usage_exp, file=fpout)#}}}57def CreateProfile(seqfile, outpath_profile, outpath_result, tmp_outpath_result, timefile, runjob_errfile):#{{{58    (seqid, seqanno, seq) = myfunc.ReadSingleFasta(seqfile)59    subfoldername_profile = os.path.basename(outpath_profile)60    tmp_outpath_profile = "%s/%s"%(tmp_outpath_result, subfoldername_profile)61    isSkip = False62    rmsg = ""63    if not g_params['isForceRun']:64        md5_key = hashlib.md5(seq).hexdigest()65        subfoldername = md5_key[:2]66        md5_link = "%s/%s/%s"%(path_md5cache, subfoldername, md5_key)67        if os.path.exists(md5_link):68            # create a symlink to the cache69            rela_path = os.path.relpath(md5_link, outpath_result) #relative path70            os.chdir(outpath_result)71            os.symlink(rela_path, subfoldername_profile)72            isSkip = True73    if not isSkip:74        # build profiles75        if not os.path.exists(tmp_outpath_profile):76            try:77                os.makedirs(tmp_outpath_profile)78            except OSError:79                msg = "Failed to create folder %s"%(tmp_outpath_profile)80                myfunc.WriteFile(msg+"\n", runjob_errfile, "a")81                return 182        cmd = [runscript, "-fasta", seqfile,  "-outpath", tmp_outpath_profile, "-only-build-profile"]83        g_params['runjob_log'].append(" ".join(cmd))84        begin_time = time.time()85        cmdline = " ".join(cmd)86        #os.system("%s >> %s 2>&1"%(cmdline, runjob_errfile)) #DEBUG87        try:88            rmsg = subprocess.check_output(cmd)89            g_params['runjob_log'].append("profile_building:\n"+rmsg+"\n")90        except subprocess.CalledProcessError as e:91            g_params['runjob_err'].append(str(e)+"\n")92            g_params['runjob_err'].append("cmdline: "+cmdline+"\n")93            g_params['runjob_err'].append("profile_building:\n"+rmsg + "\n")94            pass95        end_time = time.time()96        runtime_in_sec = end_time - begin_time97        msg = "%s\t%f\n"%(subfoldername_profile, runtime_in_sec)98        myfunc.WriteFile(msg, timefile, "a")99        if os.path.exists(tmp_outpath_profile):100            md5_key = hashlib.md5(seq).hexdigest()101            md5_subfoldername = md5_key[:2]102            subfolder_profile_cache = "%s/%s"%(path_profile_cache, md5_subfoldername)103            outpath_profile_cache = "%s/%s"%(subfolder_profile_cache, md5_key)104            if os.path.exists(outpath_profile_cache):105                shutil.rmtree(outpath_profile_cache)106            if not os.path.exists(subfolder_profile_cache):107                os.makedirs(subfolder_profile_cache)108            cmd = ["mv","-f", tmp_outpath_profile, outpath_profile_cache]109            isCmdSuccess = False110            try:111                subprocess.check_output(cmd)112                isCmdSuccess = True113            except subprocess.CalledProcessError as e:114                msg =  "Failed to run get profile for the target sequence %s"%(seq)115                g_params['runjob_err'].append(msg)116                g_params['runjob_err'].append(str(e)+"\n")117                pass118            if isCmdSuccess and webserver_common.IsFrontEndNode(g_params['base_www_url']):119                # make zip folder for the cached profile120                cwd = os.getcwd()121                os.chdir(subfolder_profile_cache)122                cmd = ["zip", "-rq", "%s.zip"%(md5_key), md5_key]123                try:124                    subprocess.check_output(cmd)125                except subprocess.CalledProcessError as e:126                    g_params['runjob_err'].append(str(e))127                    pass128                os.chdir(cwd)129                # create soft link for profile and for md5130                # first create a soft link for outpath_profile to outpath_profile_cache131                rela_path = os.path.relpath(outpath_profile_cache, outpath_result) #relative path132                try:133                    os.chdir(outpath_result)134                    os.symlink(rela_path,  subfoldername_profile)135                except:136                    pass137                # then create a soft link for md5 to outpath_proifle_cache138                md5_subfolder = "%s/%s"%(path_md5cache, md5_subfoldername)139                md5_link = "%s/%s/%s"%(path_md5cache, md5_subfoldername, md5_key)140                if os.path.exists(md5_link):141                    try:142                        os.unlink(md5_link)143                    except:144                        pass145                if not os.path.exists(md5_subfolder):146                    try:147                        os.makedirs(md5_subfolder)148                    except:149                        pass150                rela_path = os.path.relpath(outpath_profile_cache, md5_subfolder) #relative path151                try:152                    os.chdir(md5_subfolder)153                    os.symlink(rela_path,  md5_key)154                except:155                    pass156#}}}157def GetProQ3Option(query_para):#{{{158    """Return the proq3opt in list159    """160    yes_or_no_opt = {}161    for item in ['isDeepLearning', 'isRepack', 'isKeepFiles']:162        if query_para[item]:163            yes_or_no_opt[item] = "yes"164        else:165            yes_or_no_opt[item] = "no"166    proq3opt = [167            "-r", yes_or_no_opt['isRepack'],168            "-deep", yes_or_no_opt['isDeepLearning'],169            "-k", yes_or_no_opt['isKeepFiles'],170            "-quality", query_para['method_quality'],171            "-output_pdbs", "yes"         #always output PDB file (with proq3 written at the B-factor column)172            ]173    if 'targetlength' in query_para:174        proq3opt += ["-t", str(query_para['targetlength'])]175    return proq3opt176#}}}177def ScoreModel(query_para, model_file, outpath_this_model, profilename, outpath_result, #{{{178        tmp_outpath_result, timefile, runjob_errfile): 179    subfoldername_this_model = os.path.basename(outpath_this_model)180    modelidx = int(subfoldername_this_model.split("model_")[1])181    try:182        method_quality = query_para['method_quality']183    except KeyError:184        method_quality = 'sscore'185    rmsg = ""186    tmp_outpath_this_model = "%s/%s"%(tmp_outpath_result, subfoldername_this_model)187    proq3opt = GetProQ3Option(query_para)188    cmd = [runscript, "-profile", profilename,  "-outpath",189            tmp_outpath_this_model, model_file190            ] + proq3opt191    g_params['runjob_log'].append(" ".join(cmd))192    cmdline = " ".join(cmd)193    begin_time = time.time()194    try:195        rmsg = subprocess.check_output(cmd)196        g_params['runjob_log'].append("model scoring:\n"+rmsg+"\n")197    except subprocess.CalledProcessError as e:198        g_params['runjob_err'].append(str(e)+"\n")199        g_params['runjob_err'].append("cmdline: "+ cmdline + "\n")200        g_params['runjob_err'].append("model scoring:\n" + rmsg + "\n")201        pass202    end_time = time.time()203    runtime_in_sec = end_time - begin_time204    msg = "%s\t%f\n"%(subfoldername_this_model, runtime_in_sec)205    myfunc.WriteFile(msg, timefile, "a")206    if os.path.exists(tmp_outpath_this_model):207        cmd = ["mv","-f", tmp_outpath_this_model, outpath_this_model]208        isCmdSuccess = False209        try:210            subprocess.check_output(cmd)211            isCmdSuccess = True212        except subprocess.CalledProcessError as e:213            msg =  "Failed to move result from %s to %s."%(tmp_outpath_this_model, outpath_this_model)214            g_params['runjob_err'].append(msg)215            g_params['runjob_err'].append(str(e)+"\n")216            pass217    modelfile = "%s/query_%d.pdb"%(outpath_this_model,modelidx)218    globalscorefile = "%s.proq3.%s.global"%(modelfile, method_quality)219    if not os.path.exists(globalscorefile):220        globalscorefile = "%s.proq3.global"%(modelfile)221    (globalscore, itemList) = webserver_common.ReadProQ3GlobalScore(globalscorefile)222    modelseqfile = "%s/query_%d.pdb.fasta"%(outpath_this_model, modelidx)223    modellength = myfunc.GetSingleFastaLength(modelseqfile)224    modelinfo = [subfoldername_this_model, str(modellength), str(runtime_in_sec)]225    if globalscore:226        for i in range(len(itemList)):227            modelinfo.append(str(globalscore[itemList[i]]))228    return modelinfo229#}}}230def RunJob(modelfile, seqfile, outpath, tmpdir, email, jobid, g_params):#{{{231    all_begin_time = time.time()232    rootname = os.path.basename(os.path.splitext(modelfile)[0])233    starttagfile   = "%s/runjob.start"%(outpath)234    runjob_errfile = "%s/runjob.err"%(outpath)235    runjob_logfile = "%s/runjob.log"%(outpath)236    finishtagfile = "%s/runjob.finish"%(outpath)237    rmsg = ""238    query_parafile = "%s/query.para.txt"%(outpath)239    query_para = {}240    content = myfunc.ReadFile(query_parafile)241    if content != "":242        query_para = json.loads(content)243    resultpathname = jobid244    outpath_result = "%s/%s"%(outpath, resultpathname)245    tarball = "%s.tar.gz"%(resultpathname)246    zipfile = "%s.zip"%(resultpathname)247    tarball_fullpath = "%s.tar.gz"%(outpath_result)248    zipfile_fullpath = "%s.zip"%(outpath_result)249    mapfile = "%s/seqid_index_map.txt"%(outpath_result)250    finished_model_file = "%s/finished_models.txt"%(outpath_result)251    timefile = "%s/time.txt"%(outpath_result)252    tmp_outpath_result = "%s/%s"%(tmpdir, resultpathname)253    isOK = True254    if os.path.exists(tmp_outpath_result):255        shutil.rmtree(tmp_outpath_result)256    try:257        os.makedirs(tmp_outpath_result)258        isOK = True259    except OSError:260        msg = "Failed to create folder %s"%(tmp_outpath_result)261        myfunc.WriteFile(msg+"\n", runjob_errfile, "a")262        isOK = False263        pass264    if os.path.exists(outpath_result):265        shutil.rmtree(outpath_result)266    try:267        os.makedirs(outpath_result)268        isOK = True269    except OSError:270        msg = "Failed to create folder %s"%(outpath_result)271        myfunc.WriteFile(msg+"\n", runjob_errfile, "a")272        isOK = False273        pass274    if isOK:275        try:276            open(finished_model_file, 'w').close()277        except:278            pass279#first getting result from caches280# cache profiles for sequences, but do not cache predictions for models281        webserver_common.WriteDateTimeTagFile(starttagfile, runjob_logfile, runjob_errfile)282# ==================================283        numModel = 0284        modelFileList = []285        if seqfile != "": # if the fasta sequence is supplied, all models should be using this sequence286            subfoldername_profile = "profile_%d"%(0)287            outpath_profile = "%s/%s"%(outpath_result, subfoldername_profile)288            CreateProfile(seqfile, outpath_profile, outpath_result, tmp_outpath_result, timefile, runjob_errfile)289            # run proq3 for models290            modelList = myfunc.ReadPDBModel(modelfile)291            numModel = len(modelList)292            for ii in range(len(modelList)):293                model = modelList[ii]294                tmp_model_file = "%s/query_%d.pdb"%(tmp_outpath_result, ii)295                myfunc.WriteFile(model+"\n", tmp_model_file)296                profilename = "%s/%s"%(outpath_profile, "query.fasta")297                subfoldername_this_model = "model_%d"%(ii)298                outpath_this_model = "%s/%s"%(outpath_result, subfoldername_this_model)299                modelinfo = ScoreModel(query_para, tmp_model_file, outpath_this_model, profilename,300                        outpath_result, tmp_outpath_result, timefile,301                        runjob_errfile)302                myfunc.WriteFile("\t".join(modelinfo)+"\n", finished_model_file, "a")303                modelFileList.append("%s/%s"%(outpath_this_model, "query_%d.pdb"%(ii)))304        else: # no seqfile supplied, sequences are obtained from the model file305            modelList = myfunc.ReadPDBModel(modelfile)306            numModel = len(modelList)307            for ii in range(len(modelList)):308                model = modelList[ii]309                tmp_model_file = "%s/query_%d.pdb"%(tmp_outpath_result, ii)310                myfunc.WriteFile(model+"\n", tmp_model_file)311                subfoldername_this_model = "model_%d"%(ii)312                tmp_outpath_this_model = "%s/%s"%(tmp_outpath_result, subfoldername_this_model)313                if not os.path.exists(tmp_outpath_this_model):314                    os.makedirs(tmp_outpath_this_model)315                tmp_seqfile = "%s/query.fasta"%(tmp_outpath_this_model)316                cmd = [pdb2aa_script, tmp_model_file]317                g_params['runjob_log'].append(" ".join(cmd))318                try:319                    rmsg = subprocess.check_output(cmd)320                    g_params['runjob_log'].append("extracting sequence from modelfile:\n"+rmsg+"\n")321                except subprocess.CalledProcessError as e:322                    g_params['runjob_err'].append(str(e)+"\n")323                    g_params['runjob_err'].append(rmsg + "\n")324                if rmsg != "":325                    myfunc.WriteFile(">seq\n"+rmsg.strip(),tmp_seqfile)326                subfoldername_profile = "profile_%d"%(ii)327                outpath_profile = "%s/%s"%(outpath_result, subfoldername_profile)328                CreateProfile(tmp_seqfile, outpath_profile, outpath_result, tmp_outpath_result, timefile, runjob_errfile)329                outpath_this_model = "%s/%s"%(outpath_result, subfoldername_this_model)330                profilename = "%s/%s"%(outpath_profile, "query.fasta")331                modelinfo = ScoreModel(query_para, tmp_model_file, outpath_this_model, profilename,332                        outpath_result, tmp_outpath_result, timefile,333                        runjob_errfile)334                myfunc.WriteFile("\t".join(modelinfo)+"\n", finished_model_file, "a")335                modelFileList.append("%s/%s"%(outpath_this_model, "query_%d.pdb"%(ii)))336        all_end_time = time.time()337        all_runtime_in_sec = all_end_time - all_begin_time338        if len(g_params['runjob_log']) > 0 :339            rt_msg = myfunc.WriteFile("\n".join(g_params['runjob_log'])+"\n", runjob_logfile, "a")340            if rt_msg:341                g_params['runjob_err'].append(rt_msg)342        webserver_common.WriteDateTimeTagFile(finishtagfile, runjob_logfile, runjob_errfile)343# now write the text output to a single file344        #statfile = "%s/%s"%(outpath_result, "stat.txt")345        statfile = ""346        dumped_resultfile = "%s/%s"%(outpath_result, "query.proq3.txt")347        proq3opt = GetProQ3Option(query_para)348        webserver_common.WriteProQ3TextResultFile(dumped_resultfile, query_para, modelFileList,349                all_runtime_in_sec, g_params['base_www_url'], proq3opt, statfile=statfile)350        # now making zip instead (for windows users)351        # note that zip rq will zip the real data for symbolic links352        os.chdir(outpath)353#             cmd = ["tar", "-czf", tarball, resultpathname]354        cmd = ["zip", "-rq", zipfile, resultpathname]355        try:356            subprocess.check_output(cmd)357        except subprocess.CalledProcessError as e:358            g_params['runjob_err'].append(str(e))359            pass360    isSuccess = False361    if (os.path.exists(finishtagfile) and os.path.exists(zipfile_fullpath)):362        isSuccess = True363        flist = glob.glob("%s/*.out"%(tmpdir))364        if len(flist)>0:365            outfile_runscript = flist[0]366        else:367            outfile_runscript = ""368        if os.path.exists(outfile_runscript):369            shutil.move(outfile_runscript, outpath)370        # delete the tmpdir if succeeded371        shutil.rmtree(tmpdir) #DEBUG, keep tmpdir372    else:373        isSuccess = False374        failedtagfile = "%s/runjob.failed"%(outpath)375        webserver_common.WriteDateTimeTagFile(failedtagfile, runjob_logfile, runjob_errfile)376# send the result to email377# do not sendmail at the cloud VM378    if ( webserver_common.IsFrontEndNode(g_params['base_www_url']) and379            myfunc.IsValidEmailAddress(email)):380        from_email = "proq3@proq3.bioinfo.se"381        to_email = email382        subject = "Your result for ProQ3 JOBID=%s"%(jobid)383        if isSuccess:384            bodytext = """385Your result is ready at %s/pred/result/%s386Thanks for using ProQ3387        """%(g_params['base_www_url'], jobid)388        else:389            bodytext="""390We are sorry that your job with jobid %s is failed.391Please contact %s if you have any questions.392Attached below is the error message:393%s394            """%(jobid, contact_email, "\n".join(g_params['runjob_err']))395        g_params['runjob_log'].append("Sendmail %s -> %s, %s"% (from_email, to_email, subject)) #debug396        rtValue = myfunc.Sendmail(from_email, to_email, subject, bodytext)397        if rtValue != 0:398            g_params['runjob_err'].append("Sendmail to {} failed with status {}".format(to_email, rtValue))399    if len(g_params['runjob_err']) > 0:400        rt_msg = myfunc.WriteFile("\n".join(g_params['runjob_err'])+"\n", runjob_errfile, "w")401        return 1402    return 0403#}}}404def main(g_params):#{{{405    argv = sys.argv406    numArgv = len(argv)407    if numArgv < 2:408        PrintHelp()409        return 1410    outpath = ""411    modelfile = ""412    seqfile = ""413    tmpdir = ""414    email = ""415    jobid = ""416    i = 1417    isNonOptionArg=False418    while i < numArgv:419        if isNonOptionArg == True:420            modelfile = argv[i]421            isNonOptionArg = False422            i += 1423        elif argv[i] == "--":424            isNonOptionArg = True425            i += 1426        elif argv[i][0] == "-":427            if argv[i] in ["-h", "--help"]:428                PrintHelp()429                return 1430            elif argv[i] in ["-outpath", "--outpath"]:431                (outpath, i) = myfunc.my_getopt_str(argv, i)432            elif argv[i] in ["-tmpdir", "--tmpdir"] :433                (tmpdir, i) = myfunc.my_getopt_str(argv, i)434            elif argv[i] in ["-jobid", "--jobid"] :435                (jobid, i) = myfunc.my_getopt_str(argv, i)436            elif argv[i] in ["-fasta", "--fasta"] :437                (seqfile, i) = myfunc.my_getopt_str(argv, i)438            elif argv[i] in ["-baseurl", "--baseurl"] :439                (g_params['base_www_url'], i) = myfunc.my_getopt_str(argv, i)440            elif argv[i] in ["-email", "--email"] :441                (email, i) = myfunc.my_getopt_str(argv, i)442            elif argv[i] in ["-q", "--q"]:443                g_params['isQuiet'] = True444                i += 1445            elif argv[i] in ["-force", "--force"]:446                g_params['isForceRun'] = True447                i += 1448            else:449                print("Error! Wrong argument:", argv[i], file=sys.stderr)450                return 1451        else:452            modelfile = argv[i]453            i += 1454    if jobid == "":455        print("%s: jobid not set. exit"%(sys.argv[0]), file=sys.stderr)456        return 1457    if myfunc.checkfile(modelfile, "modelfile") != 0:458        return 1459    if outpath == "":460        print("outpath not set. exit", file=sys.stderr)461        return 1462    elif not os.path.exists(outpath):463        try:464            subprocess.check_output(["mkdir", "-p", outpath])465        except subprocess.CalledProcessError as e:466            print(e, file=sys.stderr)467            return 1468    if tmpdir == "":469        print("tmpdir not set. exit", file=sys.stderr)470        return 1471    elif not os.path.exists(tmpdir):472        try:473            subprocess.check_output(["mkdir", "-p", tmpdir])474        except subprocess.CalledProcessError as e:475            print(e, file=sys.stderr)476            return 1477    g_params['debugfile'] = "%s/debug.log"%(outpath)478    if not os.path.exists(path_profile_cache):479        os.makedirs(path_profile_cache)480    return RunJob(modelfile, seqfile, outpath, tmpdir, email, jobid, g_params)481#}}}482def InitGlobalParameter():#{{{483    g_params = {}484    g_params['isQuiet'] = True485    g_params['runjob_log'] = []486    g_params['runjob_err'] = []487    g_params['isForceRun'] = False488    g_params['base_www_url'] = ""489    return g_params490#}}}491if __name__ == '__main__' :492    g_params = InitGlobalParameter()...Create.py
Source:Create.py  
...65            """ALTER TABLE bl_runjob ADD66               (CONSTRAINT bl_runjob_fk1 FOREIGN KEY(wmbs_id)67               REFERENCES wmbs_job(id) ON DELETE CASCADE)"""68        self.constraints["01_idx_bl_runjob"] = \69            """CREATE INDEX idx_bl_runjob_wmbs ON bl_runjob(wmbs_id) %s""" % tablespaceIndex70        self.constraints["02_fk_bl_runjob"] = \71            """ALTER TABLE bl_runjob ADD72               (CONSTRAINT bl_runjob_fk2 FOREIGN KEY(sched_status)73               REFERENCES bl_status(id) ON DELETE CASCADE)"""74        self.constraints["02_idx_bl_runjob"] = \75            """CREATE INDEX idx_bl_runjob_status ON bl_runjob(sched_status) %s""" % tablespaceIndex76        self.constraints["03_fk_bl_runjob"] = \77            """ALTER TABLE bl_runjob ADD78               (CONSTRAINT bl_runjob_fk3 FOREIGN KEY(user_id)79               REFERENCES wmbs_users(id) ON DELETE CASCADE)"""80        self.constraints["03_idx_bl_runjob"] = \81            """CREATE INDEX idx_bl_runjob_users ON bl_runjob(user_id) %s""" % tablespaceIndex82        self.constraints["04_fk_bl_runjob"] = \83            """ALTER TABLE bl_runjob ADD84               (CONSTRAINT bl_runjob_fk4 FOREIGN KEY(location)85               REFERENCES wmbs_location(id) ON DELETE CASCADE)"""86        self.constraints["04_idx_bl_runjob"] = \87            """CREATE INDEX idx_bl_runjob_location ON bl_runjob(location) %s""" % tablespaceIndex88        j = 5089        for i in self.sequence_tables:90            seqname = '%s_SEQ' % i91            self.create["%s%s" % (j, seqname)] = \92                "CREATE SEQUENCE %s start with 1 increment by 1 nomaxvalue cache 100" \93                % seqname94        return95    def execute(self, conn=None, transaction=None):96        """97        _execute_98        Check to make sure that all required tables have been defined.  If99        everything is in place have the DBCreator make everything.100        """101        for requiredTable in self.requiredTables:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
