Best Python code snippet using stestr_python
data_integrity.py
Source:data_integrity.py  
1#!/usr/bin/env python32from __future__ import print_function3import re4import os5import sys6import gzip7import argparse8import subprocess9import multiprocessing as mp10from Bio import SeqIO11# For errors / warnings12def eprint(*args, **kwargs):13    print(*args, file=sys.stderr, **kwargs)14def getArgs():15    parser = argparse.ArgumentParser(description="")16    parser.add_argument('-a',dest="manifest",type=str,required=True,help='q2_manifest file')17    parser.add_argument('-e',dest="metadata",type=str,required=True,help='q2_metadata file')18    parser.add_argument('-p',dest="primer",type=int,default=70,help='Percentage of primers supposed to be found in raw reads [%(default)s]')19    parser.add_argument('-c',dest="control",type=str, default='',help='Delimited list of control (comma separated)')20    parser.add_argument('-r',dest="readtype",type=str,default='paired',choices=['paired','single'],help='Sequencing format [%(default)s]')21    parser.add_argument('-t',dest="threads",type=int,default=4,help='Number of threads [%(default)s]')22    arg = parser.parse_args()23    return arg24def collect_check_metadata(metadata_file, data_type):25    collect_data = {}26    # read metadata line by line27    for l in open(metadata_file, 'r'):28        # ignore header29        if not l.startswith('sampleid'):30            sampleid = re.split(r'\t', l.rstrip('\n'))[0]31            collect_data[sampleid] = {}32            collect_data[sampleid]['barcode'] = re.split(r'\t', l.rstrip('\n'))[1]33            if data_type == 'paired':34                collect_data[sampleid]['primerF'] = re.split(r'\t', l.rstrip('\n'))[2]35                collect_data[sampleid]['primerR'] = re.split(r'\t', l.rstrip('\n'))[3]36                collect_data[sampleid]['vars'] = re.split(r'\t', l.rstrip('\n'))[4:]37            else:38                collect_data[sampleid]['primerF'] = re.split(r'\t', l.rstrip('\n'))[2]39                collect_data[sampleid]['vars'] = re.split(r'\t', l.rstrip('\n'))[3:]40            # check that var(s) didn't contains any NA41            for var in collect_data[sampleid]['vars']:42                if var == 'NA' or var == '':43                    eprint('ERROR: '+sampleid+' has NA value(s) in q2_metadata, please remove them before running SAMBA')44                    exit(1)45    # return results46    return collect_data, data_type47def collect_manifest(manifest_file, collect_data, data_type):48    # in order to check that metadata and manifest have the same number of lines49    metadata_size = len(collect_data)50    manifest_size = 051    # read manifest52    for l in open(manifest_file, 'r'):53        # ignore header54        if not l.startswith('sample-id'):55            if data_type == 'paired':56                try:57                    sampleid, R1, R2 = re.split(r'\t', l.rstrip('\n'))58                except ValueError:59                    size = len(re.split(r'\t', l.rstrip('\n')))60                    eprint('ERROR: q2_manifest contains '+str(size)+ 'column(s) instead of 3')61                    exit(1)62                # check that file path are good63                check_fastq_path(R1)64                check_fastq_path(R2)65                # add to collect_data66                try:67                    collect_data[sampleid]['R1'] = R168                    collect_data[sampleid]['R2'] = R269                except KeyError:70                    eprint('ERROR: '+sampleid+' from q2_manifest is absent in q2_metadata')71                    exit(1)72            else:73                try:74                    sampleid, R1 = l.split()75                except ValueError:76                    size = len(re.split(r'\t', l.rstrip('\n')))77                    eprint('ERROR: q2_manifest contains '+str(size)+ 'column(s) instead of 2')78                    exit(1)79                # check that file path are good80                check_fastq_path(R1)81                # add to collect_data82                try:83                    collect_data[sampleid]['R1'] = R184                except KeyError:85                    eprint('ERROR: '+sampleid+' from q2_manifest is absent in q2_metadata')86                    exit(1)87            # increase line counter88            manifest_size += 189    # check that manifest have the same size as metadata90    if manifest_size != metadata_size:91        eprint('ERROR: q2_manifest and q2_metadata did not have the same number of lines')92        exit(1)93    # return update dict94    return collect_data95def check_fastq(collect_data, sample, data_type):96    out = {}97    print("\tanalyse sample: "+sample)98    barcode = collect_data[sample]['barcode']99    # by default, single-end100    reads = ['R1']101    # in case of paired, add R2 analysis102    if data_type == 'paired':103        reads.append('R2')104    # check fastq105    for r in reads:106        R=collect_data[sample][r]107        if r == 'R1':108            primer=collect_data[sample]['primerF']109        else:110            primer=collect_data[sample]['primerR']111        instrument, index, reads_count, primer = read_fastq(R, primer)112        collect_data[sample]['reads_count_'+r] = reads_count113        collect_data[sample]['instrument_'+r] = instrument114        collect_data[sample]['nb_instrument_'+r] = len(instrument)115        collect_data[sample]['index_'+r] = index116        collect_data[sample]['primer_'+r] = primer117        collect_data[sample]['perc_primer_'+r] = round(primer * 100 / reads_count, 2)118        collect_data[sample]['nb_barcode_'+r] = len(index)119        collect_data[sample]['barcode_seq_'+r] = 'FALSE'120        # chech that barcode are the same as in metadata121        if len(index) == 1:122            if index[0] == barcode:123                collect_data[sample]['barcode_seq_'+r]  = 'TRUE'124    # return updated dict125    out[sample] = collect_data[sample]126    return out127def read_fastq(fastq, primer):128    primer = re.sub(r"([RYSWKMBDHVNI])", r".", primer)129    reads_count = 0130    primers_count = 0131    instrument = []132    index = []133    with gzip.open(fastq, "rt") as handle:134            for record in SeqIO.parse(handle, "fastq"):135                # check instrument name136                h_1, h_2 = record.description.split()137                instrument_name = h_1.split(':')[0]138                if not instrument_name in instrument:139                    instrument.append(instrument_name)140                # check index141                sequence_index = h_2.split(':')[3]142                if not sequence_index in index:143                    index.append(sequence_index)144                # check primers145                if re.search(primer, str(record.seq)):146                    primers_count += 1147                # increase read count148                reads_count += 1149    return instrument, index, reads_count, primers_count150def check_fastq_path(path):151    if not os.path.isfile(path):152        eprint('ERROR: ' + path + ' from q2_manifest not exit. Wrong path?')153        exit(1)154def write_report(collect_data, data_type):155    # open output file for writting156    report = open('data_integrity.txt', 'w')157    # header158    header_S = ['SampleID', 'Reads_R1', 'Barcode', 'Uniq_in_R1', 'Same_as_ref_R1', 'Uniq_sequencer_R1', 'PrimerF_in_R1', 'Perc_primerF_R1']159    header_P = ['SampleID', 'Reads_R1', 'Reads_R2', 'Barcode', 'Uniq_in_R1', 'Same_as_ref_R1', 'Uniq_in_R2', 'Same_as_ref_R2', 'Uniq_sequencer_R1', 'Uniq_sequencer_R2', 'PrimerF_in_R1', 'Perc_primerF_R1', 'PrimerR_in_R2', 'Perc_primerR_R2']160    if data_type == 'paired':161        report.write('\t'.join(header_P)+'\n')162    else:163        report.write('\t'.join(header_S)+'\n')164    for sample, val in collect_data.items():165        if data_type == 'paired':166            report.write(sample+'\t'+'{reads_count_R1}\t{reads_count_R2}\t{barcode}\t{nb_barcode_R1}\t{barcode_seq_R1}\t{nb_barcode_R2}\t{barcode_seq_R2}\t{nb_instrument_R1}\t{nb_instrument_R2}\t{primer_R1}\t{perc_primer_R1}\t{primer_R2}\t{perc_primer_R2}\n'.format(**val))167        else:168            report.write(sample+'\t'+'{reads_count_R1}\t{barcode}\t{nb_barcode_R1}\t{barcode_seq_R1}\t{nb_instrument_R1}\t{primer_R1}\t{perc_primer_R1}\n'.format(**val))169def integrity_validation(collect_data_out,control_list, data_type, primer_threshold):170    # collect control(s)171    controls = [str(control) for control in control_list.split(',')]172    # let's check the integrity173    for sample_id, val in collect_data_out.items():174        if data_type == 'paired':175            # 1 - check reads count R1 vs R2176            if val['reads_count_R1'] != val['reads_count_R2']:177                eprint('ERROR: different number of reads between R1 and R2 for ' + sample_id)178                exit(1)179            # 2 - check single sequencing instrument in reads180            if val['nb_instrument_R1'] != 1 or val['nb_instrument_R2'] != 1:181                eprint('ERROR: multiple sequencing machines detected in ' + sample_id)182                exit(1)183            # 3 - check primer percentage found, except for control(s)184            if not sample_id in controls:185                if val['perc_primer_R1'] < primer_threshold or val['perc_primer_R2'] < primer_threshold:186                    eprint('ERROR: ' + sample_id + " did not reach the minimum threshold for primer percentage [" + str(primer_threshold) +"%]")187                    exit(1)188        else:189            # 1 - check single sequencing instrument in reads190            if val['nb_instrument_R1'] != 1:191                eprint('ERROR: multiple sequencing machines detected in ' + sample_id)192                exit(1)193            # 2 - check primer percentage found, except for control(s)194            if not sample_id in controls:195                if val['perc_primer_R1'] < primer_threshold:196                    eprint('ERROR: ' + sample_id + " did not reach the minimum threshold for primer percentage [" + str(primer_threshold) + "%]")197                    exit(1)198def sort_process(input_file, output_file):199    cmd_sort = '(head -n 1 {in_file} && tail -n +2 {in_file} | sort) > {out_file}'.format(in_file=input_file, out_file=output_file)200    p = subprocess.Popen(cmd_sort, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)201    stdout, stderr = p.communicate()202    if p.returncode != 0:203        eprint('ERROR: sort metadata/manifest failed')204        raise Exception(stderr)205        exit(1)206def main(args):207    # 1 - Collect metadata infos and check variable names208    print("Step 1 - parse and collect data from q2_metadata")209    collect_data, data_type = collect_check_metadata(args.metadata, args.readtype)210    # 2 - Collect reads location from manifest211    print("Step 2 - parse and collect data from q2_manifest")212    collect_data = collect_manifest(args.manifest, collect_data, data_type)213    # 3 - Check fastq integrity214    print("Step 3 - collect fastq integrity")215    pool = mp.Pool(args.threads)216    collect_data_para = pool.starmap(check_fastq, [(collect_data, sample,data_type) for sample in collect_data.keys()])217    pool.close()218    # Clean all this mess219    collect_data_out = {}220    for result in collect_data_para:221        for sample, vals in result.items():222            collect_data_out[sample] = vals223    # 4 - Report fastq intergrity before validation224    # Allow exploration by user for a better understanding225    print("Step 4 - write integrity report")226    write_report(collect_data_out, data_type)227    # 5 - Validate manifest, metadata and fastq228    print("Step 5 - validation of data")229    integrity_validation(collect_data_out, args.control, data_type, args.primer)230    # 6 - Sort manifest and metadata231    print("Step 6 - Sort manifest and metadata by sample id")232    print("\tsort manifest...")233    output_file_manifest = args.manifest + ".sort"234    sort_process(args.manifest, output_file_manifest)235    print("\tsort metadata...")236    output_file_metadata = args.metadata + ".sort"237    sort_process(args.metadata, output_file_metadata)238if __name__ == '__main__':239    args = getArgs()...main_buttons.py
Source:main_buttons.py  
...85    Args:86        window (QMainWindow): Ventana principal87    """88    window.btn_header.clicked.connect(89        lambda: collect_data(window, "Header"),90    )91    window.btn_free.clicked.connect(92        lambda: collect_data(window, "Free"),93    )94    window.btn_comment.clicked.connect(95        lambda: collect_data(window, "Comment"),96    )97    window.btn_subroutine.clicked.connect(98        lambda: collect_data(window, "Subroutine")99    )100    window.btn_collect.clicked.connect(101        lambda: collect_data(window, "Collect"),102    )103    window.btn_end.clicked.connect(104        lambda: collect_data(window, "End"),105    )106    window.btn_tool_call.clicked.connect(107        lambda: collect_data(window, "Tool_call"),108    )109    window.btn_tool_close.clicked.connect(110        lambda: collect_data(window, "Tool_close"),111    )112    window.btn_spindle.clicked.connect(113        lambda: collect_data(window, "Spindle"),114    )115    window.btn_spindle_index.clicked.connect(116        lambda: collect_data(window, "Spindle_index"),117    )118    window.btn_misc.clicked.connect(119        lambda: collect_data(window, "Misc"),120    )121    window.btn_turn_ini.clicked.connect(122        lambda: collect_data(window, "Turn_ini"),123    )124    window.btn_lineal_turn.clicked.connect(125        lambda: collect_data(window, "Lineal_turn"),126    )127    window.btn_radial_turn.clicked.connect(128        lambda: collect_data(window, "Radial_turn"),129    )130    window.btn_thread.clicked.connect(131        lambda: collect_data(window, "Thread"),132    )133    window.btn_cutoff.clicked.connect(134        lambda: collect_data(window, "Cutoff"),135    )136    window.btn_mill_ini.clicked.connect(137        lambda: collect_data(window, "Mill_ini"),138    )139    window.btn_mill_end.clicked.connect(140        lambda: collect_data(window, "Mill_end"),141    )142    window.btn_lineal_mill.clicked.connect(143        lambda: collect_data(window, "Lineal_mill"),144    )145    window.btn_radial_mill.clicked.connect(146        lambda: collect_data(window, "Radial_mill"),147    )148    window.btn_drill_ini.clicked.connect(149        lambda: collect_data(window, "Drill_ini"),150    )151    window.btn_drill_end.clicked.connect(152        lambda: collect_data(window, "Drill_end"),153    )154    window.btn_center.clicked.connect(155        lambda: collect_data(window, "Center_drill"),156    )157    window.btn_drill.clicked.connect(158        lambda: collect_data(window, "Drill"),159    )160    window.btn_csink.clicked.connect(161        lambda: collect_data(window, "Csink"),162    )163    window.btn_tapping.clicked.connect(164        lambda: collect_data(window, "Tapping"),165    )166    window.btn_rough_turn_cycle.clicked.connect(167        lambda: collect_data(window, "Rough_turn_cycle"),168    )169    window.btn_rough_turn_cycle_end.clicked.connect(170        lambda: collect_data(window, "Rough_turn_cycle_end"),171    )172    window.btn_flat_mill.clicked.connect(173        lambda: collect_data(window, "Flat_mill"),174    )175    window.btn_face_mill.clicked.connect(176        lambda: collect_data(window, "Face_mill"),...collect_main.py
Source:collect_main.py  
1import os23os.system("python ./collect_data/collect_data.py 0.1 1")4os.system("python ./collect_data/collect_data.py 0.5 2")5os.system("python ./collect_data/collect_data.py 1.0 3")6os.system("python ./collect_data/collect_data.py 2.0 4")
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
