Best Python code snippet using fMBT_python
data.py
Source:data.py  
1import sys2import math3import readVCF4import fileinput5#Reads a tab file with name string given by toRead.6#Constructs a list of chromosome items, one per chromosome, and inserts7#chromosome name, start bp, end bp, coverage per 1000 bp in these items.8def readTab(toRead):9    totalReadLines = 010    tabFileName = toRead11    chromosomes = []12    coverageNorm = 013    coverageNormLog = 014    totalBP = 015    numChr = 016    with open(toRead, 'r') as tab:17        #Read the first line in the file, should start with #CHR.18        line = tab.readline()19        if (not line.startswith("#CHR")):20            print('TAB file is not in correct format')21            return None22        else:23            print('TAB file seems ok, continuing read')24        #Read the second line and create the first Chromosome object.25        #All following lines should be formatted as: chrName\tstart\tend\tcoverage26        line = tab.readline()27        fields = line.split('\t')28        if (not len(fields) == 4):29            print("TAB file not formatted correctly on line 2")30            return None31        else:32            curChrName = fields[0]33            chrom = Chromosome(curChrName)34            chrom.addCoverage(float(fields[3]))35            coverageNorm += float(fields[3])36            if(float(fields[3])) > 0:37                coverageNormLog += math.log(float(fields[3]),2)38            totalReadLines += 139            lastRead = line40            chromosomes.append(chrom)41            numChr += 142        #Iterate over the rest of the lines in the file43        for line in tab:44            fields = line.split('\t')45            if (not len(fields) == 4):46                print("TAB file not formatted correctly")47                return -148            #If we come across a new chromosome, assign end on current chromosome (contained in last read line)49            #Then create a new Chromosome object, assign name & start. Add to list.50            if fields[0] != curChrName:51                chrom.setEnd(lastRead.split('\t')[2])52                curChrName = fields[0]53                chrom = Chromosome(curChrName)54                chromosomes.append(chrom)55                numChr += 156            #Every line contains coverage data of interest, for current chromosome57            chrom.addCoverage(float(fields[3]))58            coverageNorm += float(fields[3])59            if(float(fields[3])) > 0:60                coverageNormLog += math.log(float(fields[3]),2)61            totalReadLines += 162            #Store last read line and go to next line63            lastRead = line64        chrom.setEnd(lastRead.split('\t')[2])65        coverageNorm = coverageNorm / totalReadLines66        coverageNormLog = coverageNormLog / totalReadLines67    #sum total read bp68    for chromo in chromosomes:69        totalBP += int(chromo.end)70    return (chromosomes,coverageNorm,coverageNormLog,totalBP)71def readCytoTab(toRead):72    cytoTabName = toRead73    cytoTabInfo = []74    with open(toRead, 'r') as tab:75		#Read the first line in the file, should start with #chromosome.76        line = tab.readline()77        if (not line.startswith("chr1")):78            print('TAB file is not in correct format')79            return None80        else:81            print('CytoBandTAB file seems ok, continuing read')82        #The fields are as following: #chromosome, startPos, endPos, cytoband, stain value83        fields = line.split('\t')84        fields[0] = fields[0].strip('chr')85        fields[4] = fields[4].strip('\n')86        cytoTab = [fields[0], fields[1], fields[2], fields[3], fields[4]]87        cytoTabInfo.append(cytoTab)88        for line in tab:89            fields = line.split('\t')90            fields[0] = fields[0].strip('chr')91            fields[4] = fields[4].strip('\n')92            cytoTab = [fields[0], fields[1], fields[2], fields[3], fields[4]]93            cytoTabInfo.append(cytoTab)94    return cytoTabInfo95def readVCFFile(toRead, chromosomes):96    vcfFileName = toRead97    vcfInfoLines = []98    with open(toRead, 'r') as vcf:99        #The first lines should be a number of meta-information lines, prepended by ##.100        #Should begin with fileformat. Store these. Check first line for correct format.101        line = vcf.readline()102        if (not line.startswith("##fileformat=")):103            print("VCF file is not in correct format")104            return None105        else:106            print("VCF file seems ok, continuing read")107        while (line.startswith("##")):108            vcfInfoLines.append(line)109            line = vcf.readline()110        #A header line prepended by # should follow containing 8 fields, tab-delimited.111        #These are in order CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO. Store in info line list.112        fields = line.split('\t')113        if (not (line.startswith('#') or len(fields) == 8) ):114            print("Header columns missing in VCF file")115            return None116        else:117            vcfInfoLines.append(line)118        #All following lines are tab-delmited data lines.119        #Store variant data in chromosome item corresponding to CHROM field120        numvars = 0121        for line in vcf:122            numvars += 1123            #Feed every data line into readVCF, returning: (chrA,posA,chrB,posB,event_type,description,format)124            (chrA,posA,chrB,posB,event_type,description,format) = readVCF.readVCFLine(line)125            #Iterate through chromosome list to find match to insert data into126            for chromo in chromosomes:127                if chromo.name == chrA:128                    chromo.addVariant(chrA,posA,chrB,posB,event_type,description,format)129                    break130        return (chromosomes,vcfInfoLines)131#Reads a general tab delimited file (such as a bed file)132def readGeneralTab(toRead):133    tabLines = []134    with open(toRead, 'r') as tab:135        #Skip first line136        line = tab.readline()137        for line in tab:138            #The fields are as following: #chromosome, startPos, endPos, text139            fields = line.split('\t')140            fields[0] = fields[0].replace("chr","").replace("Chr","").replace("CHR","")141            fields[-1] = fields[-1].strip('\n')142            tabLines.append(fields)143    return tabLines144def readConfig(toRead):145    circularConfig = {}146    coverageConfig = {}147    karyoConfig = {}148    heatmapConfig = {}149    colorConfig = {}150    with open(toRead, 'r') as config:151        activeSection = "CIRCULAR"152        for line in config:153            if line.startswith('#'):154                continue155            if line.startswith('['):156                if line.startswith('[CIRCULAR]'):157                    activeSection = 'CIRCULAR'158                elif line.startswith('[COVERAGE]'):159                    activeSection = 'COVERAGE'160                elif line.startswith('[KARYOGRAM]'):161                    activeSection = 'KARYOGRAM'162                elif line.startswith('[HEATMAP]'):163                    activeSection = 'HEATMAP'164                elif line.startswith('[COLORS]'):165                    activeSection = 'COLORS'166            else:167                if activeSection == "CIRCULAR":168                    fields = line.split('=')169                    circularConfig[fields[0]] = fields[1].strip('\n')170                elif activeSection == "COVERAGE":171                    fields = line.split('=')172                    coverageConfig[fields[0]] = fields[1].strip('\n')173                elif activeSection == "KARYOGRAM":174                    fields = line.split('=')175                    karyoConfig[fields[0]] = fields[1].strip('\n')176                elif activeSection == "HEATMAP":177                    fields = line.split('=')178                    heatmapConfig[fields[0]] = fields[1].strip('\n')179                elif activeSection == 'COLORS':180                    fields = line.split('=')181                    fields[1] = fields[1]182                    colorConfig[fields[0]] = fields[1].strip('\n')183    return (circularConfig,coverageConfig,karyoConfig,heatmapConfig,colorConfig)184def saveConfig(fileName,circularConfig,coverageConfig,karyoConfig,heatmapConfig,colorConfig):185    with open(fileName,'r+') as config:186        configData = config.readlines()187        activeSection = "CIRCULAR"188        newData = []189        for line in configData:190            if line.startswith('#'):191                line = line.strip('\n')192                newData.append(line)193                continue194            if line.startswith('['):195                if line.startswith('[CIRCULAR]'):196                    activeSection = 'CIRCULAR'197                elif line.startswith('[COVERAGE]'):198                    activeSection = 'COVERAGE'199                elif line.startswith('[KARYOGRAM]'):200                    activeSection = 'KARYOGRAM'201                elif line.startswith('[HEATMAP]'):202                    activeSection = 'HEATMAP'203                elif line.startswith('[COLORS]'):204                    activeSection = 'COLORS'205            else:206                if activeSection == "CIRCULAR":207                    fields = line.split('=')208                    line = line.replace(fields[1],circularConfig[fields[0]])209                elif activeSection == "COVERAGE":210                    fields = line.split('=')211                    line = line.replace(fields[1],coverageConfig[fields[0]])212                elif activeSection == "KARYOGRAM":213                    fields = line.split('=')214                    line = line.replace(fields[1],karyoConfig[fields[0]])215                elif activeSection == "HEATMAP":216                    fields = line.split('=')217                    line = line.replace(fields[1],heatmapConfig[fields[0]])218                elif activeSection == 'COLORS':219                    pass220            line = line.strip('\n')221            newData.append(line)222        config.seek(0)223        config.truncate()224        for line in newData:225            config.write(line + '\n')226class Chromosome():227    def __init__(self, name):228        self.name = name229        self.coverage = []230        self.coverageLog = []231        self.display = False232        self.variants = []233        self.connections = []234        self.display_connections = False235        self.display_cytoBandNames = False236    def addCoverage(self, coverageValue):237        self.coverage.append(coverageValue)238        if(coverageValue > 0):239            self.coverageLog.append(math.log(coverageValue,2))240        else:241            self.coverageLog.append(0)242    def setEnd(self,end):243        self.end = end244    def addVariant(self,chrA,posA,chrB,posB,event_type,description,format):245        #The variants are by default set to be shown246        display_variant = True247        marked = False248        #For every variant we would like the genes in CSQ, if this exists249        if "CSQ" in description:250            csqField = description["CSQ"]251            #The CSQ field has several sub-fields, each separated with ','252            subList = csqField.split(',')253            geneList = []254            for subIndex in range(len(subList)):255                #The gene name field is always the fourth element in the CSQ field separated with '|'256                subSubList = subList[subIndex].split('|')257                geneList.append(subSubList[3])258            #Convert the list to a set to remove any duplicates259            geneSet = set(geneList)260            s = ', '261            allGenes = s.join(geneSet)262        else:263            allGenes = ""264        #We would also like the CYTOBAND field, if this exists265        if "CYTOBAND" in description:266            cband = description["CYTOBAND"]267        else:268            cband = None269        if "RankScore" in description:270            rankScore = description["RankScore"]271        else:272            rankScore = None273        #Add the variant data to this chromosome274        variant = [chrA,posA,chrB,posB,event_type,description,format,allGenes,cband,display_variant,rankScore, marked]275        self.variants.append(variant)276    def createConnections(self):277        #These corresponding values for the variant are added to the list: CHRA,CHRB,WINA,WINB,CYTOBAND278        self.connections = []279        for variant in self.variants:280            if not variant[9]:281                continue282            else:283                description = variant[5]284                if "CYTOBAND" in description:285                    cband = description["CYTOBAND"]286                else:287                    cband = None288                if variant[0] is not variant[2]:289                    connection = [variant[0],variant[2],description["WINA"],description["WINB"],cband]290                    self.connections.append(connection)291                else:292                    connection = [variant[0], variant[2], str(variant[1]) + "," + str(variant[1]), str(variant[3]) + "," + str(variant[3]), cband]...coverage_data.py
Source:coverage_data.py  
1from __future__ import annotations2from typing import List, Set3from dataclasses import dataclass, field, asdict4import enum5from pathlib import Path6import json7from ros_metrics_reporter.color import Color8import pandas as pd9from datetime import datetime10class CoverageKeys(enum.Enum):11    Lines = "Lines"12    Functions = "Functions"13    Branches = "Branches"14@dataclass15class CoverageValue:16    label: str = ""17    line: float = 0.018    function: float = 0.019    branch: float = 0.020    def get(self, coverage_key: CoverageKeys):21        if coverage_key == CoverageKeys.Lines:22            return self.line23        elif coverage_key == CoverageKeys.Functions:24            return self.function25        elif coverage_key == CoverageKeys.Branches:26            return self.branch27        else:28            raise Exception("Unknown coverage key")29@dataclass30class Coverage:31    package: str = ""32    value: List[CoverageValue] = field(default_factory=list)33    def write(self, file: Path):34        with open(file, "w") as f:35            json.dump(asdict(self), f, indent=2)36    def read(self, file: Path) -> Coverage:37        with open(file, "r") as f:38            data = json.load(f)39            self.package = data["package"]40            self.value = [CoverageValue(**value) for value in data["value"]]41        return self42    def get_label_value(self, label: str) -> CoverageValue:43        for value in self.value:44            if value.label == label:45                return value46        return CoverageValue(label=label)47    def get_labels(self) -> Set[str]:48        return {value.label for value in self.value}49@dataclass50class CoverageStamped(Coverage):51    date: datetime = field(default_factory=datetime.now)52@dataclass53class Threshold:54    high: float = 0.055    med: float = 0.056    def write(self, file: Path):57        with open(file, "w") as f:58            json.dump(asdict(self), f, indent=2)59@dataclass60class CoverageData:61    coverage: List[Coverage] = field(default_factory=list)62    threshold: Threshold = field(default_factory=Threshold)63    def add_threshold(self, high, med):64        self.threshold.high = high65        self.threshold.med = med66    def add_coverage(self, coverage: Coverage):67        for i, item in enumerate(self.coverage):68            if item.package == coverage.package:69                self.coverage[i].value.extend(coverage.value)70                return71        self.coverage.append(coverage)72    def add_coverages(self, coverages: List[Coverage]):73        for coverage in coverages:74            self.add_coverage(coverage)75    def save_coverage(self, output_dir: Path):76        for item in self.coverage:77            output_json_dir = output_dir / item.package78            output_json_dir.mkdir(parents=True, exist_ok=True)79            item.write(output_json_dir / "coverage.json")80    def save_threshold_value(self, output_path: Path):81        self.threshold.write(output_path)82    def get_coverage(self, package: str) -> Coverage:83        for item in self.coverage:84            if item.package == package:85                return item86        return Coverage(package=package)87    def get_color(self, value: str, coverage_key: CoverageKeys) -> Color:88        if coverage_key == CoverageKeys.Lines:89            if value >= self.threshold.high:90                return Color.GREEN91            elif value >= self.threshold.med:92                return Color.YELLOW93            else:94                return Color.RED95        elif coverage_key == CoverageKeys.Functions:96            if value >= self.threshold.high:97                return Color.GREEN98            elif value >= self.threshold.med:99                return Color.YELLOW100            else:101                return Color.RED102        elif coverage_key == CoverageKeys.Branches:103            if value >= self.threshold.high:104                return Color.GREEN105            elif value >= self.threshold.med:106                return Color.YELLOW107            else:108                return Color.RED109        else:...makeCoverageDB.py
Source:makeCoverageDB.py  
1# -*- coding: utf-8 -*-2"""3Created on Fri Oct 23 21:58:09 20154@author: jiun5"""6import os7import re8import sys9from pymongo import MongoClient10# main function11# get list of txt.gz files, extract, parse and print to screen12# input main directory13def main():14    dirName = sys.argv[1]15    db = connecttomongo()16    filenames = next(os.walk(dirName))[2]17    for iF in filenames:18        if iF.endswith(".gz"):19            fName = os.path.join(dirName, iF)20            fObj = unzipCoverageFile(fName)21            22            uid = os.path.splitext(os.path.splitext(iF)[0])[0]23            parselines_insert(db, fObj, uid)24    25# function to parse coverage text file26# takes in the file object and prints the each coverage value to screen27# in the following format:28# chromosomeName    GeneName    Location    CoverageValue29def parselines_insert(db, fileObj, uid):30    31    for line in fileObj:32        bits = re.split("\t", line)33        #coverage = re.split(",", re.sub("\n","",bits[3]))34        coverage = [int(i) for i in re.sub("\n", "", bits[3]).split(",")]35        startPos = int(bits[2])36        endPos = int(startPos) + len(coverage) 37        # seqLen = len(coverage)38        # seqIdx = range(startPos,startPos + seqLen)39        40        content = {"UID": uid, "chr": bits[0], "genename": bits[1], "startpos": startPos, "endpos": endPos, "coverage": coverage}41       42        #print content # print it43        insertrecord(db, content) # insert it44# files are stored in gz format, so unzip and get unique id 45# based on the filename            46def unzipCoverageFile(fileName):47    import gzip48    fileObj = gzip.open(fileName, 'rb')49    50    return fileObj51def insertrecord(db, r):52    db.insert(r)53    54def connecttomongo():55    #connection = MongoClient("mongodb://localhost:27017")56    connection = MongoClient("mongodb://146.118.98.44:27017")57    db_test = connection.healthhack.testcoverage58    return db_test59if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
