Best Python code snippet using slash
server.py
Source:server.py  
...72                            test.__slash__.variation.dump_variation_dict()]73                           for test in self.tests]74        self._sorted_collection = sorted(self.collection)75        self._tests_distrubuter = TestsDistributer(len(self._sorted_collection))76    def has_connected_clients(self):77        return len(self.connected_clients) > 078    def get_connected_clients(self):79        return self.connected_clients.copy()80    def has_more_tests(self):81        return len(self.finished_tests) < len(self.tests)82    def report_client_failure(self, client_id):83        self.connected_clients.remove(client_id)84        test_index = self.executing_tests.get(client_id, None)85        if test_index is not None:86            _logger.error("Worker {} interrupted while executing test {}", client_id,87                          self.tests[test_index].__slash__.address, extra={'capture': False})88            with _get_test_context(self.tests[test_index], logging=False) as (result, _):89                result.mark_interrupted()90                self.finished_tests.append(test_index)91        self.state = ServerStates.STOP_TESTS_SERVING92        self._mark_unrun_tests()93        self.worker_error_reported = True94    def get_unstarted_tests(self):95        return self._tests_distrubuter.get_unstarted_tests()96    def _mark_unrun_tests(self):97        unstarted_tests_indexes = self._tests_distrubuter.get_unstarted_tests()98        for test_index in unstarted_tests_indexes:99            with _get_test_context(self.tests[test_index], logging=False):100                pass101            self.finished_tests.append(test_index)102        self._tests_distrubuter.clear_unstarted_tests()103    def _get_worker_session_id(self, client_id):104        return "worker_{}".format(client_id)105    def connect(self, client_id, client_pid):106        _logger.notice("Client_id {} connected", client_id)107        self.connected_clients.add(client_id)108        client_session_id = '{}_{}'.format(context.session.id.split('_')[0], client_id)109        context.session.logging.create_worker_symlink(self._get_worker_session_id(client_id), client_session_id)110        hooks.worker_connected(session_id=client_session_id)  # pylint: disable=no-member111        self.worker_session_ids.append(client_session_id)112        self.worker_to_pid[client_id] = client_pid113        self.executing_tests[client_id] = None114        if len(self.connected_clients) >= config.root.parallel.num_workers:115            _logger.notice("All workers connected to server")116            self.state = ServerStates.WAIT_FOR_COLLECTION_VALIDATION117    def validate_collection(self, client_id, sorted_client_collection):118        if not self._sorted_collection == sorted_client_collection:119            _logger.error("Client_id {} sent wrong collection", client_id, extra={'capture': False})120            return False121        self.num_collections_validated += 1122        _logger.debug("Worker {} validated tests successfully", client_id)123        if self.num_collections_validated >= config.root.parallel.num_workers and self.state == ServerStates.WAIT_FOR_COLLECTION_VALIDATION:124            _logger.notice("All workers collected tests successfully, start serving tests")125            self.state = ServerStates.SERVE_TESTS126        return True127    def disconnect(self, client_id, has_failure=False):128        _logger.notice("Client {} sent disconnect", client_id)129        self.connected_clients.remove(client_id)130        if has_failure:131            self.state = ServerStates.STOP_TESTS_SERVING132    def get_test(self, client_id):133        if not self.executing_tests[client_id] is None:134            _logger.error("Client_id {} requested new test without sending former result", client_id,135                          extra={'capture': False})136            return PROTOCOL_ERROR137        if self.state == ServerStates.STOP_TESTS_SERVING:138            return NO_MORE_TESTS139        elif self.state in [ServerStates.WAIT_FOR_CLIENTS, ServerStates.WAIT_FOR_COLLECTION_VALIDATION]:140            return WAITING_FOR_CLIENTS141        elif self.state == ServerStates.SERVE_TESTS and self._tests_distrubuter.has_unstarted_tests():142            test_index = self._tests_distrubuter.get_next_test_for_client(client_id)143            if test_index is None: #we have omre tests but current worker cannot execute them144                return NO_MORE_TESTS145            test = self.tests[test_index]146            self.executing_tests[client_id] = test_index147            hooks.test_distributed(test_logical_id=test.__slash__.id, worker_session_id=self._get_worker_session_id(client_id)) # pylint: disable=no-member148            _logger.notice("#{}: {}, Client_id: {}", test_index + 1, test.__slash__.address, client_id,149                           extra={'highlight': True, 'filter_bypass': True})150            return (self.collection[test_index], test_index)151        else:152            _logger.debug("No unstarted tests, sending end to client_id {}", client_id)153            self.state = ServerStates.STOP_TESTS_SERVING154            return NO_MORE_TESTS155    def finished_test(self, client_id, result_dict):156        _logger.debug("Client_id {} finished_test", client_id)157        test_index = self.executing_tests.get(client_id, None)158        if test_index is not None:159            self.finished_tests.append(test_index)160            self.executing_tests[client_id] = None161            with _get_test_context(self.tests[test_index], logging=False) as (result, _):162                result.deserialize(result_dict)163                context.session.reporter.report_test_end(self.tests[test_index], result)164                if result.has_fatal_exception() or (not result.is_success(allow_skips=True) and config.root.run.stop_on_error):165                    _logger.debug("Server stops serving tests, run.stop_on_error: {}, result.has_fatal_exception: {}",166                                  config.root.run.stop_on_error, result.has_fatal_exception())167                    self.state = ServerStates.STOP_TESTS_SERVING168                    self._mark_unrun_tests()169        else:170            _logger.error("finished_test request from client_id {} with index {}, but no test is mapped to this worker",171                          client_id, test_index, extra={'capture': False})172            return PROTOCOL_ERROR173    def stop_serve(self):174        self.state = ServerStates.STOP_SERVE175    def session_interrupted(self):176        context.session.results.global_result.mark_interrupted()177        self.interrupted = True178        if self.state != ServerStates.STOP_SERVE:179            self.state = ServerStates.STOP_TESTS_SERVING180    def report_warning(self, client_id, pickled_warning):181        _logger.notice("Client_id {} sent warning", client_id)182        try:183            warning = unpickle(pickled_warning)184            context.session.warnings.add(warning)185        except TypeError:186            _logger.error('Error when deserializing warning, not adding it', extra={'capture': False})187    def report_session_error(self, message):188        self.worker_error_reported = True189        _logger.error(message, extra={'capture': False})190    def should_wait_for_request(self):191        return self.has_connected_clients() or self.has_more_tests()192    def serve(self):193        server = xmlrpc_server.SimpleXMLRPCServer((config.root.parallel.server_addr, config.root.parallel.server_port),\194                                                  allow_none=True, logRequests=False)195        try:196            self.port = server.server_address[1]197            self.state = ServerStates.WAIT_FOR_CLIENTS198            server.register_instance(self)199            _logger.debug("Starting server loop")200            while self.state != ServerStates.STOP_SERVE:201                server.handle_request()202            if not self.interrupted:203                context.session.mark_complete()204            _logger.trace('Session finished. is_success={0} has_skips={1}',205                          context.session.results.is_success(allow_skips=True), bool(context.session.results.get_num_skipped()))...parallel_manager.py
Source:parallel_manager.py  
...88    def check_no_requests_timeout(self):89        if time.time() - self.keepalive_server.last_request_time > config.root.parallel.no_request_timeout:90            _logger.error("No request sent to server for {} seconds, terminating",91                          config.root.parallel.no_request_timeout, extra={'capture': False})92            if self.server.has_connected_clients():93                _logger.error("Clients that are still connected to server: {}",94                              self.server.connected_clients, extra={'capture': False})95            if self.server.has_more_tests():96                _logger.error("Number of unstarted tests: {}", len(self.server.get_unstarted_tests()),97                              extra={'capture': False})98            if self.server.executing_tests:99                _logger.error("Currently executed tests indexes: {}", self.server.executing_tests.values(),100                              extra={'capture': False})101            self.handle_error("No request sent to server for {} seconds, terminating".format(config.root.parallel.no_request_timeout))102    def start(self):103        self.try_connect()104        try:105            for worker in list(self.workers.values()):106                worker.start()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
