How to use regular_cleanup method in Slash

Best Python code snippet using slash

executor.py

Source:executor.py Github

copy

Full Screen

...287 data_id : unidist.core.backends.mpi.core.common.MasterDataID288 An ID to data289 """290 self._cleanup_list.append(data_id)291 def regular_cleanup(self):292 """293 Cleanup all garbage collected IDs from local and all workers object storages.294 Cleanup triggers based on internal threshold settings.295 """296 e_logger.debug("Cleanup list len - {}".format(len(self._cleanup_list)))297 e_logger.debug(298 "Cleanup counter {}, threshold reached - {}".format(299 self._cleanup_counter,300 (self._cleanup_counter % self._cleanup_threshold) == 0,301 )302 )303 if len(self._cleanup_list) > self._cleanup_list_threshold:304 if self._cleanup_counter % self._cleanup_threshold == 0:305 timestamp_snapshot = time.perf_counter()306 if (timestamp_snapshot - self._timestamp) > self._time_threshold:307 e_logger.debug("Cleanup counter {}".format(self._cleanup_counter))308 # Compare submitted and executed tasks309 communication.mpi_send_object(310 comm,311 common.Operation.GET_TASK_COUNT,312 communication.MPIRank.MONITOR,313 )314 executed_task_counter = communication.recv_simple_operation(315 comm, communication.MPIRank.MONITOR316 )317 e_logger.debug(318 "Submitted task count {} vs executed task count {}".format(319 self._task_counter, executed_task_counter320 )321 )322 if executed_task_counter == self._task_counter:323 self._send_cleanup_request(self._cleanup_list)324 # Clear the remaining references325 self._object_store.clear(self._cleanup_list)326 self._cleanup_list.clear()327 self._cleanup_counter += 1328 self._timestamp = time.perf_counter()329 else:330 self._cleanup_counter += 1331object_store = ObjectStore()332garbage_collector = GarbageCollector(object_store)333e_logger = common.get_logger("executor", "executor.log")334# Internal functions335# -----------------------------------------------------------------------------336# Current rank for schedule (exclude MPIRank.ROOT and monitor ranks)337round_robin = itertools.cycle(range(2, world_size))338def schedule_rank():339 """340 Find the next rank for task/actor-task execution.341 Returns342 -------343 int344 A rank number.345 """346 global round_robin347 return next(round_robin)348def request_worker_data(data_id):349 """350 Get an object(s) associated with `data_id` from the object storage.351 Parameters352 ----------353 data_id : unidist.core.backends.common.data_id.DataID354 An ID(s) to object(s) to get data from.355 Returns356 -------357 object358 A Python object.359 """360 owner_rank = object_store.get_data_owner(data_id)361 e_logger.debug("GET {} id from {} rank".format(data_id._id, owner_rank))362 # Worker request363 operation_type = common.Operation.GET364 operation_data = {"source": rank, "id": data_id.base_data_id()}365 communication.send_simple_operation(366 comm, operation_type, operation_data, owner_rank367 )368 # Blocking get369 data = communication.recv_complex_data(comm, owner_rank)370 # Caching the result, check the protocol correctness here371 object_store.put(data_id, data)372 return data373def push_local_data(dest_rank, data_id):374 """375 Send local data associated with passed ID to target rank.376 Parameters377 ----------378 dest_rank : int379 Target rank.380 data_id : unidist.core.backends.mpi.core.common.MasterDataID381 An ID to data.382 """383 # Check if data was already pushed384 if not object_store.is_already_sent(data_id, dest_rank):385 e_logger.debug("PUT LOCAL {} id to {} rank".format(data_id._id, dest_rank))386 # Push the local master data to the target worker directly387 operation_type = common.Operation.PUT_DATA388 if object_store.is_already_serialized(data_id):389 serialized_data = object_store.get_serialized_data(data_id)390 communication.send_operation(391 comm, operation_type, serialized_data, dest_rank, is_serialized=True392 )393 else:394 operation_data = {"id": data_id, "data": object_store.get(data_id)}395 serialized_data = communication.send_operation(396 comm, operation_type, operation_data, dest_rank, is_serialized=False397 )398 object_store.cache_serialized_data(data_id, serialized_data)399 # Remember pushed id400 object_store.cache_send_info(data_id, dest_rank)401def push_data_owner(dest_rank, data_id):402 """403 Send data location associated with data ID to target rank.404 Parameters405 ----------406 dest_rank : int407 Target rank.408 value : unidist.core.backends.mpi.core.common.MasterDataID409 An ID to data.410 """411 operation_type = common.Operation.PUT_OWNER412 operation_data = {413 "id": data_id,414 "owner": object_store.get_data_owner(data_id),415 }416 communication.send_simple_operation(comm, operation_type, operation_data, dest_rank)417def push_data(dest_rank, value):418 """419 Parse and send all values to destination rank.420 Process all arguments recursivelly and send all ID associated data or it's location421 to the target rank.422 Parameters423 ----------424 dest_rank : int425 Rank where task data is needed.426 value : iterable or dict or object427 Arguments to be sent.428 """429 if isinstance(value, (list, tuple)):430 for v in value:431 push_data(dest_rank, v)432 elif isinstance(value, (dict)):433 for v in value.values():434 push_data(dest_rank, v)435 elif is_data_id(value):436 if object_store.contains(value):437 push_local_data(dest_rank, value)438 elif object_store.contains_data_owner(value):439 push_data_owner(dest_rank, value)440 else:441 raise ValueError("Unknown DataID!")442# Control API443# -----------------------------------------------------------------------------444def init():445 """446 Initialize MPI processes.447 Notes448 -----449 Only collects the MPI cluster topology.450 """451 global topology452 if not topology:453 topology = communication.get_topology()454# TODO: cleanup before shutdown?455def shutdown():456 """457 Shutdown all MPI processes.458 Notes459 -----460 Sends cancelation operation to all workers and monitor processes.461 """462 # Send shutdown commands to all ranks463 for rank_id in range(1, world_size):464 communication.mpi_send_object(comm, common.Operation.CANCEL, rank_id)465 e_logger.debug("Shutdown rank {}".format(rank_id))466def cluster_resources():467 """468 Get resources of MPI cluster.469 Returns470 -------471 dict472 Dictionary with cluster nodes info in the form473 `{"node_ip0": {"CPU": x0}, "node_ip1": {"CPU": x1}, ...}`.474 """475 cluster_resources = defaultdict(dict)476 for host, ranks_list in topology.items():477 cluster_resources[host]["CPU"] = len(ranks_list)478 return dict(cluster_resources)479# Data API480# -----------------------------------------------------------------------------481def put(data):482 """483 Put the data into object storage.484 Parameters485 ----------486 data : object487 Data to be put.488 Returns489 -------490 unidist.core.backends.mpi.core.common.MasterDataID491 An ID of an object in object storage.492 """493 data_id = object_store.generate_data_id(garbage_collector)494 object_store.put(data_id, data)495 e_logger.debug("PUT {} id".format(data_id._id))496 return data_id497def get(data_ids):498 """499 Get an object(s) associated with `data_ids` from the object storage.500 Parameters501 ----------502 data_ids : unidist.core.backends.common.data_id.DataID or list503 An ID(s) to object(s) to get data from.504 Returns505 -------506 object507 A Python object.508 """509 def get_impl(data_id):510 if object_store.contains(data_id):511 value = object_store.get(data_id)512 else:513 value = request_worker_data(data_id)514 if isinstance(value, Exception):515 raise value516 return value517 e_logger.debug("GET {} ids".format(common.unwrapped_data_ids_list(data_ids)))518 is_list = isinstance(data_ids, list)519 if not is_list:520 data_ids = [data_ids]521 values = [get_impl(data_id) for data_id in data_ids]522 # Initiate reference count based cleaup523 # if all the tasks were completed524 garbage_collector.regular_cleanup()525 return values if is_list else values[0]526def wait(data_ids, num_returns=1):527 """528 Wait until `data_ids` are finished.529 This method returns two lists. The first list consists of530 ``DataID``-s that correspond to objects that completed computations.531 The second list corresponds to the rest of the ``DataID``-s (which may or may not be ready).532 Parameters533 ----------534 data_ids : unidist.core.backends.mpi.core.common.MasterDataID or list535 ``DataID`` or list of ``DataID``-s to be waited.536 num_returns : int, default: 1537 The number of ``DataID``-s that should be returned as ready.538 Returns539 -------540 tuple541 List of data IDs that are ready and list of the remaining data IDs.542 """543 def wait_impl(data_id):544 if object_store.contains(data_id):545 return546 owner_rank = object_store.get_data_owner(data_id)547 operation_type = common.Operation.WAIT548 operation_data = {"id": data_id.base_data_id()}549 communication.send_simple_operation(550 comm, operation_type, operation_data, owner_rank551 )552 e_logger.debug("WAIT {} id from {} rank".format(data_id._id, owner_rank))553 communication.mpi_busy_wait_recv(comm, owner_rank)554 e_logger.debug("WAIT {} ids".format(common.unwrapped_data_ids_list(data_ids)))555 if not isinstance(data_ids, list):556 data_ids = [data_ids]557 ready = []558 not_ready = data_ids559 while len(ready) != num_returns:560 first = not_ready.pop(0)561 wait_impl(first)562 ready.append(first)563 # Initiate reference count based cleaup564 # if all the tasks were completed565 garbage_collector.regular_cleanup()566 return ready, not_ready567# Task API568# -----------------------------------------------------------------------------569def remote(task, *args, num_returns=1, **kwargs):570 """571 Execute function on a worker process.572 Parameters573 ----------574 task : callable575 Function to be executed in the worker.576 *args : iterable577 Positional arguments to be passed in the `task`.578 num_returns : int, default: 1579 Number of results to be returned from `task`.580 **kwargs : dict581 Keyword arguments to be passed in the `task`.582 Returns583 -------584 unidist.core.backends.mpi.core.common.MasterDataID or list or None585 Type of returns depends on `num_returns` value:586 * if `num_returns == 1`, ``DataID`` will be returned.587 * if `num_returns > 1`, list of ``DataID``-s will be returned.588 * if `num_returns == 0`, ``None`` will be returned.589 """590 # Initiate reference count based cleanup591 # if all the tasks were completed592 garbage_collector.regular_cleanup()593 dest_rank = schedule_rank()594 output_ids = object_store.generate_output_data_id(595 dest_rank, garbage_collector, num_returns596 )597 e_logger.debug("REMOTE OPERATION")598 e_logger.debug(599 "REMOTE args to {} rank: {}".format(600 dest_rank, common.unwrapped_data_ids_list(args)601 )602 )603 e_logger.debug(604 "REMOTE outputs to {} rank: {}".format(605 dest_rank, common.unwrapped_data_ids_list(output_ids)606 )...

Full Screen

Full Screen

test_sync_music.py

Source:test_sync_music.py Github

copy

Full Screen

1# music_sync - Sync music library to external device2# Copyright (C) 2013-2018 Christian Fetzer3#4# This program is free software; you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation; either version 2 of the License, or7# (at your option) any later version.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License along15# with this program; if not, write to the Free Software Foundation, Inc.,16# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.17"""Tests sync_music."""18import pathlib19import shutil20from unittest import mock21import pytest22from sync_music.sync_music import SyncMusic23from sync_music.sync_music import load_settings24from sync_music.util import list_all_files25class TestSyncMusicSettings:26 """Tests sync_music load_settings function."""27 @staticmethod28 def test_noparams():29 """Tests loading of settings without parameters."""30 with pytest.raises(SystemExit):31 load_settings("")32 @staticmethod33 def test_nonexistent():34 """Tests loading of settings with incorrect paths."""35 argv = [36 "--audio-src",37 "/proc/nonexistingpath",38 "--audio-dest",39 "/proc/nonexistingpath",40 ]41 with pytest.raises(SystemExit):42 load_settings(argv)43 @staticmethod44 def test_forcecopy_with_hacks(tmp_path):45 """Tests loading of settings with force copy and hacks."""46 argv = [47 "--audio-src",48 str(tmp_path),49 "--audio-dest",50 str(tmp_path),51 "--mode=copy",52 "--albumartist-artist-hack",53 ]54 with pytest.raises(SystemExit):55 load_settings(argv)56 @staticmethod57 def test_configfile(tmp_path):58 """Tests loading of settings within config file."""59 configpath = tmp_path / "sync_music.cfg"60 argv = ["-c", str(configpath)]61 with configpath.open("w") as configfile:62 configfile.write("[Defaults]\n")63 configfile.write(f"audio_src={str(tmp_path)}\n")64 configfile.write(f"audio_dest={str(tmp_path)}\n")65 load_settings(argv)66class TestSyncMusicFiles:67 """Tests sync_music audio conversion."""68 input_path = "tests/reference_data/regular"69 output_path = None70 output_files = [71 "stripped_flac.mp3",72 "stripped_mp3.mp3",73 "stripped_ogg.mp3",74 "stripped_m4a.mp3",75 "withtags_flac.mp3",76 "withtags_mp3.mp3",77 "withtags_ogg.mp3",78 "withtags_m4a.mp3",79 "sync_music.db",80 "folder.jpg",81 "dir/folder.jpg",82 ]83 @pytest.fixture(autouse=True)84 def init_output_path(self, tmp_path):85 """Setup temporary output directory."""86 self.output_path = tmp_path87 def _execute_sync_music(88 self, input_path=input_path, output_files=None, arguments=None, jobs=None89 ):90 """Helper method to run sync_music tests."""91 if output_files is None:92 output_files = self.output_files93 argv = [] if arguments is None else arguments94 argv += ["--audio-src", str(input_path), "--audio-dest", str(self.output_path)]95 args = load_settings(argv)96 args.jobs = 1 if jobs is None else jobs97 sync_music = SyncMusic(args)98 sync_music.sync_audio()99 assert set(list_all_files(self.output_path)) == {100 pathlib.Path(file) for file in output_files101 }102 def test_emptyfolder(self):103 """Test empty input folder."""104 with pytest.raises(FileNotFoundError):105 self._execute_sync_music(self.output_path, [])106 def test_filenames_utf8(self):107 """Test UTF-8 input file names."""108 input_path = "tests/reference_data/filenames_utf8"109 output_files = ["test_äöüß.mp3", "sync_music.db"]110 self._execute_sync_music(input_path, output_files)111 def test_filenames_fat32(self):112 """Test input file names incompatible with FAT32."""113 input_path = "tests/reference_data/filenames_fat32"114 output_files = [115 "A___A.mp3",116 "B___B.mp3",117 "C___C.mp3",118 "D___D.mp3",119 "E___E.mp3",120 "F___F.mp3",121 "G___G.mp3",122 "H___H.mp3",123 "sync_music.db",124 ]125 self._execute_sync_music(input_path, output_files)126 def test_reference_default(self):127 """Test reference folder with default arguements."""128 self._execute_sync_music()129 def test_reference_withdatabase(self):130 """Test reference folder with default arguements (2 runs)."""131 self._execute_sync_music()132 self._execute_sync_music()133 def test_reference_cleanup(self):134 """Test reference folder with cleanup (file removed from input)."""135 input_path = "tests/reference_data/regular_cleanup"136 output_files = self.output_files137 # Run a regular sync138 self._execute_sync_music()139 # First run: don't delete files140 def query_no(_):141 """Replacement for sync_music.util.query_yes_no."""142 return False143 with mock.patch("sync_music.util.query_yes_no", side_effect=query_no):144 self._execute_sync_music(input_path, output_files)145 # Delete a file also in output directory (to check double deletion)146 (self.output_path / "withtags_ogg.mp3").unlink()147 # Simulate OSError by creating a folder (that can't be removed)148 (self.output_path / "withtags_mp3.mp3").unlink()149 (self.output_path / "withtags_mp3.mp3").mkdir()150 # Second run: delete files151 def query_yes(_):152 """Replacement for sync_music.util.query_yes_no."""153 return True154 output_files = ["stripped_mp3.mp3", "sync_music.db"]155 with mock.patch("sync_music.util.query_yes_no", side_effect=query_yes):156 self._execute_sync_music(input_path, output_files)157 def test_reference_multiprocessing(self):158 """Test reference folder with parallel jobs."""159 self._execute_sync_music(jobs=4)160 def test_reference_forcecopy(self):161 """Test reference folder with force copy."""162 output_files = [163 "stripped_flac.flac",164 "stripped_mp3.mp3",165 "stripped_ogg.ogg",166 "stripped_m4a.m4a",167 "withtags_flac.flac",168 "withtags_mp3.mp3",169 "withtags_ogg.ogg",170 "withtags_m4a.m4a",171 "sync_music.db",172 "folder.jpg",173 "dir/folder.jpg",174 ]175 self._execute_sync_music(output_files=output_files, arguments=["--mode=copy"])176 def test_reference_transcodeonly(self):177 """Test reference folder with tag processing only."""178 self._execute_sync_music()179 self._execute_sync_music(arguments=["-f", "--disable-file-processing"])180 def test_reference_tagsonly(self):181 """Test reference folder with file processing only."""182 self._execute_sync_music(arguments=["--disable-tag-processing"])183 def test_reference_hacks(self):184 """Test reference folder with hacks."""185 self._execute_sync_music(186 arguments=[187 "--albumartist-artist-hack",188 "--albumartist-composer-hack",189 "--artist-albumartist-hack",190 "--discnumber-hack",191 "--tracknumber-hack",192 ]193 )194 def test_reference_ioerror(self, mocker):195 """Test reference folder with mocked IOError."""196 mocker.patch(197 "sync_music.transcode.Transcode.execute",198 side_effect=IOError("Mocked exception"),199 )200 mocker.patch(201 "sync_music.copy.Copy.execute", side_effect=IOError("Mocked exception")202 )203 self._execute_sync_music(output_files=["sync_music.db"])204 def test_reference_exception(self, mocker):205 """Test reference folder with mocked random exception."""206 mocker.patch(207 "sync_music.transcode.Transcode.execute",208 side_effect=Exception("Mocked exception"),209 )210 mocker.patch(211 "sync_music.copy.Copy.execute", side_effect=Exception("Mocked exception")212 )213 self._execute_sync_music(output_files=["sync_music.db"])214class TestSyncMusicPlaylists:215 """Tests sync_music playlist conversion."""216 input_path = "tests/reference_data/regular"217 output_path = None218 playlist_path = "tests/reference_data/playlists"219 @pytest.fixture(autouse=True)220 def init_output_path(self, tmp_path):221 """Setup temporary output directory."""222 self.output_path = tmp_path223 def _execute_sync_music(self, playlist_path=playlist_path):224 """Helper method to run sync_music tests."""225 argv = [226 "--audio-src",227 str(self.input_path),228 "--audio-dest",229 str(self.output_path),230 "--playlist-src",231 str(playlist_path),232 ]233 args = load_settings(argv)234 sync_music = SyncMusic(args)235 sync_music.sync_audio()236 sync_music.sync_playlists()237 def test_playlists(self):238 """Tests playlist generation."""239 self._execute_sync_music()240 def test_playlists_exists(self):241 """Tests playlist generation when playlist exists."""242 shutil.copy(243 "tests/reference_data/playlists/normal.m3u",244 self.output_path / "normal.m3u",245 )246 self._execute_sync_music()247 def test_playlists_ioerror(self):248 """Tests playlist generation with playlist that can't be opened."""249 (self.output_path / "null.m3u").symlink_to("/dev/null")...

Full Screen

Full Screen

test_cleanups.py

Source:test_cleanups.py Github

copy

Full Screen

...162 def module_cleanup():163 assert slash.is_in_cleanup()164 assert slash.get_current_cleanup_phase() == 'module'165 slash.add_cleanup(module_cleanup, scope='module')166 def regular_cleanup():167 assert slash.is_in_cleanup()168 assert slash.get_current_cleanup_phase() == 'test'169 slash.add_cleanup(regular_cleanup)170 suite_builder.build().run().assert_success(1)171def test_add_functools_partial_cleanup(suite_builder):172 @suite_builder.first_file.add_code173 def __code__(): # pylint: disable=unused-variable174 import slash # pylint: disable=redefined-outer-name,reimported175 import functools176 def cleanup_with_argument(x):177 pass178 def test_partial_cleanup():179 slash.add_cleanup(functools.partial(cleanup_with_argument, 5))180 suite_builder.build().run().assert_success(1)

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful