Best Python code snippet using avocado_python
phenotype_simulator.py
Source:phenotype_simulator.py  
1#2# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES.  All rights reserved.3#4# NVIDIA CORPORATION and its licensors retain all intellectual property5# and proprietary rights in and to this software, related documentation6# and any modifications thereto.  Any use, reproduction, disclosure or7# distribution of this software and related documentation without an express8# license agreement from NVIDIA CORPORATION is strictly prohibited.9#10import json11import matplotlib.pyplot as plt12import numpy as np13import pandas as pd14import pickle15import tables16class PhenotypeSimulator():17    def __init__(self, args):18        self.data_path = args.data_path19        self.data_identifier = args.data_identifier20        self.phenotype_experiment_name = args.phenotype_experiment_name21        self.interactive_cut = args.interactive_cut 22        self.mask_rate = args.mask_rate 23        self.dominance_frac = args.dominance_frac  24        self.recessive_frac = args.recessive_frac25        self.max_interaction_coeff = args.max_interaction_coeff 26        self.causal_snp_mode = args.causal_snp_mode27        self.heritability = args.heritability 28        self.case_frac = args.case_frac29        self.stratify = args.stratify30        if self.causal_snp_mode == "random":31            self.n_causal_snps = args.n_causal_snps 32        if self.causal_snp_mode == "gene":33            self.causal_gene_cut = args.causal_gene_cut 34            self.max_gene_risk = args.max_gene_risk 35        36    def get_number_patients(self):37        with tables.open_file(self.data_path + "genotype_{}.h5".format(self.data_identifier), "r") as f:38            self.num_patients = f.root.data.shape[0]39        return self.num_patients40                         41    def read_genotype_data(self):42        """43        Reads in the annotated genotype csv file.44        """45        df = pd.read_csv(self.data_path + "snplist_{}.csv".format(self.data_identifier), sep=" ", usecols=['Risk Allele','Feature ID'], dtype={'Feature ID': object})46        df['Feature ID'] = df['Feature ID'].apply(lambda x: json.loads(x) if type(x) == str else x)47        self.risk_alleles = df['Risk Allele'].to_list()48        self.feature_id = df['Feature ID'].to_list()49        self.num_snps = len(self.risk_alleles)50        print("SNPs: {} People: {}".format(len(self.risk_alleles), self.get_number_patients()))51        return self.risk_alleles, self.feature_id52    53    def read_stratification_files(self):54        """55        Reads in the files for groups and group coefficients.56        """57        groups = pd.read_csv(self.data_path + "groups_{}.csv".format(self.data_identifier), header=None)58        group_coefficients = pd.read_csv(self.data_path + "group_coefficients_{}.csv".format(self.data_identifier), header=None)59        self.groups = groups[0].to_list()60        self.group_coefficients = dict(zip(group_coefficients[0], group_coefficients[1]))61    62    def save_file(self, fname, data):63        with open(self.data_path + "{}_{}_{}.pkl".format(fname, self.data_identifier, self.phenotype_experiment_name), 'wb') as f:64            pickle.dump(data, f)65        66    def simulate_phenotype(self):67        """68        Simulates causal SNP sampling and effect size creation if not done already.69        Generates the phenotype scores and returns resulting phenotype for the input patients70        This can be run directly after generate_genotype_file71        Parameters72        ----------73        Define kwargs based on initialized CAUSAL SNP MODE74        75        interactive_cut: Fraction of causal SNPs to have an interacting pair76        mask_rate: Fraction of interactive realtions that are masking77        dosage_frac: Fraction of effect sizes that would be dosage dependant vs absolute78        max_interaction_coeff: Max value for interaction coefficient uniform distribution79        Results80        -------81        Takes resulting genotype data once the genes have been mapped and simulates causal snps, interactive relations and returns82        a list of resulting Phenotypes83        """84        if self.causal_snp_mode == "random":85            effect_size, causal_snps_idx = self.simulate_causal_snps_random()86        elif self.causal_snp_mode == "gene":87            effect_size, causal_snps_idx = self.simulate_causal_snps_gene()88        phenotype_scores, interactive_snps = self.generate_phenotypes_scores(effect_size, causal_snps_idx)89        phenotype_scores = self.heritability_injection(phenotype_scores)90        if self.stratify:91            self.read_stratification_files()92            phenotype_scores = self.patient_level_score_injection(phenotype_scores)93        if self.case_frac > 0:94            phenotype_cutoff = np.percentile(phenotype_scores, self.case_frac*100)95            phenotype = [1 if x >= phenotype_cutoff else 0 for x in phenotype_scores]96        else:97            phenotype = list(phenotype_scores)98        self.save_file("phenotype", phenotype)99        print("Success!")100        return phenotype, causal_snps_idx, effect_size, interactive_snps101    102    def heritability_injection(self, scores):103        """104        Given phenotype scores normalize and apply heritability function.105        Creates a score distrobution histogram plot as well as return the updated scores.106        """107        #Normalize scores108        scores = (scores - np.mean(scores))/np.std(scores)109        #Apply g' = h*g + sqrt(1-h^2)*N(0,1)110        phenotype_scores = self.heritability * scores + np.random.randn(len(scores)) * np.sqrt(1 - self.heritability * self.heritability)111        self.get_distribution(phenotype_scores, title = "Phenotype Scores with Heredity {} {}".format(self.heritability, self.phenotype_experiment_name), ylabel="Number of People", xlabel="Genetic Risk Score")112        return phenotype_scores113    114    def patient_level_score_injection(self, scores):115        #coefficients is a dictionary of labels to coefficients116        patient_level_bias = [self.group_coefficients[group] for group in self.groups]117        new_scores = scores + patient_level_bias118        return new_scores119    120    def patient_level_func_injection(self, scores, patients, coefficients):121        #coefficients is a dictionary of labels to functions of genetic risk score122        patient_level_bias = [coefficients[patient](scores[i]) for i,patient in enumerate(patients)]123        new_score = score + patient_level_bias124        return new_score125        126    def gen_snp_interaction(self, causal_snps_idx):127        """128        Generates SNP-SNP interaction Coeffecients for Phentype score calculation.129        Parameters130        ----------131        data: Dataframe that can be supplied directly from generate_genotype_file132        n_causal_snps: Number of desired causal SNPs133        cut: Fraction of causal SNPs to have an interacting pair134        mask_rate: Fraction of interactive realtions that are masking135        max_interaction_coeff: Max value for interaction coefficient uniform distribution136        Results137        -------138        Returns the interactive_snps dictionary for pairing and interaction coeficients139        interactive_snps: Dictionary that maps causal snp indexes to a length 3 list [Interactive SNP Index Pair, Interaction Coefficeint, Partner Risk Allele]140        """141        interactive_snps = {}142        if self.interactive_cut == 0: #No epistasis143            return interactive_snps144        interactive_snps_idx = np.random.choice(causal_snps_idx, size = max(1,int(self.interactive_cut*len(causal_snps_idx))), replace=False)145        coeff = []146        for idx in interactive_snps_idx:147            interaction = []148            possible_snps = [i for i in range(len(self.risk_alleles)) if i != idx]149            partner = np.random.choice(possible_snps, size = 1, replace=False)[0]150            interaction.append(partner)151            if np.random.random() > self.mask_rate:152                interaction_coeff = np.random.uniform(0, self.max_interaction_coeff)153                interaction.append(interaction_coeff)154                coeff.append(interaction_coeff)155            else:156                interaction.append(0)157                coeff.append(0)158            interaction.append(self.risk_alleles[partner])159            interactive_snps[idx] = interaction160        self.save_file("interactive_snps", interactive_snps)161        self.get_distribution(coeff, title = "{} Interactive Coefficients".format(self.phenotype_experiment_name), ylabel="Number of SNPs", xlabel="Interaction Coefficients")162        return interactive_snps163    def get_score(self, person, effect_size, interactive_snps):164        """165        Calculates the Phenotype Score of a input person.166        Parameters167        ----------168        person: Person row of Genotype file169        effect_size: Dictionary that maps causal snp indexes to length 3 list that has the effect size per genotyple value {0, 1, 2} accesed via index170        interactive_snps: Dictionary that maps causal snp indexes to a length 3 list [Interactive SNP Index Pair, Interaction Coefficeint, Partner Risk Allele]171        Results172        -------173        Returns the sum of effective sizes as the phenotype score for the given person.174        """175        score = 0176        for idx in effect_size.keys():177            interaction_coeff = 1178            if idx in interactive_snps.keys():179                other_gen = person[interactive_snps[idx][0]]180                other_risk = interactive_snps[idx][2]181                if (other_risk == 0 and other_gen != 2) or (other_risk == 1 and other_gen != 0):182                    interaction_coeff = interactive_snps[idx][1]183                #  if the partner risk allele is present in 1 or 2 copies activate the effect184            score += effect_size[idx][person[idx]]*interaction_coeff185        return score186    def generate_phenotypes_scores(self, effect_size, causal_snps_idx):187        """188         Parameters189        ----------190        effect_size: Dictionary that maps causal snp indexes to length 3 list that has the effect size per genotyple value {0, 1, 2} accesed via index191        interactive_snps: Dictionary that maps causal snp indexes to a length 2 list [Interactive SNP Index Pair, Interaction Coefficeint]192        causal_snps_idx: Dictionary of SNP indices that are chosen as the causal SNPs to gene risk coefficients if present.193        Results194        -------195        Generates interactive relations if not given already.196        Returns the calculated phenotype scores.197        """198        interactive_snps =  self.gen_snp_interaction(causal_snps_idx)199        with tables.open_file(self.data_path + "genotype_{}.h5".format(self.data_identifier), "r") as f:200            patients = f.root.data201            pheno_scores = [self.get_score(patients[idx,:], effect_size, interactive_snps) for idx in range(patients.shape[0])]                        202        self.get_distribution(pheno_scores, title = "{} Phenotype Scores".format(self.phenotype_experiment_name), ylabel="Number of People", xlabel="Genetic Risk Score")203        print(len(pheno_scores), "Phenotype Scores Closed")204        return pheno_scores, interactive_snps205                                  206    def simulate_causal_snps_random(self):207        """208        Simulates causal SNP selection and generates effect sizes209        Parameters210        ----------211        data_path: Path to File created via generate_genotype_file to be read212        CHR: Chromosome Number213        n_causal_snps: Number of desired causal SNPs214        data: Dataframe that can be supplied directly from generate_genotype_file215        dosage_frac: Fraction of effect sizes that would be dosage dependant vs absolute216        Results217        -------218        Simulates the causal SNPs and samples the effect sizes for each.219        Returns the data, the effect_size dictionary, list of causal snp indices220        """221        self.read_genotype_data()222        possible_snps = list(range(len(self.risk_alleles)))223        causal_snps_id = np.random.choice(possible_snps, size = self.n_causal_snps, replace=False)224        causal_snps_idx = {idx: 1 for idx in causal_snps_id}225        effect_size = self.simulate_effect_sizes(causal_snps_idx)226        causal_genes = self.get_causal_genes(causal_snps_id)227        self.save_file("causal_genes", causal_genes)228        self.save_file("causal_snp_idx", causal_snps_idx)229        return effect_size, causal_snps_id230    def get_causal_genes(self, snp_ids):231        """232        Grabs genes associated for random causal snp selection.233        """234        causal_genes = {}235        for snp in snp_ids:236            genes = self.feature_id[snp]237            if type(genes) == list:238                for g in genes:239                    causal_genes[g] = 1240            else:241                causal_genes[genes] = 1242        return causal_genes243                    244    def get_unique_genes(self):245        """246        Given dataframe return list of unique genes present247        Can use gene to snps map to count all genes such that have non empty lists248        """249        gene_list = self.feature_id250        gene_set = set()251        for g in gene_list:252            if type(g) == int:253                if g < 0:254                    continue255                gene_set.add(g)256            else:257                gene_set.update(g)258        return list(gene_set)259    def get_snps_from_gene(self, gene_id):260        """261        Given dataframe and gene_id returns list of snp indices that map to that gene262        """263        gene_list = self.feature_id264        snp_idx_list = list(range(len(self.risk_alleles)))265        gsnps = []266        for snp, gene in zip(snp_idx_list,gene_list):267            if type(gene) == int:268                if gene_id == gene:269                    gsnps.append(snp)270            else:271                if gene_id in gene:272                    gsnps.append(snp)273        return gsnps274    275    def get_distribution(self, values, title=None, xlabel = None, ylabel = None, binsize = 0.1):276        plt.hist(values, bins=np.arange(np.min(values)-binsize, np.max(values)+binsize, binsize))277        plt.title(title)278        plt.ylabel(ylabel)279        plt.xlabel(xlabel)280        plt.savefig(self.data_path +'{}.png'.format(title.replace(' ', '_')))281        plt.close()282        283    def simulate_causal_snps_gene(self):284        """285        Simulate causal genes and snps and return effect_size, list of causal snp indices286        """287        self.read_genotype_data()288        possible_genes = self.get_unique_genes()289        causal_genes_idx = np.random.choice(possible_genes, size = max(1,int(len(possible_genes)*self.causal_gene_cut)), replace=False)290        causal_snps_idx = {}291        causal_genes = {}292        for cgene in causal_genes_idx:293            gene_risk = np.random.uniform(0, self.max_gene_risk)294            causal_genes[cgene] = gene_risk295            causal_fraction = np.random.random()296            possible_snps = self.get_snps_from_gene(cgene)297            causal_snps = np.random.choice(possible_snps, size = max(1,int(len(possible_snps)*causal_fraction)), replace=False)298            for cs in causal_snps:299                causal_snps_idx[cs] = gene_risk    300        effect_size = self.simulate_effect_sizes(causal_snps_idx)301        self.save_file("causal_snp_idx", causal_snps_idx)302        self.save_file("causal_genes", causal_genes)303        return effect_size, list(causal_snps_idx.keys())304    305    def simulate_effect_sizes(self, causal_snps_idx):306        """307        Simulates causal snps effect size where we apply a sampled fraction to the gene risk308        """309        effect_size = {}310        self.dosage_frac = 1 - self.dominance_frac - self.recessive_frac311        dominant = np.random.choice([k for k in causal_snps_idx.keys()], size = int(self.dominance_frac*len(causal_snps_idx.keys())), replace=False)312        leftover = list(set(causal_snps_idx.keys()).difference(set(dominant)))313        recessive = np.random.choice([k for k in leftover], size = int(self.recessive_frac*len(causal_snps_idx.keys())), replace=False)314        dosage = list(set(leftover).difference(set(recessive)))315        316        for idx in dosage:317            a= np.random.random()318            b= 2*a319            if self.risk_alleles[idx] == 1:320                effect_size[idx] = [0, a*causal_snps_idx[idx], b*causal_snps_idx[idx]]321            else:322                effect_size[idx] = [b*causal_snps_idx[idx], a*causal_snps_idx[idx], 0]323        for idx in dominant:324            a= np.random.normal()325            if self.risk_alleles[idx] == 1:326                effect_size[idx] = [0, a*causal_snps_idx[idx], a*causal_snps_idx[idx]]327            else:328                effect_size[idx] = [a*causal_snps_idx[idx], a*causal_snps_idx[idx], 0]329        for idx in recessive:330            a= np.random.normal()331            if self.risk_alleles[idx] == 1:332                effect_size[idx] = [0, 0, a*causal_snps_idx[idx]]333            else:334                effect_size[idx] = [a*causal_snps_idx[idx], 0, 0]335        self.save_file("effect_size", effect_size)336        return effect_size...do_why.py
Source:do_why.py  
1""" Module containing the main model class for the dowhy package.2"""3import logging4from sympy import init_printing5import dowhy.causal_estimators as causal_estimators6import dowhy.causal_refuters as causal_refuters7import dowhy.utils.cli_helpers as cli8from dowhy.causal_estimator import CausalEstimate9from dowhy.causal_graph import CausalGraph10from dowhy.causal_identifier import CausalIdentifier11from dowhy.utils.api import parse_state12init_printing()  # To display symbolic math symbols13class CausalModel:14    """Main class for storing the causal model state.15    """16    def __init__(self, data, treatment, outcome, graph=None,17                 common_causes=None, instruments=None, estimand_type="ate",18                 proceed_when_unidentifiable=False,19                 **kwargs):20        """Initialize data and create a causal graph instance.21        Assigns treatment and outcome variables.22        Also checks and finds the common causes and instruments for treatment23        and outcome.24        At least one of graph, common_causes or instruments must be provided.25        :param data: a pandas dataframe containing treatment, outcome and other26        variables.27        :param treatment: name of the treatment variable28        :param outcome: name of the outcome variable29        :param graph: path to DOT file containing a DAG or a string containing30        a DAG specification in DOT format31        :param common_causes: names of common causes of treatment and _outcome32        :param instruments: names of instrumental variables for the effect of33        treatment on outcome34        :returns: an instance of CausalModel class35        """36        self._data = data37        self._treatment = parse_state(treatment)38        self._outcome = parse_state(outcome)39        self._estimand_type = estimand_type40        self._proceed_when_unidentifiable = proceed_when_unidentifiable41        if 'logging_level' in kwargs:42            logging.basicConfig(level=kwargs['logging_level'])43        else:44            logging.basicConfig(level=logging.INFO)45        # TODO: move the logging level argument to a json file. Tue 20 Feb 2018 06:56:27 PM DST46        self.logger = logging.getLogger(__name__)47        if graph is None:48            self.logger.warning("Causal Graph not provided. DoWhy will construct a graph based on data inputs.")49            self._common_causes = parse_state(common_causes)50            self._instruments = parse_state(instruments)51            if common_causes is not None and instruments is not None:52                self._graph = CausalGraph(53                    self._treatment,54                    self._outcome,55                    common_cause_names=self._common_causes,56                    instrument_names=self._instruments,57                    observed_node_names=self._data.columns.tolist()58                )59            elif common_causes is not None:60                self._graph = CausalGraph(61                    self._treatment,62                    self._outcome,63                    common_cause_names=self._common_causes,64                    observed_node_names=self._data.columns.tolist()65                )66            elif instruments is not None:67                self._graph = CausalGraph(68                    self._treatment,69                    self._outcome,70                    instrument_names=self._instruments,71                    observed_node_names=self._data.columns.tolist()72                )73            else:74                cli.query_yes_no(75                    "WARN: Are you sure that there are no common causes of treatment and outcome?",76                    default=None77                )78        else:79            self._graph = CausalGraph(80                self._treatment,81                self._outcome,82                graph,83                observed_node_names=self._data.columns.tolist()84            )85            self._common_causes = self._graph.get_common_causes(self._treatment, self._outcome)86            self._instruments = self._graph.get_instruments(self._treatment,87                                                            self._outcome)88        self._other_variables = kwargs89        self.summary()90    def identify_effect(self):91        """Identify the causal effect to be estimated, using properties of the causal graph.92        :returns: a probability expression for the causal effect if identified, else NULL93        """94        self.identifier = CausalIdentifier(self._graph,95                                           self._estimand_type,96                                           proceed_when_unidentifiable=self._proceed_when_unidentifiable)97        identified_estimand = self.identifier.identify_effect()98        return identified_estimand99    def estimate_effect(self, identified_estimand, method_name=None,100                        test_significance=None, method_params=None):101        """Estimate the identified causal effect.102        If method_name is provided, uses the provided method. Else, finds a103        suitable method to be used.104        :param identified_estimand: a probability expression105            that represents the effect to be estimated. Output of106            CausalModel.identify_effect method107        :param method_name: (optional) name of the estimation method to be used.108        :returns: an instance of the CausalEstimate class, containing the causal effect estimate109            and other method-dependent information110        """111        if method_name is None:112            pass113        else:114            str_arr = method_name.split(".")115            identifier_name = str_arr[0]116            estimator_name = str_arr[1]117            identified_estimand.set_identifier_method(identifier_name)118            causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")119        # Check if estimator's target estimand is identified120        if identified_estimand.estimands[identifier_name] is None:121            self.logger.warning("No valid identified estimand for using instrumental variables method")122            estimate = CausalEstimate(None, None, None)123        else:124            causal_estimator = causal_estimator_class(125                self._data,126                identified_estimand,127                self._treatment, self._outcome,128                test_significance=test_significance,129                params=method_params130            )131            estimate = causal_estimator.estimate_effect()132            estimate.add_params(133                estimand_type=identified_estimand.estimand_type,134                estimator_class=causal_estimator_class135            )136        return estimate137    def do(self, x, identified_estimand, method_name=None,  method_params=None):138        """Estimate the identified causal effect.139        If method_name is provided, uses the provided method. Else, finds a140        suitable method to be used.141        :param identified_estimand: a probability expression142            that represents the effect to be estimated. Output of143            CausalModel.identify_effect method144        :param method_name: (optional) name of the estimation method to be used.145        :returns: an instance of the CausalEstimate class, containing the causal effect estimate146            and other method-dependent information147        """148        if method_name is None:149            pass150        else:151            str_arr = method_name.split(".")152            identifier_name = str_arr[0]153            estimator_name = str_arr[1]154            identified_estimand.set_identifier_method(identifier_name)155            causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")156        # Check if estimator's target estimand is identified157        if identified_estimand.estimands[identifier_name] is None:158            self.logger.warning("No valid identified estimand for using instrumental variables method")159            estimate = CausalEstimate(None, None, None)160        else:161            causal_estimator = causal_estimator_class(162                self._data,163                identified_estimand,164                self._treatment, self._outcome,165                test_significance=False,166                params=method_params167            )168            try:169                estimate = causal_estimator.do(x)170            except NotImplementedError:171                self.logger.error('Do Operation not implemented or not supported for this estimator.')172                raise NotImplementedError173        return estimate174    def refute_estimate(self, estimand, estimate, method_name=None, **kwargs):175        """Refute an estimated causal effect.176        If method_name is provided, uses the provided method. Else, finds a177        suitable method to use.178        :param estimate: an instance of the CausalEstimate class.179        :returns: an instance of the RefuteResult class180        """181        if method_name is None:182            pass183        else:184            refuter_class = causal_refuters.get_class_object(method_name)185        refuter = refuter_class(186            self._data,187            identified_estimand=estimand,188            estimate=estimate,189            **kwargs190        )191        res = refuter.refute_estimate()192        return res193    def view_model(self, layout="dot"):194        """View the causal DAG.195        :returns: a visualization of the graph196        """197        self._graph.view_graph(layout)198    def summary(self):199        """Print a text summary of the model.200        :returns: None201        """...kvs.py
Source:kvs.py  
1"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""2kvs.py3kvs.py accepts inputs on /kvs/keys/<key> endpoints to save, forward and distribute4"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""5from app import app6from flask import request7import json8import requests9from state import State10import logging11from static import Request, Http_Error, Entry12from constants import *13global state14@app.before_first_request15def build_state():16    global state17    state = State()18def get_causal_context(request):19    data = request.get_json()20    if data == None: data = {}21    causal_context = data.get('causal-context', {})22    if len(causal_context) == 0 or causal_context['view'] != state.view or causal_context['repl_factor'] != state.repl_factor: 23        causal_context = state.new_causal_context()24    return causal_context25def privlege(request):26    body = request.get_json()27    if body == None: return 028    access = body.get('access_token')29    if access in state.access_tokens['users']:30        return USER_PRIVLEGE31    elif access == state.access_tokens['servers']:32        return SERVER_PRIVLEGE33    return 0 # no privelege34"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""35key value store36"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""37@app.route('/kvs/keys/<key>', methods=['GET'])38def get(key):39    if state.address not in state.view: return json.dumps({"error":"Unable to satisfy request", "message":"Error in GET"}), 50340    if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list"}), 40141    address = state.maps_to(key)42    shard_id = state.shard_map[address]43    causal_context = get_causal_context(request)44    if shard_id == state.shard_id:45        response = Request.send_get(state.address, key, causal_context)46        payload = response.json()47        if 'address' in payload: del payload['address']48        return payload, response.status_code49    else:50        # Attempt to send to every address in a shard, first one that doesn't tine 51        shard_id -= 152        for i in range(state.repl_factor):53            address = state.view[shard_id*state.repl_factor + i]54            response = Request.send_get(address, key, causal_context)55            if response.status_code != 500:56                return response.json(), response.status_code57        #unreachable due to TA guarentee        58        app.logger.error(f'No requests were successfully forwarded to shard.{shard_id}')59        return json.dumps({"error":"Unable to satisfy request", "message":"Error in GET"}), 50360@app.route('/kvs/keys/<key>', methods=['PUT'])61def put(key):62    if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list", "replaced":False}), 40163    data = request.get_json()64    causal_context = get_causal_context(request)65    address = state.maps_to(key)66    shard_id = state.shard_map[address]67    if shard_id == state.shard_id:68        if 'value' not in data:69            return json.dumps({"error":"Value is missing","message":"Error in PUT","causal-context":causal_context}), 40070        if len(key) > 50:71            return json.dumps({"error":"Key is too long","message":"Error in PUT","causal-context":causal_context}), 40072        # if in storage update, else create73        entry = state.update_put_entry(data['value'], state.storage[key]) if key in state.storage else state.build_put_entry(data['value'])74        causal_context['queue'][key] = entry75        successful_broadcast = state.put_to_replicas(key, entry, causal_context)76        if not successful_broadcast:77            causal_context['queue'][key] = entry78        elif key in causal_context['queue']:79            del causal_context['queue'][key]80        # # save on local, return causal context to client81        response = Request.send_put_endpoint(state.address, key, entry, causal_context)82        payload = response.json()83        payload['causal-context'] = causal_context84        return payload, response.status_code85    else:86        # try sending to every node inside of expected shard, first successful quit87        return state.put_to_shard(shard_id, key, data['value'], causal_context)88@app.route('/kvs/keys/<key>', methods=['DELETE'])89def delete(key):90    if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list"}), 40191    address = state.maps_to(key)92    shard_id = state.shard_map[address]  93    # get causal context, if empty, initalize94    causal_context = get_causal_context(request)95    # if its in our shard, foward96    if shard_id == state.shard_id:97        entry = state.update_delete_entry(state.storage[key]) if key in state.storage else state.build_delete_entry()98        # forward to replicas99        successful_broadcast = state.delete_from_replicas(key, entry, causal_context)100        if not successful_broadcast: # if a node didn't recieve, save in client101            causal_context['queue'][key] = entry102        elif key in causal_context['queue']: # if every node recieved, delete previous context from client103            del causal_context['queue'][key]104        # send to self105        response = Request.send_delete_endpoint(state.address, key, entry, causal_context)106        payload = response.json()107        payload['causal-context'] = causal_context108        return payload, response.status_code109    else:110        # key belongs to different shard, foward deletion...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
