How to use collect_data method in stestr

Best Python code snippet using stestr_python

data_integrity.py

Source:data_integrity.py Github

copy

Full Screen

1#!/usr/bin/env python32from __future__ import print_function3import re4import os5import sys6import gzip7import argparse8import subprocess9import multiprocessing as mp10from Bio import SeqIO11# For errors / warnings12def eprint(*args, **kwargs):13 print(*args, file=sys.stderr, **kwargs)14def getArgs():15 parser = argparse.ArgumentParser(description="")16 parser.add_argument('-a',dest="manifest",type=str,required=True,help='q2_manifest file')17 parser.add_argument('-e',dest="metadata",type=str,required=True,help='q2_metadata file')18 parser.add_argument('-p',dest="primer",type=int,default=70,help='Percentage of primers supposed to be found in raw reads [%(default)s]')19 parser.add_argument('-c',dest="control",type=str, default='',help='Delimited list of control (comma separated)')20 parser.add_argument('-r',dest="readtype",type=str,default='paired',choices=['paired','single'],help='Sequencing format [%(default)s]')21 parser.add_argument('-t',dest="threads",type=int,default=4,help='Number of threads [%(default)s]')22 arg = parser.parse_args()23 return arg24def collect_check_metadata(metadata_file, data_type):25 collect_data = {}26 # read metadata line by line27 for l in open(metadata_file, 'r'):28 # ignore header29 if not l.startswith('sampleid'):30 sampleid = re.split(r'\t', l.rstrip('\n'))[0]31 collect_data[sampleid] = {}32 collect_data[sampleid]['barcode'] = re.split(r'\t', l.rstrip('\n'))[1]33 if data_type == 'paired':34 collect_data[sampleid]['primerF'] = re.split(r'\t', l.rstrip('\n'))[2]35 collect_data[sampleid]['primerR'] = re.split(r'\t', l.rstrip('\n'))[3]36 collect_data[sampleid]['vars'] = re.split(r'\t', l.rstrip('\n'))[4:]37 else:38 collect_data[sampleid]['primerF'] = re.split(r'\t', l.rstrip('\n'))[2]39 collect_data[sampleid]['vars'] = re.split(r'\t', l.rstrip('\n'))[3:]40 # check that var(s) didn't contains any NA41 for var in collect_data[sampleid]['vars']:42 if var == 'NA' or var == '':43 eprint('ERROR: '+sampleid+' has NA value(s) in q2_metadata, please remove them before running SAMBA')44 exit(1)45 # return results46 return collect_data, data_type47def collect_manifest(manifest_file, collect_data, data_type):48 # in order to check that metadata and manifest have the same number of lines49 metadata_size = len(collect_data)50 manifest_size = 051 # read manifest52 for l in open(manifest_file, 'r'):53 # ignore header54 if not l.startswith('sample-id'):55 if data_type == 'paired':56 try:57 sampleid, R1, R2 = re.split(r'\t', l.rstrip('\n'))58 except ValueError:59 size = len(re.split(r'\t', l.rstrip('\n')))60 eprint('ERROR: q2_manifest contains '+str(size)+ 'column(s) instead of 3')61 exit(1)62 # check that file path are good63 check_fastq_path(R1)64 check_fastq_path(R2)65 # add to collect_data66 try:67 collect_data[sampleid]['R1'] = R168 collect_data[sampleid]['R2'] = R269 except KeyError:70 eprint('ERROR: '+sampleid+' from q2_manifest is absent in q2_metadata')71 exit(1)72 else:73 try:74 sampleid, R1 = l.split()75 except ValueError:76 size = len(re.split(r'\t', l.rstrip('\n')))77 eprint('ERROR: q2_manifest contains '+str(size)+ 'column(s) instead of 2')78 exit(1)79 # check that file path are good80 check_fastq_path(R1)81 # add to collect_data82 try:83 collect_data[sampleid]['R1'] = R184 except KeyError:85 eprint('ERROR: '+sampleid+' from q2_manifest is absent in q2_metadata')86 exit(1)87 # increase line counter88 manifest_size += 189 # check that manifest have the same size as metadata90 if manifest_size != metadata_size:91 eprint('ERROR: q2_manifest and q2_metadata did not have the same number of lines')92 exit(1)93 # return update dict94 return collect_data95def check_fastq(collect_data, sample, data_type):96 out = {}97 print("\tanalyse sample: "+sample)98 barcode = collect_data[sample]['barcode']99 # by default, single-end100 reads = ['R1']101 # in case of paired, add R2 analysis102 if data_type == 'paired':103 reads.append('R2')104 # check fastq105 for r in reads:106 R=collect_data[sample][r]107 if r == 'R1':108 primer=collect_data[sample]['primerF']109 else:110 primer=collect_data[sample]['primerR']111 instrument, index, reads_count, primer = read_fastq(R, primer)112 collect_data[sample]['reads_count_'+r] = reads_count113 collect_data[sample]['instrument_'+r] = instrument114 collect_data[sample]['nb_instrument_'+r] = len(instrument)115 collect_data[sample]['index_'+r] = index116 collect_data[sample]['primer_'+r] = primer117 collect_data[sample]['perc_primer_'+r] = round(primer * 100 / reads_count, 2)118 collect_data[sample]['nb_barcode_'+r] = len(index)119 collect_data[sample]['barcode_seq_'+r] = 'FALSE'120 # chech that barcode are the same as in metadata121 if len(index) == 1:122 if index[0] == barcode:123 collect_data[sample]['barcode_seq_'+r] = 'TRUE'124 # return updated dict125 out[sample] = collect_data[sample]126 return out127def read_fastq(fastq, primer):128 primer = re.sub(r"([RYSWKMBDHVNI])", r".", primer)129 reads_count = 0130 primers_count = 0131 instrument = []132 index = []133 with gzip.open(fastq, "rt") as handle:134 for record in SeqIO.parse(handle, "fastq"):135 # check instrument name136 h_1, h_2 = record.description.split()137 instrument_name = h_1.split(':')[0]138 if not instrument_name in instrument:139 instrument.append(instrument_name)140 # check index141 sequence_index = h_2.split(':')[3]142 if not sequence_index in index:143 index.append(sequence_index)144 # check primers145 if re.search(primer, str(record.seq)):146 primers_count += 1147 # increase read count148 reads_count += 1149 return instrument, index, reads_count, primers_count150def check_fastq_path(path):151 if not os.path.isfile(path):152 eprint('ERROR: ' + path + ' from q2_manifest not exit. Wrong path?')153 exit(1)154def write_report(collect_data, data_type):155 # open output file for writting156 report = open('data_integrity.txt', 'w')157 # header158 header_S = ['SampleID', 'Reads_R1', 'Barcode', 'Uniq_in_R1', 'Same_as_ref_R1', 'Uniq_sequencer_R1', 'PrimerF_in_R1', 'Perc_primerF_R1']159 header_P = ['SampleID', 'Reads_R1', 'Reads_R2', 'Barcode', 'Uniq_in_R1', 'Same_as_ref_R1', 'Uniq_in_R2', 'Same_as_ref_R2', 'Uniq_sequencer_R1', 'Uniq_sequencer_R2', 'PrimerF_in_R1', 'Perc_primerF_R1', 'PrimerR_in_R2', 'Perc_primerR_R2']160 if data_type == 'paired':161 report.write('\t'.join(header_P)+'\n')162 else:163 report.write('\t'.join(header_S)+'\n')164 for sample, val in collect_data.items():165 if data_type == 'paired':166 report.write(sample+'\t'+'{reads_count_R1}\t{reads_count_R2}\t{barcode}\t{nb_barcode_R1}\t{barcode_seq_R1}\t{nb_barcode_R2}\t{barcode_seq_R2}\t{nb_instrument_R1}\t{nb_instrument_R2}\t{primer_R1}\t{perc_primer_R1}\t{primer_R2}\t{perc_primer_R2}\n'.format(**val))167 else:168 report.write(sample+'\t'+'{reads_count_R1}\t{barcode}\t{nb_barcode_R1}\t{barcode_seq_R1}\t{nb_instrument_R1}\t{primer_R1}\t{perc_primer_R1}\n'.format(**val))169def integrity_validation(collect_data_out,control_list, data_type, primer_threshold):170 # collect control(s)171 controls = [str(control) for control in control_list.split(',')]172 # let's check the integrity173 for sample_id, val in collect_data_out.items():174 if data_type == 'paired':175 # 1 - check reads count R1 vs R2176 if val['reads_count_R1'] != val['reads_count_R2']:177 eprint('ERROR: different number of reads between R1 and R2 for ' + sample_id)178 exit(1)179 # 2 - check single sequencing instrument in reads180 if val['nb_instrument_R1'] != 1 or val['nb_instrument_R2'] != 1:181 eprint('ERROR: multiple sequencing machines detected in ' + sample_id)182 exit(1)183 # 3 - check primer percentage found, except for control(s)184 if not sample_id in controls:185 if val['perc_primer_R1'] < primer_threshold or val['perc_primer_R2'] < primer_threshold:186 eprint('ERROR: ' + sample_id + " did not reach the minimum threshold for primer percentage [" + str(primer_threshold) +"%]")187 exit(1)188 else:189 # 1 - check single sequencing instrument in reads190 if val['nb_instrument_R1'] != 1:191 eprint('ERROR: multiple sequencing machines detected in ' + sample_id)192 exit(1)193 # 2 - check primer percentage found, except for control(s)194 if not sample_id in controls:195 if val['perc_primer_R1'] < primer_threshold:196 eprint('ERROR: ' + sample_id + " did not reach the minimum threshold for primer percentage [" + str(primer_threshold) + "%]")197 exit(1)198def sort_process(input_file, output_file):199 cmd_sort = '(head -n 1 {in_file} && tail -n +2 {in_file} | sort) > {out_file}'.format(in_file=input_file, out_file=output_file)200 p = subprocess.Popen(cmd_sort, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)201 stdout, stderr = p.communicate()202 if p.returncode != 0:203 eprint('ERROR: sort metadata/manifest failed')204 raise Exception(stderr)205 exit(1)206def main(args):207 # 1 - Collect metadata infos and check variable names208 print("Step 1 - parse and collect data from q2_metadata")209 collect_data, data_type = collect_check_metadata(args.metadata, args.readtype)210 # 2 - Collect reads location from manifest211 print("Step 2 - parse and collect data from q2_manifest")212 collect_data = collect_manifest(args.manifest, collect_data, data_type)213 # 3 - Check fastq integrity214 print("Step 3 - collect fastq integrity")215 pool = mp.Pool(args.threads)216 collect_data_para = pool.starmap(check_fastq, [(collect_data, sample,data_type) for sample in collect_data.keys()])217 pool.close()218 # Clean all this mess219 collect_data_out = {}220 for result in collect_data_para:221 for sample, vals in result.items():222 collect_data_out[sample] = vals223 # 4 - Report fastq intergrity before validation224 # Allow exploration by user for a better understanding225 print("Step 4 - write integrity report")226 write_report(collect_data_out, data_type)227 # 5 - Validate manifest, metadata and fastq228 print("Step 5 - validation of data")229 integrity_validation(collect_data_out, args.control, data_type, args.primer)230 # 6 - Sort manifest and metadata231 print("Step 6 - Sort manifest and metadata by sample id")232 print("\tsort manifest...")233 output_file_manifest = args.manifest + ".sort"234 sort_process(args.manifest, output_file_manifest)235 print("\tsort metadata...")236 output_file_metadata = args.metadata + ".sort"237 sort_process(args.metadata, output_file_metadata)238if __name__ == '__main__':239 args = getArgs()...

Full Screen

Full Screen

main_buttons.py

Source:main_buttons.py Github

copy

Full Screen

...85 Args:86 window (QMainWindow): Ventana principal87 """88 window.btn_header.clicked.connect(89 lambda: collect_data(window, "Header"),90 )91 window.btn_free.clicked.connect(92 lambda: collect_data(window, "Free"),93 )94 window.btn_comment.clicked.connect(95 lambda: collect_data(window, "Comment"),96 )97 window.btn_subroutine.clicked.connect(98 lambda: collect_data(window, "Subroutine")99 )100 window.btn_collect.clicked.connect(101 lambda: collect_data(window, "Collect"),102 )103 window.btn_end.clicked.connect(104 lambda: collect_data(window, "End"),105 )106 window.btn_tool_call.clicked.connect(107 lambda: collect_data(window, "Tool_call"),108 )109 window.btn_tool_close.clicked.connect(110 lambda: collect_data(window, "Tool_close"),111 )112 window.btn_spindle.clicked.connect(113 lambda: collect_data(window, "Spindle"),114 )115 window.btn_spindle_index.clicked.connect(116 lambda: collect_data(window, "Spindle_index"),117 )118 window.btn_misc.clicked.connect(119 lambda: collect_data(window, "Misc"),120 )121 window.btn_turn_ini.clicked.connect(122 lambda: collect_data(window, "Turn_ini"),123 )124 window.btn_lineal_turn.clicked.connect(125 lambda: collect_data(window, "Lineal_turn"),126 )127 window.btn_radial_turn.clicked.connect(128 lambda: collect_data(window, "Radial_turn"),129 )130 window.btn_thread.clicked.connect(131 lambda: collect_data(window, "Thread"),132 )133 window.btn_cutoff.clicked.connect(134 lambda: collect_data(window, "Cutoff"),135 )136 window.btn_mill_ini.clicked.connect(137 lambda: collect_data(window, "Mill_ini"),138 )139 window.btn_mill_end.clicked.connect(140 lambda: collect_data(window, "Mill_end"),141 )142 window.btn_lineal_mill.clicked.connect(143 lambda: collect_data(window, "Lineal_mill"),144 )145 window.btn_radial_mill.clicked.connect(146 lambda: collect_data(window, "Radial_mill"),147 )148 window.btn_drill_ini.clicked.connect(149 lambda: collect_data(window, "Drill_ini"),150 )151 window.btn_drill_end.clicked.connect(152 lambda: collect_data(window, "Drill_end"),153 )154 window.btn_center.clicked.connect(155 lambda: collect_data(window, "Center_drill"),156 )157 window.btn_drill.clicked.connect(158 lambda: collect_data(window, "Drill"),159 )160 window.btn_csink.clicked.connect(161 lambda: collect_data(window, "Csink"),162 )163 window.btn_tapping.clicked.connect(164 lambda: collect_data(window, "Tapping"),165 )166 window.btn_rough_turn_cycle.clicked.connect(167 lambda: collect_data(window, "Rough_turn_cycle"),168 )169 window.btn_rough_turn_cycle_end.clicked.connect(170 lambda: collect_data(window, "Rough_turn_cycle_end"),171 )172 window.btn_flat_mill.clicked.connect(173 lambda: collect_data(window, "Flat_mill"),174 )175 window.btn_face_mill.clicked.connect(176 lambda: collect_data(window, "Face_mill"),...

Full Screen

Full Screen

collect_main.py

Source:collect_main.py Github

copy

Full Screen

1import os23os.system("python ./collect_data/collect_data.py 0.1 1")4os.system("python ./collect_data/collect_data.py 0.5 2")5os.system("python ./collect_data/collect_data.py 1.0 3")6os.system("python ./collect_data/collect_data.py 2.0 4") ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run stestr automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful