How to use output_table method in stestr

Best Python code snippet using stestr_python

alignStats.py

Source:alignStats.py Github

copy

Full Screen

1#!/usr/bin/env python2"""3Extract alignment statistics from a SAM/BAM file.4Adapted from the Celloline stats script5available at: https://github.com/Teichlab/celloline/blob/master/lib/stats.py6"""7import os8import sys9import re10import argparse11import pysam12import logging13import cPickle as pickle14from collections import Counter, defaultdict, OrderedDict15from intervaltree import IntervalTree16from joblib import Parallel, delayed17#LOAD GTF FILE18def load_gtf(gtf_path):19 """20 Load a GTF annotation and create an index using IntervalTrees.21 Args:22 gtf_path: Path to the GTF file to load.23 Returns:24 Dictionary containing IntervalTree indexes of the annotation.25 """26 gtf_index = defaultdict()27 with open(gtf_path) as gtf_file:28 for line in gtf_file:29 if not line.startswith("#"):30 entry = line.split("\t")31 entry_addition = entry[8]32 entry_addition = entry_addition.split(";")33 entry_addition = entry_addition[0].split(" ")34 gene_id = entry_addition[1]35 feature = entry[2]36 #TYPE(Gene, exon etc.), START, END, STRAND, gene_ID37 info = [feature, entry[3], entry[4], entry[6], gene_id]38 #Build GTF INDEX39 if feature != "" and entry[3] != entry[4]:40 if entry[0] in gtf_index:41 index = gtf_index[entry[0]]42 else:43 index = IntervalTree()44 index.addi(int(info[1]), int(info[2]), info)45 gtf_index[entry[0]] = index46 return gtf_index47def gen_stats(input_file, input_type, sample_name, gtf_dict):48 """49 Generate alignment stats from a SAM/BAM file.50 Loop over alignments in a SAM/BAM file and extract statistics such as the51 numer of reads aligned to introns, exons, intergenic regions etc.52 Args:53 input_file: An open BAM or SAM file.54 input_type: Whether the file is 'bam' or 'sam'.55 sample_name: A name relating to this file.56 gtf_dict: Dictionary containing GTF index.57 Returns:58 Dictionary containing alignment statistics.59 """60 logger = logging.getLogger("stats." + sample_name[0:10])61 #OUTPUT TABLE CONTAING STATS62 output_table = OrderedDict()63 #Dict indicating to which genes a specific read maps to64 #It is a temporary dict65 exonic_mappings_temp = defaultdict(str)66 #Dict indicating which read is multi-mapped67 #It is a temporary dict68 exonic_multi_table = defaultdict(str)69 # Sample70 output_table["sample"] = sample_name71 #MAPPABILITY72 output_table["total"] = 073 output_table["mapped"] = 074 output_table["unmapped"] = 075 output_table["unique"] = 076 output_table["multi"] = 077 #CODING VERSUS NON-CODING REGIONS78 output_table["intergenic"] = 079 output_table["intragenic"] = 080 output_table["exonic"] = 081 output_table["intronic"] = 082 output_table["ambigious"] = 083 #CODING REGIONS MAPPABILITY84 output_table["exonicU"] = 085 output_table["exonicM"] = 086 #ALIGNMENT CODING VS NONCODING87 output_table["alignments"] = 088 output_table["multi-intergenic"] = 089 output_table["multi-intragenic"] = 090 output_table["multi-exonic"] = 091 output_table["multi-intronic"] = 092 output_table["multi-ambigious"] = 093 #ERROR94 output_table["perfect"] = 095 output_table["partly_perfect"] = 096 output_table["mapped_no_correct"] = 097 for i in range(0, 10):98 output_table["S_" + str(i)] = 099 output_table["S_10+"] = 0100 output_table["I"] = 0101 output_table["D"] = 0102 output_table["INDEL"] = 0103 reads = Counter()104 if input_type == "bam":105 ref_map = input_file.references106 input_file = input_file.fetch(until_eof=True)107 line_count = 0108 for line in input_file:109 line_count += 1110 if input_type == "bam": # BAM input line111 split = str(line).split("\t")112 split[2] = ref_map[int(split[2])]113 split[3] = int(split[3]) + 1114 elif not line.startswith("@"): # SAM input line115 split = line.split("\t")116 else:117 continue118 read_name = split[0]119 flag_code = int(split[1])120 chrom = split[2]121 pos = split[3]122 errors = split[5]123 errors_a = list(errors)124 number = ""125 num = 0126 error_table = defaultdict(int)127 name_and_flag = read_name128 #CHECK IF READ MAPPED OR UNMAPPED129 #IT US UNMAPPED130 if flag_code & 0x0004 != 0:131 output_table["unmapped"] += 1132 output_table["total"] += 1133 error_table["*"] += 1134 #IT IS MAPPED135 else:136 if flag_code & 0x0001 != 0: #This is paired end137 if flag_code & 0x0040 != 0: #1st read138 name_and_flag += ";first"139 if flag_code & 0x0080 != 0: #2nd read140 name_and_flag += ";second"141 # CHECK TO WHICH GENE(S) IT MAPPED TO142 genes_info, num_genes, num_exons = get_gene(gtf_dict, [chrom, pos])143 output_table["alignments"] += 1.0144 #STATS145 if name_and_flag not in reads:146 reads[name_and_flag] += 1147 output_table["unique"] += 1148 output_table["total"] += 1149 output_table["mapped"] += 1150 if num_genes == 0:151 output_table["intergenic"] += 1152 elif num_genes == 1:153 output_table["intragenic"] += 1154 if num_exons == 0:155 output_table["intronic"] += 1156 else:157 output_table["exonic"] += 1158 output_table["exonicU"] += 1159 exons = []160 if name_and_flag in exonic_mappings_temp:161 exons = exonic_mappings_temp[name_and_flag]162 exons.append([genes_info[0], chrom, pos])163 exonic_mappings_temp[name_and_flag] = exons164 elif num_genes > 1:165 output_table["ambigious"] += 1166 #READ IS MULTI-MAPPED167 else:168 if reads[name_and_flag] == 1:169 output_table["unique"] -= 1170 output_table["exonicU"] -= 1171 output_table["multi"] += 1172 reads[name_and_flag] += 1173 exons = []174 #GET KNOWLEDGE IF FIRST MAPPING EXONIC OR INTRONIC175 if name_and_flag in exonic_mappings_temp:176 exons = exonic_mappings_temp[name_and_flag]177 if num_genes == 0:178 output_table["multi-intergenic"] += (1)179 elif num_genes == 1:180 output_table["multi-intragenic"] += (1)181 if num_exons == 0:182 output_table["multi-intronic"] += (1)183 else:184 output_table["multi-exonic"] += (1)185 exons.append([genes_info[0], chrom, pos])186 elif num_genes > 1:187 output_table["multi-ambigious"] += (1)188 #IF AT LEAST ONE EXONIC ALIGNMENT189 if len(exons) > 0:190 exonic_multi_table[name_and_flag] = exons191 #PARSE MAPPING ERRORS192 for i in errors_a:193 if re.match("[0-9]", i):194 number += (i)195 elif re.match("[A-Z]", i):196 num = int(number)197 error_table[i] += num198 number = ""199 #TABLE OF HOW MANY READS MAP PERFECT, PARTLY PERFECT ETC200 if "M" in error_table and len(error_table) == 1:201 output_table["perfect"] += 1202 elif "M" in error_table and len(error_table) > 1:203 output_table["partly_perfect"] += 1204 elif "M" not in error_table and "*" not in error_table:205 output_table["mapped_no_correct"] += 1206 if "S" in error_table:207 if int(error_table["S"]) < 10:208 output_table["S_" + str(error_table["S"])] += 1209 else:210 output_table["S_10+"] += 1211 elif "S" not in error_table:212 output_table["S_0"] += 1213 if "I" in error_table:214 output_table["I"] += 1215 if "D" in error_table:216 output_table["D"] += 1217 if "I" in error_table or "D" in error_table:218 output_table["INDEL"] += 1219 if (line_count % 1000000) == 0:220 logger.debug(sample_name + " line " + str(line_count) + "...")221 output_table["exonicM"] = len(exonic_multi_table.keys())222 return output_table223def get_stats_line(stats_table):224 """225 Get an output line from a stats table.226 Args:227 stats_table: Dictionary of alignment statistics.228 Returns:229 String representing the results for one file.230 """231 logger = logging.getLogger("stats.extract")232 out_line = ""233 for stat, value in stats_table.iteritems():234 if stat in ["unique", "multi", "intragenic", "intergenic",235 "exonic", "intronic", "ambigious", "exonicM", "exonicU"]:236 value = (value + 0.0) / (stats_table["mapped"] + 0.0)237 value = "%.2f" % (100.0 * (value))238 elif stat in ["multi-intragenic", "multi-intergenic", "multi-exonic",239 "multi-intronic", "multi-ambigious"]:240 value = (value + 0.0)241 if stats_table["alignments"] != 0:242 value = value / (stats_table["alignments"] + 0.0)243 value = "%.2f" % (100.0 * (value))244 value = str(value)245 if not stat == "sample":246 out_line += "," + value247 else:248 out_line += value249 logger.debug(stat + " : " + value)250 out_line += "\n"251 return out_line252def write_stats(output_path, stats_list):253 """254 Write a series of results to a file.255 Args:256 output_path: Path to write results to.257 stats_list: List of dictionaries containing results from input files.258 """259 cols = stats_list[0].keys()260 with open(output_path, "w") as out_file:261 out_file.write(",".join(cols) + "\n")262 for stats_table in stats_list:263 stats_line = get_stats_line(stats_table)264 out_file.write(stats_line)265def get_gene(gtf_dict, pos_pair):266 """267 Identify which genes overlap a given position.268 Args:269 gtf_dict: Dictionary containing GTF index.270 pos_pair: Tuple containing genomic position (chrom, pos).271 Returns:272 Tuple containing the list of overlapping genes, the number of273 overlapping genes and the number of overlapping exons.274 """275 num_genes = 0276 num_exons = 0277 if pos_pair[0] not in gtf_dict:278 #print ("Ignored pos: " + pos_pair[0])279 return ([], num_genes, num_exons)280 entries = gtf_dict[pos_pair[0]]281 pos = int(pos_pair[1])282 found = []283 found = entries.search(pos)284 gene_list = []285 for entry in found:286 info = entry[2]287 if info[0] == "gene":288 gene_list.append(info)289 num_genes += 1290 elif info[0] == "exon":291 num_exons += 1292 return (gene_list, num_genes, num_exons)293def process_file(input_file, input_type, index, is_parallel):294 """295 Process an individual SAM/BAM file.296 How we want to process the file depends on the input type and whether we297 are operating in parallel. If in parallel the index must be loaded for each298 input file. If the input is a BAM file it needs to be read using Pysam, if299 SAM it can be read directly as a text file.300 Args:301 input_file: Path to the input file.302 input_type: Whether the file is 'bam' or 'sam'.303 index: If operating in parallel a string to the index file, if not the304 loaded GTF index dictionary.305 is_parallel: Whether to operate in parallel.306 Returns:307 Dictionary containing alignment statistics for the input file.308 """309 sample_name = input_file.split("/")[-1]310 logger = logging.getLogger("stats." + sample_name[0:10])311 logger.info("Processing " + sample_name + "...")312 if is_parallel:313 logger.info("Loading index...")314 with open(index, "rb") as index_file:315 loaded_index = pickle.load(index_file)316 logger.info("Loaded.")317 else:318 loaded_index = index319 if input_type == "sam":320 logger.info("Parsing SAM file...")321 with open(input_file) as sam:322 output_table = gen_stats(sam, input_type, sample_name, loaded_index)323 elif input_type == "bam":324 logger.info("Parsing BAM file...")325 bam = pysam.AlignmentFile(input_file, "rb")326 output_table = gen_stats(bam, input_type, sample_name, loaded_index)327 logger.info("Finished " + sample_name)328 return output_table329def get_index(args):330 """331 Load a GTF index if available or create from GTF file if not found.332 If a valid path to an index file is given that file will be loaded. If no333 index file was specified or the file does not exist the annotation will be334 read from a GTF file. It will then be pickled if an index file is specified.335 When running in parallel the path to the index file is returned rather than336 the index dictionary itself.337 Args:338 args: Options from the command line.339 Returns:340 Dictionary containing GTF index or path to index file if in parallel.341 """342 logger = logging.getLogger("stats.index")343 if args.index and os.path.isfile(args.index):344 logger.info("Index found at " + args.index)345 if not args.is_parallel:346 logger.info("Loading index...")347 with open(args.index, "rb") as index_file:348 index = pickle.load(index_file)349 logger.info("Loaded.")350 else:351 index = args.index352 elif args.gtf and os.path.isfile(args.gtf):353 logger.info("No index file found.")354 logger.info("Loading GTF file...")355 gtf_dict = load_gtf(args.gtf)356 logger.info("Loaded.")357 if args.index:358 logger.info("Saving index to " + args.index + "...")359 with open(args.index, "wb") as index_file:360 pickle.dump(gtf_dict, index_file, -1)361 logger.info("Saved.")362 if not args.is_parallel:363 index = gtf_dict364 else:365 index = args.index366 return index367def get_args():368 """369 Read arguments from the command line and check they are valid.370 """371 logger = logging.getLogger("stats.args")372 parser = argparse.ArgumentParser(373 description="Extract alignment statistics from a SAM/BAM file")374 parser.add_argument("inputs",375 metavar="SAM/BAM",376 nargs="+",377 help="Input SAM or BAM files")378 parser.add_argument("-o", "--out",379 help="Output file",380 required=True)381 parser.add_argument("-g", "--gtf",382 help="GTF annotation file")383 parser.add_argument("-i", "--index",384 help="""Annotation index file. Required when385 operating in parallel.""")386 parser.add_argument("-t", "--type",387 choices=["sam", "bam"],388 help="Type of input file",389 required=True)390 parser.add_argument("-p", "--parallel",391 type=int,392 default=1,393 help="""Number of files to process in parallel.394 Requires N + 1 threads if greater than 1.""")395 args = parser.parse_args()396 args.is_parallel = False397 if args.parallel < 1:398 logger.error("Number of parallel files must be positive")399 sys.exit()400 elif args.parallel > 1:401 args.is_parallel = True402 logger.info("Running with " + str(args.parallel) + " jobs")403 if args.is_parallel and not args.index:404 logger.error("Index file is required when running in parallel.")405 sys.exit()406 if not (args.index and os.path.isfile(args.index)):407 if not (args.gtf and os.path.isfile(args.gtf)):408 logger.error("No GTF or index file found.")409 sys.exit()410 return args411def setup_logging():412 """413 Setup logging system.414 Log is written to 'alignmentStats.log'.415 """416 logger = logging.getLogger("stats")417 logger.setLevel(logging.DEBUG)418 # create file handler which logs even debug messages419 file_handler = logging.FileHandler('alignmentStats.log')420 file_handler.setLevel(logging.INFO)421 # create console handler with a higher log level422 console_handler = logging.StreamHandler()423 console_handler.setLevel(logging.DEBUG)424 # create formatter and add it to the handlers425 format_str = "[%(asctime)s] %(levelname)s %(name)s: %(message)s"426 formatter = logging.Formatter(format_str, "%Y-%m-%d %H:%M:%S")427 file_handler.setFormatter(formatter)428 format_str = "[%(asctime)s] %(message)s"429 formatter = logging.Formatter(format_str, "%H:%M:%S")430 console_handler.setFormatter(formatter)431 # add the handlers to logger432 logger.addHandler(console_handler)433 logger.addHandler(file_handler)434def main():435 """436 Main function.437 1. Setup logging438 2. Get arguments439 3. Get index440 4. Process files441 5. Write output442 """443 setup_logging()444 logger = logging.getLogger("stats." + __name__)445 args = get_args()446 index = get_index(args)447 logger.warning("Positions not in annotation will be ignored.")448 logger.info("Found " + str(len(args.inputs)) + " input file(s):")449 for input_file in sorted(args.inputs):450 logger.debug(input_file)451 if args.is_parallel:452 stats = Parallel(n_jobs=args.parallel,453 verbose=100,454 batch_size=1)(delayed(process_file)(input_file,455 args.type,456 index,457 args.is_parallel)458 for input_file in args.inputs)459 else:460 stats = []461 for input_file in args.inputs:462 output_table = process_file(input_file, args.type, index,463 args.is_parallel)464 stats.append(output_table)465 write_stats(args.out, stats)466if __name__ == "__main__":...

Full Screen

Full Screen

FECfilterService.py

Source:FECfilterService.py Github

copy

Full Screen

1from PyQt5.QtWidgets import QTableWidget, QComboBox, QLabel2from PyQt5.QtCore import Qt3import random4class FECfilterService:5 FEC = "FEC baseline" # EN_2.16 FEC_specific = "FEC baseline (specific)" # EN_2.27 FEC_future = "FEC future" # EN_2.38 FEC_future_specific = "FEC future (specific)" # EN_2.49 FEC_future_variation = "FEC variation" # EN_2.510 FEC_future_saving = "FEC saving" # EN_2.611 YEOH_Base = "YEOH baseline" # EN_15.112 YEOH_Future = "YEOH future" # EN_15.213 YEOH_Variation = "YEOH variation" # EN_15.314 def __init__(self, fec_filter_combo_box: QComboBox, description_filter_label: QLabel, mode="future"):15 self.fec_filter_combo_box = fec_filter_combo_box16 self.description_filter_label = description_filter_label17 self.fec_filter_combo_box.clear()18 self.fec_filter_combo_box.insertItem(0, "Select filter:")19 self.fec_filter_combo_box.setItemData(0, 0, Qt.UserRole - 1)20 self.fec_filter_combo_box.insertItem(1, self.FEC)21 self.fec_filter_combo_box.insertItem(2, self.FEC_specific)22 self.fec_filter_combo_box.insertItem(3, self.YEOH_Base)23 if mode == "future":24 self.fec_filter_combo_box.insertItem(4, self.FEC_future)25 self.fec_filter_combo_box.insertItem(5, self.FEC_future_specific)26 self.fec_filter_combo_box.insertItem(6, self.FEC_future_variation)27 self.fec_filter_combo_box.insertItem(7, self.FEC_future_saving)28 self.fec_filter_combo_box.insertItem(8, self.YEOH_Future)29 self.fec_filter_combo_box.insertItem(9, self.YEOH_Variation)30 def update_label(self, new_text):31 self.description_filter_label.setText(new_text + ":")32 def get_filtered_table(self, KPIs):33 if self.fec_filter_combo_box.currentText() == self.FEC:34 return self.apply_filter_FEC(KPIs)35 if self.fec_filter_combo_box.currentText() == self.FEC_specific:36 return self.apply_filter_FEC_specific(KPIs)37 if self.fec_filter_combo_box.currentText() == self.FEC_future:38 return self.apply_filter_FEC(KPIs)39 if self.fec_filter_combo_box.currentText() == self.FEC_future_specific:40 return self.apply_filter_FEC_future_specific(KPIs)41 if self.fec_filter_combo_box.currentText() == self.FEC_future_variation:42 return self.apply_filter_FEC_variation(KPIs)43 if self.fec_filter_combo_box.currentText() == self.FEC_future_saving:44 return self.apply_filter_FEC_saving(KPIs)45 if self.fec_filter_combo_box.currentText() == self.YEOH_Base:46 return self.apply_filter_YEOH_Base(KPIs)47 def apply_filter_FEC(self, KPIs):48 output_table = {}49 sources_names = FECfilterService.get_from_key(KPIs, "sources")50 for index, source in enumerate(sources_names):51 output_table[source] = {}52 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.1R_s" + str(index))53 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.1T_s" + str(index))54 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.1_s" + str(index))55 return output_table56 def apply_filter_FEC_specific(self, KPIs):57 output_table = {}58 sources_names = FECfilterService.get_from_key(KPIs, "sources")59 for index, source in enumerate(sources_names):60 output_table[source] = {}61 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.2R_s" + str(index))62 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.2T_s" + str(index))63 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.2_s" + str(index))64 return output_table65 def apply_filter_FEC_future(self, KPIs):66 output_table = {}67 sources_names = FECfilterService.get_from_key(KPIs, "sources")68 for index, source in enumerate(sources_names):69 output_table[source] = {}70 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.3R_s" + str(index))71 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.3T_s" + str(index))72 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.3_s" + str(index))73 return output_table74 def apply_filter_FEC_future_specific(self, KPIs):75 output_table = {}76 sources_names = FECfilterService.get_from_key(KPIs, "sources")77 for index, source in enumerate(sources_names):78 output_table[source] = {}79 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.4R_s" + str(index))80 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.4T_s" + str(index))81 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.4_s" + str(index))82 return output_table83 def apply_filter_FEC_variation(self, KPIs):84 output_table = {}85 sources_names = FECfilterService.get_from_key(KPIs, "sources")86 for index, source in enumerate(sources_names):87 output_table[source] = {}88 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.5R_s" + str(index))89 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.5T_s" + str(index))90 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.5_s" + str(index))91 return output_table92 def apply_filter_FEC_saving(self, KPIs):93 output_table = {}94 sources_names = FECfilterService.get_from_key(KPIs, "sources")95 for index, source in enumerate(sources_names):96 output_table[source] = {}97 output_table[source]["R"] = FECfilterService.get_from_key(KPIs, "EN_2.6R_s" + str(index))98 output_table[source]["T"] = FECfilterService.get_from_key(KPIs, "EN_2.6RT_s" + str(index))99 output_table[source]["TOT"] = FECfilterService.get_from_key(KPIs, "EN_2.6_s" + str(index))100 return output_table101 def apply_filter_YEOH_Base(self, KPIs):102 output_table = {}103 YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.1")104 if YEOHbase is None:105 return output_table106 for index, key in enumerate(YEOHbase):107 output_table[key] = {}108 output_table[key]["R"] = ""109 output_table[key]["T"] = ""110 fec_pow = FECfilterService.get_from_key(YEOHbase, key)111 try:112 output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)113 except (ZeroDivisionError, ValueError, IndexError) as e:114 output_table[key]["TOT"] = "Nan"115 return output_table116 def apply_filter_YEOH_Future(self, KPIs):117 output_table = {}118 YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.2")119 if YEOHbase is None:120 return output_table121 for index, key in enumerate(YEOHbase):122 output_table[key] = {}123 output_table[key]["R"] = ""124 output_table[key]["T"] = ""125 fec_pow = FECfilterService.get_from_key(YEOHbase, key)126 try:127 output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)128 except (ZeroDivisionError, ValueError, IndexError) as e:129 output_table[key]["TOT"] = "Nan"130 return output_table131 def apply_filter_YEOH_Variation(self, KPIs):132 output_table = {}133 YEOHbase = FECfilterService.get_from_key(KPIs, "EN_15.3")134 if YEOHbase is None:135 return output_table136 for index, key in enumerate(YEOHbase):137 output_table[key] = {}138 output_table[key]["R"] = ""139 output_table[key]["T"] = ""140 fec_pow = FECfilterService.get_from_key(YEOHbase, key)141 try:142 output_table[key]["TOT"] = round(float(fec_pow[0]) / float(fec_pow[1]), 2)143 except (ZeroDivisionError, ValueError, IndexError) as e:144 output_table[key]["TOT"] = "Nan"145 return output_table146 @staticmethod147 def get_from_key(dictionary, key):148 try:149 item = dictionary[key]150 if isinstance(item, float):151 return "{:.2f}".format(item)152 return item153 except KeyError:154 return None155 def example_table(self):156 output_table = {}157 output_table["Fonte 0"] = {}158 output_table["Fonte 1"] = {}159 output_table["Fonte 2"] = {}160 output_table["Fonte 0"]["R"] = 42.00161 output_table["Fonte 0"]["T"] = 42.00162 output_table["Fonte 0"]["TOT"] = 84.00163 output_table["Fonte 1"]["R"] = 21.00164 output_table["Fonte 1"]["T"] = 21.00165 output_table["Fonte 1"]["TOT"] = 42.00166 output_table["Fonte 2"]["R"] = 11.00167 output_table["Fonte 2"]["T"] = 22.00168 output_table["Fonte 2"]["TOT"] = 33.00169 return output_table170 def example_KPIs(self):171 KPIs = {}172 KPIs["sources"] = ["Heating Oil", "Natural gas", "Electricity", "Deep geothermal",173 "Geothermal - Shallow - Ground heat extraction",174 "Geothermal - Shallow - Ground cold extraction", "Solar thermal", "Excess heat Industry",175 "Excess heat - Data centers",176 "Excess heat - Supermarkets", "Excess heat - Refrigerated storage facilities",177 "Excess heat - Indoor carparkings",178 "Excess heat - Subway networks", "Urban waste water treatment plant",179 "Water - Waste water - Sewer system",180 "Water - Surface water - Rivers cold extraction heat pump",181 "Water - Surface water - Rivers cold extraction from free cooling HEX", "Water - Surface water - Lakes heat extraction with heat pump",182 "Water - Surface water - Lakes cold extraction with heat pump", "Water - Surface water - Rivers heat extraction heat pump",183 "LNG terminals excess cooling", "Biomass forestry",184 "generic source"]185 for i in range(23):186 KPIs["EN_2.1_s" + str(i)] = round(random.random(), 2)187 KPIs["EN_2.1R_s" + str(i)] = round(random.random(), 2)188 KPIs["EN_2.1T_s" + str(i)] = round(random.random(), 2)189 KPIs["EN_2.2_s" + str(i)] = round(random.random(), 2)190 KPIs["EN_2.2R_s" + str(i)] = round(random.random(), 2)191 KPIs["EN_2.2T_s" + str(i)] = round(random.random(), 2)192 KPIs["EN_2.3_s" + str(i)] = round(random.random(), 2)193 KPIs["EN_2.3R_s" + str(i)] = round(random.random(), 2)194 KPIs["EN_2.3T_s" + str(i)] = round(random.random(), 2)195 KPIs["EN_2.4_s" + str(i)] = round(random.random(), 2)196 KPIs["EN_2.4R_s" + str(i)] = round(random.random(), 2)197 KPIs["EN_2.4T_s" + str(i)] = round(random.random(), 2)198 KPIs["EN_2.5_s" + str(i)] = round(random.random(), 2)199 KPIs["EN_2.5R_s" + str(i)] = round(random.random(), 2)200 KPIs["EN_2.5T_s" + str(i)] = round(random.random(), 2)201 KPIs["EN_2.6_s" + str(i)] = round(random.random(), 2)202 KPIs["EN_2.6R_s" + str(i)] = round(random.random(), 2)203 KPIs["EN_2.6T_s" + str(i)] = round(random.random(), 2)...

Full Screen

Full Screen

getGNPS_library_annotations.py

Source:getGNPS_library_annotations.py Github

copy

Full Screen

1#!/usr/bin/python2import sys3import os4import ming_fileio_library5import ming_gnps_library6from collections import defaultdict7def usage():8 print "<input clusterinfosummary file> <input edges file> <outptu file> "9def main():10 input_result_filename = sys.argv[1]11 output_result_filename = sys.argv[2]12 input_rows, input_table = ming_fileio_library.parse_table_with_headers(input_result_filename)13 output_table = defaultdict(list)14 output_headers = ["SpectrumID", "Compound_Name", "Ion_Source", "Instrument", "Compound_Source", "PI", "Data_Collector", "Adduct"]15 output_headers += ["Precursor_MZ", "ExactMass", "Charge", "CAS_Number", "Pubmed_ID", "Smiles", "INCHI", "INCHI_AUX", "Library_Class"]16 output_headers += ["IonMode", "UpdateWorkflowName", "LibraryQualityString", "#Scan#", "SpectrumFile", "MQScore", "Organism"]17 output_headers += ["TIC_Query", "RT_Query", "MZErrorPPM", "SharedPeaks", "MassDiff", "LibMZ", "SpecMZ", "SpecCharge"]18 for header in output_headers:19 output_table[header] = []20 for i in range(input_rows):21 spectrum_id = input_table["LibrarySpectrumID"][i]22 score = input_table["MQScore"][i]23 filename = input_table["SpectrumFile"][i]24 libfilename = input_table["LibraryName"][i]25 scan = input_table["#Scan#"][i]26 TIC_Query = input_table["UnstrictEvelopeScore"][i]27 RT_Query = input_table["p-value"][i]28 SpecCharge = input_table["Charge"][i]29 SpecMZ = input_table["SpecMZ"][i]30 MZErrorPPM = input_table["mzErrorPPM"][i]31 SharedPeaks = input_table["LibSearchSharedPeaks"][i]32 MassDiff = input_table["ParentMassDiff"][i]33 print(spectrum_id)34 gnps_library_spectrum = None35 try:36 gnps_library_spectrum = ming_gnps_library.get_library_spectrum(spectrum_id)37 except KeyboardInterrupt:38 raise39 except:40 continue41 output_table["SpectrumID"].append(spectrum_id)42 output_table["Compound_Name"].append(gnps_library_spectrum["annotations"][0]["Compound_Name"])43 output_table["Ion_Source"].append(gnps_library_spectrum["annotations"][0]["Ion_Source"])44 output_table["Instrument"].append(gnps_library_spectrum["annotations"][0]["Instrument"])45 output_table["Compound_Source"].append(gnps_library_spectrum["annotations"][0]["Compound_Source"])46 output_table["PI"].append(gnps_library_spectrum["annotations"][0]["PI"])47 output_table["Data_Collector"].append(gnps_library_spectrum["annotations"][0]["Data_Collector"])48 output_table["Adduct"].append(gnps_library_spectrum["annotations"][0]["Adduct"])49 output_table["Precursor_MZ"].append(gnps_library_spectrum["annotations"][0]["Precursor_MZ"])50 output_table["ExactMass"].append(gnps_library_spectrum["annotations"][0]["ExactMass"])51 output_table["Charge"].append(gnps_library_spectrum["annotations"][0]["Charge"])52 output_table["CAS_Number"].append(gnps_library_spectrum["annotations"][0]["CAS_Number"])53 output_table["Pubmed_ID"].append(gnps_library_spectrum["annotations"][0]["Pubmed_ID"])54 output_table["Smiles"].append(gnps_library_spectrum["annotations"][0]["Smiles"])55 output_table["INCHI"].append(gnps_library_spectrum["annotations"][0]["INCHI"])56 output_table["INCHI_AUX"].append(gnps_library_spectrum["annotations"][0]["INCHI_AUX"])57 output_table["Library_Class"].append(gnps_library_spectrum["annotations"][0]["Library_Class"])58 output_table["IonMode"].append(gnps_library_spectrum["annotations"][0]["Ion_Mode"])59 if gnps_library_spectrum["annotations"][0]["Library_Class"] == "1":60 output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-GOLD")61 output_table["LibraryQualityString"].append("Gold")62 if gnps_library_spectrum["annotations"][0]["Library_Class"] == "2":63 output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-SILVER")64 output_table["LibraryQualityString"].append("Silver")65 if gnps_library_spectrum["annotations"][0]["Library_Class"] == "3":66 output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")67 output_table["LibraryQualityString"].append("Bronze")68 if gnps_library_spectrum["annotations"][0]["Library_Class"] == "4":69 output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")70 output_table["LibraryQualityString"].append("Insilico")71 if gnps_library_spectrum["annotations"][0]["Library_Class"] == "10":72 output_table["UpdateWorkflowName"].append("UPDATE-SINGLE-ANNOTATED-BRONZE")73 output_table["LibraryQualityString"].append("Challenge")74 output_table["#Scan#"].append(scan)75 output_table["SpectrumFile"].append(filename)76 output_table["LibraryName"].append(libfilename)77 output_table["MQScore"].append(score)78 output_table["Organism"].append(gnps_library_spectrum["spectruminfo"]["library_membership"])79 output_table["TIC_Query"].append(TIC_Query)80 output_table["RT_Query"].append(RT_Query)81 output_table["MZErrorPPM"].append(MZErrorPPM)82 output_table["SharedPeaks"].append(SharedPeaks)83 output_table["MassDiff"].append(MassDiff)84 output_table["LibMZ"].append(gnps_library_spectrum["annotations"][0]["Precursor_MZ"])85 output_table["SpecMZ"].append(SpecMZ)86 output_table["SpecCharge"].append(SpecCharge)87 tag_string = ""88 for tag in gnps_library_spectrum["spectrum_tags"]:89 tag_string += tag["tag_desc"].replace("\t", "") + "||"90 if len(tag_string) > 3:91 tag_string = tag_string[:-2]92 output_table["tags"].append(tag_string)93 ming_fileio_library.write_dictionary_table_data(output_table, output_result_filename)94if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful