Best Python code snippet using sure_python
shared_memory.py
Source:shared_memory.py  
1"""Provides shared memory for direct access across processes.2The API of this package is currently provisional. Refer to the3documentation for details.4"""5__all__ = [ 'SharedMemory', 'ShareableList' ]6from functools import partial7import mmap8import os9import errno10import struct11import secrets12import types13import _posixshmem14_USE_POSIX = True15_O_CREX = os.O_CREAT | os.O_EXCL16# FreeBSD (and perhaps other BSDs) limit names to 14 characters.17_SHM_SAFE_NAME_LENGTH = 1418# Shared memory block name prefix19_SHM_NAME_PREFIX = '/psm_'20class SharedMemory2:21    """Creates a new shared memory block or attaches to an existing22    shared memory block.23    Every shared memory block is assigned a unique name.  This enables24    one process to create a shared memory block with a particular name25    so that a different process can attach to that same shared memory26    block using that same name.27    As a resource for sharing data across processes, shared memory blocks28    may outlive the original process that created them.  When one process29    no longer needs access to a shared memory block that might still be30    needed by other processes, the close() method should be called.31    When a shared memory block is no longer needed by any process, the32    unlink() method should be called to ensure proper cleanup."""33    # Defaults; enables close() and unlink() to run without errors.34    _name = None35    _fd = -136    _mmap = None37    _buf = None38    _flags = os.O_RDWR39    _mode = 0o60040    _prepend_leading_slash = True41    def __init__(self, name='None', create=False, size=0):42        if not size >= 0:43            raise ValueError("'size' must be a positive integer")44        if create:45            self._flags = _O_CREX | os.O_RDWR46            if size == 0:47                raise ValueError("'size' must be a positive number different from zero")48        if name is None and not self._flags & os.O_EXCL:49            raise ValueError("'name' can only be None if create=True")50        # POSIX Shared Memory51        name = "/" + name if self._prepend_leading_slash else name52        self._fd = _posixshmem.shm_open(53            name,54            self._flags,55            mode=self._mode56        )57        self._name = name58        try:59            if create and size:60                os.ftruncate(self._fd, size)61            stats = os.fstat(self._fd)62            size = stats.st_size63            self._mmap = mmap.mmap(self._fd, size)64        except OSError:65            self.unlink()66            raise67        #from .resource_tracker import register68        #register(self._name, "shared_memory")69        self._size = size70        self._buf = memoryview(self._mmap)71    def __del__(self):72        try:73            self.close()74        except OSError:75            pass76    def __reduce__(self):77        return (78            self.__class__,79            (80                self.name,81                False,82                self.size,83            ),84        )85    def __repr__(self):86        return f'{self.__class__.__name__}({self.name!r}, size={self.size})'87    @property88    def buf(self):89        "A memoryview of contents of the shared memory block."90        return self._buf91    @property92    def name(self):93        "Unique name that identifies the shared memory block."94        reported_name = self._name95        if self._prepend_leading_slash:96            if self._name.startswith("/"):97                reported_name = self._name[1:]98        return reported_name99    @property100    def size(self):101        "Size in bytes."102        return self._size103    def close(self):104        """Closes access to the shared memory from this instance but does105        not destroy the shared memory block."""106        if self._buf is not None:107            self._buf.release()108            self._buf = None109        if self._mmap is not None:110            self._mmap.close()111            self._mmap = None112        if self._fd >= 0:113            os.close(self._fd)114            self._fd = -1115    def unlink(self):116        """Requests that the underlying shared memory block be destroyed.117        In order to ensure proper cleanup of resources, unlink should be118        called once (and only once) across all processes which have access119        to the shared memory block."""120        #if self._name:121        #    from .resource_tracker import unregister122        #    _posixshmem.shm_unlink(self._name)123        #    unregister(self._name, "shared_memory")124        pass125    def get_filled_empty_shm(self):126        shm_volume = int(self._size / 8)127        are_filled, are_empty = 0, 0128        for i in range(shm_volume):129            shft = i * 8130            cur_id = int.from_bytes(self._buf[shft:shft+8], 'big')131            if cur_id > 0:132                are_filled += 1133            else:134                are_empty += 1135        return are_filled, are_empty136    def puch_value_in_shm(self, value, values_need_to_be_empty=0):137        shm_volume = int(self._size / 8)138        are_empty = 0139        shft_to_push = -1140        for i in range(shm_volume):141            shft = i * 8142            cur_id = int.from_bytes(self._buf[shft:shft+8], 'big')143            if cur_id <= 0:144                are_empty += 1145                if shft_to_push < 0:146                    shft_to_push = i * 8147        if are_empty > values_need_to_be_empty:148            self._buf[shft_to_push:shft_to_push+8] = value.to_bytes(8, 'big')149            return True150        return False151    def pull_n_values_from_shm(self, n=1):152        shm_volume = int(self._size / 8)153        n = min(n, shm_volume)154        to_ret = []155        value = 0156        for i in range(shm_volume):157            shft = i * 8158            cur_id = int.from_bytes(self._buf[shft:shft+8], 'big')159            if cur_id <= 0:160                continue161            to_ret.append((i, cur_id))162        #print(n)163        #print(to_ret)164        to_ret.sort(key=lambda x: x[1])165        n = min(n, len(to_ret))166        #print(to_ret)167        ret = [to_ret[i][1] for i in range(n)]168        for t_r in to_ret[:n]:169            shft = t_r[0] * 8170            self._buf[shft:shft+8] = value.to_bytes(8, 'big')171        return ret172    def remove_old_values_from_shm(self, max_dif_between_old_n_new_product=4):173        shm_volume = int(self._size / 8)174        max_dif_between_old_n_new = shm_volume * max_dif_between_old_n_new_product175        to_ret = []176        for i in range(shm_volume):177            shft = i * 8178            cur_id = int.from_bytes(self._buf[shft:shft+8], 'big')179            if cur_id <= 0:180                continue181            to_ret.append((i, cur_id))182        if len(to_ret) == 0:183            return 0184        max_id = max([t_r[1] for t_r in to_ret])185        max_id_to_remove = max_id - max_dif_between_old_n_new186        value = 0187        removed_cnt = 0188        for t_r in to_ret:189            if t_r[1] <= max_id_to_remove:190                shft = t_r[0] * 8191                self._buf[shft:shft+8] = value.to_bytes(8, 'big')192                removed_cnt += 1193        return removed_cnt194    def pull_id_from_shm(self, req_id):195        shm_volume = int(self._size / 8)196        value = 0197        for i in range(shm_volume):198            shft = i * 8199            cur_id = int.from_bytes(self._buf[shft:shft+8], 'big')200            if cur_id == req_id:201                self._buf[shft:shft+8] = value.to_bytes(8, 'big')202                return True203        return False204_encoding = "utf8"205class ShareableList:206    """Pattern for a mutable list-like object shareable via a shared207    memory block.  It differs from the built-in list type in that these208    lists can not change their overall length (i.e. no append, insert,209    etc.)210    Because values are packed into a memoryview as bytes, the struct211    packing format for any storable value must require no more than 8212    characters to describe its format."""213    # The shared memory area is organized as follows:214    # - 8 bytes: number of items (N) as a 64-bit integer215    # - (N + 1) * 8 bytes: offsets of each element from the start of the216    #                      data area217    # - K bytes: the data area storing item values (with encoding and size218    #            depending on their respective types)219    # - N * 8 bytes: `struct` format string for each element220    # - N bytes: index into _back_transforms_mapping for each element221    #            (for reconstructing the corresponding Python value)222    _types_mapping = {223        int: "q",224        float: "d",225        bool: "xxxxxxx?",226        str: "%ds",227        bytes: "%ds",228        None.__class__: "xxxxxx?x",229    }230    _alignment = 8231    _back_transforms_mapping = {232        0: lambda value: value,                   # int, float, bool233        1: lambda value: value.rstrip(b'\x00').decode(_encoding),  # str234        2: lambda value: value.rstrip(b'\x00'),   # bytes235        3: lambda _value: None,                   # None236    }237    @staticmethod238    def _extract_recreation_code(value):239        """Used in concert with _back_transforms_mapping to convert values240        into the appropriate Python objects when retrieving them from241        the list as well as when storing them."""242        if not isinstance(value, (str, bytes, None.__class__)):243            return 0244        elif isinstance(value, str):245            return 1246        elif isinstance(value, bytes):247            return 2248        else:249            return 3  # NoneType250    def __init__(self, sequence=None, *, name=None):251        if name is None or sequence is not None:252            sequence = sequence or ()253            _formats = [254                self._types_mapping[type(item)]255                    if not isinstance(item, (str, bytes))256                    else self._types_mapping[type(item)] % (257                        self._alignment * (len(item) // self._alignment + 1),258                    )259                for item in sequence260            ]261            self._list_len = len(_formats)262            assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len263            offset = 0264            # The offsets of each list element into the shared memory's265            # data area (0 meaning the start of the data area, not the start266            # of the shared memory area).267            self._allocated_offsets = [0]268            for fmt in _formats:269                offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])270                self._allocated_offsets.append(offset)271            _recreation_codes = [272                self._extract_recreation_code(item) for item in sequence273            ]274            requested_size = struct.calcsize(275                "q" + self._format_size_metainfo +276                "".join(_formats) +277                self._format_packing_metainfo +278                self._format_back_transform_codes279            )280            self.shm = SharedMemory(name, create=True, size=requested_size)281        else:282            self.shm = SharedMemory(name)283        if sequence is not None:284            _enc = _encoding285            struct.pack_into(286                "q" + self._format_size_metainfo,287                self.shm.buf,288                0,289                self._list_len,290                *(self._allocated_offsets)291            )292            struct.pack_into(293                "".join(_formats),294                self.shm.buf,295                self._offset_data_start,296                *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)297            )298            struct.pack_into(299                self._format_packing_metainfo,300                self.shm.buf,301                self._offset_packing_formats,302                *(v.encode(_enc) for v in _formats)303            )304            struct.pack_into(305                self._format_back_transform_codes,306                self.shm.buf,307                self._offset_back_transform_codes,308                *(_recreation_codes)309            )310        else:311            self._list_len = len(self)  # Obtains size from offset 0 in buffer.312            self._allocated_offsets = list(313                struct.unpack_from(314                    self._format_size_metainfo,315                    self.shm.buf,316                    1 * 8317                )318            )319    def _get_packing_format(self, position):320        "Gets the packing format for a single value stored in the list."321        position = position if position >= 0 else position + self._list_len322        if (position >= self._list_len) or (self._list_len < 0):323            raise IndexError("Requested position out of range.")324        v = struct.unpack_from(325            "8s",326            self.shm.buf,327            self._offset_packing_formats + position * 8328        )[0]329        fmt = v.rstrip(b'\x00')330        fmt_as_str = fmt.decode(_encoding)331        return fmt_as_str332    def _get_back_transform(self, position):333        "Gets the back transformation function for a single value."334        if (position >= self._list_len) or (self._list_len < 0):335            raise IndexError("Requested position out of range.")336        transform_code = struct.unpack_from(337            "b",338            self.shm.buf,339            self._offset_back_transform_codes + position340        )[0]341        transform_function = self._back_transforms_mapping[transform_code]342        return transform_function343    def _set_packing_format_and_transform(self, position, fmt_as_str, value):344        """Sets the packing format and back transformation code for a345        single value in the list at the specified position."""346        if (position >= self._list_len) or (self._list_len < 0):347            raise IndexError("Requested position out of range.")348        struct.pack_into(349            "8s",350            self.shm.buf,351            self._offset_packing_formats + position * 8,352            fmt_as_str.encode(_encoding)353        )354        transform_code = self._extract_recreation_code(value)355        struct.pack_into(356            "b",357            self.shm.buf,358            self._offset_back_transform_codes + position,359            transform_code360        )361    def __getitem__(self, position):362        position = position if position >= 0 else position + self._list_len363        try:364            offset = self._offset_data_start + self._allocated_offsets[position]365            (v,) = struct.unpack_from(366                self._get_packing_format(position),367                self.shm.buf,368                offset369            )370        except IndexError:371            raise IndexError("index out of range")372        back_transform = self._get_back_transform(position)373        v = back_transform(v)374        return v375    def __setitem__(self, position, value):376        position = position if position >= 0 else position + self._list_len377        try:378            item_offset = self._allocated_offsets[position]379            offset = self._offset_data_start + item_offset380            current_format = self._get_packing_format(position)381        except IndexError:382            raise IndexError("assignment index out of range")383        if not isinstance(value, (str, bytes)):384            new_format = self._types_mapping[type(value)]385            encoded_value = value386        else:387            allocated_length = self._allocated_offsets[position + 1] - item_offset388            encoded_value = (value.encode(_encoding)389                             if isinstance(value, str) else value)390            if len(encoded_value) > allocated_length:391                raise ValueError("bytes/str item exceeds available storage")392            if current_format[-1] == "s":393                new_format = current_format394            else:395                new_format = self._types_mapping[str] % (396                    allocated_length,397                )398        self._set_packing_format_and_transform(399            position,400            new_format,401            value402        )403        struct.pack_into(new_format, self.shm.buf, offset, encoded_value)404    def __reduce__(self):405        return partial(self.__class__, name=self.shm.name), ()406    def __len__(self):407        return struct.unpack_from("q", self.shm.buf, 0)[0]408    def __repr__(self):409        return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'410    @property411    def format(self):412        "The struct packing format used by all currently stored items."413        return "".join(414            self._get_packing_format(i) for i in range(self._list_len)415        )416    @property417    def _format_size_metainfo(self):418        "The struct packing format used for the items' storage offsets."419        return "q" * (self._list_len + 1)420    @property421    def _format_packing_metainfo(self):422        "The struct packing format used for the items' packing formats."423        return "8s" * self._list_len424    @property425    def _format_back_transform_codes(self):426        "The struct packing format used for the items' back transforms."427        return "b" * self._list_len428    @property429    def _offset_data_start(self):430        # - 8 bytes for the list length431        # - (N + 1) * 8 bytes for the element offsets432        return (self._list_len + 2) * 8433    @property434    def _offset_packing_formats(self):435        return self._offset_data_start + self._allocated_offsets[-1]436    @property437    def _offset_back_transform_codes(self):438        return self._offset_packing_formats + self._list_len * 8439    def count(self, value):440        "L.count(value) -> integer -- return number of occurrences of value."441        return sum(value == entry for entry in self)442    def index(self, value):443        """L.index(value) -> integer -- return first index of value.444        Raises ValueError if the value is not present."""445        for position, entry in enumerate(self):446            if value == entry:447                return position448        else:449            raise ValueError(f"{value!r} not in this container")...command.py
Source:command.py  
1# (c) Nelen & Schuurmans, see LICENSE.rst.2from qgis.core import QgsProject3from ThreeDiToolbox.tool_commands.custom_command_base import CustomCommandBase4from ThreeDiToolbox.utils import constants5from ThreeDiToolbox.utils.predictions import Predictor6from ThreeDiToolbox.utils.user_messages import messagebar_message7from ThreeDiToolbox.utils.user_messages import pop_up_question8from ThreeDiToolbox.views.modify_schematisation_dialogs import (9    PredictCalcPointsDialogWidget,10)11import logging12logger = logging.getLogger(__name__)13class CustomCommand(CustomCommandBase):14    """15    command to predict the threedicore calculation points based on16    calculation type, geometry and the attribute dist_calc_points17    The results will be written to the database table v2_calculation_point.18    When running the command, the table must be empty!19    """20    def __init__(self, *args, **kwargs):21        self.args = args22        self.kwargs = kwargs23        self.iface = kwargs.get("iface")24        self.ts_datasources = kwargs.get("ts_datasources")25        self.tool_dialog_widget = None26    def run(self):27        self.show_gui()28    def show_gui(self):29        self.tool_dialog_widget = PredictCalcPointsDialogWidget(command=self)30        self.tool_dialog_widget.exec_()  # block execution31    def run_it(self, db_set, db_type):32        """33        :param db_set: dict of database settings. Expected keywords:34                'host': '',35                'port': '',36                'name': '',37                'username': '',38                'password': '',39                'schema': '',40                'database': '',41                'db_path': ,42        :param db_type: 'spatialite' or 'postgres'43        """44        predictor = Predictor(db_type)45        uri = predictor.get_uri(**db_set)46        calc_pnts_lyr = predictor.get_layer_from_uri(47            uri, constants.TABLE_NAME_CALC_PNT, "the_geom"48        )49        self.connected_pnts_lyr = predictor.get_layer_from_uri(50            uri, constants.TABLE_NAME_CONN_PNT, "the_geom"51        )52        predictor.start_sqalchemy_engine(db_set)53        if not self.fresh_start(predictor):54            return55        default_epsg_code = 2899256        epsg_code = predictor.get_epsg_code() or default_epsg_code57        logger.info(58            "[*] Using epsg code {} to build the calc_type_dict".format(epsg_code)59        )60        predictor.build_calc_type_dict(epsg_code=epsg_code)61        transform = None62        # spatialites are in WGS84 so we need a transformation63        if db_type == "spatialite":64            transform = "{epsg_code}:4326".format(epsg_code=epsg_code)65        succces, features = predictor.predict_points(66            output_layer=calc_pnts_lyr, transform=transform67        )68        if succces:69            msg = "Predicted {} calculation points".format(len(features))70            level = 371            QgsProject.instance().addMapLayer(calc_pnts_lyr)72        else:73            msg = (74                "Predicted calculation points failed! "75                'Are you sure the table "v2_calculation_point" '76                "is empty?".format()77            )78            level = 179        messagebar_message("Finished", msg, level=level, duration=12)80        cp_succces, cp_features = predictor.fill_connected_pnts_table(81            calc_pnts_lyr=calc_pnts_lyr, connected_pnts_lyr=self.connected_pnts_lyr82        )83        if cp_succces:84            cp_msg = "Created {} connected points template".format(len(cp_features))85            cp_level = 386            QgsProject.instance().addMapLayer(self.connected_pnts_lyr)87        else:88            cp_msg = "Creating connected points failed!"89            cp_level = 190        messagebar_message("Finished", cp_msg, level=cp_level, duration=12)91        logger.info("Done predicting calcualtion points.\n" + msg)92    def fresh_start(self, predictor):93        """94        Check whether we start off fresh or not. That is, if the95        calculation and connected points have been calculated before96        the stale data will be removed from the database after97        the user has confirmed to do so98        :param predictor: utils.predictions.Predictor instance99        :returns True if we start fresh. In this case all database100            tables are empty. False otherwise101        """102        fresh = True103        are_empty = []104        table_names = [constants.TABLE_NAME_CALC_PNT, constants.TABLE_NAME_CONN_PNT]105        for tn in table_names:106            are_empty.append(predictor.threedi_db.table_is_empty(tn))107        if not all(are_empty):108            fresh = False109            question = (110                "Calculation point and connected point tables are not "111                "empty! Do you want to delete all their contents?"112            )113            if pop_up_question(question, "Warning"):114                predictor.threedi_db.delete_from(constants.TABLE_NAME_CONN_PNT)115                predictor.threedi_db.delete_from(constants.TABLE_NAME_CALC_PNT)116                fresh = True...__init__.py
Source:__init__.py  
...33    return series.apply(test).any()34# def lists_columns(df):35#     """Return a list of column names whose values are lists."""36#     return list(df.columns[df.apply(column_is_list)])37def are_empty(series):38    """Return True if all items in series evaluate to empty."""39    def test(value):40        """Return True if value is considered empty."""41        empty_values = [list(), dict(), tuple(), str(), np.nan, pd.NaT, None]42        if value in empty_values:43            return True44        else:45            return False46    return series.apply(test).all()47# def empty_columns(df):48#     """Return a list of column names that are completely empty."""49#     return list(df.columns[df.apply(column_is_empty)])50def split_table_by_column_value(df, column_name, drop_empty_columns=False):51    """Return a dict of DataFrames grouped by the values in column_name."""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
