Best Python code snippet using Airtest
server_app.py
Source:server_app.py  
...67        self.old_aggregate = old_aggregate68        self.old_data = old_data69        self.memory = memory70    def initialize(self):71        self.ready_func(False, agg=self.old_aggregate, data=self.old_data, memory=self.memory)72    def aggregate_univariate(self, client_data):73        if self.project.internal_state == "init":74            logger.info(f'[{self.project.id}] Univariate aggregation')75            results, aggregation, n_samples = univariate_analysis(client_data, int(self.project.privacy_level))76            prepare_univarate_results(results, aggregation, self.project)77            sleep(1)78            self.project.internal_state = "finished"79            self.ready_func(True, {}, client_data, self.memory)80            self.project.sample_number = n_samples81            self.project.state = "finished"82            cleanup(self.project)83            self.project.run_end = timezone.now()84    def smpc_aggregate_univariate(self, client_data):85        if self.project.internal_state == 'init':86            send_to_local = client_data87            self.project.internal_state = "local_calc"88            self.ready_func(False, send_to_local, client_data, self.memory)89        elif self.project.internal_state == 'local_calc':90            data_to_broadcast = distribute_smpc(client_data)91            send_to_local = data_to_broadcast92            self.project.internal_state = "smpc_agg"93            self.ready_func(False, send_to_local, client_data, self.memory)94        elif self.project.internal_state == 'smpc_agg':95            data = list(client_data.values())96            global_agg = aggregate_smpc(data, exp=0)97            dfs = {}98            global_agg = global_agg["local_results"]99            for key in global_agg:100                df = pd.DataFrame.from_dict(global_agg[key])101                df.index = df.index102                dfs[key] = df103            results, aggregation_dfs = univariate_analysis(dfs, privacy_level=self.project.privacy_level, smpc=True)104            n_samples = 0105            for key in aggregation_dfs:106                n_samples = n_samples + aggregation_dfs[key].iloc[0, 1]107            prepare_univarate_results(results, aggregation_dfs, self.project)108            sleep(1)109            self.project.internal_state = "finished"110            self.ready_func(True, {"state": "finished"}, None)111            self.project.sample_number = n_samples112            self.project.state = "finished"113            cleanup(self.project)114            self.project.run_end = timezone.now()115    def aggregate_cox(self, client_data):116        if self.project.internal_state == "init":117            logger.info(f'[{self.project.id}] Compute global mean')118            norm_mean = coxph.compute_global_mean(client_data)119            self.project.internal_state = "norm_std"120            send_to_local = {"norm_mean": norm_mean}121            self.memory = send_to_local.copy()122            self.ready_func(False, send_to_local, client_data, self.memory)123        elif self.project.internal_state == "norm_std":124            logger.info(f'[{self.project.id}] Compute global std')125            norm_std = coxph.compute_global_std(client_data)126            self.project.internal_state = "local_init"127            send_to_local = {"norm_std": norm_std}128            self.memory.update(send_to_local)129            self.ready_func(False, send_to_local, client_data, self.memory)130        elif self.project.internal_state == "local_init":131            logger.info(f'[{self.project.id}] Initialization')132            D, zr, count_d, n_samples = coxph.global_initialization(client_data)133            covariates = zr.axes[0]134            beta = np.zeros((len(covariates),))135            step_sizer = StepSizer(None)136            step_size = step_sizer.next()137            delta = np.zeros_like(beta)138            iteration = 0139            converging = True140            self.memory.update({"beta": beta, "iteration": iteration, "D": D, "zr": zr,141                                "count_d": count_d, "n_samples": n_samples, "step_sizer": step_sizer,142                                "step_size": step_size, "delta": delta, "converging": converging})143            send_to_local = {"beta": beta, "iteration": iteration}144            self.project.internal_state = "iteration_update"145            self.ready_func(False, send_to_local, client_data, self.memory)146        elif self.project.internal_state == "iteration_update":147            iteration = self.memory["iteration"]148            logger.info(f'[{self.project.id}] Iteration update {str(iteration)}')149            beta = self.memory["beta"]150            zr = self.memory["zr"]151            step_sizer = self.memory["step_sizer"]152            step_size = self.memory["step_size"]153            iteration = self.memory["iteration"]154            n_samples = self.memory["n_samples"]155            count_d = self.memory["count_d"]156            D = self.memory["D"]157            converging = self.memory["converging"]158            new_beta, converging, hessian, step_size, iteration, delta = coxph.iteration_update(159                client_data, beta, zr, converging, step_sizer, step_size, iteration,160                n_samples, count_d, D, penalization=self.project.penalizer,161                l1_ratio=self.project.l1_ratio, max_steps=self.project.max_iters)162            if converging:163                send_to_local = {"beta": new_beta, "iteration": iteration}164                self.memory.update(send_to_local)165                self.project.internal_state = "iteration_update"166                self.ready_func(False, send_to_local, client_data, self.memory)167            else:168                logger.info(f'[{self.project.id}] Stopping criterion fulfilled')169                norm_std = self.memory["norm_std"]170                summary_df, params, standard_errors = coxph.create_summary(norm_std, new_beta, zr, hessian)171                send_to_local = {"summary_df": summary_df, "params": params, "standard_errors": standard_errors,172                                 "beta": new_beta}173                self.memory.update(send_to_local)174                self.project.internal_state = "c-index"175                self.ready_func(False, send_to_local, client_data, self.memory)176        elif self.project.internal_state == "c-index":177            logger.info(f'[{self.project.id}] Finishing...')178            global_c, sample_number = coxph.calculate_concordance_index(client_data)179            prepare_cox_results(self.memory["summary_df"], self.memory["params"],180                                self.memory["standard_errors"], global_c, sample_number, self.project)181            sleep(1)182            self.project.internal_state = "finished"183            self.ready_func(True, {"state": "finished"}, None, self.memory)184            self.project.state = "finished"185            logger.info(f'[{self.project.id}] Computation finished')186            cleanup(self.project)187            self.project.run_end = timezone.now()188    def smpc_aggregate_cox(self, client_data):189        exp = 10190        if self.project.internal_state == 'init':191            send_to_local = client_data192            self.project.internal_state = "norm_mean"193            self.ready_func(False, send_to_local, client_data, self.memory)194        elif self.project.internal_state == "norm_mean":195            data_to_broadcast = distribute_smpc(client_data)196            send_to_local = data_to_broadcast197            self.project.internal_state = "smpc_agg_norm_mean"198            self.ready_func(False, send_to_local, client_data, self.memory)199        elif self.project.internal_state == "smpc_agg_norm_mean":200            data = list(client_data.values())201            global_agg = aggregate_smpc(data, exp)202            n_samples = global_agg["n_samples"] * 10**exp203            self.memory = {"n_samples": n_samples}204            global_mean = pd.Series(global_agg["mean"]) / n_samples205            send_to_local = global_mean206            self.project.internal_state = "norm_std"207            self.ready_func(False, send_to_local, client_data, self.memory)208        elif self.project.internal_state == "norm_std":209            data_to_broadcast = distribute_smpc(client_data)210            send_to_local = data_to_broadcast211            self.project.internal_state = "smpc_agg_norm_std"212            self.ready_func(False, send_to_local, client_data, self.memory)213        elif self.project.internal_state == "smpc_agg_norm_std":214            data = list(client_data.values())215            global_agg = aggregate_smpc(data, exp)216            n_samples = self.memory["n_samples"]217            norm_std = np.sqrt(pd.Series(global_agg["std"]) / (n_samples - 1))218            self.memory.update({"norm_std": norm_std})219            send_to_local = norm_std220            self.project.internal_state = "local_init"221            self.ready_func(False, send_to_local, client_data, self.memory)222        elif self.project.internal_state == "local_init":223            data_to_broadcast = distribute_smpc(client_data)224            send_to_local = data_to_broadcast225            self.project.internal_state = "smpc_agg_local_init"226            self.ready_func(False, send_to_local, client_data, self.memory)227        elif self.project.internal_state == "smpc_agg_local_init":228            data = list(client_data.values())229            global_agg = aggregate_smpc(data, exp)230            global_zlr = pd.Series(global_agg["zlr"])231            covariates = pd.Series(global_zlr).index.tolist()232            beta = np.zeros((len(covariates),))233            step_sizer = StepSizer(None)234            step_size = step_sizer.next()235            delta = np.zeros_like(beta)236            beta += step_size * delta237            global_count_d = pd.Series(global_agg["numb_d_set"])238            global_count_d.index = global_count_d.index.astype(float)239            global_count_d.index = global_count_d.index.astype(str)240            iteration = 0241            converging = True242            global_distinct_times = np.arange(self.project.from_time, self.project.to_time,243                                              self.project.step_size).tolist()244            global_distinct_times.reverse()245            self.memory.update(246                {"global_zlr": global_zlr, "covariates": covariates, "beta": beta, "step_sizer": step_sizer,247                 "step_size": step_size, "global_count_d": global_count_d, "iteration": iteration,248                 "converging": converging, "global_distinct_times": global_distinct_times})249            send_to_local = [beta, iteration]250            self.project.internal_state = "iteration_update"251            self.ready_func(False, send_to_local, data, self.memory)252        elif self.project.internal_state == "iteration_update":253            data_to_broadcast = distribute_smpc(client_data)254            send_to_local = data_to_broadcast255            self.project.internal_state = "smpc_agg_update"256            self.ready_func(False, send_to_local, client_data, self.memory)257        elif self.project.internal_state == "smpc_agg_update":258            data = list(client_data.values())259            global_agg = aggregate_smpc(data, exp)260            iteration = self.memory["iteration"]261            logger.info(f'[{self.project.id}] Iteration update {str(iteration)}')262            beta = self.memory["beta"]263            zr = self.memory["global_zlr"]264            step_sizer = self.memory["step_sizer"]265            step_size = self.memory["step_size"]266            iteration = self.memory["iteration"]267            n_samples = self.memory["n_samples"]268            count_d = self.memory["global_count_d"]269            D = self.memory["global_distinct_times"]270            converging = self.memory["converging"]271            new_beta, converging, hessian, step_size, iteration, delta = coxph.iteration_update(272                global_agg, beta, zr, converging, step_sizer, step_size, iteration,273                n_samples, count_d, D, penalization=self.project.penalizer,274                l1_ratio=self.project.l1_ratio, max_steps=self.project.max_iters, smpc=self.project.smpc)275            if converging:276                send_to_local = [new_beta, iteration]277                self.memory.update({"beta": new_beta, "iteration": iteration})278                self.project.internal_state = "iteration_update"279                self.ready_func(False, send_to_local, client_data, self.memory)280            else:281                logger.info(f'[{self.project.id}] Stopping criterion fulfilled')282                norm_std = self.memory["norm_std"]283                summary_df, params, standard_errors = coxph.create_summary(norm_std, new_beta, zr, hessian)284                send_to_local = params285                self.memory.update({"summary_df": summary_df, "params": params, "standard_errors": standard_errors})286                self.project.internal_state = "c_index"287                self.ready_func(False, send_to_local, client_data, self.memory)288        elif self.project.internal_state == "c_index":289            data_to_broadcast = distribute_smpc(client_data)290            send_to_local = data_to_broadcast291            self.project.internal_state = "smpc_agg_c_index"292            self.ready_func(False, send_to_local, client_data, self.memory)293        elif self.project.internal_state == "smpc_agg_c_index":294            data = list(client_data.values())295            global_agg = aggregate_smpc(data, 0)296            n_samples = self.memory["n_samples"]297            global_c = (global_agg["c-index"] / 10) / n_samples298            summary_df = self.memory["summary_df"]299            params = self.memory["params"]300            standard_errors = self.memory["standard_errors"]301            prepare_cox_results(summary_df, params, standard_errors, global_c, n_samples, self.project)302            self.project.internal_state = "finished"303            self.ready_func(True, {"finished": True}, client_data, self.memory)304            logger.info(f'[{self.project.id}] Computation finished')305            self.project.run_end = timezone.now()306            delete_blob(f'p{self.project.id}_memory')307            sleep(2)308            cleanup(self.project)309            self.project.state = "finished"310    def aggregate(self, client_data):311        try:312            self.check_error(client_data)313            logger.info(f'[{self.project.id}] Aggregate')314            logger.info(f'[{self.project.id}] {self.project.internal_state}')315            if self.project.method == "univariate":316                if not self.project.smpc:317                    self.aggregate_univariate(client_data)318                else:319                    self.smpc_aggregate_univariate(client_data)320            elif self.project.method == "cox":321                if not self.project.smpc:322                    self.aggregate_cox(client_data)323                else:324                    self.smpc_aggregate_cox(client_data)325            self.project.save()326        except Exception as e:327            self.project.state = "error"328            self.project.error_message = f'Error: {e}'329            self.project.run_end = timezone.now()330            self.project.save()331            logger.warning(f'[{self.project.id}] Computation finished with an error: {e}')332            send_to_local = {"error": self.project.error_message}333            self.ready_func(True, send_to_local, client_data, self.memory)334            delete_blob(f'p{self.project.id}_memory')335            cleanup(self.project)336            raise337    def check_error(self, client_data):338        for client_id in client_data.keys():339            client = client_data[client_id]340            try:341                if "error" in client.keys():342                    self.project.state = "error"343                    self.project.error_message = client["error"]344                    self.project.run_end = timezone.now()345                    self.project.save()346                    logger.warning(f'[{self.project.id}] Computation finished with a client side error.')347                    self.ready_func(True, {"error": self.project.error_message}, client_data, self.memory)348                    delete_blob(f'p{self.project.id}_memory')349                    cleanup(self.project)350                    return351            except AttributeError:...main.py
Source:main.py  
...153                for item in iter(self.readq.get, None):154                    if self.state.machine.interrupted.is_set():155                        logger.info(f"{self.name} interrupted")156                        break157                    result = self.state.ready_func(item)158                    self.coordinator.add_task(Machine.Task(item, self.state.id, result))159                else:...server_funcs.py
Source:server_funcs.py  
...5from fed_algo.storage import delete_blob, load_blob, dump_blob, cleanup6from fed_algo.transformations import deserialize_bytes, serialize_bytes7logger = logging.getLogger(__name__)8def initialize(project):9    def ready_func(ready, agg=None, data=None, memory=None):10        dump_blob(f'p{project.id}_aggregate', serialize_bytes(agg))11        dump_blob(f'p{project.id}_data', serialize_bytes(data))12        dump_blob(f'p{project.id}_memory', serialize_bytes(memory))13        project.state = 'waiting' if not ready else 'finished'14        project.step += 115        project.save()16    logger.info(f'[{project.id}]Initialize server for aggregation')17    ServerApp(project=project, ready_func=ready_func, old_aggregate=None, old_data=None, memory=None).initialize()18def aggregate(proj: Project, lock: FileLock):19    with lock:20        logger.info(f'[{proj.id}] Aggregate local client data')21        client_data = {}22        for mem in proj.members.all():23            logger.debug(f'[{proj.id}] Add local client data to aggregation: {str(mem.id)}')24            client_bytes = load_blob(f'c{mem.id}_data')25            deserialized = deserialize_bytes(client_bytes)26            client_data[mem.id] = deserialized27            delete_blob(f'c{mem.id}_data')28        def ready_func(ready, agg=None, data=None, memory=None):29            dump_blob(f'p{proj.id}_aggregate', serialize_bytes(agg))30            if "smpc" in proj.internal_state:31                for mem in proj.members.all():32                    dump_blob(f'p{proj.id}_{mem.id}', serialize_bytes(agg[mem.id]))33            dump_blob(f'p{proj.id}_data', serialize_bytes(data))34            dump_blob(f'p{proj.id}_memory', serialize_bytes(memory))35            if not ready:36                proj.state = 'waiting'37            else:38                cleanup(proj)39            proj.step += 140            proj.save()41        proj.state = 'running'42        proj.save()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
