Best Python code snippet using fMBT_python
blast_using_grid.py
Source:blast_using_grid.py  
1#!/usr/bin/python2__author__ = "Kishori M Konwar"3__copyright__ = "Copyright 2013, MetaPathways"4__credits__ = ["r"]5__version__ = "1.0"6__maintainer__ = "Kishori M Konwar"7__status__ = "Release"8try:9   from optparse import make_option10   from os import makedirs, path, listdir, remove11   import os, sys, re, errno, shutil12   from glob import glob13   from datetime import date14   import traceback15   from libs.python_modules.utils.sysutil import getstatusoutput, pathDelim16   from libs.python_modules.parsers.parse  import parse_metapaths_parameters17   from libs.python_modules.utils.metapathways_utils import fprintf, printf,  WorkflowLogger, generate_log_fp18   from libs.python_modules.utils import *19   from libs.python_modules.grid.BlastService import BlastService20   from libs.python_modules.grid.GridParam import GridParam21   from libs.python_modules.grid.BlastBroker import BlastBroker22   import libs.python_scripts23except:24     print traceback.print_exc(10)25     print """ Could not load some user defined  module functions"""26     print """ Make sure your typed \"source MetaPathwaysrc\""""27     print """ """28     sys.exit(3)29PATHDELIM = pathDelim()30def copyFile(src, dst):31    try:32        shutil.copytree(src, dst)33    except OSError as exc: # python >2.534        if exc.errno == errno.ENOTDIR:35            shutil.copy(src, dst)36        else: raise37def dry_run_status( commands ):38    for command in commands:39        #printf("%s", command[0])40        if command[4] == True:41           printf("%s", " Required")42        else:43           printf("%s", " Not Required")44    printf("\n")45def format_db(formatdb_executable, seqType, refdb_sequence_file, algorithm):46     if algorithm=='BLAST':47         cmd='%s -dbtype %s -in %s' %(formatdb_executable, seqType, refdb_sequence_file)48     if algorithm=='LAST':49         dirname = os.path.dirname(refdb_sequence_file)    50         cmd='%s -p -c %s  %s' %(formatdb_executable, refdb_sequence_file, refdb_sequence_file)51         52     result= getstatusoutput(cmd)53     if result[0]==0:54        return True55     else:56        return False57# decide if a command should be run if it is overlay,58# when results are alread computed decide not to run59def shouldRunStep(run_type, expected_output):60    if  run_type =='overlay'  and  path.exists(expected_output):61        return False62    else:63        return True 64    #print enable_flag65def formatted_db_exists(dbname):66    fileList = glob(dbname) 67    if len(fileList) > 0:68       return True69    else: 70       return False71def check_if_db_exists(dbname):72    if path.exists(dbname):73       return True74    else: 75       return False76def  make_sure_map_file_exists(dbmapfile):77    if not doFilesExist( [dbmapfile ] ):78         print 'WARNING: ' + 'Creating the database map file'79         fullRefDbName = re.sub(r'-names.txt','',dbmapfile)80         mapfile = open(dbmapfile,'w')81         fullRefDbFile = open(fullRefDbName,'r')82         for line in fullRefDbFile:83             if re.match(r'>', line):84                 fprintf(mapfile, "%s\n",line.strip())85         mapfile.close()86         fullRefDbFile.close()87#gets the parameter value from a category as specified in the 88# parameter file89def get_parameter(config_params, category, field, default = None):90    if config_params == None:91      return default92    if category in config_params:93        if field in config_params[category]:94            return config_params[category][field]95        else:96            return default97    return default98# parameter file99def get_make_parameter(config_params,category, field, default = False):100    if category in config_params:101        if field in config_params[category]:102            return config_params[category][field]103        else:104            return default105    return default106def get_pipeline_steps(steps_log_file):107    #print steps_log_file108    try:109       logfile = open(steps_log_file, 'r')110    except IOError:111       print "Did not find " + logfile + "!" 112       print "Try running in \'complete\' run-type"113    else:114       lines = logfile.readlines()115#       for line in lines:116#           print line117    pipeline_steps = None118    return pipeline_steps119# This function reads the pipeline configuration file and sets the 120# paths to differenc scripts and executables the pipeline call121def read_pipeline_configuration( file ):122    try:123       configfile = open(file, 'r')124    except IOError:125       print "Did not find pipeline config " + file + "!" 126    else:127       lines = configfile.readlines()128    config_settings = {}129    for line in lines:130        if not re.match("#",line) and len(line.strip()) > 0 :131           line = line.strip()132           line = re.sub('\t+', ' ', line)133           line = re.sub('\s+', ' ', line)134           line = re.sub('\'', '', line)135           line = re.sub('\"', '', line)136           fields = re.split('\s', line)137           if not len(fields) == 2:138              print """     The following line in your config settings files is set set up yet"""139              print """     Please rerun the pipeline after setting up this line"""140              print """ Error ine line :""" + line141              sys.exit(-1);142              143#           print fields[0] + "\t" + fields[1]144           if PATHDELIM=='\\':145              config_settings[fields[0]] = re.sub(r'/',r'\\',fields[1])   146           else:147              config_settings[fields[0]] = re.sub(r'\\','/',fields[1])   148           149    config_settings['METAPATHWAYS_PATH'] = config_settings['METAPATHWAYS_PATH'] + PATHDELIM150    config_settings['REFDBS'] = config_settings['REFDBS'] + PATHDELIM151    152    #check_config_settings(config_settings, file);153    return config_settings154# has the results to use155def hasResults(expected_output):156    if  path.exists(expected_output):157        return True158    else:159        return False160# has the results to use161def hasResults1(dir , expected_outputs):162    if  doFilesExist(expected_outputs, dir =  dir):163        return True164    else:165        return False166# if the directory is empty then there is not precomputed results167# and so you should decide to run the command168def shouldRunStepOnDirectory(run_type, dirName):169    dirName = dirName + PATHDELIM + '*'170    files = glob(dirName)171    if len(files)==0:172      return True173    else:174      return False175# if the command is "redo" then delete all the files176# in the folder and then delete the folder too177def removeDirOnRedo(command_Status, origFolderName):178    if command_Status=='redo' and path.exists(origFolderName) :179       folderName = origFolderName + PATHDELIM + '*'180       files = glob(folderName)181       for  f in files:182         remove(f)183       if path.exists(origFolderName): 184         shutil.rmtree(origFolderName)185# if the command is "redo" then delete the file186def removeFileOnRedo(command_Status, fileName):187    if command_Status=='redo' and path.exists(fileName) :188        remove(fileName)189        return True190    else:191        return False192# remove all the files in the directory on Redo193def cleanDirOnRedo(command_Status, folderName):194    if command_Status=='redo':195       cleanDirectory(folderName)196# remove all the files in the directory197def cleanDirectory( folderName):198    folderName = folderName + PATHDELIM + '*'199    files = glob(folderName)200    for  f in files:201       remove(f)202# if folder does not exist then create one203def checkOrCreateFolder( folderName ):204    if not path.exists(folderName) :205        makedirs(folderName)206        return False207    else:208        return True209#does the file Exist?210def doFilesExist( fileNames, dir="" ):211    for fileName in fileNames:212       file = fileName213       if dir!='':214         file = dir + PATHDELIM + fileName215       if not path.exists(file):216          return False217    return True218# check if all of the metapaths_steps have 219# settings from the valid list [ yes, skip stop, redo]220def  checkParam_values(allcategorychoices, parameters):221     for category in allcategorychoices:222        for choice in allcategorychoices[category]:223           if choice in parameters: 224#             print choice + " " + parameters[choice]  225#             print allcategorychoices[category][choice] 226             if not parameters[choice] in allcategorychoices[category][choice]:227                 print "ERROR: Incorrect setting in your parameter file"228                 print "       for step " + choice + " as  " + parameters[choices]229                 sys.exit(0)230def checkMetapaths_Steps(config_params):231     choices = { 'metapaths_steps':{}, 'annotation':{}, 'INPUT':{} }232     choices['INPUT']['format']  = ['fasta', 'gbk_unannotated', 'gbk_annotated', 'gff_unannotated', 'gff_annotated']233     choices['annotation']['algorithm'] =  ['last', 'blast'] 234     choices['metapaths_steps']['PREPROCESS_FASTA']   = ['yes', 'skip', 'stop', 'redo']235     choices['metapaths_steps']['ORF_PREDICTION']  = ['yes', 'skip', 'stop', 'redo']236     choices['metapaths_steps']['GFF_TO_AMINO']    = ['yes', 'skip', 'stop', 'redo']237     choices['metapaths_steps']['FILTERED_FASTA']  = ['yes', 'skip', 'stop', 'redo']238     choices['metapaths_steps']['COMPUTE_REFSCORE']    = ['yes', 'skip', 'stop', 'redo']239     choices['metapaths_steps']['BLAST_REFDB'] = ['yes', 'skip', 'stop', 'redo', 'grid']240     choices['metapaths_steps']['PARSE_BLAST'] = ['yes', 'skip', 'stop', 'redo']241     choices['metapaths_steps']['SCAN_rRNA']   = ['yes', 'skip', 'stop', 'redo']242     choices['metapaths_steps']['STATS_rRNA']  = ['yes', 'skip', 'stop', 'redo']243     choices['metapaths_steps']['ANNOTATE']    = ['yes', 'skip', 'stop', 'redo']244     choices['metapaths_steps']['PATHOLOGIC_INPUT']    = ['yes', 'skip', 'stop', 'redo']245     choices['metapaths_steps']['GENBANK_FILE']    = ['yes', 'skip', 'stop', 'redo']246     choices['metapaths_steps']['CREATE_SEQUIN_FILE']  = ['yes', 'skip', 'stop', 'redo']247     choices['metapaths_steps']['CREATE_REPORT_FILES']  = ['yes', 'skip', 'stop', 'redo']248     choices['metapaths_steps']['SCAN_tRNA']   = ['yes', 'skip', 'stop', 'redo']249     choices['metapaths_steps']['MLTREEMAP_CALCULATION']   = ['yes', 'skip', 'stop', 'redo']250     choices['metapaths_steps']['MLTREEMAP_IMAGEMAKER']    = ['yes', 'skip', 'stop', 'redo']251     choices['metapaths_steps']['PATHOLOGIC']  = ['yes', 'skip', 'stop', 'redo']252     if config_params['metapaths_steps']:253        checkParam_values(choices, config_params['metapaths_steps'])254def isValidInput(output_dir, samples_and_input, dbs, gridSettings, config_settings,  messagelogger): 255    if not doesFolderExist(output_dir):   256        messagelogger.write("ERROR: Output folder \"%s\" does not exist!\n" %(output_dir)) 257        return False258    else:259        messagelogger.write("OK: Output folder \"%s\" exists!\n" %(output_dir)) 260    for sample, inputfile in samples_and_input.iteritems():261       if not doesFolderExist(output_dir + PATHDELIM + sample):   262          messagelogger.write("ERROR: Sample folder  \"%s\" for sample \"%s\" not found!\n" %(output_dir + PATHDELIM + sample, sample)) 263          return False264       else:265          messagelogger.write("OK: Sample folder  \"%s\" for sample \"%s\" exists!\n" %(output_dir + PATHDELIM + sample, sample)) 266       if not doesFileExist(inputfile):   267          messagelogger.write("ERROR: Input file \"%s\" does not exist!\n" %(inputfile)) 268          return False269       else:270          messagelogger.write("OK: Input file \"%s\" exists!\n" %(inputfile)) 271    for db in  dbs:272       if not doesFileExist(config_settings['REFDBS'] + PATHDELIM + 'functional' + PATHDELIM + db):   273          messagelogger.write("ERROR: Database file \"%s\" does not exist!\n" %(config_settings['REFDBS'] + PATHDELIM + db)) 274          return False275       else:276          messagelogger.write("OK: Database file \"%s\" found!\n" %(config_settings['REFDBS'] + PATHDELIM + db)) 277    for gridsetting in gridSettings:278         if 'server' in gridsetting:279            if not 'user' in gridsetting:280                 messagelogger.write("ERROR: User in grid servers \"%s\" not specified!\n" %(gridsetting['server'])) 281                 return False282         else:283                 messagelogger.write("OK: Specification for Grid\"%s\" with user \"%s\" found!\n" %(gridsetting['server'], gridsetting['user'])) 284    return True285#################################################################################286###########################  BLAST ##############################################287#################################################################################288def blast_in_grid(input_files, output_dir, config_params, metapaths_config, config_file, run_type):289    algorithm = get_parameter(config_params, 'annotation', 'algorithm', default='BLAST').upper()290    messagelogger = WorkflowLogger(generate_log_fp(output_dir, basefile_name='metapathways_messages', suffix='txt'),\291                    open_mode='w')292    command_Status=  get_parameter(config_params,'metapaths_steps','BLAST_REFDB')293    config_settings = read_pipeline_configuration( config_file )294#   preprocessed_dir = output_dir + PATHDELIM + "preprocessed" + PATHDELIM295    orf_prediction_dir =   "orf_prediction"  296#   genbank_dir =  output_dir + PATHDELIM + "genbank"  + PATHDELIM297    output_run_statistics_dir = output_dir + PATHDELIM + "run_statistics"  +PATHDELIM298    blast_results_dir =  output_dir +  PATHDELIM + "blast_results"  + PATHDELIM299    output_results = output_dir + PATHDELIM + "results" + PATHDELIM 300    #---301    # create the sample and input pairs 302    samples_and_input = {}303    for input_file in input_files:304       sample_name = re.sub(r'[.][a-zA-Z]*$','',input_file)305       sample_name = path.basename(sample_name)306       sample_name = re.sub('[.]','_',sample_name)307       samples_and_input[sample_name] =  output_dir + PATHDELIM + sample_name + PATHDELIM + orf_prediction_dir + PATHDELIM +  sample_name + ".qced.faa"   308    309    310    # BLAST THE ORFs AGAINST THE REFERENCE DATABASES  FOR FUNCTIONAL ANNOTATION311    dbstring = get_parameter(config_params, 'annotation', 'dbs', default=None)312    dbs= dbstring.split(",")313    #parse the grid settings from the param file314    gridEnginePATTERN = re.compile(r'(grid_engine\d+)')315    trueOrYesPATTERN = re.compile(r'^[yYTt]')316    gridSettings = []317    for key in config_params:318       match = gridEnginePATTERN.match(key)319       if match ==None:320           continue321       if 'active' in config_params[key]:322           trueOrYes =  trueOrYesPATTERN.match(config_params[key]['active'])323           if trueOrYes:  # this grid is inactive324               # proceed with adding the grid325               match = gridEnginePATTERN.match(key)326               if match:327                  gridSettings.append(config_params[key])328    329    if not isValidInput(output_dir, samples_and_input, dbs, gridSettings, config_settings = config_settings,\330         messagelogger = messagelogger): 331       sys.exit(0)332       333    blastbroker = BlastBroker(messagelogger) # setup the broker with a message logger334    blastbroker.setBaseOutputFolder(output_dir)  #set up the output folder 335    blastbroker.addSamples(samples_and_input)   # add the samples and the input files336    337    # add databases against the samples338    for sample in samples_and_input:339       for db in dbs:340          blastbroker.addDatabase(sample, db)341       blastbroker.addAlgorithm(sample, algorithm)   # add the algorithms342       343    # setup services and add them to the Broker 344    for gridsetting in gridSettings:345        gridsetting['messagelogger']=messagelogger346        gridsetting['MetaPathwaysDir']=config_settings['METAPATHWAYS_PATH']347        gridsetting['base_output_folder']=blastbroker.base_output_folder348        gridsetting['blast_db_folder']=config_settings['REFDBS'] + PATHDELIM + 'functional'349        try:350          blastservice = BlastService(gridsetting)351        except:352          print traceback.format_exc(10)353        blastbroker.addService(blastservice)354    # create the work space folders355    if  blastbroker.are_working_folders_available():356       messagelogger.write("STATUS: Local working folders for Grid found!\n")357    elif blastbroker.create_working_folders():358       messagelogger.write("OK: Successfully created the grid related local working folders!\n")359    else:360       messagelogger.write("ERROR: Cannot create the grid working folders!\n")361       messagelogger.write("ERROR: Exiting blast in grid mode!\n")362       return363    364    # check if the input files are already split365    messagelogger.write("STATUS: Checking if input files are already split!\n")366#    for s in blastbroker.getSamples():367#       if not blastbroker.doesValidSplitExist(s):368#          messagelogger.write("STATUS: Did not find any previously split files for sample \"%s\"!\n" %(s))369#          if not blastbroker.splitInput(s): #if not then split370#             messagelogger.write("ERROR: Cannot split the files for some or all of the samples!\n")371#             sys.exit(0)372#          else:373#             messagelogger.write("SUCCESS: Successfully split the files for some or all of the samples!\n")374#       else:375#          messagelogger.write("OK: Found previously split files for sample \"%s\"!\n" %(s))376#           377    messagelogger.write("STATUS: Competed checks for file splits!\n")378    batch_size = int(get_parameter(config_params, 'grid_submission', 'batch_size', default=1000))379    blastbroker.setBatchSize(batch_size)380    381    382    # check if the input files are already split383    for s in blastbroker.getSamples():384       if not blastbroker.doesValidSplitExist(s):385          messagelogger.write("STATUS: Did not find any previously split files for sample \"%s\"!\n" %(s))386          if not blastbroker.splitInput(s): #if not then split387             print ("ERROR: Cannot split the files for some or all of the samples!\n")388             messagelogger.write("ERROR: Cannot split the files for some or all of the samples!\n")389             sys.exit(0)390          else:391             messagelogger.write("SUCCESS: Successfully split the files for some or all of the samples!\n")392       else:393          messagelogger.write("OK: Found previously split files for sample \"%s\"!\n" %(s))394           395    # load the list of splits396    blastbroker.load_list_splits()397    messagelogger.write("SUCCESS: Successfully loaded the list of file splits!\n")398    399    # create the databse and split combinations as jobs for each sample400    blastbroker.createJobs(redo=False)401    messagelogger.write("SUCCESS: Successfully created the (split, database) pairs!\n")402    403    # make sure you loaded the latest job lists on file404    blastbroker.load_job_lists()405    messagelogger.write("SUCCESS: Successfully recovered the old/existing job list!\n")406    # for each sample load the submitted and completed lists407    # and compute the loadper Server408    blastbroker.load_job_status_lists()409    messagelogger.write("SUCCESS: Successfully loaded the status of the jobs!\n")410    blastbroker.compute_performance()411    try:412       blastbroker.compute_server_loads()413    except:414       print traceback.format_exc(10)415    #print blastbroker.list_jobs_submitted416    #print blastbroker.list_jobs_completed417    #blastbroker.launch_AWS_grid()418    blastbroker.setupStatsVariables()419    messagelogger.write("STATUS: Getting ready to submit jobs to the servers!\n")420    blastbroker.Do_Work()421    #blastbroker.stop_AWS_grid()422    blastbroker.Delete_Remote_Directories()423    424    #print output_dir    425    #print samples_and_input426    #print dbs427    #print gridSettings428    429    message = "\n6. Blasting using Grid ORFs against reference database - "430    #################################...processFileOps.py
Source:processFileOps.py  
1import json2import logging3import os4import com.dataanalytics.utility.constants as const5import com.dataanalytics.utility.messageLogger as ml6import pandas as pd7from pyspark.sql import SparkSession8from pyspark.sql.types import StructType9class ProcessFileOps:10    def __init__(self, spark):11        self.spark = spark12        self.messageLogger = ml.MessageLogger(const.getProjectName(__file__), "Logger Started")13        self.messageLogger.logInfo("Processing read and write operations from files")14    def processCSVoutput(self, df, file_name):15        fn_csv = const.processedFile + file_name +"_csv"16        msg = self.createFolder(fn_csv)17        self.messageLogger.logInfo(msg)18        try:19            self.messageLogger.logInfo("starting csv write operation")20            # pandasDF = df.toPandas()21            # pandasDF.to_csv(fn_csv, index=False, line_terminator='\n')22            df.write.csv(fn_csv, header=True)23            self.messageLogger.logInfo("successfully loaded data into csv file")24        except Exception as e:25            self.messageLogger.logError("!!!Error writing to csv ::: " + str(e.__class__) + "occurred.")26    def processParquetOutput(self, df, file_name, partion_size, partition_col):27        print(partition_col)28        # fn_parquet = const.processedFile + file_name + ".gzip"29        fn_parquet = const.processedFile + file_name + "_parquet"30        msg = self.createFolder(fn_parquet)31        self.messageLogger.logInfo(msg)32        try:33            self.messageLogger.logInfo("starting parquet write operation")34            # pandasDF = df.toPandas()35            # pandasDF.to_parquet(fn_parquet, compression='gzip')36            df.repartition(partion_size).write.partitionBy(partition_col).parquet(fn_parquet)37            self.messageLogger.logInfo("successfully loaded data into parquet")38        except Exception as e:39            self.messageLogger.logError("!!!Error writing to parquet ::: " + str(e.__class__) + "occurred.")40        # Test Parquet Data41        # a = pd.read_parquet(self.fn_parquet)42        # print(a.to_string(index=False))43        # pandasDF.to_parquet(self.fn_parquet,compression = 'gzip') #, index=False, line_terminator='\n')44    def processJsonOutput(self, df, file_name):45        # fn_json = open(const.processedFile + file_name + ".json", 'w+')46        fn_json = const.processedFile + file_name + "_json"47        msg = self.createFolder(fn_json)48        self.messageLogger.logInfo(msg)49        try:50            self.messageLogger.logInfo("starting Json write operation")51            # pandasDF = df.toPandas().reset_index()52            # pandasDF.to_json(fn_json)53            df.write.json(fn_json)54            self.messageLogger.logInfo("successfully loaded data into json")55        except Exception as e:56            self.messageLogger.logError("!!!Error writing to Json  ::: " + str(e.__class__) + "occurred.")57    def readCSV(self, file_name):58        df = const.createEmptyDF(self.spark)59        try:60            self.messageLogger.logInfo("Reading from CSV file.")61            df = self.spark.read.csv(file_name, header=True)62            self.messageLogger.logInfo("File reading finished successfully.")63        except Exception as e:64            self.messageLogger.logError(65                "unable to read the file, exception occurred: " + str(e.__class__) + "occurred.")66        return df67    def createFolder(self, fp):68        msg = ""69        try:70            import shutil71            shutil.rmtree(fp)72            msg = "folder successfully deleted"73        except Exception as e:74            msg = "folder not found"75    def writeTextFile(self, file_name, text):76        fp = open(file_name,'w+')77        fp.write(text)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
