Best Python code snippet using slash
parallel.py
Source:parallel.py  
1"""Tools to aid in parallelizing a function call.2Default method is MPI, if available. Fallback is concurrent.futures. If all3else fails, final fallback is serial.4Author: Seth Axen5Email: seth.axen@gmail.com6"""7import os8import sys9import logging10from copy import copy11import multiprocessing12try:13    from itertools import izip as zip14except ImportError:  # python 315    pass16# upon import, figure out if MPI is available, and decide parallel_mode17MPI_MODE = "mpi"18FUTURES_THREADS_MODE = "threads"19FUTURES_PROCESSES_MODE = "processes"20SERIAL_MODE = "serial"21ALL_PARALLEL_MODES = (MPI_MODE,22                      FUTURES_PROCESSES_MODE, FUTURES_THREADS_MODE,23                      SERIAL_MODE)24available_parallel_modes = []25try:26    from mpi4py import MPI27    available_parallel_modes.append(MPI_MODE)28except ImportError:29    pass30try:31    import concurrent.futures32    available_parallel_modes.append(FUTURES_PROCESSES_MODE)33    available_parallel_modes.append(FUTURES_THREADS_MODE)34except ImportError:35    pass36available_parallel_modes.append(SERIAL_MODE)37def make_data_iterator(data_entries, *iterables):38    """Make an iterator from an iterable of data entries and constant values.39    All iterables should have the same number of entries. Any passed values40    that are not iterators, lists, or tuples will have that same value41    repeated for the entire length of `data_entries`.42    Parameters43    ----------44    data_entries : iterable45        Iterable of data entries.46    *iterables47        One or more iterables or constant values to serve as additional48        data entries. These are zipped into an iterator with `data_entries`.49    Returns50    -------51    iterator : Iterator of tuples, each with an item in `data_entries`52               followed by corresponding items in `iterables`.53    """54    from itertools import repeat, cycle55    from collections import Iterator56    new_iterables = [iter(data_entries), ]57    for iterable in iterables:58        if (isinstance(iterable, Iterator) or59                isinstance(iterable, list) or60                isinstance(iterable, tuple)):61            new_iterables.append(cycle(iterable))62        else:63            new_iterables.append(repeat(iterable))64    return zip(*new_iterables)65def read_bash_var(var, default=None):66    """Rad a bash variable for number of available processes/threads."""67    if var is None:68        return default69    try:70        val = int(os.environ[var])71        logging.debug("Variable %s indicates %d processors" % (var, val))72        return val73    except KeyError:74        logging.debug("Variable %s not set" % (var))75        return default76    except ValueError:77        logging.debug("Variable %s set to non-integer %s" % (var, str(val)))78        return default79def enum(*sequential, **named):80    """Fake an enumerated type.81    Reference:82    ----------83    - http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python84    Parameters85    ----------86    *sequential87        List of items.88    **named89        Dictionary of items.90    """91    enums = dict(zip(sequential, range(len(sequential))), **named)92    return type('Enum', (), enums)93class Parallelizer(object):94    """A class to aid in parallelizing a function call.95    Ideal use case is when function calls are expected to have different96    runtimes and each call is completely independent of all others."""97    def __init__(self, parallel_mode=None, num_proc=None,98                 num_proc_bash_var=None, fail_value=False):99        """Choose mode and/or number of processors or use defaults.100        Parameters101        ----------102        parallel_mode : str, optional (default None)103            Mode to use for parallelization. Available modes are104            ('mpi', 'processes', 'threads', 'serial').105        num_proc : int, optional (default None)106            Maximum number of processors to use. Ignored in MPI mode.107        num_proc_bash_var : str, optional (default None)108            Number of available processors will be read from bash variable.109            Ignored if `num_proc` is specified.110        fail_value : any, optional (default False)111            Result to be yielded if specific function evaluation failed.112        """113        preferred_parallel_modes = copy(available_parallel_modes)114        logging.debug("Parallel modes %s are available." %115                      repr(preferred_parallel_modes))116        self.fail_value = fail_value117        self.rank = 0118        self.num_proc = None119        if parallel_mode is not None:120            if parallel_mode not in ALL_PARALLEL_MODES:121                raise KeyError("Parallel mode must be in %s." %122                               (repr(ALL_PARALLEL_MODES)))123            if parallel_mode not in preferred_parallel_modes:124                if self.is_master():125                    logging.warning(126                        "Parallel mode %s not available. Will auto-select a replacement." % (repr(parallel_mode)))127            else:128                preferred_parallel_modes.pop(129                    preferred_parallel_modes.index(parallel_mode))130                preferred_parallel_modes = ([parallel_mode, ] +131                                            preferred_parallel_modes)132                logging.debug("Available parallel modes reorganized to %s." % (133                    repr(preferred_parallel_modes)))134        if num_proc is None:135            num_proc = read_bash_var(num_proc_bash_var)136        for parallel_mode in preferred_parallel_modes:137            logging.debug("Checking if mode %s is valid." %138                          (repr(parallel_mode)))139            mode_num_proc = num_proc140            self.rank = 0141            if parallel_mode == MPI_MODE:142                comm = MPI.COMM_WORLD143                self.rank = comm.Get_rank()144                mpi_num_proc = comm.Get_size()145                mode_num_proc = mpi_num_proc146            if (mode_num_proc is not None and mode_num_proc < 2147                    and parallel_mode != SERIAL_MODE):148                if self.is_master():149                    logging.warning("Only %d processes available. %s mode not available." % (150                        mode_num_proc, repr(parallel_mode)))151                    continue152            elif (mode_num_proc is None153                  and parallel_mode in (FUTURES_PROCESSES_MODE,154                                        FUTURES_THREADS_MODE)):155                mode_num_proc = multiprocessing.cpu_count()156                logging.info("num_proc is not specified. %s mode will use all %d processes" % (157                    repr(parallel_mode), mode_num_proc))158            elif parallel_mode == SERIAL_MODE:159                mode_num_proc = 1160            self.parallel_mode = parallel_mode161            self.num_proc = mode_num_proc162            break163        if self.is_master():164            logging.info(165                "Parallelizer initialized with mode %s and %d processors." % (166                    repr(self.parallel_mode), self.num_proc))167    def is_mpi(self):168        return self.parallel_mode == MPI_MODE169    def is_concurrent(self):170        return self.is_threads() or self.is_processes()171    def is_threads(self):172        return self.parallel_mode == FUTURES_THREADS_MODE173    def is_processes(self):174        return self.parallel_mode == FUTURES_PROCESSES_MODE175    def is_serial(self):176        return self.parallel_mode == SERIAL_MODE177    def is_master(self):178        return self.rank == 0179    def run(self, *args, **kwargs):180        r"""Execute a function in parallel. Return list of results.181        Parameters182        ----------183        func : function184            Function to execute. Argument is single entry of `data_iterator`185            as well as named arguments in `kwargs`.186        data_iterator : iterator187            Iterator where each entry is an argument to `func`. These data188            are communicated between processors so should be as small as189            possible.190        kwargs : dict, optional (default {})191            Named arguments for `func`.192        out_file : str, optional (default None)193            File to write results of function to. If None, results are yielded194            instead of being written to file.195        out_str : str, optional (default "%s\n")196            Format string to be written to output file for each result.197        out_format : function, optional (default str)198            Function to apply to output of `func` to format results to match199            `out_str`.200        logging_str : str, optional (default None)201            Format string to be logged using `logging` for each successful202            result. If None, only errors are logged.203        logging_format : function, optional (default str)204            Function to apply to `data` entries of `data_iterator` to format205            results to match `logging_str`.206        num_proc : int (default None)207            Number of processors to use. If None, maximum number available208            is used. If `is_mpi`, this term is ignored.209        Returns210        -------211        list : List of results of `func`.212        """213        results = [x for x in self.run_gen(*args, **kwargs)]214        return results215    def run_gen(self, func, data_iterator, kwargs={}, out_file=None,216                out_str="%s\n", out_format=str, logging_str=None,217                logging_format=str):218        r"""Execute a function in parallel. Return result iterator.219        Parameters220        ----------221        func : function222            Function to execute. Argument is single entry of `data_iterator`223            as well as named arguments in `kwargs`.224        data_iterator : iterator225            Iterator where each entry is an argument to `func`. These data226            are communicated between processors so should be as small as227            possible.228        kwargs : dict, optional (default {})229            Named arguments for `func`.230        out_file : str, optional (default None)231            File to write results of function to. If None, results are yielded232            instead of being written to file.233        out_str : str, optional (default "%s\n")234            Format string to be written to output file for each result.235        out_format : function, optional (default str)236            Function to apply to output of `func` to format results to match237            `out_str`.238        logging_str : str, optional (default None)239            Format string to be logged using `logging` for each successful240            result. If None, only errors are logged.241        logging_format : function, optional (default str)242            Function to apply to `data` entries of `data_iterator` to format243            results to match `logging_str`.244        num_proc : int (default None)245            Number of processors to use. If None, maximum number available246            is used. If `is_mpi`, this term is ignored.247        Returns248        -------249        iterator : Iterator through results of `func`.250        """251        result_iterator = iter([])252        if self.is_mpi():253            result_iterator = (x for x in self.mpi_run(254                func, data_iterator, kwargs=kwargs, out_file=out_file,255                out_str=out_str, out_format=out_format,256                logging_str=logging_str, logging_format=logging_format))257        elif self.is_concurrent():258            result_iterator = (x for x in self.concurrent_run(259                func, data_iterator, kwargs=kwargs, out_file=out_file,260                out_str=out_str, out_format=out_format,261                logging_str=logging_str, logging_format=logging_format))262        else:263            result_iterator = (x for x in self.serial_run(264                func, data_iterator, kwargs=kwargs, out_file=out_file,265                out_str=out_str, out_format=out_format,266                logging_str=logging_str, logging_format=logging_format))267        return result_iterator268    def serial_run(self, func, data_iterator, kwargs={}, out_file=None,269                   out_str="%s\n", out_format=str, logging_str=None,270                   logging_format=str):271        """Run in serial on a single processor."""272        if out_file is not None:273            fh = open(out_file, "w")274        for data in data_iterator:275            try:276                result = func(*data, **kwargs)277                if out_file is None:278                    yield (result, data)279                else:280                    fh.write(out_str % out_format(result))281                    yield (True, data)282                if result != self.fail_value and logging_str is not None:283                    logging.info(logging_str % logging_format(data))284            except:285                logging.error("Error running: %s" % str(data),286                              exc_info=True)287                yield(self.fail_value, data)288    def concurrent_run(self, func, data_iterator, kwargs={}, out_file=None,289                       out_str="%s\n", out_format=str, logging_str=None,290                       logging_format=str):291        """Run in parallel with concurrent.futures."""292        if self.is_threads():293            executor = concurrent.futures.ThreadPoolExecutor(294                max_workers=self.num_proc)295        else:296            executor = concurrent.futures.ProcessPoolExecutor(297                max_workers=self.num_proc)298        jobs = []299        jobs_dict = {}300        for data in data_iterator:301            job = executor.submit(func, *data, **kwargs)302            jobs.append(job)303            jobs_dict[job] = data304        jobs_iterator = concurrent.futures.as_completed(jobs)305        if out_file is not None:306            fh = open(out_file, "w")307        for job in jobs_iterator:308            data = jobs_dict[job]309            try:310                result = job.result()311                if out_file is None:312                    yield (result, data)313                else:314                    fh.write(out_str % out_format(result))315                    yield (True, data)316                if result != self.fail_value and logging_str is not None:317                    logging.info(logging_str % logging_format(data))318            except KeyboardInterrupt:319                logging.error("Error running: %s" % str(data),320                              exc_info=True)321                executor.shutdown()322                yield(self.fail_value, data)323            except:324                logging.error("Error running: %s" % str(data),325                              exc_info=True)326                yield(self.fail_value, data)327        if out_file is not None:328            fh.close()329        executor.shutdown()330    def mpi_run(self, func, data_iterator, kwargs={}, out_file=None,331                out_str="%s\n", out_format=str, logging_str=None,332                logging_format=str):333        """Run in parallel with MPI.334        Reference:335        ----------336        - https://github.com/jbornschein/mpi4py-examples/blob/master/09-task-pull.py337        """338        comm = MPI.COMM_WORLD339        status = MPI.Status()   # get MPI status object340        tags = enum('READY', 'DONE', 'EXIT', 'START')341        msg = "Proc:%d|" % self.rank342        comm.Barrier()343        mode = MPI.MODE_WRONLY | MPI.MODE_CREATE344        if out_file is not None:345            fh = MPI.File.Open(comm, out_file, mode)346        if self.is_master():347            task_index = 0348            num_workers = comm.Get_size() - 1349            closed_workers = 0350            logging.debug("%sMaster starting with %d workers" % (msg,351                                                                 num_workers))352            try:353                i = 0354                while closed_workers < num_workers:355                    received = comm.recv(source=MPI.ANY_SOURCE,356                                         tag=MPI.ANY_TAG,357                                         status=status)358                    source = status.Get_source()359                    tag = status.Get_tag()360                    if tag == tags.READY:361                        try:362                            data = next(data_iterator)363                        except StopIteration:364                            logging.debug(365                                "%sEnd of data iterator. Closing proc %d" % (366                                    msg, source))367                            comm.send(368                                None, dest=source, tag=tags.EXIT)369                        except:370                            logging.debug("%sCould not get data" % msg)371                        logging.debug(372                            "%sSending task %d to proc %d" % (msg,373                                                              task_index,374                                                              source))375                        comm.send(data, dest=source, tag=tags.START)376                        task_index += 1377                    elif tag == tags.DONE:378                        if received is not None:379                            result, data = received380                            logging.debug(381                                "%sReceived result %d from proc %d" % (382                                    msg, task_index, source))383                            if (result != self.fail_value and384                                    logging_str is not None):385                                logging.info(386                                    logging_str % logging_format(data))387                            if out_file is None or result == self.fail_value:388                                yield (result, data)389                            else:390                                yield (True, data)391                            i += 1392                    elif tag == tags.EXIT:393                        logging.debug("%sExiting proc %d" % (msg, source))394                        closed_workers += 1395            except (KeyboardInterrupt, SystemExit):396                logging.exception("%sError while processing" % msg,397                                  exc_info=True)398                sys.exit()399        else:400            # Worker processes execute code below401            while True:402                comm.send(None, dest=0, tag=tags.READY)403                data = comm.recv(404                    source=0, tag=MPI.ANY_TAG, status=status)405                tag = status.Get_tag()406                if tag == tags.START:407                    try:408                        result = func(*data, **kwargs)409                        if out_file is None:410                            comm.send(411                                (result, data), dest=0, tag=tags.DONE)412                        else:413                            fh.Write_shared(414                                (out_str % out_format(result)).encode("utf-8"))415                            comm.send(416                                (True, data), dest=0, tag=tags.DONE)417                    except:418                        logging.error(419                            "%sError running: %s" % (msg, str(data)),420                            exc_info=True)421                        comm.send(422                            (self.fail_value, data), dest=0, tag=tags.DONE)423                elif tag == tags.EXIT:424                    break425            comm.send(None, dest=0, tag=tags.EXIT)426        if out_file is not None:427            fh.Sync()428            fh.Close()429        comm.Barrier()430if __name__ == "__main__":431    def test_func(num, *args):432        return num * 100433    logging.basicConfig(level=logging.INFO, format="%(message)s")434    data_list = range(100)435    data_iterator = make_data_iterator(data_list, "test")436    parallelizer = Parallelizer(parallel_mode=FUTURES_PROCESSES_MODE)437    run_kwargs = {"out_file": "test_out.txt", "out_str": "%d\n",438                  "out_format": lambda x: x,439                  "logging_str": "Logged %s %d",440                  "logging_format": lambda x: (x[1], x[0])}441    for result in parallelizer.run(test_func, data_iterator, **run_kwargs):...universal.py
Source:universal.py  
...3# import itertools4import util.logging5from .constants import MODES6CURRENT_MODE = MODES['scoop']7def max_parallel_mode():8    if util.parallel.is_running.scoop_module():9        max_parallel_mode = MODES['scoop']10    else:11        max_parallel_mode = MODES['multiprocessing']12    util.logging.debug('Maximal parallel mode is {}.'.format(max_parallel_mode))13    return max_parallel_mode14# map functions15# def map_serial_with_args(function, values, *args):16#     util.logging.debug('Mapping function with {} args of types {} to values in serial.'.format(len(args), tuple(map(type, args))))17#18#     values = args_generator_with_indices(values, args)19#     results = itertools.starmap(function, values)20#     results = tuple(results)21#...sce.py
Source:sce.py  
1import subprocess2import threading3import random4import time5import numpy as np6import multiprocessing as mp7from util_settings import *8class Complex:9	def __init__(self, compute_handles, controller):10		self.compute_handles = compute_handles11		self.controller = controller12	def update_compute_handles(self, compute_handles):13		self.compute_handles = compute_handles14		return(0)15	def evolve(self):16		for _ in range(self.controller.n_evolutions):17			all_args = np.array([ch.args for ch in self.compute_handles])18			h = np.array([[np.min(all_args[:,i]), np.max(all_args[:,i])] for i in range(self.controller.sample_space.shape[0])])19			evolution_sample =  []20			while len(evolution_sample) < self.controller.n_evolution_sample:21				i = self.controller.generate_random()22				ch = self.compute_handles[i]23				if ch not in evolution_sample: evolution_sample.append([ch, i])24			for _ in range(self.controller.n_gen_offspring):25				evolution_sample.sort(key=lambda x : x[0].value)26				centroid = compute_centroid([ch[0] for ch in evolution_sample[:-1]])27				r = 2 * centroid - evolution_sample[-1][0].args28				if np.any(self.controller.sample_space[:,0] - r) > 0 or np.any(self.controller.sample_space[:,1] - r < 0):29					r = np.array([random.uniform(r[0], r[1]) for r in h])30				ch = ComputeHandle(r, self.controller)31				ch.compute()32				if ch.value < evolution_sample[-1][0].value: evolution_sample[-1][0] = ch33				else:34					r = (centroid + evolution_sample[-1][0].args) / 235					ch = ComputeHandle(r, self.controller)36					ch.compute()37					if ch.value < evolution_sample[-1][0].value: evolution_sample[-1][0] = ch38					else:39						r = np.array([random.uniform(r[0], r[1]) for r in h])40						ch = ComputeHandle(r, self.controller)41						ch.compute()42						evolution_sample[-1][0] = ch43			for (ch, i) in evolution_sample:44				self.compute_handles[i] = ch45		return(0)46class ComputeHandle:47	def __init__(self, args, controller):48		self.args = np.array(args)49		self.controller = controller50		self.value = False51	def compute(self):52		if self.controller.objf_mode == 'generic':53			str_args = [str(arg) for arg in self.args]54			if not self.value: self.value = float(subprocess.check_output([self.controller.function] + str_args))55		elif self.controller.objf_mode == 'python':56			if not self.value: self.value = self.controller.function(self.args)57		return(self.value)58class SCEController:59	def __init__(60		self,61		objf_mode,62		function,63		sample_space,64		parallel_mode=PARALLEL_MODE,65		n_complex=N_COMPLEX,66		n_points=N_POINTS,67		n_evolution_sample = N_EVOLUTION_SAMPLE,68		n_gen_offspring = N_GEN_OFFSPRING,69		n_evolutions = N_EVOLUTIONS,70		max_iters=MAX_ITERS,71		log_file = LOG_FILE72	):73		74		# loading in input variables75		self.parallel_mode = parallel_mode76		self.objf_mode = objf_mode77		self.function = function78		self.sample_space = sample_space79		self.n_complex = n_complex80		self.n_points = n_points81		self.n_evolution_sample = n_evolution_sample82		self.n_gen_offspring = n_gen_offspring83		self.n_evolutions = n_evolutions84		self.max_iters = max_iters85		# configuring other variables86		self.best_value = float('inf')87		self.best_args = None88		self.prob = [2.0 * (self.n_points - i ) / (self.n_points * (self.n_points + 1)) for i in range(self.n_points)]89		for i in range(1, self.n_points): self.prob[i] += self.prob[i - 1]90		self.iters = 091		self.log_file = open(log_file, 'w')92		self.log = []93		self.add_log('SCE controller initialized')94		# starting SCE algorithm95		self.main_loop()96	def main_loop(self):97		self.init_complexes()98		while self.iters < self.max_iters:99			self.evolve_complexes()100			self.shuffle_complexes()101			for c in self.complexes:102				if c.compute_handles[0].value < self.best_value:103					self.best_value = c.compute_handles[0].value104					self.best_args = c.compute_handles[0].args105		return(0)106			107	def add_log(self, log_message):108		formatted_message = get_time_string() + 'CONTROLLER: ' + log_message109		self.log_file.write(formatted_message + '\n')110		self.log.append(formatted_message)111		return(0)112	def generate_random(self):113		r = random.random()114		for i in range(self.n_points):115			if r < self.prob[i]: return(i)116	def init_complexes(self):117		self.add_log('first initialization of complexes (iteration {0})...'.format(self.iters))118		compute_handles = [np.array([random.uniform(r[0], r[1]) for r in self.sample_space]) for _ in range(self.n_complex * self.n_points)]119		compute_handles = [ComputeHandle(args, self) for args in compute_handles]120		self.eval_compute_handles(compute_handles)121		compute_handles.sort(key=lambda x : x.value)122		self.complexes = []123		for i in range(self.n_complex):124				c = Complex([compute_handles[i + j * self.n_complex] for j in range(self.n_points)], self)125				self.complexes.append(c)126		self.add_log('complexes initialized (iteration {0})'.format(self.iters))127		self.iters += 1128		return(0)129	def shuffle_complexes(self):130		compute_handles = [ch for c in self.complexes for ch in c.compute_handles]131		compute_handles.sort(key=lambda x : x.value)132		133		for i in range(self.n_complex):134			self.complexes[i].update_compute_handles([compute_handles[i + j * self.n_complex] for j in range(self.n_points)])135		self.add_log('complexes shuffled (iteration {0})'.format(self.iters))136		self.iters += 1137		return(0)138	139	def evolve_complexes(self):140		if self.parallel_mode == 'serial':141			for c in self.complexes: c.evolve()142		elif self.parallel_mode == 'threaded':143			threads = []144			for c in self.complexes: threads.append(threading.Thread(target=c.evolve))145			for t in threads:146				t.start()147				t.join()148		149		else:150			for c in self.complexes: c.evolve()151		self.add_log('evolving complexes (iteration {0})...'.format(self.iters))152		self.add_log('complexes evolved (iteration {0})'.format(self.iters))153		return(0)154	def eval_compute_handles(self, compute_handles):155		if self.parallel_mode == 'serial':156			for ch in compute_handles: ch.compute()157		158		elif self.parallel_mode == 'threaded':159			threads = []160			for ch in compute_handles: threads.append(threading.Thread(target=ch.compute))161			for t in threads:162				t.start()163				t.join()164		else:165			pool = mp.Pool(self.parallel_mode)166			val = pool.map(parallel_compute_helper, compute_handles)167			for chi in range(len(compute_handles)):168				compute_handles[chi].value = val[chi]...test_build_parallel.py
Source:test_build_parallel.py  
...41        assert [ ] == mock_atpbar.disable.call_args_list42    ## workingarea43    assert parallel.workingarea is None44##__________________________________________________________________||45def test_build_logging_unknown_parallel_mode(caplog):46    with caplog.at_level(logging.WARNING):47        parallel = build_parallel(parallel_mode='unknown_mode')48    assert len(caplog.records) == 149    assert caplog.records[0].levelname == 'WARNING'50    assert 'parallel.build' in caplog.records[0].name51    assert 'unknown parallel_mode' in caplog.records[0].msg52    assert 'MultiprocessingDropbox' ==  parallel.communicationChannel.dropbox.__class__.__name__53##__________________________________________________________________||54@pytest.mark.parametrize('dispatcher_options', [dict()])55@pytest.mark.parametrize('user_modules', [[], ['scribblers']])56def test_build_parallel_subprocess(user_modules, dispatcher_options):57    parallel_mode = 'subprocess'58    parallel = build_parallel(59        parallel_mode=parallel_mode,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
