Best Python code snippet using slash
result.py
Source:result.py  
...87            session_result = context.session.results.global_result88            interrupted_test = self.is_interrupted()89            interrupted_session = session_result.is_interrupted()90            if not self.is_global_result():91                self.mark_interrupted()92                if not interrupted_test and not context.session.has_children():93                    with notify_if_slow_context(message="Cleaning up test due to interrupt. Please wait..."),\94                         handling_exceptions(swallow=True):95                        hooks.test_interrupt() # pylint: disable=no-member96            if not interrupted_session:97                session_result.mark_interrupted()98        elif not isinstance(exc_value, GeneratorExit):99            #skip keyboardinterrupt and system exit100            self.add_error(exc_info=exc_info)101        else:102            _logger.trace('Ignoring GeneratorExit exception')103    def has_errors_or_failures(self):104        return bool(self._failures or self._errors)105    def get_log_path(self):106        """Returns log path107        """108        return self._log_path109    def set_log_path(self, path):110        """Set log path111        """112        self._log_path = path113    def get_log_dir(self):114        """Returns log's directory.115        """116        if self._log_path is None:117            return None118        return os.path.dirname(self._log_path)119    def add_extra_log_path(self, path):120        """Add additional log path. This path will be added to the list returns by get_log_paths121        """122        self._extra_logs.append(path)123    def get_log_paths(self):124        """Returns a list of all log paths125        """126        logs = []127        if self._log_path:128            logs.append(self._log_path)129        return logs + list(self._extra_logs)130    def is_started(self):131        return self._started132    def is_not_run(self):133        return not self.is_started() and not self.has_errors_or_failures()134    def mark_started(self):135        self._start_time = datetime.now()136        self._started = True137    def is_error(self):138        return bool(self._errors)139    @property140    def test_id(self):141        return self.test_metadata.id142    def is_failure(self):143        return bool(self._failures)144    def is_just_failure(self):145        """Indicates this is a pure failure, without errors involved"""146        return self.is_failure() and not self.is_error()147    def is_skip(self):148        return bool(self._skips)149    def is_run_and_skip(self):150        return self.is_started() and self.is_skip()151    def is_success(self, allow_skips=False):152        if self._errors or self._failures or self._interrupted:153            return False154        if self._skips:155            return allow_skips156        return self.is_started()157    def is_success_finished(self):158        return self.is_success() and self.is_finished()159    def is_finished(self):160        return self._finished161    def mark_finished(self):162        self._end_time = datetime.now()163        self._finished = True164    def mark_interrupted(self):165        self._interrupted = True166    def is_interrupted(self):167        return self._interrupted168    def add_error(self, e=None, frame_correction=0, exc_info=None, append=True):169        """Adds a failure to the result170        """171        err = self._add_error(self._errors, e, frame_correction=frame_correction + 1, exc_info=exc_info, append=append)172        context.reporter.report_test_error_added(context.test, err)173        return err174    def add_failure(self, e=None, frame_correction=0, exc_info=None, append=True):175        """Adds a failure to the result176        """177        err = self._add_error(self._failures, e, frame_correction=frame_correction + 1, exc_info=exc_info, is_failure=True, append=append)178        context.reporter.report_test_failure_added(context.test, err)...work_scheduler.py
Source:work_scheduler.py  
...80    def __init__(self, is_soft_interruptible, work_args):81        self.future = CheapFuture()82        self.is_soft_interruptible = is_soft_interruptible83        self.work_args = work_args84    def mark_interrupted(self, exception_type):85        try:86            raise exception_type()87        except SchedulerInterrupted:88            self.future.set_exception(sys.exc_info())89class WorkerThread(Thread):90    """91    Manage worker lifecycle into a dedicated thread92    """93    def __init__(self, worker, scheduler):94        super(WorkerThread, self).__init__()95        self.worker = worker96        self.scheduler = scheduler97        self.state = WorkerState.PENDING98    def _start_worker(self):99        try:100            self.worker.start(self.scheduler.context)101        except Exception:102            logger.exception("Failed to start worker")103            return False104        with self.scheduler.lock:105            if self.state == WorkerState.PENDING:106                # Worker is now ready to receive a task107                # -> Notify threads blocked in scheduler.schedule_task()108                self.state = WorkerState.READY109                self.scheduler.schedule_work_condition.notify_all()110                return True111            else:112                # Worker has been stopped while is was starting113                # This is (only) caused by the scheduler being shut down before all workers are ready114                logger.info("Worker %s killed before it started" % self.worker.worker_id)115                return False116    def _stop_worker(self, stop_only_if_pending):117        with self.scheduler.lock:118            if stop_only_if_pending and self.state != WorkerState.PENDING:119                return  # Stop only if pending120            if self.state == WorkerState.DEAD:121                return  # Already stopped122            self.state = WorkerState.DEAD123            # We may need to replace this worker by a fresh one (eg. when it has failed)124            self.scheduler._start_worker_if_needed()125            # Make sure not all workers are dead126            self.scheduler._check_if_all_workers_dead()127        try:128            self.worker.stop()129        except Exception:130            logger.exception("Error while stopping worker %s..." % self.worker.worker_id)131    def stop_worker_if_pending(self):132        with contextualized_thread_name(self.worker.worker_id):133            self._stop_worker(True)134    def _take_next_task(self):135        """136        Wait until a task can be taken from the queue137        Returns None if there is no task anymore (when scheduler is shutting down)138        """139        with self.scheduler.lock:140            while not self.scheduler.hard_interrupted and self.state == WorkerState.READY:141                if len(self.scheduler.queue) > 0:142                    self.scheduler.schedule_work_condition.notify_all()143                    self.state = WorkerState.BUSY144                    return self.scheduler.queue.pop(0)145                self.scheduler.wait_for_task_condition.wait()146            return None147    def _notify_task_done(self):148        """149        Task has been processed, move back to READY state150        """151        with self.scheduler.lock:152            if self.state == WorkerState.BUSY:153                self.state = WorkerState.READY154            self.scheduler.schedule_work_condition.notify_all()155    def run(self):156        """157        Worker thread main loop158        """159        with contextualized_thread_name(self.worker.worker_id):160            try:161                if self._start_worker():162                    while True:163                        task = self._take_next_task()164                        if task is None:165                            break166                        exception = None167                        result = None168                        try:169                            logger.info("Running task...")170                            result = self.worker.execute_work(*task.work_args)171                        except Exception as e:172                            exception = sys.exc_info()173                            if isinstance(e, WorkerFailure):174                                # Current implementation is a bit simplistic: if a worker fails, we stop everything175                                # In the future, we might want to continue with less workers or try to replace them176                                logger.exception("Unexpected worker-level failure, shutdown the scheduler")177                                self.scheduler.interrupt_hard_async()178                        finally:179                            logger.info("Task done")180                            self._notify_task_done()181                        # Resolve the future *after* the worker becomes ready again (via _notify_task_done())182                        # Ensure that a new task scheduled *immediately* after this one completes doesn't start a new183                        # worker (this guarantee is mostly required by some unit tests in order to make deterministic184                        # assertions)185                        if exception:186                            task.future.set_exception(exception)187                        else:188                            task.future.set_result(result)189            finally:190                self._stop_worker(False)191class WorkScheduler(object):192    """193    Schedule work among a collection of workers (local, remote or a mix of both)194    """195    def __init__(self, workers, context):196        # List of (not yet started) workers197        self.workers = workers198        # Context object on which execute_work() is called199        self.context = context200        # Flag used to reject only "interruptible" tasks201        # Must be True whenever 'hard_interrupted' is True202        self.soft_interrupted = False203        # Flag used to completely shut down the scheduler204        self.hard_interrupted = False205        # Internals206        self.worker_threads = []207        self.queue = []208        self.nb_blocked_tasks = 0209        self.lock = threading.Lock()210        self.wait_for_task_condition = threading.Condition(self.lock)211        self.schedule_work_condition = threading.Condition(self.lock)212        self.interrupt_callbacks = []213    def __enter__(self):214        return self215    def __exit__(self, exc_type, exc_val, exc_tb):216        self.interrupt_hard()217    def interrupt_hard_async(self):218        """219        Hard interrupt without waiting220        """221        Thread(target=self.interrupt_hard).start()222    def interrupt_hard(self):223        """224        Hard interrupt the scheduler:225        - All not-yet-running tasks (interruptible AND non-interruptible) are interrupted and new ones are rejected226        - Wait for running tasks to complete227        """228        with self.lock:229            if self.hard_interrupted:230                return231            logger.info("Scheduler has been hard interrupted (shutdown)")232            self.hard_interrupted = True233            self.soft_interrupted = True234            self.schedule_work_condition.notify_all()235            self.wait_for_task_condition.notify_all()236            for task in self.queue:237                task.mark_interrupted(SchedulerHardInterrupted)238            self.queue = []239        for callback in self.interrupt_callbacks:240            callback()241        for worker_thread in self.worker_threads:242            # Force stop workers that might be blocked in start()243            worker_thread.stop_worker_if_pending()244        for worker_thread in self.worker_threads:245            # Wait for all threads to complete246            worker_thread.join()247    def register_interrupt_callback(self, fn):248        self.interrupt_callbacks.append(fn)249    def _start_worker_if_needed(self):250        assert self.lock.locked()251        if self.hard_interrupted:252            return253        desired_nb_of_workers = self._compute_ideal_nb_of_workers()254        current_nb_of_non_dead_workers = sum(255            1 for worker_thread in self.worker_threads if worker_thread.state != WorkerState.DEAD)256        is_new_worker_needed = desired_nb_of_workers > current_nb_of_non_dead_workers257        new_worker_index = len(self.worker_threads)258        can_start_new_worker = new_worker_index < len(self.workers)259        if is_new_worker_needed and can_start_new_worker:260            worker_thread = WorkerThread(self.workers[new_worker_index], self)261            worker_thread.start()262            self.worker_threads.append(worker_thread)263            self.schedule_work_condition.notify_all()264    def _check_if_all_workers_dead(self):265        """266        If all started workers died and there is no other worker remaining, we'll never be able to schedule267        anything anymore and we must hard interrupt the scheduler268        """269        assert self.lock.locked()270        can_start_new_worker = len(self.worker_threads) < len(self.workers)271        all_worker_threads_dead = all(worker_thread.state == WorkerState.DEAD for worker_thread in self.worker_threads)272        if not can_start_new_worker and all_worker_threads_dead:273            logger.error("All workers are dead, interrupt the scheduler")274            self.interrupt_hard_async()275    def _compute_max_queue_size(self):276        """277        Max queue size is used to determine whether schedule_work() should block or not278        It is currently set as the nb. of immediately available workers (READY) + nb. of workers that279        could be/are being started (PENDING)280        """281        assert self.lock.locked()282        current_nb_of_available_workers = \283            sum(1 for worker_thread in self.worker_threads284                if worker_thread.state in (WorkerState.READY, WorkerState.PENDING))285        return current_nb_of_available_workers286    def _compute_ideal_nb_of_workers(self):287        """288        Compute the ideal nb. of workers which should be started.289        Good value is the nb. of "currently waiting tasks":290        - Waiting in queue291        - Waiting for being enqueued292        - Being executed293        Note that starting more workers than "waiting tasks" would be a waste of resources294        """295        assert self.lock.locked()296        return self.nb_blocked_tasks + len(self.queue) + sum(297            1 for worker_thread in self.worker_threads if worker_thread.state == WorkerState.BUSY)298    def schedule_work(self, is_soft_interruptible, *work_args):299        """300        Schedule call to "context.execute_work(*work_args)" on a worker and returns a future representing the result301        The task is interruptible by interrupt_soft() only if 'is_soft_interruptible' is True302        """303        with self.lock:304            self.nb_blocked_tasks += 1305            try:306                while True:307                    # Reject the task if the scheduler is interrupted308                    if self.hard_interrupted:309                        return CheapFuture.from_exception(SchedulerHardInterrupted)310                    if is_soft_interruptible and self.soft_interrupted:311                        return CheapFuture.from_exception(SchedulerSoftInterrupted)312                    # We may need to start a new worker to execute this task313                    self._start_worker_if_needed()314                    # Make sure not all workers are dead315                    self._check_if_all_workers_dead()316                    # Enqueue task if queue isn't full317                    if len(self.queue) < self._compute_max_queue_size():318                        task = Task(is_soft_interruptible, work_args)319                        self.queue.append(task)320                        self.wait_for_task_condition.notify_all()321                        return task.future322                    # Wait until a worker becomes available323                    self.schedule_work_condition.wait()324            finally:325                self.nb_blocked_tasks -= 1326    def interrupt_soft(self):327        """328        Partially interrupt the scheduler:329        - Not-yet-running interruptible tasks are interrupted and new ones are rejected330        - Non-interruptible tasks are processed as usual331        - Wait for running tasks to complete332        """333        with self.lock:334            if self.soft_interrupted or self.hard_interrupted:335                return336            logger.info("Scheduler has been soft interrupted")337            self.soft_interrupted = True338            self.schedule_work_condition.notify_all()339            new_queue = []340            for task in self.queue:341                if task.is_soft_interruptible:342                    task.mark_interrupted(SchedulerSoftInterrupted)343                else:344                    new_queue.append(task)345            self.queue = new_queue346    def get_workers_count(self):347        with self.lock:...server.py
Source:server.py  
...85        if test_index is not None:86            _logger.error("Worker {} interrupted while executing test {}", client_id,87                          self.tests[test_index].__slash__.address, extra={'capture': False})88            with _get_test_context(self.tests[test_index], logging=False) as (result, _):89                result.mark_interrupted()90                self.finished_tests.append(test_index)91        self.state = ServerStates.STOP_TESTS_SERVING92        self._mark_unrun_tests()93        self.worker_error_reported = True94    def get_unstarted_tests(self):95        return self._tests_distrubuter.get_unstarted_tests()96    def _mark_unrun_tests(self):97        unstarted_tests_indexes = self._tests_distrubuter.get_unstarted_tests()98        for test_index in unstarted_tests_indexes:99            with _get_test_context(self.tests[test_index], logging=False):100                pass101            self.finished_tests.append(test_index)102        self._tests_distrubuter.clear_unstarted_tests()103    def _get_worker_session_id(self, client_id):104        return "worker_{}".format(client_id)105    def connect(self, client_id, client_pid):106        _logger.notice("Client_id {} connected", client_id)107        self.connected_clients.add(client_id)108        client_session_id = '{}_{}'.format(context.session.id.split('_')[0], client_id)109        context.session.logging.create_worker_symlink(self._get_worker_session_id(client_id), client_session_id)110        hooks.worker_connected(session_id=client_session_id)  # pylint: disable=no-member111        self.worker_session_ids.append(client_session_id)112        self.worker_to_pid[client_id] = client_pid113        self.executing_tests[client_id] = None114        if len(self.connected_clients) >= config.root.parallel.num_workers:115            _logger.notice("All workers connected to server")116            self.state = ServerStates.WAIT_FOR_COLLECTION_VALIDATION117    def validate_collection(self, client_id, sorted_client_collection):118        if not self._sorted_collection == sorted_client_collection:119            _logger.error("Client_id {} sent wrong collection", client_id, extra={'capture': False})120            return False121        self.num_collections_validated += 1122        _logger.debug("Worker {} validated tests successfully", client_id)123        if self.num_collections_validated >= config.root.parallel.num_workers and self.state == ServerStates.WAIT_FOR_COLLECTION_VALIDATION:124            _logger.notice("All workers collected tests successfully, start serving tests")125            self.state = ServerStates.SERVE_TESTS126        return True127    def disconnect(self, client_id, has_failure=False):128        _logger.notice("Client {} sent disconnect", client_id)129        self.connected_clients.remove(client_id)130        if has_failure:131            self.state = ServerStates.STOP_TESTS_SERVING132    def get_test(self, client_id):133        if not self.executing_tests[client_id] is None:134            _logger.error("Client_id {} requested new test without sending former result", client_id,135                          extra={'capture': False})136            return PROTOCOL_ERROR137        if self.state == ServerStates.STOP_TESTS_SERVING:138            return NO_MORE_TESTS139        elif self.state in [ServerStates.WAIT_FOR_CLIENTS, ServerStates.WAIT_FOR_COLLECTION_VALIDATION]:140            return WAITING_FOR_CLIENTS141        elif self.state == ServerStates.SERVE_TESTS and self._tests_distrubuter.has_unstarted_tests():142            test_index = self._tests_distrubuter.get_next_test_for_client(client_id)143            if test_index is None: #we have omre tests but current worker cannot execute them144                return NO_MORE_TESTS145            test = self.tests[test_index]146            self.executing_tests[client_id] = test_index147            hooks.test_distributed(test_logical_id=test.__slash__.id, worker_session_id=self._get_worker_session_id(client_id)) # pylint: disable=no-member148            _logger.notice("#{}: {}, Client_id: {}", test_index + 1, test.__slash__.address, client_id,149                           extra={'highlight': True, 'filter_bypass': True})150            return (self.collection[test_index], test_index)151        else:152            _logger.debug("No unstarted tests, sending end to client_id {}", client_id)153            self.state = ServerStates.STOP_TESTS_SERVING154            return NO_MORE_TESTS155    def finished_test(self, client_id, result_dict):156        _logger.debug("Client_id {} finished_test", client_id)157        test_index = self.executing_tests.get(client_id, None)158        if test_index is not None:159            self.finished_tests.append(test_index)160            self.executing_tests[client_id] = None161            with _get_test_context(self.tests[test_index], logging=False) as (result, _):162                result.deserialize(result_dict)163                context.session.reporter.report_test_end(self.tests[test_index], result)164                if result.has_fatal_exception() or (not result.is_success(allow_skips=True) and config.root.run.stop_on_error):165                    _logger.debug("Server stops serving tests, run.stop_on_error: {}, result.has_fatal_exception: {}",166                                  config.root.run.stop_on_error, result.has_fatal_exception())167                    self.state = ServerStates.STOP_TESTS_SERVING168                    self._mark_unrun_tests()169        else:170            _logger.error("finished_test request from client_id {} with index {}, but no test is mapped to this worker",171                          client_id, test_index, extra={'capture': False})172            return PROTOCOL_ERROR173    def stop_serve(self):174        self.state = ServerStates.STOP_SERVE175    def session_interrupted(self):176        context.session.results.global_result.mark_interrupted()177        self.interrupted = True178        if self.state != ServerStates.STOP_SERVE:179            self.state = ServerStates.STOP_TESTS_SERVING180    def report_warning(self, client_id, pickled_warning):181        _logger.notice("Client_id {} sent warning", client_id)182        try:183            warning = unpickle(pickled_warning)184            context.session.warnings.add(warning)185        except TypeError:186            _logger.error('Error when deserializing warning, not adding it', extra={'capture': False})187    def report_session_error(self, message):188        self.worker_error_reported = True189        _logger.error(message, extra={'capture': False})190    def should_wait_for_request(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
