Best Python code snippet using avocado_python
marginphase_chunk_merging.py
Source:marginphase_chunk_merging.py  
1import tarfile2from contextlib import closing3import pysam4from marginphase_core import *5def merge_chunks__determine_chunk_splitting(job, chunk_identifier, left_chunk_location, right_chunk_location, chunk_boundary):6    # get read and phase block info7    l_reads, l_phase_blocks = merge_chunks__read_chunk(job, chunk_identifier, left_chunk_location)8    r_reads, r_phase_blocks = merge_chunks__read_chunk(job, chunk_identifier, right_chunk_location)9    # organize chunk comparison10    all_phase_blocks, perfect_matches, inverted_matches, shared_phase_blocks = merge_chunks__organize_reads_and_blocks(11        job, chunk_identifier, l_reads, l_phase_blocks, r_reads, r_phase_blocks)12    # recommend strategy13    split_position, phase_block, invert_right, decision_summary = merge_chunks__recommend_merge_strategy(14        job, chunk_identifier, chunk_boundary, perfect_matches, inverted_matches, shared_phase_blocks)15    # implement strategy16    left_reads_writing, right_reads_writing, vcf_split_pos, vcf_right_phase_action = merge_chunks__specify_split_action(17        job, chunk_identifier, split_position, phase_block, invert_right,18        l_reads, l_phase_blocks, r_reads, r_phase_blocks)19    # log summarization20    job.fileStore.logToMaster(("{}:merge_chunks:determine_chunk_splitting: "21                              "read merge action: write {} from left, {} from right")22                              .format(chunk_identifier, len(left_reads_writing), len(right_reads_writing)))23    job.fileStore.logToMaster(("{}:merge_chunks:determine_chunk_splitting: call merge action: "24                              "split at {}, right action {}")25                              .format(chunk_identifier, vcf_split_pos, vcf_right_phase_action))26    # return27    return left_reads_writing, right_reads_writing, vcf_split_pos, vcf_right_phase_action, decision_summary28def merge_chunks__organize_reads_and_blocks(job, chunk_identifier, l_reads, l_phase_blocks, r_reads, r_phase_blocks):29    # get all phase blocks with same start pos30    all_phase_blocks = list(set(l_phase_blocks.keys()).union(set(r_phase_blocks.keys())))31    all_phase_block_count = len(all_phase_blocks)32    # get all phase blocks with same start pos33    shared_phase_blocks = list(set(l_phase_blocks.keys()).intersection(set(r_phase_blocks.keys())))34    shared_phase_blocks.sort()35    # get phase blocks by start and end pos36    l_pb_uniq_ids = list(map(lambda x: "{}-{}".format(x[PB_START_POS], x[PB_END_POS]), l_phase_blocks.values()))37    r_pb_uniq_ids = set(map(lambda x: "{}-{}".format(x[PB_START_POS], x[PB_END_POS]), r_phase_blocks.values()))38    # find all matches39    perfect_matches = list()40    inverted_matches = list()41    for l_pb_uniq in l_pb_uniq_ids:42        if l_pb_uniq in r_pb_uniq_ids:43            # we know phase block positions align44            shared_phase_block = int(l_pb_uniq.split("-")[0])45            phase_block_median_pos = int((int(l_pb_uniq.split("-")[0]) + int(l_pb_uniq.split("-")[1]))/2.0)46            # get all reads in phase block (get from both blocks, in case some were skipped)47            reads_in_phase_block = set()48            for reads in [l_phase_blocks[shared_phase_block][PB_HAP1_READS],l_phase_blocks[shared_phase_block][PB_HAP2_READS],49                          r_phase_blocks[shared_phase_block][PB_HAP1_READS],r_phase_blocks[shared_phase_block][PB_HAP2_READS]]:50                for read in reads:51                    reads_in_phase_block.add(read)52            # get reads spannign median position53            reads_spanning_median = list(filter(54                lambda x:   l_reads[x][RD_ALN_START] <= phase_block_median_pos <= l_reads[x][RD_ALN_END]55                            if x in l_reads else56                            r_reads[x][RD_ALN_START] <= phase_block_median_pos <= r_reads[x][RD_ALN_END],57                reads_in_phase_block))58            # perfect match?59            perfect_matched_reads = list(filter(60                lambda x:   l_reads[x][RD_HAPLOTYPE_TAG] == r_reads[x][RD_HAPLOTYPE_TAG]61                            if x in l_reads and x in r_reads else False,62                reads_spanning_median63            ))64            if len(perfect_matched_reads) == len(reads_spanning_median):65                perfect_matches.append(l_pb_uniq)66                continue67            # inverted match?68            inverted_matched_reads = list(filter(69                lambda x: l_reads[x][RD_HAPLOTYPE_TAG].replace("h1","<TMP>").replace("h2","h1").replace("<TMP>","h2")70                          == r_reads[x][RD_HAPLOTYPE_TAG] if x in l_reads and x in r_reads else False,71                reads_spanning_median72            ))73            if len(inverted_matched_reads) == len(reads_spanning_median):74                inverted_matches.append(l_pb_uniq)75                continue76    # loggit77    job.fileStore.logToMaster("{}:merge_chunks:organize_reads_and_blocks: Found {} distinct phase blocks"78        .format(chunk_identifier, all_phase_block_count))79    job.fileStore.logToMaster("{}:merge_chunks:organize_reads_and_blocks: Found {} ({}%) perfect matches"80        .format(chunk_identifier, len(perfect_matches), percent(len(perfect_matches), all_phase_block_count)))81    job.fileStore.logToMaster("{}:merge_chunks:organize_reads_and_blocks: Found {} ({}%) inverted matches"82        .format(chunk_identifier, len(inverted_matches), percent(len(inverted_matches), all_phase_block_count)))83    job.fileStore.logToMaster("{}:merge_chunks:organize_reads_and_blocks: Found {} ({}%) matched phase starts"84        .format(chunk_identifier, len(shared_phase_blocks), percent(len(shared_phase_blocks), all_phase_block_count)))85    # return what we found86    return all_phase_blocks, perfect_matches, inverted_matches, shared_phase_blocks87def merge_chunks__recommend_merge_strategy(job, chunk_identifier, chunk_boundary, perfect_matches,88                                           inverted_matches, shared_phase_blocks):89    # helper function90    def pick_closest_elem_to_chunk_boundary(elems):91        elems.sort(key=lambda x: abs(chunk_boundary - sum(map(int, str(x).split("-"))) / len(str(x).split("-"))))92        return elems[0]93    # description of merge strategy94    split_position, phase_block, invert_right, decision_summary = None, None, None, None95    # case1: perfect match:96    #   READS: left of and spanning split_pos from left chunk, starting after split_pos from right chunk97    #   CALLS: left of split pos from left VCF, right of split pos from right VCF98    if len(perfect_matches) > 0:99        parts = map(int, pick_closest_elem_to_chunk_boundary(perfect_matches).split("-"))100        split_position = int(sum(parts) / len(parts))101        phase_block = parts[0]102        invert_right = False103        decision_summary = "PERFECT_MATCH"104        job.fileStore.logToMaster("{}:merge_chunks:recommend_merge_strategy: Found perfect match at "105                                  "pos {} in phase block {}".format(chunk_identifier, split_position, phase_block))106    # case2: perfect match but inverted haploptyes107    #   READS: left of and spanning split_pos from left chunk, starting after split_pos from right chunk108    #       reverse haplotype of included reads in phase_block from right chunk109    #   CALLS: left of split pos from left VCF, right of split pos from right VCF110    #       reverse phasing of calls in phase block from right chunk111    elif len(inverted_matches) > 0:112        parts = map(int, pick_closest_elem_to_chunk_boundary(inverted_matches).split("-"))113        split_position = int(sum(parts) / len(parts))114        phase_block = parts[0]115        invert_right = True116        decision_summary = "INVERT_MATCH"117        job.fileStore.logToMaster(118            "{}:merge_chunks:recommend_merge_strategy: Found inverted match at "119            "pos {} in phase block {}".format(chunk_identifier, split_position, phase_block))120    # case3: found a phase block starting at the same posistion in each chunk121    #   READS: finishing before split_pos from left chunk, starting after split pos from right chunk122    #       reads spanning split_pos get hap info from left before split_pos, and hap info from right after and including123    #   CALLS: left of split pos from left VCF, right of split pos from right VCF124    elif len(shared_phase_blocks) > 0:125        phase_block = pick_closest_elem_to_chunk_boundary(shared_phase_blocks)126        split_position = phase_block127        invert_right = False128        decision_summary = "PHASE_START_MATCH"129        job.fileStore.logToMaster("{}:merge_chunks:recommend_merge_strategy: Found phase block start match "130                                  "at {}".format(chunk_identifier, phase_block))131    # case4: no matching phase blocks132    #   READS: finishing before split_pos from left chunk, reads spanning split_pos get phasing finishing left of133    #       split_pos from left chunk, phasing in phase blocks spanning split_pos get split in two, phasing in phase134    #       blocks starting after split_pos from right chunk135    #   CALLS: starting left of split_pos from left VCF, starting after split_pos from right VCF, calls from right136    #       phase block spanning split_pos get new phase_block137    else:138        phase_block = None139        split_position = chunk_boundary140        invert_right = False141        decision_summary = "NO_MATCH"142        job.fileStore.logToMaster("{}:merge_chunks:recommend_merge_strategy: Found no match, creating "143                                  "new phase block at {}".format(chunk_identifier, split_position))144    # return data145    return split_position, phase_block, invert_right, decision_summary146def merge_chunks__specify_split_action(job, chunk_identifier, split_position, phase_block, invert_right,147                                       l_reads, l_phase_blocks, r_reads, r_phase_blocks):148    # describes read inclusion and modifications to haplotype string149    left_reads_writing = dict()150    right_reads_writing = dict()151    right_phase_block_conversion = dict()152    for read in l_reads.values():153        # this read belongs wholly to the right chunk154        if read[RD_ALN_START] > split_position:155            continue156        # this read belongs wholly to the left chunk157        elif read[RD_ALN_END] <= split_position:158            left_reads_writing[read[RD_ID]] = None159        # this read spans the split_position (and needs analysis)160        elif read[RD_ALN_START] <= split_position and read[RD_ALN_END] > split_position:161            # case4: new phase block created at split pos162            if phase_block is None:163                l_read = read164                r_read = r_reads[read[RD_ID]]165                new_hap_str, old_right_phase_block = merge_chunks__create_new_phase_block_at_position(166                    job, chunk_identifier, split_position, l_read, r_read)167                left_reads_writing[read[RD_ID]] = [[read[RD_HAPLOTYPE_TAG], new_hap_str]]168                if old_right_phase_block is not None: # None indicates read was not haplotyped (ie, h0)169                    right_phase_block_conversion[old_right_phase_block] = split_position170                # santity check (should never find two phase blocks from right)171                if len(right_phase_block_conversion) > 1:172                    raise UserError("SANITY_CHECK_FAILURE:{}: got inconsistent phase blocks ({}) spanning {} for read {}"173                                    .format(chunk_identifier, right_phase_block_conversion.keys(), split_position, read[RD_ID]))174            # case3: take hap info before split_pos from left, after right.  phase block exists at split_pos175            elif phase_block == split_position:176                l_read = l_reads[read[RD_ID]]177                haps = list(filter(lambda x: x[RPB_BLOCK_ID] < split_position, l_read[RD_PHASE_BLOCKS]))178                if read[RD_ID] in r_reads:179                    r_read = r_reads[read[RD_ID]]180                    haps.extend(list(filter(lambda x: x[RPB_BLOCK_ID] >= split_position, r_read[RD_PHASE_BLOCKS])))181                haps.sort(key=lambda x: x[RPB_BLOCK_ID])182                new_hap_str = ";".join(map(merge_chunks__encode_phase_info, haps))183                left_reads_writing[read[RD_ID]] = [[read[RD_HAPLOTYPE_TAG], new_hap_str]]184            # case2, case1:185            else:186                left_reads_writing[read[RD_ID]] = None187    # get right reads we care about (reads in the spanning-the-split_pos phase chunk)188    # (everthing before split_pos comes from left chunk, everything after is unchanged)189    analysis_read_ids = set()190    if phase_block is None:191        if len(right_phase_block_conversion) == 0:192            job.fileStore.logToMaster("{}:merge_chunks:specify_split_action: No reads spanning {} were found!"193                .format(chunk_identifier, split_position))194        else:195            analysis_phase_block_id = list(right_phase_block_conversion.keys())[0]196            analysis_phase_block = r_phase_blocks[analysis_phase_block_id]197            analysis_read_ids = analysis_phase_block[PB_HAP1_READS].union(analysis_phase_block[PB_HAP2_READS])198    else:199        analysis_phase_block = r_phase_blocks[phase_block]200        analysis_read_ids = analysis_phase_block[PB_HAP1_READS].union(analysis_phase_block[PB_HAP2_READS])201    for read_id in analysis_read_ids:202        read = r_reads[read_id]203        # this read belongs wholly to the left chunk204        if read[RD_ALN_END] <= split_position:205            continue206        # this was analyzed with the left reads207        elif read_id in left_reads_writing:208            continue209        # now we need to analyize - we know these reads start after split_pos210        # case4211        if phase_block is None:212            if len(right_phase_block_conversion) == 0:213                raise UserError("SANITY_CHECK_FAILURE:{}: new phase block determined, but no conversion for read {}"214                                .format(chunk_identifier, read_id))215            pb_from = list(right_phase_block_conversion.keys())[0]216            pb_to = right_phase_block_conversion[pb_from]217            new_hap_str = read[RD_HAPLOTYPE_TAG].replace("p{},".format(pb_from), "p{},".format(pb_to))218            right_reads_writing[read_id] = [[read[RD_HAPLOTYPE_TAG], new_hap_str]]219        # case2220        elif invert_right:221            h1_str = "h1,p{}".format(phase_block)222            h1_tmp = "h1,p<TMP>"223            h2_str = "h2,p{}".format(phase_block)224            new_hap_str = read[RD_HAPLOTYPE_TAG].replace(h1_str, h1_tmp).replace(h2_str, h1_str).replace(h1_tmp, h2_str)225            right_reads_writing[read_id] = [[read[RD_HAPLOTYPE_TAG], new_hap_str]]226        # case1, case3227        else:228            pass229    # summarize vcf230    vcf_split_position = split_position231    vcf_right_phase_conversion = dict()232    if phase_block is None: # case 4233        if len(right_phase_block_conversion) != 0:234            # this also requires a new phase block when started235            vcf_right_phase_conversion = right_phase_block_conversion236        else: # no reads span this, so no action237            pass238    elif invert_right: # case 2239        vcf_right_phase_conversion = {phase_block: ACTION_INVERT}240    else:241        # case1 or case 3: no action242        pass243    # finish244    return left_reads_writing, right_reads_writing, vcf_split_position, vcf_right_phase_conversion245def merge_chunks__parse_phase_info(phase_info):246    parts = phase_info.split(",")247    if len(parts) != 4 or not (parts[0].startswith('h') and parts[1].startswith('p') and parts[2].startswith('r')248                               and parts[3].startswith('l')):249        return None, None, None, None250    parts = map(lambda x: int(x[1:]), parts)251    haplotype, phase_block, read_start, read_length = parts[0], parts[1], parts[2], parts[3]252    return haplotype, phase_block, read_start, read_length253def merge_chunks__encode_phase_info(read_data):254    haplotype = 0 if read_data[RPB_IS_HAP1] is None else (1 if read_data[RPB_IS_HAP1] else 2)255    phase_block = read_data[RPB_BLOCK_ID]256    read_start = read_data[RPB_READ_START]257    read_length = read_data[RPB_READ_LENGTH]258    return "h{},p{},r{},l{}".format(haplotype, phase_block, read_start, read_length)259def merge_chunks__save_read_info(all_reads, read):260    # get info261    read_id = read.query_name262    align_start = read.reference_start263    align_end = read.reference_end264    # save storage data265    read_data = dict()266    all_reads[read_id] = read_data267    read_data[RD_ID] = read_id268    read_data[RD_ALN_START] = align_start269    read_data[RD_ALN_END] = align_end270    read_data[RD_HAPLOTYPE_TAG] = read.get_tag(TAG_HAPLOTYPE)271    read_data[RD_PHASE_BLOCKS] = list()272    # return data273    return read_data274def merge_chunks__save_phase_block_info(job, chunk_identifier, phase_blocks, phase_info, read_id):275    # get info276    haplotype, phase_block, read_start, read_length = merge_chunks__parse_phase_info(phase_info)277    if haplotype is None:278        raise UserError("SANITY_CHECK_FAILURE:{}: malformed phase info for read {}: {}"279                        .format(chunk_identifier, read_id, phase_info))280    if haplotype == 0:281        return None282    # get storage data283    if phase_block in phase_blocks:284        phase_block_data = phase_blocks[phase_block]285    else:286        phase_block_data = dict()287        phase_blocks[phase_block] = phase_block_data288        phase_block_data[PB_BLOCK_ID] = phase_block289        phase_block_data[PB_HAP1_READS] = set()290        phase_block_data[PB_HAP2_READS] = set()291        phase_block_data[PB_END_POS] = None292        phase_blocks[phase_block] = phase_block_data293    # read pb info294    read_phase_block_info = dict()295    read_phase_block_info[RPB_BLOCK_ID] = phase_block296    read_phase_block_info[RPB_READ_START] = read_start297    read_phase_block_info[RPB_READ_LENGTH] = read_length298    read_phase_block_info[RPB_IS_HAP1] = None299    # save read300    if haplotype == 1:301        phase_block_data[PB_HAP1_READS].add(read_id)302        read_phase_block_info[RPB_IS_HAP1] = True303    elif haplotype == 2:304        phase_block_data[PB_HAP2_READS].add(read_id)305        read_phase_block_info[RPB_IS_HAP1] = False306    else:307        raise UserError("SANITY_CHECK_FAILURE:{}: unexpected haplotype in phase_info for read {}: {}"308                        .format(chunk_identifier, read_id, phase_info))309    # return read phase data310    return read_phase_block_info311def merge_chunks__read_chunk(job, chunk_identifier, chunk_location):312    # log313    job.fileStore.logToMaster("{}:merge_chunks:read_chunk: reading chunk {}"314                              .format(chunk_identifier, os.path.basename(chunk_location)))315    # data storage316    phase_blocks = dict()317    reads = dict()318    read_count = 0319    failed_reads = 0320    with closing(pysam.AlignmentFile(chunk_location, 'rb' if chunk_location.endswith("bam") else 'r')) as aln:321        start = time.time()322        for read in aln.fetch():323            # get read data324            read_id = read.query_name325            read_count += 1326            # find haplotype tag327            for tag in [TAG_HAPLOTYPE, TAG_CHUNK_ID]:328                if not read.has_tag(tag):329                    job.fileStore.logToMaster("{}:merge_chunks:read_chunk: read {} had no {} tag"330                                              .format(chunk_identifier, read_id, tag))331                    failed_reads += 1332                    continue333            # save read data334            read_data = merge_chunks__save_read_info(reads, read)335            # save haplotpye data336            haplotype_tags = read.get_tag(TAG_HAPLOTYPE).split(";")337            for pb_tag in haplotype_tags:338                rpb_info = merge_chunks__save_phase_block_info(job, chunk_identifier, phase_blocks, pb_tag, read_id)339                if rpb_info is not None: read_data[RD_PHASE_BLOCKS].append(rpb_info)340        job.fileStore.logToMaster("{}:merge_chunks:read_chunk: read {} reads ({}s)"341                                  .format(chunk_identifier, read_count, int(time.time() - start)))342    # finish phase block analysis343    phase_block_ids = list(phase_blocks.keys())344    phase_block_ids.sort()345    prev_pb = None346    for pb_id in phase_block_ids:347        curr_pb = phase_blocks[pb_id]348        if prev_pb is not None: prev_pb[PB_END_POS] = curr_pb[PB_START_POS]349        prev_pb = curr_pb350    # we aren't going to use this last one anyway351    prev_pb[PB_END_POS] = prev_pb[PB_START_POS]352    # return chunk data353    return reads, phase_blocks354def merge_chunks__create_new_phase_block_at_position(job, chunk_identifier, split_position, l_read, r_read):355    # get all documented haplotpyes356    l_haps = l_read[RD_PHASE_BLOCKS]357    r_haps = r_read[RD_PHASE_BLOCKS]358    # data we want at the end359    haps = list()360    old_right_phase_block = None361    # get desired (and modified) haplotypes from l_read362    for hap in l_haps:363        if hap[RPB_BLOCK_ID] >= split_position:364            continue  # belongs to r_read365        elif hap[RPB_BLOCK_ID] + hap[RPB_READ_LENGTH] < split_position:366            haps.append(hap)  # before split (or h0 read)367        else:  # this read needs to be split368            new_hap = {369                RPB_IS_HAP1: hap[RPB_IS_HAP1],370                RPB_BLOCK_ID: hap[RPB_BLOCK_ID],371                RPB_READ_START: hap[RPB_READ_START],372                RPB_READ_LENGTH: split_position - hap[RPB_BLOCK_ID]373            }374            haps.append(new_hap)375    # get desired (and modified) haplotypes from r_read376    for hap in r_haps:377        if hap[RPB_BLOCK_ID] >= split_position:378            haps.append(hap)  # after split379        elif hap[RPB_BLOCK_ID] + hap[RPB_READ_LENGTH] < split_position:380            continue  # belongs to l_read381        else:  # this read needs to be split382            split_diff = split_position - hap[RPB_BLOCK_ID]383            new_hap = {384                RPB_IS_HAP1: hap[RPB_IS_HAP1],385                RPB_BLOCK_ID: split_position,386                RPB_READ_START: hap[RPB_READ_START] + split_diff,387                RPB_READ_LENGTH: hap[RPB_READ_LENGTH] - split_diff388            }389            haps.append(new_hap)390            # sanity check and save old haplotype391            if old_right_phase_block is not None:392                raise UserError("{}:SANITY_CHECK_FAILURE: found multiple phase_blocks ({}, {}) spanning split_position "393                                "{} for read {}:".format(chunk_identifier, old_right_phase_block, hap[RPB_BLOCK_ID],394                                                         split_position, l_read[RD_ID]))395            old_right_phase_block = hap[RPB_BLOCK_ID]396    # edge case for when l_hap is haplotyped after split pos, but and r_haps is not-haplotyped397    if len(haps) == 0:398        job.fileStore.logToMaster("{}:merge_chunks:create_new_phase_block: output no haplotypes for read {} ({}-{}) "399                                  "spanning split_position {}, with haplotypes (left) {} and (right) {}"400                                  .format(chunk_identifier, l_read[RD_ID], l_read[RD_ALN_START], l_read[RD_ALN_END],401                                          split_position, l_read[RD_HAPLOTYPE_TAG], r_read[RD_HAPLOTYPE_TAG]))402        haps.append({RPB_IS_HAP1: None, RPB_BLOCK_ID: 0, RPB_READ_START: 0, RPB_READ_LENGTH: 0})403    # save haploptyes404    haps.sort(key=lambda x: x[RPB_BLOCK_ID])405    new_hap_str = ";".join(map(merge_chunks__encode_phase_info, haps))406    return new_hap_str, old_right_phase_block407def merge_chunks__append_sam_reads(job, chunk_identifier, input_sam_file, output_sam_file, included_reads,408                                   excluded_reads):409    # data to track410    header_lines = 0411    written_read_ids = set()412    reads_not_written = 0413    explicitly_excluded = 0414    modified_writes = 0415    # include header?416    include_header = not os.path.isfile(output_sam_file)417    if include_header:418        job.fileStore.logToMaster("{}:merge_chunks:append_sam_reads: output sam file does not exist, writing header"419                                  .format(chunk_identifier))420    # read and write421    with open(output_sam_file, 'a') as output, open(input_sam_file, 'r') as input:422        for line in input:423            # header424            if line.startswith("@"):425                if include_header:426                    output.write(line)427                    header_lines += 1428                continue429            read_id = line.split("\t")[0]430            # already written in a previous chunk431            if read_id in excluded_reads:432                reads_not_written += 1433                explicitly_excluded += 1434                continue435            # should be written in this chunk436            elif read_id in included_reads:437                if included_reads[read_id] is not None:438                    for substitution in included_reads[read_id]:439                        # sanity check440                        if type(substitution) != list and len(substitution) != 2:441                            raise UserError("{}:merge_chunks:append_sam_reads: malformed substitution for {} in {}: {}"442                                            .format(chunk_identifier, read_id, included_reads[read_id], substitution))443                        # replace old hap str for new one444                        line = line.replace(substitution[0], substitution[1])445                    modified_writes += 1446                output.write(line)447                written_read_ids.add(read_id)448            # edge case for inlusion of all reads in last chunk449            elif None in included_reads:450                output.write(line)451                written_read_ids.add(read_id)452            # if not explicitly told to write, read is not written453            else:454                reads_not_written += 1455    if include_header:456        job.fileStore.logToMaster("{}:merge_chunks:append_sam_reads: wrote {} header lines from {} to {}"457                                  .format(chunk_identifier, header_lines,458                                          os.path.basename(input_sam_file), os.path.basename(output_sam_file)))459    job.fileStore.logToMaster("{}:merge_chunks:append_sam_reads: wrote {} reads ({} modified) with {} excluded "460                              "({} explicitly so) from {} to {}"461                              .format(chunk_identifier, len(written_read_ids), modified_writes,462                                      reads_not_written, explicitly_excluded,463                                      os.path.basename(input_sam_file), os.path.basename(output_sam_file)))464    return written_read_ids465def merge_chunks__append_vcf_calls(job, chunk_identifier, input_vcf_file, output_vcf_file, start_pos, end_pos,466                                   vcf_conversion, mp_identifier=None):467    # include header?468    include_header = not os.path.isfile(output_vcf_file)469    if include_header:470        job.fileStore.logToMaster("{}:merge_chunks:append_vcf_calls: output vcf file does not exist, writing header"471                                  .format(chunk_identifier))472    # data we track473    written_header_lines = 0474    written_lines = 0475    lines_outside_boundaries = 0476    # read and write477    with open(output_vcf_file, 'a') as output, open(input_vcf_file, 'r') as input:478        first_analyzed_line = True # may need to manage the phase set (only for the first phase of a chunk)479        for line in input:480            if line.startswith("#"):  # header481                if include_header:482                    output.write(line)483                    written_header_lines += 1484                continue485            # break line into parts486            line = line.rstrip().split("\t")487            position = int(line[1])488            if position < start_pos or position >= end_pos: #only include positions in given range489                lines_outside_boundaries += 1490                continue491            # get info and tags (and positions)492            info = line[-1].split(":")493            tags = line[-2].split(":")494            genotype_tag_idx = None495            phase_block_tag_idx = None496            idx = 0497            for tag in tags:498                if tag == TAG_GENOTYPE: genotype_tag_idx = idx499                if tag == TAG_PHASE_SET: phase_block_tag_idx = idx500                idx += 1501            if genotype_tag_idx is None:502                raise UserError("{}:SANITY_CHECK_FAILURE: malformed vcf {} phasing line (no {} tag): {}"503                                .format(chunk_identifier, os.path.basename(input_vcf_file), TAG_GENOTYPE, line))504            if phase_block_tag_idx is None:505                raise UserError("{}:SANITY_CHECK_FAILURE: malformed vcf {} phasing line (no {} tag): {}"506                                .format(chunk_identifier, os.path.basename(input_vcf_file), TAG_PHASE_SET, line))507            # phase block508            initial_phase_block = int(info[phase_block_tag_idx])509            reverse_phasing = False510            updated_phase_block = str(initial_phase_block)511            if initial_phase_block in vcf_conversion:512                if vcf_conversion[initial_phase_block] == ACTION_INVERT:513                    reverse_phasing = True514                else:515                    updated_phase_block = str(vcf_conversion[initial_phase_block])516            # set phase block (may be same as old phase block)517            info[phase_block_tag_idx] = updated_phase_block518            # get genotype519            genotype = info[genotype_tag_idx]520            continued_phase_block = "|" in genotype521            new_phase_block = "/" in genotype522            if not (continued_phase_block ^ new_phase_block):523                raise UserError("{}:SANITY_CHECK_FAILURE: Malformed vcf {} phasing line (unexpected genotype): {}"524                                .format(chunk_identifier, os.path.basename(input_vcf_file), line))525            genotype = genotype.split("|") if continued_phase_block else genotype.split("/")526            # make updates to genotype (potentially)527            if reverse_phasing:528                genotype.reverse()529            if first_analyzed_line and initial_phase_block != updated_phase_block:530                new_phase_block = True531            # save genotype532            genotype = ("/" if new_phase_block else "|").join(genotype)533            info[genotype_tag_idx] = genotype534            # add identifier (if appropriate)535            if mp_identifier is not None:536                info.append(mp_identifier)537                tags.append(TAG_MARGIN_PHASE_IDENTIFIER)538                line[-2] = ":".join(tags)539            # cleanup540            line[-1] = ":".join(map(str, info))541            line = "\t".join(line) + "\n"542            output.write(line)543            written_lines += 1544            first_analyzed_line = False545    # loggit546    job.fileStore.logToMaster(547        "{}:merge_chunks:append_vcf_calls: wrote {} calls ({} skipped from being outside boundaries {}-{}) from {} to {}"548            .format(chunk_identifier, written_lines, lines_outside_boundaries, start_pos, end_pos,549                    os.path.basename(input_vcf_file), os.path.basename(output_vcf_file)))550def merge_chunks__extract_chunk_tarball(job, config, tar_work_dir, chunk):551    # prep552    os.mkdir(tar_work_dir)553    tar_file = os.path.join(tar_work_dir, "chunk.tar.gz")554    # get file555    job.fileStore.readGlobalFile(chunk[CI_OUTPUT_FILE_ID], tar_file, mutable=True)556    with tarfile.open(tar_file, 'r') as tar:557        tar.extractall(tar_work_dir)558    # find desired files559    sam_unified, vcf = None, None560    for name in os.listdir(tar_work_dir):561        if name.endswith(VCF_SUFFIX): vcf = name562        elif name.endswith(SAM_UNIFIED_SUFFIX): sam_unified = name563    sam_unified_file = None if sam_unified is None else os.path.join(tar_work_dir, sam_unified)564    vcf_file = None if vcf is None else os.path.join(tar_work_dir, vcf)565    # return file locations...wrap_unwrap.py
Source:wrap_unwrap.py  
1import numpy as np2import copy3def wrap(phase, units='rad'):4    """5    Wrap N-D phase profile within (- pi, pi] radian interval.6    :param phase: input phase7    :type phase: array_like8    9    :param units: Units of phase (accepted: 'rad', 'waves', 'fringes').10    :type units: str11    12    :return: wrapped values (radians).13    """14    units_period = {'rad': 2 * np.pi, 'waves': 1, 'fringes': 1}15    assert units in units_period16    period = units_period[units]17    return (phase + period / 2) % period - (period / 2)18def unwrap(phase, centre=True):19    """20    Unwrap 1-D / 2-D phase profiles.21    In 2-D, a simple phase unwrapping algorithm sequentially unwraps columns and then rows (or vice versa). For noisy22    data however, this method can perform poorly.23    This 'pseudo 2D' unwrapping algorithm relies on the strong vertical phase shear present in the CIS raw_data.24    In an ideal system the optical axis is projected onto the detector centre. On axis, the Savart plate introduces25    zero net phase delay, so the (wrapped) phase measured at the centre of the detector array is the (wrapped) waveplate26    phase delay -- a handy quantity to know. Numpy's np.unwrap function sets the 0th array element to be wrapped, set27    'centre' to False to return this numpy-like behaviour. 'centre' defaults to True, wrapping the phase at the array28    centre.29    :param phase: input phase [ rad ], 1-D or 2-D.30    :type phase: np.ndarray31    :param centre: Unwrap the phase such that the centre of the input array is wrapped.32    :type centre: bool33    :return: Unwrapped phase [ rad ].34    """35    assert isinstance(phase, np.ndarray)36    if phase.ndim == 1:37        # 1D phase unwrap:38        phase_uw = np.unwrap(phase)39        if centre:40            # wrap array column centre into [-pi, +pi] (assumed projection of optical axis onto detector)41            centre_idx = np.round((len(phase_uw) - 1) / 2).astype(np.int)42            phase_uw_centre = phase_uw[centre_idx]43            if phase_uw_centre > 0:44                while abs(phase_uw_centre) > np.pi:45                    phase_uw -= 2 * np.pi46                    phase_uw_centre = phase_uw[centre_idx]47            else:48                while abs(phase_uw_centre) > np.pi:49                    phase_uw += 2 * np.pi50                    phase_uw_centre = phase_uw[centre_idx]51    elif phase.ndim == 2:52        # pseudo 2-D phase unwrap:53        y_pix, x_pix = np.shape(phase)54        phase_contour = -np.unwrap(phase[int(np.round(y_pix / 2)), :])55        phase_uw_col = np.unwrap(phase, axis=0)56        phase_contour = phase_contour + phase_uw_col[int(np.round(y_pix / 2)), :]57        phase_0 = np.tile(phase_contour, [y_pix, 1])58        phase_uw = phase_uw_col - phase_059        if centre:60            # wrap image centre into [-pi, +pi] (assumed projection of optical axis onto detector)61            y_centre_idx = np.round((np.size(phase_uw, 0) - 1) / 2).astype(np.int)62            x_centre_idx = np.round((np.size(phase_uw, 1) - 1) / 2).astype(np.int)63            phase_uw_centre = phase_uw[y_centre_idx, x_centre_idx]64            if phase_uw_centre > 0:65                while abs(phase_uw_centre) > np.pi:66                    phase_uw -= 2 * np.pi67                    phase_uw_centre = phase_uw[y_centre_idx, x_centre_idx]68            else:69                while abs(phase_uw_centre) > np.pi:70                    phase_uw += 2 * np.pi71                    phase_uw_centre = phase_uw[y_centre_idx, x_centre_idx]72    else:73        raise Exception('# ERROR #   Phase input must be 1D or 2D array_like.')74    return phase_uw75# def wrap_centre(phase):76#     """77#     Wrap centre of a phase image into (- pi, pi] radian interval.78#79#     :param phase: [ rad ]80#81#     :return: phase_wc [ rad ]82#     """83#84#     assert isinstance(phase, np.ndarray)85#     assert phase.ndim == 286#87#     phase_wc = copy.deepcopy(phase)88#89#     # wrap image centre into [-pi, +pi] (assumed projection of optical axis onto detector)90#     y_centre_idx = np.round((np.size(phase, 0) - 1) / 2).astype(np.int)91#     x_centre_idx = np.round((np.size(phase, 1) - 1) / 2).astype(np.int)92#     phase_centre = phase_wc[y_centre_idx, x_centre_idx]93#94#     if phase_centre > 0:95#         while abs(phase_centre) > np.pi:96#             phase_wc -= 2 * np.pi97#             phase_centre = phase_wc[y_centre_idx, x_centre_idx]98#     else:99#         while abs(phase_centre) > np.pi:100#             phase_wc += 2 * np.pi101#             phase_centre = phase_wc[y_centre_idx, x_centre_idx]102#...installer_checkpoint.py
Source:installer_checkpoint.py  
1"""Ansible callback plugin to print a summary completion status of installation2phases.3"""4from datetime import datetime5from ansible.plugins.callback import CallbackBase6from ansible import constants as C7class CallbackModule(CallbackBase):8    """This callback summarizes installation phase status."""9    CALLBACK_VERSION = 2.010    CALLBACK_TYPE = 'aggregate'11    CALLBACK_NAME = 'installer_checkpoint'12    CALLBACK_NEEDS_WHITELIST = False13    def __init__(self):14        super(CallbackModule, self).__init__()15    def v2_playbook_on_stats(self, stats):16        # Return if there are no custom stats to process17        if stats.custom == {}:18            return19        phases = stats.custom['_run']20        # Find the longest phase title21        max_column = 022        for phase in phases:23            max_column = max(max_column, len(phases[phase].get('title', '')))24        # Sort the phases by start time25        ordered_phases = sorted(phases, key=lambda x: (phases[x].get('start', 0)))26        self._display.banner('INSTALLER STATUS')27        # Display status information for each phase28        for phase in ordered_phases:29            phase_title = phases[phase].get('title', '')30            padding = max_column - len(phase_title) + 231            phase_status = phases[phase]['status']32            phase_time = phase_time_delta(phases[phase])33            if phase_title:34                self._display.display(35                    '{}{}: {} ({})'.format(phase_title, ' ' * padding, phase_status, phase_time),36                    color=self.phase_color(phase_status))37            # If the phase is not complete, tell the user what playbook to rerun38            if phase_status == 'In Progress' and phase != 'installer_phase_initialize':39                self._display.display(40                    '\tThis phase can be restarted by running: {}'.format(41                        phases[phase]['playbook']))42            # Display any extra messages stored during the phase43            if 'message' in phases[phase]:44                self._display.display(45                    '\t{}'.format(46                        phases[phase]['message']))47    def phase_color(self, status):48        """ Return color code for installer phase"""49        valid_status = [50            'In Progress',51            'Complete',52        ]53        if status not in valid_status:54            self._display.warning('Invalid phase status defined: {}'.format(status))55        if status == 'Complete':56            phase_color = C.COLOR_OK57        elif status == 'In Progress':58            phase_color = C.COLOR_ERROR59        else:60            phase_color = C.COLOR_WARN61        return phase_color62def phase_time_delta(phase):63    """ Calculate the difference between phase start and end times """64    if not phase.get('start'):65        return ''66    time_format = '%Y%m%d%H%M%SZ'67    phase_start = datetime.strptime(phase['start'], time_format)68    if 'end' not in phase:69        # The phase failed so set the end time to now70        phase_end = datetime.now()71    else:72        phase_end = datetime.strptime(phase['end'], time_format)73    delta = str(phase_end - phase_start).split(".")[0]  # Trim microseconds...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
