Best Python code snippet using autotest_python
input_reader.py
Source:input_reader.py  
1# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.2#3# Redistribution and use in source and binary forms, with or without4# modification, are permitted provided that the following conditions5# are met:6#  * Redistributions of source code must retain the above copyright7#    notice, this list of conditions and the following disclaimer.8#  * Redistributions in binary form must reproduce the above copyright9#    notice, this list of conditions and the following disclaimer in the10#    documentation and/or other materials provided with the distribution.11#  * Neither the name of NVIDIA CORPORATION nor the names of its12#    contributors may be used to endorse or promote products derived13#    from this software without specific prior written permission.14#15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY16# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE17# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR18# PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR19# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,20# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,21# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR22# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY23# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.26import os27import numpy as np28from PIL import Image29import scipy.io30import random31import scipy.sparse32import scipy.ndimage.morphology as morp33import cv234from skimage.measure import regionprops35from skimage import measure36from collections import OrderedDict37class InputReaderBase:38    def __init__(self, in_path, file_list, n_classes):39        """40        :param path:41        :param file_list:42        :param n_classes: tuple (start,end)43        """44        self._in_path = in_path45        self._file_list = file_list46        if file_list is not None:47            self._read_list = open(file_list, 'r').read().splitlines()48        else:49            self._read_list = None50        self.n_classes = n_classes51        self._seed = 12352        self._classes_to_keep = []53        self._remapping = OrderedDict()54    def set_classes_to_keep(self, classes_to_keep):55        self._classes_to_keep = classes_to_keep56        self._remapping = OrderedDict()57    def set_external_list(self, ext_list):58        self._read_list = ext_list59    def randompick(self, max_number, seed=123):60        self._seed = seed61        random.seed(seed)62        random.shuffle(self._read_list)63        self._file_list = 'external'64        self._read_list = self._read_list[:max_number]65    def _ignore_classes(self, masks):66        if len(self._classes_to_keep) == 0:67            return masks68        idx = 069        result = []70        for i in range(len(masks)):71            if i in self._classes_to_keep:72                mask = masks[i]73                result.append(mask)74                self._remapping[idx] = i75                idx = idx + 176        return np.array(result)77    def __getitem__(self, item):78        raise NotImplementedError()79    def __len__(self):80        return len(self._read_list)81class InputReader(InputReaderBase):82    def __getitem__(self, item):83        stack = []84        filename = self._read_list[item]85        for idx_cls in range(self.n_classes[0], self.n_classes[1] + self.n_classes[0]):86            img_path = os.path.join(self._in_path, 'class_' + str(idx_cls), filename + '.png')87            img = np.array(Image.open(img_path)) / 255.088            stack.append(img)89        assert len(stack) == self.n_classes[1], len(stack)90        stack = self._ignore_classes(stack)91        return filename, stack92class InputReaderBaseName(InputReaderBase):93    def __getitem__(self, item):94        stack = []95        filename = self._read_list[item]96        filename = os.path.basename(filename).split('.png')[0]97        for idx_cls in range(self.n_classes[0], self.n_classes[1] + self.n_classes[0]):98            img_path = os.path.join(self._in_path, 'class_' + str(idx_cls), filename + '.png')99            img = np.array(Image.open(img_path)) / 255.0100            stack.append(img)101        assert len(stack) == self.n_classes[1], len(stack)102        stack = self._ignore_classes(stack)103        return filename, stack104class InputReaderDummy(InputReaderBase):105    def __getitem__(self, item):106        filename = self._read_list[item]107        return filename, [None for _ in range(self.n_classes[1])]108class InputReaderRGBImage(InputReaderBase):109    def __getitem__(self, item):110        filename = self._read_list[item]111        img_path = os.path.join(self._in_path, 'img', filename + '.png')112        if not os.path.isfile(img_path):113            img_path = os.path.join(self._in_path, 'img', filename + '.jpg')114        img = Image.open(img_path)115        return filename, img116class InputReaderSemMat(InputReaderBase):117    @staticmethod118    def map2Kchannels(maps, klasses):119        masks = [maps == (i + 1) for i in range(klasses)]120        return np.stack(masks, axis=0).astype(np.uint8)121    def __getitem__(self, item):122        filename = self._read_list[item]123        mat_path = os.path.join(self._in_path, filename + '.mat')124        matlab = scipy.io.loadmat(mat_path)125        segmap = matlab['GTcls']['Segmentation'][0][0]126        masks = self.map2Kchannels(segmap, self.n_classes[1])127        assert len(masks) == self.n_classes[1], len(masks)128        masks = self._ignore_classes(masks)129        return filename, masks130class InputReaderSemMat2(InputReaderSemMat):131    def __getitem__(self, item):132        filename = self._read_list[item]133        mat_path = os.path.join(self._in_path, 'cls', filename + '.mat')134        matlab = scipy.io.loadmat(mat_path)135        segmap = matlab['GTcls']['Segmentation'][0][0]136        masks = self.map2Kchannels(segmap, self.n_classes[1])137        assert len(masks) == self.n_classes[1], len(masks)138        masks = self._ignore_classes(masks)139        return filename, masks140class InputReaderSemMatDemo(InputReaderSemMat):141    def __getitem__(self, item):142        mat_path = self._read_list[item]143        matlab = scipy.io.loadmat(mat_path)144        segmap = matlab['GTcls']['Segmentation'][0][0]145        masks = self.map2Kchannels(segmap, self.n_classes[1])146        assert len(masks) == self.n_classes[1], len(masks)147        masks = self._ignore_classes(masks)148        return os.path.basename(mat_path), masks149class InputReaderSemMat2BaseName(InputReaderSemMat):150    def __getitem__(self, item):151        filename = self._read_list[item]152        filename = os.path.basename(filename).split('.png')[0]153        mat_path = os.path.join(self._in_path, 'cls', filename + '.mat')154        matlab = scipy.io.loadmat(mat_path)155        segmap = matlab['GTcls']['Segmentation'][0][0]156        masks = self.map2Kchannels(segmap, self.n_classes[1])157        assert len(masks) == self.n_classes[1], len(masks)158        masks = self._ignore_classes(masks)159        return filename, masks160class InputReaderSemMatCoarse(InputReaderBase):161    def __init__(self, in_path, file_list, n_classes, delta):162        super(InputReaderSemMatCoarse, self).__init__(in_path, file_list, n_classes)163        self._delta = delta164    def map2Kchannels(self, maps, klasses):165        masks = []166        idx = 0167        for i in range(klasses):168            if (len(self._classes_to_keep) == 0) or (i in self._classes_to_keep):169                mask = (maps == (i + 1))170                coarse_mask = self._simulate_coarse_label(mask, self._delta)171                masks.append(coarse_mask)172                self._remapping[idx] = i173                idx = idx + 1174        return np.stack(masks, axis=0).astype(np.uint8)175    def _simulate_coarse_label(self, mask, delta, return_polys=False):176        all_zeros = not np.any(mask)177        if all_zeros:178            return mask179        erosion_iter = delta // 2180        eroded_mask = morp.binary_erosion(mask, iterations=erosion_iter)181        if not np.any(eroded_mask):182            eroded_mask = eroded_mask.astype(np.uint8)183            # if the objects are too small and the delta (erosion) is to big, the objects may disapear184            # if all disappear185            # so let's draw a circle186            properties = regionprops(mask.astype(np.int32))187            c_y, c_x = properties[0].centroid188            c_y = int(c_y)189            c_x = int(c_x)190            # --191            cv2.circle(eroded_mask, (c_x, c_y), 3, 1, -1)192            return eroded_mask193        polys = measure.find_contours(eroded_mask, 0)194        if len(polys) == 0:195            print('error getting poly..returning empty mask')196            return np.zeros_like(mask)197        final_mask = np.zeros_like(mask).astype(np.uint8).T198        final_mask = np.ascontiguousarray(final_mask)199        for poly in polys:200            result = cv2.approxPolyDP(poly.astype(np.int32), erosion_iter, True)[:, 0, :]201            cv2.fillPoly(final_mask, [result], 1)202        final_mask = final_mask.T203        if not np.any(final_mask):204            print('this mask ended up being empty')205        return final_mask206    def __getitem__(self, item):207        filename = self._read_list[item]208        mat_path = os.path.join(self._in_path, 'cls', filename + '.mat')209        matlab = scipy.io.loadmat(mat_path)210        segmap = matlab['GTcls']['Segmentation'][0][0]211        # this function inside does ignore classes... doing this way for speed212        masks = self.map2Kchannels(segmap, self.n_classes[1])213        return filename, masks214class InputReaderSemMatCoarsePerComponent(InputReaderSemMatCoarse):215    def getSim_fn(self):216        return None217    def _simulate_coarse_label(self, mask, delta):218        all_zeros = not np.any(mask)219        if all_zeros:220            return mask221        blobs_labels, n_blobs = measure.label(mask, background=0, return_num=True)222        simp_output = np.zeros_like(mask)223        fn_callback = self.getSim_fn()224        for blob_id in range(1, n_blobs + 1):225            mask_blob = (blobs_labels == blob_id)226            mask_blob = np.ascontiguousarray(mask_blob, np.uint8)227            if fn_callback is None:228                sim_blob = super(InputReaderSemMatCoarsePerComponent, self)._simulate_coarse_label(mask_blob, delta)229            else:230                sim_blob = fn_callback(mask_blob, delta)231            simp_output = simp_output + sim_blob232        #233        return simp_output234class InputReaderSemMatClickSim(InputReaderSemMatCoarsePerComponent):235    def _simulate_click(self, mask, delta):236        all_zeros = not np.any(mask)237        if all_zeros:238            return mask239        radius = 6240        sim_mask = np.zeros_like(mask)241        properties = regionprops(mask.astype(np.int32))242        c_y, c_x = properties[0].centroid243        c_y = c_y + delta * np.random.normal()244        c_x = c_x + delta * np.random.normal()245        c_y = max(c_y, 0)246        c_x = max(c_x, 0)247        c_y = int(c_y)248        c_x = int(c_x)249        # --250        cv2.circle(sim_mask, (c_x, c_y), delta, 1, -1)251        return sim_mask252    def getSim_fn(self):253        return self._simulate_click254    def _simulate_coarse_label(self, mask, delta):255        return super(InputReaderSemMatClickSim, self)._simulate_coarse_label(mask, delta)256class InputReaderBdryMat(InputReaderBase):257    def __getitem__(self, item):258        filename = self._read_list[item]259        mat_path = os.path.join(self._in_path, filename + '.mat')260        matlab = scipy.io.loadmat(mat_path)261        boundaries = matlab['GTcls']['Boundaries']262        masks = [scipy.sparse.csr_matrix.todense(boundaries[0][0][c_id][0]) for c_id in range(0, 20)]263        assert len(masks) == self.n_classes[1], len(masks)...io.py
Source:io.py  
...7__all__ = ["DarkriftReader", "DarkriftWriter"]8class DarkriftReader(binio.BinaryReader):9    def __init__(self, b: Union[io.BytesIO, bytes]) -> None:10        super().__init__(b, binio.ByteOrder.BIG)11    def _read_list(12        self, read: Callable[[], types.DarkriftType]13    ) -> list[types.DarkriftType]:14        """15        Read a list of values.16        :param read: the reader method to read a single value17        """18        c = self.read_int32()19        v = []20        for i in range(c):21            v.append(read())22        return v23    def read_int8s(self) -> list[types.int8]:24        return self._read_list(self.read_int8)  # type: ignore25    def read_uint8s(self) -> list[types.uint8]:26        return self._read_list(self.read_uint8)  # type: ignore27    def read_int16s(self) -> list[types.int16]:28        return self._read_list(self.read_int16)  # type: ignore29    def read_uint16s(self) -> list[types.uint16]:30        return self._read_list(self.read_uint16)  # type: ignore31    def read_int32s(self) -> list[types.int32]:32        return self._read_list(self.read_int32)  # type: ignore33    def read_uint32s(self) -> list[types.uint32]:34        return self._read_list(self.read_uint32)  # type: ignore35    def read_int64s(self) -> list[types.int64]:36        return self._read_list(self.read_int64)  # type: ignore37    def read_uint64s(self) -> list[types.uint64]:38        return self._read_list(self.read_uint64)  # type: ignore39    def read_singles(self) -> list[float]:40        return self._read_list(self.read_single)  # type: ignore41    def read_doubles(self) -> list[types.double]:42        return self._read_list(self.read_double)  # type: ignore43    def read_byte(self) -> bytes:44        return self.read(1)45    def read_bytes(self) -> bytes:46        return self.read(self.read_int32())47    def read_string(self) -> str:48        return self.read_str(self.read_int32())49    def read_strings(self) -> list[str]:50        return self._read_list(self.read_string)  # type: ignore51    def read_chars(self) -> str:52        return self.read_string()53class DarkriftWriter(binio.BinaryWriter):54    def __init__(self) -> None:55        super().__init__(binio.ByteOrder.BIG)56    @classmethod57    def from_stream(58        cls,59        stream: io.BytesIO,60        byte_order: Optional[binio.ByteOrder] = binio.ByteOrder.BIG,61    ) -> DarkriftWriter:62        writer = cls()63        writer._stream = stream64        return writer...db.py
Source:db.py  
...34    return sha256(gemurl.capsule_prefix(url).encode("us-ascii")).digest(), url35def read_orbit(orbit_dir):36    path = Path(orbit_dir) / "members.json"37    with fasteners.InterProcessReaderWriterLock(f"{path}.lock").read_lock():38        return _read_list(path)39def update_url_membership(orbit_dir, url, is_member):40    path = Path(orbit_dir) / "members.json"41    with fasteners.InterProcessReaderWriterLock(f"{path}.lock").write_lock():42        orbit = _read_list(path)43        was_member = url in orbit44        if is_member and not was_member:45            orbit = extend_orbit(orbit, url)46        elif not is_member and was_member:47            orbit.remove(url)48        _write_list(path, orbit)49        return was_member50def append_submission(orbit_dir, url):51    path = Path(orbit_dir) / "submissions.json"52    with fasteners.InterProcessLock(f"{path}.lock"):53        try:54            submissions = _read_list(path)55        except OSError:56            submissions = []57        if url not in submissions:58            submissions.append(url)59            _write_list(path, submissions)60def pop_submission(orbit_dir):61    path = Path(orbit_dir) / "submissions.json"62    with fasteners.InterProcessLock(f"{path}.lock"):63        submissions = _read_list(path)64        if not submissions:65            return None66        submission = submissions.pop(0)67        _write_list(path, submissions)68        return submission69def _read_list(path):70    if path.exists():71        with path.open("r", encoding="utf-8") as f:72            return json.load(f)73    else:74        return []75def _write_list(path, list_):76    if list_:77        with path.open("w", encoding="utf-8") as f:78            json.dump(list_, f, indent=4, sort_keys=True)79    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
