Best Python code snippet using slash
executor.py
Source:executor.py  
...287        data_id : unidist.core.backends.mpi.core.common.MasterDataID288            An ID to data289        """290        self._cleanup_list.append(data_id)291    def regular_cleanup(self):292        """293        Cleanup all garbage collected IDs from local and all workers object storages.294        Cleanup triggers based on internal threshold settings.295        """296        e_logger.debug("Cleanup list len - {}".format(len(self._cleanup_list)))297        e_logger.debug(298            "Cleanup counter {}, threshold reached - {}".format(299                self._cleanup_counter,300                (self._cleanup_counter % self._cleanup_threshold) == 0,301            )302        )303        if len(self._cleanup_list) > self._cleanup_list_threshold:304            if self._cleanup_counter % self._cleanup_threshold == 0:305                timestamp_snapshot = time.perf_counter()306                if (timestamp_snapshot - self._timestamp) > self._time_threshold:307                    e_logger.debug("Cleanup counter {}".format(self._cleanup_counter))308                    # Compare submitted and executed tasks309                    communication.mpi_send_object(310                        comm,311                        common.Operation.GET_TASK_COUNT,312                        communication.MPIRank.MONITOR,313                    )314                    executed_task_counter = communication.recv_simple_operation(315                        comm, communication.MPIRank.MONITOR316                    )317                    e_logger.debug(318                        "Submitted task count {} vs executed task count {}".format(319                            self._task_counter, executed_task_counter320                        )321                    )322                    if executed_task_counter == self._task_counter:323                        self._send_cleanup_request(self._cleanup_list)324                        # Clear the remaining references325                        self._object_store.clear(self._cleanup_list)326                        self._cleanup_list.clear()327                        self._cleanup_counter += 1328                        self._timestamp = time.perf_counter()329            else:330                self._cleanup_counter += 1331object_store = ObjectStore()332garbage_collector = GarbageCollector(object_store)333e_logger = common.get_logger("executor", "executor.log")334# Internal functions335# -----------------------------------------------------------------------------336# Current rank for schedule (exclude MPIRank.ROOT and monitor ranks)337round_robin = itertools.cycle(range(2, world_size))338def schedule_rank():339    """340    Find the next rank for task/actor-task execution.341    Returns342    -------343    int344        A rank number.345    """346    global round_robin347    return next(round_robin)348def request_worker_data(data_id):349    """350    Get an object(s) associated with `data_id` from the object storage.351    Parameters352    ----------353    data_id : unidist.core.backends.common.data_id.DataID354        An ID(s) to object(s) to get data from.355    Returns356    -------357    object358        A Python object.359    """360    owner_rank = object_store.get_data_owner(data_id)361    e_logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank))362    # Worker request363    operation_type = common.Operation.GET364    operation_data = {"source": rank, "id": data_id.base_data_id()}365    communication.send_simple_operation(366        comm, operation_type, operation_data, owner_rank367    )368    # Blocking get369    data = communication.recv_complex_data(comm, owner_rank)370    # Caching the result, check the protocol correctness here371    object_store.put(data_id, data)372    return data373def push_local_data(dest_rank, data_id):374    """375    Send local data associated with passed ID to target rank.376    Parameters377    ----------378    dest_rank : int379        Target rank.380    data_id : unidist.core.backends.mpi.core.common.MasterDataID381        An ID to data.382    """383    # Check if data was already pushed384    if not object_store.is_already_sent(data_id, dest_rank):385        e_logger.debug("PUT LOCAL {} id to {} rank".format(data_id._id, dest_rank))386        # Push the local master data to the target worker directly387        operation_type = common.Operation.PUT_DATA388        if object_store.is_already_serialized(data_id):389            serialized_data = object_store.get_serialized_data(data_id)390            communication.send_operation(391                comm, operation_type, serialized_data, dest_rank, is_serialized=True392            )393        else:394            operation_data = {"id": data_id, "data": object_store.get(data_id)}395            serialized_data = communication.send_operation(396                comm, operation_type, operation_data, dest_rank, is_serialized=False397            )398            object_store.cache_serialized_data(data_id, serialized_data)399        #  Remember pushed id400        object_store.cache_send_info(data_id, dest_rank)401def push_data_owner(dest_rank, data_id):402    """403    Send data location associated with data ID to target rank.404    Parameters405    ----------406    dest_rank : int407        Target rank.408    value : unidist.core.backends.mpi.core.common.MasterDataID409        An ID to data.410    """411    operation_type = common.Operation.PUT_OWNER412    operation_data = {413        "id": data_id,414        "owner": object_store.get_data_owner(data_id),415    }416    communication.send_simple_operation(comm, operation_type, operation_data, dest_rank)417def push_data(dest_rank, value):418    """419    Parse and send all values to destination rank.420    Process all arguments recursivelly and send all ID associated data or it's location421    to the target rank.422    Parameters423    ----------424    dest_rank : int425        Rank where task data is needed.426    value : iterable or dict or object427        Arguments to be sent.428    """429    if isinstance(value, (list, tuple)):430        for v in value:431            push_data(dest_rank, v)432    elif isinstance(value, (dict)):433        for v in value.values():434            push_data(dest_rank, v)435    elif is_data_id(value):436        if object_store.contains(value):437            push_local_data(dest_rank, value)438        elif object_store.contains_data_owner(value):439            push_data_owner(dest_rank, value)440        else:441            raise ValueError("Unknown DataID!")442# Control API443# -----------------------------------------------------------------------------444def init():445    """446    Initialize MPI processes.447    Notes448    -----449    Only collects the MPI cluster topology.450    """451    global topology452    if not topology:453        topology = communication.get_topology()454# TODO: cleanup before shutdown?455def shutdown():456    """457    Shutdown all MPI processes.458    Notes459    -----460    Sends cancelation operation to all workers and monitor processes.461    """462    # Send shutdown commands to all ranks463    for rank_id in range(1, world_size):464        communication.mpi_send_object(comm, common.Operation.CANCEL, rank_id)465        e_logger.debug("Shutdown rank {}".format(rank_id))466def cluster_resources():467    """468    Get resources of MPI cluster.469    Returns470    -------471    dict472        Dictionary with cluster nodes info in the form473        `{"node_ip0": {"CPU": x0}, "node_ip1": {"CPU": x1}, ...}`.474    """475    cluster_resources = defaultdict(dict)476    for host, ranks_list in topology.items():477        cluster_resources[host]["CPU"] = len(ranks_list)478    return dict(cluster_resources)479# Data API480# -----------------------------------------------------------------------------481def put(data):482    """483    Put the data into object storage.484    Parameters485    ----------486    data : object487        Data to be put.488    Returns489    -------490    unidist.core.backends.mpi.core.common.MasterDataID491        An ID of an object in object storage.492    """493    data_id = object_store.generate_data_id(garbage_collector)494    object_store.put(data_id, data)495    e_logger.debug("PUT {} id".format(data_id._id))496    return data_id497def get(data_ids):498    """499    Get an object(s) associated with `data_ids` from the object storage.500    Parameters501    ----------502    data_ids : unidist.core.backends.common.data_id.DataID or list503        An ID(s) to object(s) to get data from.504    Returns505    -------506    object507        A Python object.508    """509    def get_impl(data_id):510        if object_store.contains(data_id):511            value = object_store.get(data_id)512        else:513            value = request_worker_data(data_id)514        if isinstance(value, Exception):515            raise value516        return value517    e_logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids)))518    is_list = isinstance(data_ids, list)519    if not is_list:520        data_ids = [data_ids]521    values = [get_impl(data_id) for data_id in data_ids]522    # Initiate reference count based cleaup523    # if all the tasks were completed524    garbage_collector.regular_cleanup()525    return values if is_list else values[0]526def wait(data_ids, num_returns=1):527    """528    Wait until `data_ids` are finished.529    This method returns two lists. The first list consists of530    ``DataID``-s that correspond to objects that completed computations.531    The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).532    Parameters533    ----------534    data_ids : unidist.core.backends.mpi.core.common.MasterDataID or list535        ``DataID`` or list of ``DataID``-s to be waited.536    num_returns : int, default: 1537        The number of ``DataID``-s that should be returned as ready.538    Returns539    -------540    tuple541        List of data IDs that are ready and list of the remaining data IDs.542    """543    def wait_impl(data_id):544        if object_store.contains(data_id):545            return546        owner_rank = object_store.get_data_owner(data_id)547        operation_type = common.Operation.WAIT548        operation_data = {"id": data_id.base_data_id()}549        communication.send_simple_operation(550            comm, operation_type, operation_data, owner_rank551        )552        e_logger.debug("WAIT {} id from {} rank".format(data_id._id, owner_rank))553        communication.mpi_busy_wait_recv(comm, owner_rank)554    e_logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids)))555    if not isinstance(data_ids, list):556        data_ids = [data_ids]557    ready = []558    not_ready = data_ids559    while len(ready) != num_returns:560        first = not_ready.pop(0)561        wait_impl(first)562        ready.append(first)563    # Initiate reference count based cleaup564    # if all the tasks were completed565    garbage_collector.regular_cleanup()566    return ready, not_ready567# Task API568# -----------------------------------------------------------------------------569def remote(task, *args, num_returns=1, **kwargs):570    """571    Execute function on a worker process.572    Parameters573    ----------574    task : callable575        Function to be executed in the worker.576    *args : iterable577        Positional arguments to be passed in the `task`.578    num_returns : int, default: 1579        Number of results to be returned from `task`.580    **kwargs : dict581        Keyword arguments to be passed in the `task`.582    Returns583    -------584    unidist.core.backends.mpi.core.common.MasterDataID or list or None585        Type of returns depends on `num_returns` value:586        * if `num_returns == 1`, ``DataID`` will be returned.587        * if `num_returns > 1`, list of ``DataID``-s will be returned.588        * if `num_returns == 0`, ``None`` will be returned.589    """590    # Initiate reference count based cleanup591    # if all the tasks were completed592    garbage_collector.regular_cleanup()593    dest_rank = schedule_rank()594    output_ids = object_store.generate_output_data_id(595        dest_rank, garbage_collector, num_returns596    )597    e_logger.debug("REMOTE OPERATION")598    e_logger.debug(599        "REMOTE args to {} rank: {}".format(600            dest_rank, common.unwrapped_data_ids_list(args)601        )602    )603    e_logger.debug(604        "REMOTE outputs to {} rank: {}".format(605            dest_rank, common.unwrapped_data_ids_list(output_ids)606        )...test_sync_music.py
Source:test_sync_music.py  
1# music_sync - Sync music library to external device2# Copyright (C) 2013-2018 Christian Fetzer3#4# This program is free software; you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation; either version 2 of the License, or7# (at your option) any later version.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License along15# with this program; if not, write to the Free Software Foundation, Inc.,16# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.17"""Tests sync_music."""18import pathlib19import shutil20from unittest import mock21import pytest22from sync_music.sync_music import SyncMusic23from sync_music.sync_music import load_settings24from sync_music.util import list_all_files25class TestSyncMusicSettings:26    """Tests sync_music load_settings function."""27    @staticmethod28    def test_noparams():29        """Tests loading of settings without parameters."""30        with pytest.raises(SystemExit):31            load_settings("")32    @staticmethod33    def test_nonexistent():34        """Tests loading of settings with incorrect paths."""35        argv = [36            "--audio-src",37            "/proc/nonexistingpath",38            "--audio-dest",39            "/proc/nonexistingpath",40        ]41        with pytest.raises(SystemExit):42            load_settings(argv)43    @staticmethod44    def test_forcecopy_with_hacks(tmp_path):45        """Tests loading of settings with force copy and hacks."""46        argv = [47            "--audio-src",48            str(tmp_path),49            "--audio-dest",50            str(tmp_path),51            "--mode=copy",52            "--albumartist-artist-hack",53        ]54        with pytest.raises(SystemExit):55            load_settings(argv)56    @staticmethod57    def test_configfile(tmp_path):58        """Tests loading of settings within config file."""59        configpath = tmp_path / "sync_music.cfg"60        argv = ["-c", str(configpath)]61        with configpath.open("w") as configfile:62            configfile.write("[Defaults]\n")63            configfile.write(f"audio_src={str(tmp_path)}\n")64            configfile.write(f"audio_dest={str(tmp_path)}\n")65        load_settings(argv)66class TestSyncMusicFiles:67    """Tests sync_music audio conversion."""68    input_path = "tests/reference_data/regular"69    output_path = None70    output_files = [71        "stripped_flac.mp3",72        "stripped_mp3.mp3",73        "stripped_ogg.mp3",74        "stripped_m4a.mp3",75        "withtags_flac.mp3",76        "withtags_mp3.mp3",77        "withtags_ogg.mp3",78        "withtags_m4a.mp3",79        "sync_music.db",80        "folder.jpg",81        "dir/folder.jpg",82    ]83    @pytest.fixture(autouse=True)84    def init_output_path(self, tmp_path):85        """Setup temporary output directory."""86        self.output_path = tmp_path87    def _execute_sync_music(88        self, input_path=input_path, output_files=None, arguments=None, jobs=None89    ):90        """Helper method to run sync_music tests."""91        if output_files is None:92            output_files = self.output_files93        argv = [] if arguments is None else arguments94        argv += ["--audio-src", str(input_path), "--audio-dest", str(self.output_path)]95        args = load_settings(argv)96        args.jobs = 1 if jobs is None else jobs97        sync_music = SyncMusic(args)98        sync_music.sync_audio()99        assert set(list_all_files(self.output_path)) == {100            pathlib.Path(file) for file in output_files101        }102    def test_emptyfolder(self):103        """Test empty input folder."""104        with pytest.raises(FileNotFoundError):105            self._execute_sync_music(self.output_path, [])106    def test_filenames_utf8(self):107        """Test UTF-8 input file names."""108        input_path = "tests/reference_data/filenames_utf8"109        output_files = ["test_äöüÃ.mp3", "sync_music.db"]110        self._execute_sync_music(input_path, output_files)111    def test_filenames_fat32(self):112        """Test input file names incompatible with FAT32."""113        input_path = "tests/reference_data/filenames_fat32"114        output_files = [115            "A___A.mp3",116            "B___B.mp3",117            "C___C.mp3",118            "D___D.mp3",119            "E___E.mp3",120            "F___F.mp3",121            "G___G.mp3",122            "H___H.mp3",123            "sync_music.db",124        ]125        self._execute_sync_music(input_path, output_files)126    def test_reference_default(self):127        """Test reference folder with default arguements."""128        self._execute_sync_music()129    def test_reference_withdatabase(self):130        """Test reference folder with default arguements (2 runs)."""131        self._execute_sync_music()132        self._execute_sync_music()133    def test_reference_cleanup(self):134        """Test reference folder with cleanup (file removed from input)."""135        input_path = "tests/reference_data/regular_cleanup"136        output_files = self.output_files137        # Run a regular sync138        self._execute_sync_music()139        # First run: don't delete files140        def query_no(_):141            """Replacement for sync_music.util.query_yes_no."""142            return False143        with mock.patch("sync_music.util.query_yes_no", side_effect=query_no):144            self._execute_sync_music(input_path, output_files)145        # Delete a file also in output directory (to check double deletion)146        (self.output_path / "withtags_ogg.mp3").unlink()147        # Simulate OSError by creating a folder (that can't be removed)148        (self.output_path / "withtags_mp3.mp3").unlink()149        (self.output_path / "withtags_mp3.mp3").mkdir()150        # Second run: delete files151        def query_yes(_):152            """Replacement for sync_music.util.query_yes_no."""153            return True154        output_files = ["stripped_mp3.mp3", "sync_music.db"]155        with mock.patch("sync_music.util.query_yes_no", side_effect=query_yes):156            self._execute_sync_music(input_path, output_files)157    def test_reference_multiprocessing(self):158        """Test reference folder with parallel jobs."""159        self._execute_sync_music(jobs=4)160    def test_reference_forcecopy(self):161        """Test reference folder with force copy."""162        output_files = [163            "stripped_flac.flac",164            "stripped_mp3.mp3",165            "stripped_ogg.ogg",166            "stripped_m4a.m4a",167            "withtags_flac.flac",168            "withtags_mp3.mp3",169            "withtags_ogg.ogg",170            "withtags_m4a.m4a",171            "sync_music.db",172            "folder.jpg",173            "dir/folder.jpg",174        ]175        self._execute_sync_music(output_files=output_files, arguments=["--mode=copy"])176    def test_reference_transcodeonly(self):177        """Test reference folder with tag processing only."""178        self._execute_sync_music()179        self._execute_sync_music(arguments=["-f", "--disable-file-processing"])180    def test_reference_tagsonly(self):181        """Test reference folder with file processing only."""182        self._execute_sync_music(arguments=["--disable-tag-processing"])183    def test_reference_hacks(self):184        """Test reference folder with hacks."""185        self._execute_sync_music(186            arguments=[187                "--albumartist-artist-hack",188                "--albumartist-composer-hack",189                "--artist-albumartist-hack",190                "--discnumber-hack",191                "--tracknumber-hack",192            ]193        )194    def test_reference_ioerror(self, mocker):195        """Test reference folder with mocked IOError."""196        mocker.patch(197            "sync_music.transcode.Transcode.execute",198            side_effect=IOError("Mocked exception"),199        )200        mocker.patch(201            "sync_music.copy.Copy.execute", side_effect=IOError("Mocked exception")202        )203        self._execute_sync_music(output_files=["sync_music.db"])204    def test_reference_exception(self, mocker):205        """Test reference folder with mocked random exception."""206        mocker.patch(207            "sync_music.transcode.Transcode.execute",208            side_effect=Exception("Mocked exception"),209        )210        mocker.patch(211            "sync_music.copy.Copy.execute", side_effect=Exception("Mocked exception")212        )213        self._execute_sync_music(output_files=["sync_music.db"])214class TestSyncMusicPlaylists:215    """Tests sync_music playlist conversion."""216    input_path = "tests/reference_data/regular"217    output_path = None218    playlist_path = "tests/reference_data/playlists"219    @pytest.fixture(autouse=True)220    def init_output_path(self, tmp_path):221        """Setup temporary output directory."""222        self.output_path = tmp_path223    def _execute_sync_music(self, playlist_path=playlist_path):224        """Helper method to run sync_music tests."""225        argv = [226            "--audio-src",227            str(self.input_path),228            "--audio-dest",229            str(self.output_path),230            "--playlist-src",231            str(playlist_path),232        ]233        args = load_settings(argv)234        sync_music = SyncMusic(args)235        sync_music.sync_audio()236        sync_music.sync_playlists()237    def test_playlists(self):238        """Tests playlist generation."""239        self._execute_sync_music()240    def test_playlists_exists(self):241        """Tests playlist generation when playlist exists."""242        shutil.copy(243            "tests/reference_data/playlists/normal.m3u",244            self.output_path / "normal.m3u",245        )246        self._execute_sync_music()247    def test_playlists_ioerror(self):248        """Tests playlist generation with playlist that can't be opened."""249        (self.output_path / "null.m3u").symlink_to("/dev/null")...test_cleanups.py
Source:test_cleanups.py  
...162            def module_cleanup():163                assert slash.is_in_cleanup()164                assert slash.get_current_cleanup_phase() == 'module'165            slash.add_cleanup(module_cleanup, scope='module')166            def regular_cleanup():167                assert slash.is_in_cleanup()168                assert slash.get_current_cleanup_phase() == 'test'169            slash.add_cleanup(regular_cleanup)170    suite_builder.build().run().assert_success(1)171def test_add_functools_partial_cleanup(suite_builder):172    @suite_builder.first_file.add_code173    def __code__(): # pylint: disable=unused-variable174        import slash # pylint: disable=redefined-outer-name,reimported175        import functools176        def cleanup_with_argument(x):177            pass178        def test_partial_cleanup():179            slash.add_cleanup(functools.partial(cleanup_with_argument, 5))180    suite_builder.build().run().assert_success(1)Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
