Best Python code snippet using stestr_python
alignStats.py
Source:alignStats.py  
1#!/usr/bin/env python2"""3Extract alignment statistics from a SAM/BAM file.4Adapted from the Celloline stats script5available at: https://github.com/Teichlab/celloline/blob/master/lib/stats.py6"""7import os8import sys9import re10import argparse11import pysam12import logging13import cPickle as pickle14from collections import Counter, defaultdict, OrderedDict15from intervaltree import IntervalTree16from joblib import Parallel, delayed17#LOAD GTF FILE18def load_gtf(gtf_path):19    """20    Load a GTF annotation and create an index using IntervalTrees.21    Args:22        gtf_path: Path to the GTF file to load.23    Returns:24        Dictionary containing IntervalTree indexes of the annotation.25    """26    gtf_index = defaultdict()27    with open(gtf_path) as gtf_file:28        for line in gtf_file:29            if not line.startswith("#"):30                entry = line.split("\t")31                entry_addition = entry[8]32                entry_addition = entry_addition.split(";")33                entry_addition = entry_addition[0].split(" ")34                gene_id = entry_addition[1]35                feature = entry[2]36                #TYPE(Gene, exon etc.), START, END, STRAND, gene_ID37                info = [feature, entry[3], entry[4], entry[6], gene_id]38                #Build GTF INDEX39                if feature != "" and entry[3] != entry[4]:40                    if entry[0] in gtf_index:41                        index = gtf_index[entry[0]]42                    else:43                        index = IntervalTree()44                    index.addi(int(info[1]), int(info[2]), info)45                    gtf_index[entry[0]] = index46    return gtf_index47def gen_stats(input_file, input_type, sample_name, gtf_dict):48    """49    Generate alignment stats from a SAM/BAM file.50    Loop over alignments in a SAM/BAM file and extract statistics such as the51    numer of reads aligned to introns, exons, intergenic regions etc.52    Args:53        input_file: An open BAM or SAM file.54        input_type: Whether the file is 'bam' or 'sam'.55        sample_name: A name relating to this file.56        gtf_dict: Dictionary containing GTF index.57    Returns:58        Dictionary containing alignment statistics.59    """60    logger = logging.getLogger("stats." + sample_name[0:10])61    #OUTPUT TABLE CONTAING STATS62    output_table = OrderedDict()63    #Dict indicating to which genes a specific read maps to64    #It is a temporary dict65    exonic_mappings_temp = defaultdict(str)66    #Dict indicating which read is multi-mapped67    #It is a temporary dict68    exonic_multi_table = defaultdict(str)69    # Sample70    output_table["sample"] = sample_name71    #MAPPABILITY72    output_table["total"] = 073    output_table["mapped"] = 074    output_table["unmapped"] = 075    output_table["unique"] = 076    output_table["multi"] = 077    #CODING VERSUS NON-CODING REGIONS78    output_table["intergenic"] = 079    output_table["intragenic"] = 080    output_table["exonic"] = 081    output_table["intronic"] = 082    output_table["ambigious"] = 083    #CODING REGIONS MAPPABILITY84    output_table["exonicU"] = 085    output_table["exonicM"] = 086    #ALIGNMENT CODING VS NONCODING87    output_table["alignments"] = 088    output_table["multi-intergenic"] = 089    output_table["multi-intragenic"] = 090    output_table["multi-exonic"] = 091    output_table["multi-intronic"] = 092    output_table["multi-ambigious"] = 093    #ERROR94    output_table["perfect"] = 095    output_table["partly_perfect"] = 096    output_table["mapped_no_correct"] = 097    for i in range(0, 10):98        output_table["S_" + str(i)] = 099    output_table["S_10+"] = 0100    output_table["I"] = 0101    output_table["D"] = 0102    output_table["INDEL"] = 0103    reads = Counter()104    if input_type == "bam":105        ref_map = input_file.references106        input_file = input_file.fetch(until_eof=True)107    line_count = 0108    for line in input_file:109        line_count += 1110        if input_type == "bam":                 # BAM input line111            split = str(line).split("\t")112            split[2] = ref_map[int(split[2])]113            split[3] = int(split[3]) + 1114        elif not line.startswith("@"):         # SAM input line115            split = line.split("\t")116        else:117            continue118        read_name = split[0]119        flag_code = int(split[1])120        chrom = split[2]121        pos = split[3]122        errors = split[5]123        errors_a = list(errors)124        number = ""125        num = 0126        error_table = defaultdict(int)127        name_and_flag = read_name128        #CHECK IF READ MAPPED OR UNMAPPED129        #IT US UNMAPPED130        if flag_code & 0x0004 != 0:131            output_table["unmapped"] += 1132            output_table["total"] += 1133            error_table["*"] += 1134        #IT IS MAPPED135        else:136            if flag_code & 0x0001 != 0:         #This is paired end137                if flag_code & 0x0040 != 0:     #1st read138                    name_and_flag += ";first"139                if flag_code & 0x0080 != 0:     #2nd read140                    name_and_flag += ";second"141            # CHECK TO WHICH GENE(S) IT MAPPED TO142            genes_info, num_genes, num_exons = get_gene(gtf_dict, [chrom, pos])143            output_table["alignments"] += 1.0144            #STATS145            if name_and_flag not in reads:146                reads[name_and_flag] += 1147                output_table["unique"] += 1148                output_table["total"] += 1149                output_table["mapped"] += 1150                if num_genes == 0:151                    output_table["intergenic"] += 1152                elif num_genes == 1:153                    output_table["intragenic"] += 1154                    if num_exons == 0:155                        output_table["intronic"] += 1156                    else:157                        output_table["exonic"] += 1158                        output_table["exonicU"] += 1159                        exons = []160                        if name_and_flag in exonic_mappings_temp:161                            exons = exonic_mappings_temp[name_and_flag]162                        exons.append([genes_info[0], chrom, pos])163                        exonic_mappings_temp[name_and_flag] = exons164                elif num_genes > 1:165                    output_table["ambigious"] += 1166            #READ IS MULTI-MAPPED167            else:168                if reads[name_and_flag] == 1:169                    output_table["unique"] -= 1170                    output_table["exonicU"] -= 1171                    output_table["multi"] += 1172                reads[name_and_flag] += 1173                exons = []174                #GET KNOWLEDGE IF FIRST MAPPING EXONIC OR INTRONIC175                if name_and_flag in exonic_mappings_temp:176                    exons = exonic_mappings_temp[name_and_flag]177                if num_genes == 0:178                    output_table["multi-intergenic"] += (1)179                elif num_genes == 1:180                    output_table["multi-intragenic"] += (1)181                    if num_exons == 0:182                        output_table["multi-intronic"] += (1)183                    else:184                        output_table["multi-exonic"] += (1)185                        exons.append([genes_info[0], chrom, pos])186                elif num_genes > 1:187                    output_table["multi-ambigious"] += (1)188                #IF AT LEAST ONE EXONIC ALIGNMENT189                if len(exons) > 0:190                    exonic_multi_table[name_and_flag] = exons191            #PARSE MAPPING ERRORS192            for i in errors_a:193                if re.match("[0-9]", i):194                    number += (i)195                elif re.match("[A-Z]", i):196                    num = int(number)197                    error_table[i] += num198                    number = ""199            #TABLE OF HOW MANY READS MAP PERFECT, PARTLY PERFECT ETC200            if "M" in  error_table and len(error_table) == 1:201                output_table["perfect"] += 1202            elif "M" in error_table and len(error_table) > 1:203                output_table["partly_perfect"] += 1204            elif "M" not in error_table and "*" not in error_table:205                output_table["mapped_no_correct"] += 1206            if "S" in error_table:207                if int(error_table["S"]) < 10:208                    output_table["S_" + str(error_table["S"])] += 1209                else:210                    output_table["S_10+"] += 1211            elif "S" not in error_table:212                output_table["S_0"] += 1213            if "I" in error_table:214                output_table["I"] += 1215            if "D" in error_table:216                output_table["D"] += 1217            if "I" in error_table or "D" in error_table:218                output_table["INDEL"] += 1219        if (line_count % 1000000) == 0:220            logger.debug(sample_name + " line " + str(line_count) + "...")221    output_table["exonicM"] = len(exonic_multi_table.keys())222    return output_table223def get_stats_line(stats_table):224    """225    Get an output line from a stats table.226    Args:227        stats_table: Dictionary of alignment statistics.228    Returns:229        String representing the results for one file.230    """231    logger = logging.getLogger("stats.extract")232    out_line = ""233    for stat, value in stats_table.iteritems():234        if stat in ["unique", "multi", "intragenic", "intergenic",235                    "exonic", "intronic", "ambigious", "exonicM", "exonicU"]:236            value = (value + 0.0) / (stats_table["mapped"] + 0.0)237            value = "%.2f" % (100.0 * (value))238        elif stat in ["multi-intragenic", "multi-intergenic", "multi-exonic",239                      "multi-intronic", "multi-ambigious"]:240            value = (value + 0.0)241            if stats_table["alignments"] != 0:242                value = value / (stats_table["alignments"] + 0.0)243            value = "%.2f" % (100.0 * (value))244        value = str(value)245        if not stat == "sample":246            out_line += "," + value247        else:248            out_line += value249        logger.debug(stat + " : " + value)250    out_line += "\n"251    return out_line252def write_stats(output_path, stats_list):253    """254    Write a series of results to a file.255    Args:256        output_path: Path to write results to.257        stats_list: List of dictionaries containing results from input files.258    """259    cols = stats_list[0].keys()260    with open(output_path, "w") as out_file:261        out_file.write(",".join(cols) + "\n")262        for stats_table in stats_list:263            stats_line = get_stats_line(stats_table)264            out_file.write(stats_line)265def get_gene(gtf_dict, pos_pair):266    """267    Identify which genes overlap a given position.268    Args:269        gtf_dict: Dictionary containing GTF index.270        pos_pair: Tuple containing genomic position (chrom, pos).271    Returns:272        Tuple containing the list of overlapping genes, the number of273        overlapping genes and the number of overlapping exons.274    """275    num_genes = 0276    num_exons = 0277    if pos_pair[0] not in gtf_dict:278        #print ("Ignored pos: " + pos_pair[0])279        return ([], num_genes, num_exons)280    entries = gtf_dict[pos_pair[0]]281    pos = int(pos_pair[1])282    found = []283    found = entries.search(pos)284    gene_list = []285    for entry in found:286        info = entry[2]287        if info[0] == "gene":288            gene_list.append(info)289            num_genes += 1290        elif info[0] == "exon":291            num_exons += 1292    return (gene_list, num_genes, num_exons)293def process_file(input_file, input_type, index, is_parallel):294    """295    Process an individual SAM/BAM file.296    How we want to process the file depends on the input type and whether we297    are operating in parallel. If in parallel the index must be loaded for each298    input file. If the input is a BAM file it needs to be read using Pysam, if299    SAM it can be read directly as a text file.300    Args:301        input_file: Path to the input file.302        input_type: Whether the file is 'bam' or 'sam'.303        index: If operating in parallel a string to the index file, if not the304               loaded GTF index dictionary.305        is_parallel: Whether to operate in parallel.306    Returns:307        Dictionary containing alignment statistics for the input file.308    """309    sample_name = input_file.split("/")[-1]310    logger = logging.getLogger("stats." + sample_name[0:10])311    logger.info("Processing " + sample_name + "...")312    if is_parallel:313        logger.info("Loading index...")314        with open(index, "rb") as index_file:315            loaded_index = pickle.load(index_file)316        logger.info("Loaded.")317    else:318        loaded_index = index319    if input_type == "sam":320        logger.info("Parsing SAM file...")321        with open(input_file) as sam:322            output_table = gen_stats(sam, input_type, sample_name, loaded_index)323    elif input_type == "bam":324        logger.info("Parsing BAM file...")325        bam = pysam.AlignmentFile(input_file, "rb")326        output_table = gen_stats(bam, input_type, sample_name, loaded_index)327    logger.info("Finished " + sample_name)328    return output_table329def get_index(args):330    """331    Load a GTF index if available or create from GTF file if not found.332    If a valid path to an index file is given that file will be loaded. If no333    index file was specified or the file does not exist the annotation will be334    read from a GTF file. It will then be pickled if an index file is specified.335    When running in parallel the path to the index file is returned rather than336    the index dictionary itself.337    Args:338        args: Options from the command line.339    Returns:340        Dictionary containing GTF index or path to index file if in parallel.341    """342    logger = logging.getLogger("stats.index")343    if args.index and os.path.isfile(args.index):344        logger.info("Index found at " + args.index)345        if not args.is_parallel:346            logger.info("Loading index...")347            with open(args.index, "rb") as index_file:348                index = pickle.load(index_file)349            logger.info("Loaded.")350        else:351            index = args.index352    elif args.gtf and os.path.isfile(args.gtf):353        logger.info("No index file found.")354        logger.info("Loading GTF file...")355        gtf_dict = load_gtf(args.gtf)356        logger.info("Loaded.")357        if args.index:358            logger.info("Saving index to " + args.index + "...")359            with open(args.index, "wb") as index_file:360                pickle.dump(gtf_dict, index_file, -1)361            logger.info("Saved.")362        if not args.is_parallel:363            index = gtf_dict364        else:365            index = args.index366    return index367def get_args():368    """369    Read arguments from the command line and check they are valid.370    """371    logger = logging.getLogger("stats.args")372    parser = argparse.ArgumentParser(373        description="Extract alignment statistics from a SAM/BAM file")374    parser.add_argument("inputs",375                        metavar="SAM/BAM",376                        nargs="+",377                        help="Input SAM or BAM files")378    parser.add_argument("-o", "--out",379                        help="Output file",380                        required=True)381    parser.add_argument("-g", "--gtf",382                        help="GTF annotation file")383    parser.add_argument("-i", "--index",384                        help="""Annotation index file. Required when385                                operating in parallel.""")386    parser.add_argument("-t", "--type",387                        choices=["sam", "bam"],388                        help="Type of input file",389                        required=True)390    parser.add_argument("-p", "--parallel",391                        type=int,392                        default=1,393                        help="""Number of files to process in parallel.394                             Requires N + 1 threads if greater than 1.""")395    args = parser.parse_args()396    args.is_parallel = False397    if args.parallel < 1:398        logger.error("Number of parallel files must be positive")399        sys.exit()400    elif args.parallel > 1:401        args.is_parallel = True402        logger.info("Running with " + str(args.parallel) + " jobs")403    if args.is_parallel and not args.index:404        logger.error("Index file is required when running in parallel.")405        sys.exit()406    if not (args.index and os.path.isfile(args.index)):407        if not (args.gtf and os.path.isfile(args.gtf)):408            logger.error("No GTF or index file found.")409            sys.exit()410    return args411def setup_logging():412    """413    Setup logging system.414    Log is written to 'alignmentStats.log'.415    """416    logger = logging.getLogger("stats")417    logger.setLevel(logging.DEBUG)418    # create file handler which logs even debug messages419    file_handler = logging.FileHandler('alignmentStats.log')420    file_handler.setLevel(logging.INFO)421    # create console handler with a higher log level422    console_handler = logging.StreamHandler()423    console_handler.setLevel(logging.DEBUG)424    # create formatter and add it to the handlers425    format_str = "[%(asctime)s] %(levelname)s %(name)s: %(message)s"426    formatter = logging.Formatter(format_str, "%Y-%m-%d %H:%M:%S")427    file_handler.setFormatter(formatter)428    format_str = "[%(asctime)s] %(message)s"429    formatter = logging.Formatter(format_str, "%H:%M:%S")430    console_handler.setFormatter(formatter)431    # add the handlers to logger432    logger.addHandler(console_handler)433    logger.addHandler(file_handler)434def main():435    """436    Main function.437    1. Setup logging438    2. Get arguments439    3. Get index440    4. Process files441    5. Write output442    """443    setup_logging()444    logger = logging.getLogger("stats." + __name__)445    args = get_args()446    index = get_index(args)447    logger.warning("Positions not in annotation will be ignored.")448    logger.info("Found " + str(len(args.inputs)) + " input file(s):")449    for input_file in sorted(args.inputs):450        logger.debug(input_file)451    if args.is_parallel:452        stats = Parallel(n_jobs=args.parallel,453                         verbose=100,454                         batch_size=1)(delayed(process_file)(input_file,455                                                             args.type,456                                                             index,457                                                             args.is_parallel)458                                       for input_file in args.inputs)459    else:460        stats = []461        for input_file in args.inputs:462            output_table = process_file(input_file, args.type, index,463                                        args.is_parallel)464            stats.append(output_table)465    write_stats(args.out, stats)466if __name__ == "__main__":...FECfilterService.py
Source:FECfilterService.py  
1from PyQt5.QtWidgets import QTableWidget, QComboBox, QLabel2from PyQt5.QtCore import Qt3import random4class FECfilterService:5    FEC = "FEC baseline"  # EN_2.16    FEC_specific = "FEC baseline (specific)"  # EN_2.27    FEC_future = "FEC future"  # EN_2.38    FEC_future_specific = "FEC future (specific)"  # EN_2.49    FEC_future_variation = "FEC variation"  # EN_2.510    FEC_future_saving = "FEC saving"  # EN_2.611    YEOH_Base = "YEOH baseline"  # EN_15.112    YEOH_Future = "YEOH future"  # EN_15.213    YEOH_Variation = "YEOH variation"  # EN_15.314    def __init__(self, fec_filter_combo_box: QComboBox, description_filter_label: QLabel, mode="future"):15        self.fec_filter_combo_box = fec_filter_combo_box16        self.description_filter_label = description_filter_label17        self.fec_filter_combo_box.clear()18        self.fec_filter_combo_box.insertItem(0, "Select filter:")19        self.fec_filter_combo_box.setItemData(0, 0, Qt.UserRole - 1)20        self.fec_filter_combo_box.insertItem(1, self.FEC)21        self.fec_filter_combo_box.insertItem(2, self.FEC_specific)22        self.fec_filter_combo_box.insertItem(3, self.YEOH_Base)23        if mode == "future":24            self.fec_filter_combo_box.insertItem(4, self.FEC_future)25            self.fec_filter_combo_box.insertItem(5, self.FEC_future_specific)26            self.fec_filter_combo_box.insertItem(6, self.FEC_future_variation)27            self.fec_filter_combo_box.insertItem(7, self.FEC_future_saving)28            self.fec_filter_combo_box.insertItem(8, self.YEOH_Future)29            self.fec_filter_combo_box.insertItem(9, self.YEOH_Variation)30    def update_label(self, new_text):31        self.description_filter_label.setText(new_text + ":")32    def get_filtered_table(self, KPIs):33        if self.fec_filter_combo_box.currentText() == self.FEC:34            return self.apply_filter_FEC(KPIs)35        if self.fec_filter_combo_box.currentText() == self.FEC_specific:36            return self.apply_filter_FEC_specific(KPIs)37        if self.fec_filter_combo_box.currentText() == self.FEC_future:38            return self.apply_filter_FEC(KPIs)39        if self.fec_filter_combo_box.currentText() == self.FEC_future_specific:40            return self.apply_filter_FEC_future_specific(KPIs)41        if self.fec_filter_combo_box.currentText() == self.FEC_future_variation:42            return self.apply_filter_FEC_variation(KPIs)43        if self.fec_filter_combo_box.currentText() == self.FEC_future_saving:44            return self.apply_filter_FEC_saving(KPIs)45        if self.fec_filter_combo_box.currentText() == self.YEOH_Base:46            return self.apply_filter_YEOH_Base(KPIs)47    def apply_filter_FEC(self, KPIs):48        output_table = {}49        sources_names = FECfilterService.get_from_key(KPIs, "sources")50        for index, source in enumerate(sources_names):51            output_table[source] = {}52            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.1R_s" + str(index))53            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.1T_s" + str(index))54            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.1_s" + str(index))55        return output_table56    def apply_filter_FEC_specific(self, KPIs):57        output_table = {}58        sources_names = FECfilterService.get_from_key(KPIs, "sources")59        for index, source in enumerate(sources_names):60            output_table[source] = {}61            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.2R_s" + str(index))62            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.2T_s" + str(index))63            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.2_s" + str(index))64        return output_table65    def apply_filter_FEC_future(self, KPIs):66        output_table = {}67        sources_names = FECfilterService.get_from_key(KPIs, "sources")68        for index, source in enumerate(sources_names):69            output_table[source] = {}70            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.3R_s" + str(index))71            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.3T_s" + str(index))72            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.3_s" + str(index))73        return output_table74    def apply_filter_FEC_future_specific(self, KPIs):75        output_table = {}76        sources_names = FECfilterService.get_from_key(KPIs, "sources")77        for index, source in enumerate(sources_names):78            output_table[source] = {}79            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.4R_s" + str(index))80            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.4T_s" + str(index))81            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.4_s" + str(index))82        return output_table83    def apply_filter_FEC_variation(self, KPIs):84        output_table = {}85        sources_names = FECfilterService.get_from_key(KPIs, "sources")86        for index, source in enumerate(sources_names):87            output_table[source] = {}88            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.5R_s" + str(index))89            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.5T_s" + str(index))90            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.5_s" + str(index))91        return output_table92    def apply_filter_FEC_saving(self, KPIs):93        output_table = {}94        sources_names = FECfilterService.get_from_key(KPIs, "sources")95        for index, source in enumerate(sources_names):96            output_table[source] = {}97            output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.6R_s" + str(index))98            output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.6RT_s" + str(index))99            output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.6_s" + str(index))100        return output_table101    def apply_filter_YEOH_Base(self, KPIs):102        output_table = {}103        YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.1")104        if YEOHbase is None:105            return output_table106        for index, key in enumerate(YEOHbase):107            output_table[key] = {}108            output_table[key]["R"] = ""109            output_table[key]["T"] = ""110            fec_pow = FECfilterService.get_from_key(YEOHbase, key)111            try:112                output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)113            except (ZeroDivisionError, ValueError, IndexError) as e:114                output_table[key]["TOT"] = "Nan"115        return output_table116    def apply_filter_YEOH_Future(self, KPIs):117        output_table = {}118        YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.2")119        if YEOHbase is None:120            return output_table121        for index, key in enumerate(YEOHbase):122            output_table[key] = {}123            output_table[key]["R"] = ""124            output_table[key]["T"] = ""125            fec_pow = FECfilterService.get_from_key(YEOHbase, key)126            try:127                output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)128            except (ZeroDivisionError, ValueError, IndexError) as e:129                output_table[key]["TOT"] = "Nan"130        return output_table131    def apply_filter_YEOH_Variation(self, KPIs):132        output_table = {}133        YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.3")134        if YEOHbase is None:135            return output_table136        for index, key in enumerate(YEOHbase):137            output_table[key] = {}138            output_table[key]["R"] = ""139            output_table[key]["T"] = ""140            fec_pow = FECfilterService.get_from_key(YEOHbase, key)141            try:142                output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)143            except (ZeroDivisionError, ValueError, IndexError) as e:144                output_table[key]["TOT"] = "Nan"145        return output_table146    @staticmethod147    def get_from_key(dictionary, key):148        try:149            item = dictionary[key]150            if isinstance(item, float):151                return "{:.2f}".format(item)152            return item153        except KeyError:154            return None155    def example_table(self):156        output_table = {}157        output_table["Fonte 0"] = {}158        output_table["Fonte 1"] = {}159        output_table["Fonte 2"] = {}160        output_table["Fonte 0"]["R"] = 42.00161        output_table["Fonte 0"]["T"] = 42.00162        output_table["Fonte 0"]["TOT"] = 84.00163        output_table["Fonte 1"]["R"] = 21.00164        output_table["Fonte 1"]["T"] = 21.00165        output_table["Fonte 1"]["TOT"] = 42.00166        output_table["Fonte 2"]["R"] = 11.00167        output_table["Fonte 2"]["T"] = 22.00168        output_table["Fonte 2"]["TOT"] = 33.00169        return output_table170    def example_KPIs(self):171        KPIs = {}172        KPIs["sources"] = ["Heating Oil", "Natural gas", "Electricity", "Deep geothermal",173                           "Geothermal - Shallow - Ground heat extraction",174                           "Geothermal - Shallow - Ground cold extraction", "Solar thermal", "Excess heat Industry",175                           "Excess heat - Data centers",176                           "Excess heat - Supermarkets", "Excess heat - Refrigerated storage facilities",177                           "Excess heat - Indoor carparkings",178                           "Excess heat - Subway networks", "Urban waste water treatment plant",179                           "Water - Waste water - Sewer system",180                           "Water - Surface water - Rivers cold extraction heat pump",181                           "Water - Surface water - Rivers cold extraction from free cooling HEX", "Water - Surface water - Lakes heat extraction with heat pump",182                           "Water - Surface water - Lakes cold extraction with heat pump", "Water - Surface water - Rivers heat extraction heat pump",183                           "LNG terminals excess cooling", "Biomass forestry",184                           "generic source"]185        for i in range(23):186            KPIs["EN_2.1_s" + str(i)] = round(random.random(), 2)187            KPIs["EN_2.1R_s" + str(i)] = round(random.random(), 2)188            KPIs["EN_2.1T_s" + str(i)] = round(random.random(), 2)189            KPIs["EN_2.2_s" + str(i)] = round(random.random(), 2)190            KPIs["EN_2.2R_s" + str(i)] = round(random.random(), 2)191            KPIs["EN_2.2T_s" + str(i)] = round(random.random(), 2)192            KPIs["EN_2.3_s" + str(i)] = round(random.random(), 2)193            KPIs["EN_2.3R_s" + str(i)] = round(random.random(), 2)194            KPIs["EN_2.3T_s" + str(i)] = round(random.random(), 2)195            KPIs["EN_2.4_s" + str(i)] = round(random.random(), 2)196            KPIs["EN_2.4R_s" + str(i)] = round(random.random(), 2)197            KPIs["EN_2.4T_s" + str(i)] = round(random.random(), 2)198            KPIs["EN_2.5_s" + str(i)] = round(random.random(), 2)199            KPIs["EN_2.5R_s" + str(i)] = round(random.random(), 2)200            KPIs["EN_2.5T_s" + str(i)] = round(random.random(), 2)201            KPIs["EN_2.6_s" + str(i)] = round(random.random(), 2)202            KPIs["EN_2.6R_s" + str(i)] = round(random.random(), 2)203            KPIs["EN_2.6T_s" + str(i)] = round(random.random(), 2)...getGNPS_library_annotations.py
Source:getGNPS_library_annotations.py  
1#!/usr/bin/python2import sys3import os4import ming_fileio_library5import ming_gnps_library6from collections import defaultdict7def usage():8    print "<input clusterinfosummary file> <input edges file> <outptu file> "9def main():10    input_result_filename = sys.argv[1]11    output_result_filename = sys.argv[2]12    input_rows, input_table = ming_fileio_library.parse_table_with_headers(input_result_filename)13    output_table = defaultdict(list)14    output_headers = ["SpectrumID", "Compound_Name", "Ion_Source", "Instrument", "Compound_Source", "PI", "Data_Collector", "Adduct"]15    output_headers += ["Precursor_MZ", "ExactMass", "Charge", "CAS_Number", "Pubmed_ID", "Smiles", "INCHI", "INCHI_AUX", "Library_Class"]16    output_headers += ["IonMode", "UpdateWorkflowName", "LibraryQualityString", "#Scan#", "SpectrumFile", "MQScore", "Organism"]17    output_headers += ["TIC_Query", "RT_Query", "MZErrorPPM", "SharedPeaks", "MassDiff", "LibMZ", "SpecMZ", "SpecCharge"]18    for header in output_headers:19        output_table[header] = []20    for i in range(input_rows):21        spectrum_id = input_table["LibrarySpectrumID"][i]22        score = input_table["MQScore"][i]23        filename = input_table["SpectrumFile"][i]24        libfilename = input_table["LibraryName"][i]25        scan = input_table["#Scan#"][i]26        TIC_Query = input_table["UnstrictEvelopeScore"][i]27        RT_Query = input_table["p-value"][i]28        SpecCharge = input_table["Charge"][i]29        SpecMZ = input_table["SpecMZ"][i]30        MZErrorPPM = input_table["mzErrorPPM"][i]31        SharedPeaks = input_table["LibSearchSharedPeaks"][i]32        MassDiff = input_table["ParentMassDiff"][i]33        print(spectrum_id)34        gnps_library_spectrum = None35        try:36            gnps_library_spectrum = ming_gnps_library.get_library_spectrum(spectrum_id)37        except KeyboardInterrupt:38            raise39        except:40            continue41        output_table["SpectrumID"].append(spectrum_id)42        output_table["Compound_Name"].append(gnps_library_spectrum["annotations"][0]["Compound_Name"])43        output_table["Ion_Source"].append(gnps_library_spectrum["annotations"][0]["Ion_Source"])44        output_table["Instrument"].append(gnps_library_spectrum["annotations"][0]["Instrument"])45        output_table["Compound_Source"].append(gnps_library_spectrum["annotations"][0]["Compound_Source"])46        output_table["PI"].append(gnps_library_spectrum["annotations"][0]["PI"])47        output_table["Data_Collector"].append(gnps_library_spectrum["annotations"][0]["Data_Collector"])48        output_table["Adduct"].append(gnps_library_spectrum["annotations"][0]["Adduct"])49        output_table["Precursor_MZ"].append(gnps_library_spectrum["annotations"][0]["Precursor_MZ"])50        output_table["ExactMass"].append(gnps_library_spectrum["annotations"][0]["ExactMass"])51        output_table["Charge"].append(gnps_library_spectrum["annotations"][0]["Charge"])52        output_table["CAS_Number"].append(gnps_library_spectrum["annotations"][0]["CAS_Number"])53        output_table["Pubmed_ID"].append(gnps_library_spectrum["annotations"][0]["Pubmed_ID"])54        output_table["Smiles"].append(gnps_library_spectrum["annotations"][0]["Smiles"])55        output_table["INCHI"].append(gnps_library_spectrum["annotations"][0]["INCHI"])56        output_table["INCHI_AUX"].append(gnps_library_spectrum["annotations"][0]["INCHI_AUX"])57        output_table["Library_Class"].append(gnps_library_spectrum["annotations"][0]["Library_Class"])58        output_table["IonMode"].append(gnps_library_spectrum["annotations"][0]["Ion_Mode"])59        if gnps_library_spectrum["annotations"][0]["Library_Class"] == "1":60            output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-GOLD")61            output_table["LibraryQualityString"].append("Gold")62        if gnps_library_spectrum["annotations"][0]["Library_Class"] == "2":63            output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-SILVER")64            output_table["LibraryQualityString"].append("Silver")65        if gnps_library_spectrum["annotations"][0]["Library_Class"] == "3":66            output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")67            output_table["LibraryQualityString"].append("Bronze")68        if gnps_library_spectrum["annotations"][0]["Library_Class"] == "4":69            output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")70            output_table["LibraryQualityString"].append("Insilico")71        if gnps_library_spectrum["annotations"][0]["Library_Class"] == "10":72            output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")73            output_table["LibraryQualityString"].append("Challenge")74        output_table["#Scan#"].append(scan)75        output_table["SpectrumFile"].append(filename)76        output_table["LibraryName"].append(libfilename)77        output_table["MQScore"].append(score)78        output_table["Organism"].append(gnps_library_spectrum["spectruminfo"]["library_membership"])79        output_table["TIC_Query"].append(TIC_Query)80        output_table["RT_Query"].append(RT_Query)81        output_table["MZErrorPPM"].append(MZErrorPPM)82        output_table["SharedPeaks"].append(SharedPeaks)83        output_table["MassDiff"].append(MassDiff)84        output_table["LibMZ"].append(gnps_library_spectrum["annotations"][0]["Precursor_MZ"])85        output_table["SpecMZ"].append(SpecMZ)86        output_table["SpecCharge"].append(SpecCharge)87        tag_string = ""88        for tag in gnps_library_spectrum["spectrum_tags"]:89            tag_string += tag["tag_desc"].replace("\t", "") + "||"90        if len(tag_string) > 3:91            tag_string = tag_string[:-2]92        output_table["tags"].append(tag_string)93    ming_fileio_library.write_dictionary_table_data(output_table, output_result_filename)94if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
