Best Python code snippet using assertpy_python
data_io.py
Source:data_io.py  
1import os2import shutil3import numpy as np456class PatranMesh:7    """8    Class for Patran mesh compatible with ``meshio``910    Args:11        pat (str): Patran file12        read_celltypes (list): List of cell types to be read: ``line``, ``triangle``, ``quad``, ``tetra`` and ``hexahedron``13    """14    def __init__(self, patfile, read_celltypes=["triangle", "tetra"]):15        self.read(patfile, read_celltypes=read_celltypes)16        self.init_mesh()1718    def read(self, patfile, read_celltypes):19        """20        Read points and cells of a Patran mesh21        """22        import re2324        def get_points(line):25            line = line.strip()26            points = re.findall(r"[-\d\.]+E...", line)27            points = [float(point) for point in points]28            return points2930        def get_cell(line, num_points, pointsID):31            line = line.strip()32            cell = re.findall(r"[\d]+", line)[:num_points]33            cell = [pointsID[int(point)] for point in cell]34            return cell3536        meshio_to_patran_type = {37            "line": 2,38            "triangle": 3,39            "quad": 4,40            "tetra": 5,41            "hexahedron": 8,42        }4344        patran_to_meshio_type = {}45        assert len(read_celltypes) > 046        for celltype in read_celltypes:47            patran_to_meshio_type[meshio_to_patran_type[celltype]] = celltype4849        # Read patran file50        f = open(patfile, "r")51        lines = f.read()52        lines = lines.replace(" ", ",")53        for _ in range(15):54            lines = lines.replace(",,", ",")5556        # Read points57        self.pointsID = {}58        self.points = []59        pointlines = re.findall(r"\n(,1,[\d,]+\n[,\d.EG\+-]+\n1[G,\d]+)", lines)60        for i, n in enumerate(pointlines):61            self.pointsID[int(n.split("\n")[0].split(",")[2])] = i62            self.points.append(get_points(n.split("\n")[1]))63        self.points = np.array(self.points)6465        # Read cells66        self.cellsID = {}67        self.cells = {}68        celllines = re.findall(r"\n,2,([\d,E]+\n[\d\.\+,E]+\n[\d,E]+)", lines)69        for e in celllines:70            celltype = int(e.split(",")[1])71            num_points = int(e.split("\n")[1].split(",")[1])72            if celltype not in patran_to_meshio_type:73                continue74            meshio_type = patran_to_meshio_type[celltype]75            cellID = int(e.split(",")[0])76            cell = get_cell(e.split("\n")[2], num_points, self.pointsID)77            if meshio_type in self.cellsID:78                self.cellsID[meshio_type].append(cellID)79                self.cells[meshio_type].append(cell)80            else:81                self.cellsID[meshio_type] = [cellID]82                self.cells[meshio_type] = [cell]8384        for key in self.cells:85            self.cells[key] = np.array(self.cells[key], dtype=int)86            self.cellsID[key] = np.array(self.cellsID[key], dtype=int)8788        self.point_data = {}89        self.cell_data = {}90        self.field_data = {}9192    def init_mesh(self):93        self.ncells_cumsum = np.cumsum([len(cells) for cells in self.cells.values()])94        self.ncells = self.ncells_cumsum[-1]95        self.remove_free_points()96        self.npoints = len(self.points)97        self.ncells_per_point_ = None9899    def scale(self, factor=1e3):100        """101        Scale the coordinates by a factor102        """103        self.points *= factor104105    def ncells_per_point(self):106        """107        Number of cells that every point is connected to108        """109        if self.ncells_per_point_ is not None:110            return self.ncells_per_point_111        else:112            self.ncells_per_point_ = np.zeros(len(self.points), dtype=int)113            for celltype in self.cells:114                for cell in self.cells[celltype]:115                    self.ncells_per_point_[cell] += 1116            return self.ncells_per_point_117118    def remove_free_points(self):119        """120        Remove free points from coordinates and cell connectivities121        """122        # Find which points are not mentioned in the cells123        all_cells_flat = np.concatenate(124            [vals for vals in self.cells.values()]125        ).flatten()126        free_points = np.setdiff1d(np.arange(len(self.points)), all_cells_flat)127        if len(free_points) == 0:128            return129130        # Remove free points131        self.points = np.delete(self.points, free_points, axis=0)132        for key in self.point_data:133            self.point_data[key] = np.delete(self.point_data[key], free_points, axis=0)134135        # Adjust cell connectivities136        diff = np.zeros(len(all_cells_flat), dtype=int)137        for free_point in free_points:138            diff[np.argwhere(all_cells_flat > free_point)] += 1139        all_cells_flat -= diff140        k = 0141        for key in self.cells:142            s = self.cells[key].shape143            n = np.prod(s)144            self.cells[key] = all_cells_flat[k:k + n].reshape(s)145            k += n146147        # Adjust pointsID148        pointsID_keys = np.fromiter(self.pointsID.keys(), int)149        pointsID_keys = np.delete(pointsID_keys, free_points)150        pointsID_values = np.arange(len(pointsID_keys))151        self.pointsID = dict(zip(pointsID_keys, pointsID_values))152153    def average_cell_to_point(self, tetra_data):154        """155        Average data defined at tetra cells to nodes156        """157        if tetra_data.ndim == 1:158            point_data = np.zeros(self.npoints)159        else:160            assert tetra_data.ndim == 2161            point_data = np.zeros((self.npoints, tetra_data.shape[1]))162163        ncells_per_point = np.zeros(self.npoints, dtype=int)164        for i, cell in enumerate(self.cells["tetra"]):165            ncells_per_point[cell] += 1166            point_data[cell] += tetra_data[i]167168        point_data /= ncells_per_point[:, None]169        return point_data170171    def average_point_to_cell(self, point_data):172        """173        Average data defined at points to tetra cells174        """175        if point_data.ndim == 1:176            tetra_data = np.zeros(self.ncells)177        else:178            assert point_data.ndim == 2179            tetra_data = np.zeros((self.ncells, point_data.shape[1]))180        for i, cell in enumerate(self.cells["tetra"]):181            tetra_data[i] = np.mean(point_data[cell], axis=0)182        return tetra_data183184185def read_moldflow_xml(fxml, only_last_step=False):186    """187    Read a Moldflow XML result file188189    data["name"]: result name190    data["type"]: NDDT(Node data), ELDT(Element data) or NMDT(Non-mesh data)191    data["unit"]: physical unit192    data["time"]: list of times if exist193    data["val"]: identifiers and corresponding values194    """195    from lxml import etree as ET196197    parser = ET.XMLParser(encoding="windows-1252", remove_comments=True, huge_tree=True)198    try:199        tree = ET.parse(fxml, parser)200    except ET.ParseError:201        return False, None202    root = tree.getroot()[1]203204    # Basic information205    data = {}206    data["name"] = root.attrib["Name"].strip()207    data["type"] = root.find("DataType").text.strip()208    data["unit"] = root.find("DeptVar").attrib["Unit"]209    data["dim"] = int(root.find("NumberOfComponents").text)210211    if data["type"] != "NMDT(Non-mesh data)":212        datasets = root.xpath("Blocks/Block/Data")213        if only_last_step:214            datasets = [datasets[-1]]215    else:216        datasets = root.xpath("Blocks/Block/DeptValues")217    nsteps = len(datasets)218219    data["time"] = np.zeros(nsteps)220    if data["type"] != "NMDT(Non-mesh data)":221        if nsteps <= 1 or only_last_step:222            data["time"] = None223224    if data["type"] != "NMDT(Non-mesh data)":225        data["val"] = {}226    else:227        if data["dim"] == 1:228            data["val"] = np.zeros(nsteps)229        else:230            data["val"] = np.zeros((nsteps, data["dim"]))231232    for i, dataset in enumerate(datasets):233234        if data["time"] is not None:235            try:236                data["time"][i] = float(dataset.getprevious().getprevious().attrib["Value"])237            except(AttributeError, KeyError):238                data["time"][i] = np.nan239240        # For mesh data, loop241        if data["type"] != "NMDT(Non-mesh data)":242            id_val = {}243            for val in dataset:244                identifier = int(val.attrib["ID"])245                array = np.fromstring(246                    val.find("DeptValues").text, sep=" ", count=data["dim"]247                )248                array[array > 1e29] = np.nan249                if len(array) == 1:250                    id_val[identifier] = array[0]251                else:252                    id_val[identifier] = array253254            if data["time"] is not None:255                data["val"][i] = id_val256            else:257                data["val"] = id_val258259        # For non-mesh data, just read260        else:261            array = np.fromstring(dataset.text, sep=" ", count=data["dim"])262            if len(array) == 1:263                data["val"][i] = array[0]264            else:265                data["val"][i] = array266267    return True, data268269270def convert_to_time_series_xdmf(fxdmf, backup=False):271    """272    Convert a plain XDMF file to273    a time-series one readable by ParaView274    """275    # Backup if needed276    assert os.path.splitext(fxdmf)[1] == ".xdmf"277    if backup is not False:278        assert type(backup) == str279        shutil.copyfile(fxdmf, backup)280281    # Parse XDMF282    from copy import deepcopy283    from lxml import etree as ET284285    parser = ET.XMLParser(remove_blank_text=True)286    tree = ET.parse(fxdmf, parser)287    root = tree.getroot()288289    # Retrieve mesh information290    geometry = root.xpath("//Geometry")[0]291    topology = root.xpath("//Topology")[0]292293    # Read all functions inside HDF5294    # Those that are a function of time is of format [name__time]295    # See self._process_time_series_result296    func_all = root.xpath("//Attribute/@Name")297    func_ele = root.xpath("//Attribute")298299    # Unique functions300    func_set = []301    for f in func_all:302        name = f.split("__")[0]303        if name not in func_set:304            func_set.append(name)305306    # Concatenate all time steps307    timestep = [f.split("__")[1] for f in func_all if len(f.split("__")) > 1]308    timestep = sorted(list(set(timestep)), key=float)309310    # Time steps for a particular function311    def _f_timestep(func):312        f_ts = []313        for f in func_all:314            if f.split("__")[0] == func and len(f.split("__")) > 1:315                f_ts.append(f.split("__")[1])316        return f_ts317318    # Given a time-value and a function, return the corresponding XML element319    def _func_at(func, t):320        f_ts = _f_timestep(func)321        # Single time-step functions322        if len(f_ts) == 0:323            ind = func_all.index(func)324            element = deepcopy(func_ele[ind])325326        # For functions containing several time-steps327        # Use the maximum time-step that is <= t328        # If the above set is empty, use the smallest available time-step329        else:330            if t in f_ts:331                t = t332            else:333                f_t_ll_t = [f_t for f_t in f_ts if float(f_t) < float(t)]334                if len(f_t_ll_t) > 0:335                    t = max(f_t_ll_t, key=float)336                else:337                    t = f_ts[0]338            ind = func_all.index(func + f"__{t}")339            element = deepcopy(func_ele[ind])340            element.attrib["Name"] = func341        return element342343    # Complete grid at a time344    def _time_series_grid(t):345        grid = ET.Element("Grid", Name="Moldflow results", GridType="Uniform")346347        # Append current time value348        grid.append(ET.Element("Time", Value=t))349350        # Append mesh information351        grid.append(deepcopy(geometry))352        grid.append(deepcopy(topology))353354        # Loop on all unique functions355        for func in func_set:356            grid.append(_func_at(func, t))357358        return grid359360    # Loop on all time steps if time-steps exis361    if len(timestep) > 0:362        domain = root.xpath("//Domain")[0]363        timeseries = ET.Element(364            "Grid", Name="TimeSeries", GridType="Collection", CollectionType="Temporal"365        )366        for t in timestep:367            timeseries.append(_time_series_grid(t))368        domain.append(timeseries)369        domain.remove(domain[0])370371    # Output to replace the current file
...dvars.py
Source:dvars.py  
1import nibabel as nib2import numpy as np3from scipy import stats4def load(func_file, mask_file, check4d=True):5    func_img    = nib.load(func_file)6    mask_img    = nib.load(mask_file)7    mask        = mask_img.get_data()8    func        = func_img.get_data().astype(np.float)9    if check4d and len(func.shape) != 4:10        raise Exception("Input functional %s should be 4-dimensional" % func_file)11    func        = func[mask.nonzero()].T # will have ntpts x nvoxs12    13    #import code14    #code.interact(local=locals())15    16    return(func)17def robust_stdev(func, interp="fraction"):18    """19    Compute robust estimation of standard deviation20    """21    lower_qs    = np.percentile(func, 25, axis=0)22    upper_qs    = np.percentile(func, 75, axis=0)23    # note: won't work on roxy with scipy == 0.924    #lower_qs    = stats.scoreatpercentile(func, 25, interpolation_method=interp, axis=0)25    #upper_qs    = stats.scoreatpercentile(func, 75, interpolation_method=interp, axis=0)26    stdev       = (upper_qs - lower_qs)/1.34927    return stdev28def ar_nitime(x, order=1, center=False):29    """30    Borrowed from nipy.algorithms.AR_est_YW.31    aka from nitime import algorithms as alg.32    33    We could speed this up by having the autocorr only compute lag1.34    """35    from nitime.lazy import scipy_linalg as linalg36    import nitime.utils as utils37    if center:38        x = x.copy()39        x = x - x.mean()40    r_m = utils.autocorr(x)[:order + 1]41    Tm  = linalg.toeplitz(r_m[:order])42    y   = r_m[1:]43    ak  = linalg.solve(Tm, y)44    return ak[0]45def ar_statsmodels(x, order=(1,0), trend='nc'):46    import statsmodels.api as sm47    arma_mod = sm.tsa.ARMA(x)48    arma_res = arma_mod.fit(order=order, trend=trend, disp=False)49    return arma_res.arparams[0]50def ar1(func, method=ar_nitime):51    func_centered = func - func.mean(0)52    #import code53    #code.interact(local=locals())54    ar_vals = np.apply_along_axis(method, 0, func_centered)55    return ar_vals56def calc_dvars(func, output_all=False, interp="fraction"):57    # Robust standard deviation58    func_sd     = robust_stdev(func, interp)59    60    # AR161    func_ar1    = ar1(func)62    # Predicted standard deviation of temporal derivative63    func_sd_pd  = np.sqrt(2 * (1 - func_ar1)) * func_sd64    diff_sd_mean= func_sd_pd.mean()65    # Compute temporal difference time series 66    func_deriv  = np.diff(func, axis=0)67    # DVARS68    ## (no standardization)69    dvars_plain = func_deriv.std(1, ddof=1) # TODO: Why are we not ^2 this & getting the sqrt?70    ## standardization71    dvars_stdz  = dvars_plain/diff_sd_mean72    ## voxelwise standardization73    diff_vx_stdz= func_deriv/func_sd_pd74    dvars_vx_stdz = diff_vx_stdz.std(1, ddof=1)75    76    if output_all:77        out = np.vstack((dvars_stdz, dvars_plain, dvars_vx_stdz))78    else:79        out = dvars_stdz.reshape(len(dvars_stdz), 1)80    81    return out82def calc_mean_dvars(dvars):83    mean_dvars = dvars.mean(0)84    return mean_dvars85def mean_dvars_wrapper(func_file, mask_file, dvars_out_file=None):86    func    = load(func_file, mask_file)87    dvars   = calc_dvars(func)88    if dvars_out_file:89        np.savetxt(dvars_out_file, dvars, fmt='%.12f')90    mean_d  = calc_mean_dvars(dvars)91    return mean_d[0]92def test():93    func    = load("sample_func.nii.gz", "sample_func_mask.nii.gz")94    dvars   = calc_dvars(func)95    mean_d  = calc_mean_dvars(dvars)96    ref_dvars = np.loadtxt("sample_dvars.txt")97    ref_dvars = ref_dvars[:,[1,0,2]]98    99def specific_tests():100    from numpy.testing import *101    102    ffile   = "sample_func.nii.gz"103    mfile   = "sample_func_mask.nii.gz"104    func    = load(ffile, mfile)105    106    # Robust Standard Deviation107    ## Differences in the two approaches exist because python will handle108    ## ties via averaging109    func_sd     = robust_stdev(func)110    ref_sd      = load("ref_dvars/DVARS-1033--SD.nii.gz", mfile, False)111    print np.abs(func_sd - ref_sd).mean()112    ## so we can fix this issue by changing the interpolation/ties approach113    ## note however that normally we will use interp="fraction"114    func_sd     = robust_stdev(func, interp="higher")115    ## test116    assert_allclose(func_sd, ref_sd)117    print np.abs(func_sd - ref_sd).mean()118    119    # AR1120    func_ar1    = ar1(func)121    ref_ar1     = load("ref_dvars/DVARS-1033--AR1.nii.gz", mfile, False)122    ## test123    assert_allclose(func_ar1, ref_ar1, 1e-6, 1e-6)124    print np.abs(func_ar1 - ref_ar1).mean()125    126    # Predicted standard deviation of temporal derivative127    func_sd_pd  = np.sqrt(2 * (1 - func_ar1)) * func_sd128    diff_sd_mean= func_sd_pd.mean()129    ref_sd_pd   = load("ref_dvars/DVARS-1033--DiffSDhat.nii.gz", mfile, False)130    ## test131    assert_allclose(func_sd_pd, ref_sd_pd, 1e-5, 1e-5)132    print np.abs(func_sd_pd - ref_sd_pd).mean()133    134    # Compute temporal difference time series135    func_deriv  = np.diff(func, axis=0)136    ref_deriv   = load("ref_dvars/DVARS-1033--Diff.nii.gz", mfile)137    ## test (these should be flipped in sign)138    assert_equal(func_deriv, -1*ref_deriv)139    print np.abs(func_deriv - (-1*ref_deriv)).mean()140    141    # DVARS (no standardization)142    dvars_plain = func_deriv.std(1, ddof=1)143    ref_plain   = np.loadtxt("ref_dvars/DVARS-1033--DiffSD.dat")144    print np.abs(dvars_plain - ref_plain).mean()145    print np.vstack((dvars_plain[:10], ref_plain[:10])).T146    print np.vstack((func_deriv.std(1, ddof=1)[:10], ref_plain[:10])).T147    ## it seems like the differences above stem from an incorrect averaging by my modication of DVARS148    func_img    = nib.load("ref_dvars/DVARS-1033--Diff.nii.gz")149    func_all    = func_img.get_data().astype(np.float)150    func_all    = func_all.reshape(np.prod(func_all.shape[:3]), func_all.shape[-1])151    func_all    = func_all.T152    print func_all[0,func_all[0,:].nonzero()].std(ddof=1) - ref_plain[0]153    154    # DVARS155    ## (no standardization)156    dvars_plain = func_deriv.std(1, ddof=1) # TODO: Why are we not ^2 this & getting the sqrt?157    ## standardization158    dvars_stdz  = dvars_plain/diff_sd_mean159    ## voxelwise standardization160    diff_vx_stdz= func_deriv/func_sd_pd161    dvars_vx_stdz = diff_vx_stdz.std(1, ddof=1)162    ## reference163    ref_dvars = np.loadtxt("sample_dvars.txt")164    ref_dvars = ref_dvars[:,[1,0,2]]165    ## test166    print np.abs(dvars_plain - ref_dvars[:,0]).mean()167    168## Below tests show that ar_nitime is much faster169## so we will use that170# %timeit ar_nitime(func_centered[:,0])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
