How to use causal method in avocado

Best Python code snippet using avocado_python

phenotype_simulator.py

Source:phenotype_simulator.py Github

copy

Full Screen

1#2# Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved.3#4# NVIDIA CORPORATION and its licensors retain all intellectual property5# and proprietary rights in and to this software, related documentation6# and any modifications thereto. Any use, reproduction, disclosure or7# distribution of this software and related documentation without an express8# license agreement from NVIDIA CORPORATION is strictly prohibited.9#10import json11import matplotlib.pyplot as plt12import numpy as np13import pandas as pd14import pickle15import tables16class PhenotypeSimulator():17 def __init__(self, args):18 self.data_path = args.data_path19 self.data_identifier = args.data_identifier20 self.phenotype_experiment_name = args.phenotype_experiment_name21 self.interactive_cut = args.interactive_cut 22 self.mask_rate = args.mask_rate 23 self.dominance_frac = args.dominance_frac 24 self.recessive_frac = args.recessive_frac25 self.max_interaction_coeff = args.max_interaction_coeff 26 self.causal_snp_mode = args.causal_snp_mode27 self.heritability = args.heritability 28 self.case_frac = args.case_frac29 self.stratify = args.stratify30 if self.causal_snp_mode == "random":31 self.n_causal_snps = args.n_causal_snps 32 if self.causal_snp_mode == "gene":33 self.causal_gene_cut = args.causal_gene_cut 34 self.max_gene_risk = args.max_gene_risk 35 36 def get_number_patients(self):37 with tables.open_file(self.data_path + "genotype_{}.h5".format(self.data_identifier), "r") as f:38 self.num_patients = f.root.data.shape[0]39 return self.num_patients40 41 def read_genotype_data(self):42 """43 Reads in the annotated genotype csv file.44 """45 df = pd.read_csv(self.data_path + "snplist_{}.csv".format(self.data_identifier), sep=" ", usecols=['Risk Allele','Feature ID'], dtype={'Feature ID': object})46 df['Feature ID'] = df['Feature ID'].apply(lambda x: json.loads(x) if type(x) == str else x)47 self.risk_alleles = df['Risk Allele'].to_list()48 self.feature_id = df['Feature ID'].to_list()49 self.num_snps = len(self.risk_alleles)50 print("SNPs: {} People: {}".format(len(self.risk_alleles), self.get_number_patients()))51 return self.risk_alleles, self.feature_id52 53 def read_stratification_files(self):54 """55 Reads in the files for groups and group coefficients.56 """57 groups = pd.read_csv(self.data_path + "groups_{}.csv".format(self.data_identifier), header=None)58 group_coefficients = pd.read_csv(self.data_path + "group_coefficients_{}.csv".format(self.data_identifier), header=None)59 self.groups = groups[0].to_list()60 self.group_coefficients = dict(zip(group_coefficients[0], group_coefficients[1]))61 62 def save_file(self, fname, data):63 with open(self.data_path + "{}_{}_{}.pkl".format(fname, self.data_identifier, self.phenotype_experiment_name), 'wb') as f:64 pickle.dump(data, f)65 66 def simulate_phenotype(self):67 """68 Simulates causal SNP sampling and effect size creation if not done already.69 Generates the phenotype scores and returns resulting phenotype for the input patients70 This can be run directly after generate_genotype_file71 Parameters72 ----------73 Define kwargs based on initialized CAUSAL SNP MODE74 75 interactive_cut: Fraction of causal SNPs to have an interacting pair76 mask_rate: Fraction of interactive realtions that are masking77 dosage_frac: Fraction of effect sizes that would be dosage dependant vs absolute78 max_interaction_coeff: Max value for interaction coefficient uniform distribution79 Results80 -------81 Takes resulting genotype data once the genes have been mapped and simulates causal snps, interactive relations and returns82 a list of resulting Phenotypes83 """84 if self.causal_snp_mode == "random":85 effect_size, causal_snps_idx = self.simulate_causal_snps_random()86 elif self.causal_snp_mode == "gene":87 effect_size, causal_snps_idx = self.simulate_causal_snps_gene()88 phenotype_scores, interactive_snps = self.generate_phenotypes_scores(effect_size, causal_snps_idx)89 phenotype_scores = self.heritability_injection(phenotype_scores)90 if self.stratify:91 self.read_stratification_files()92 phenotype_scores = self.patient_level_score_injection(phenotype_scores)93 if self.case_frac > 0:94 phenotype_cutoff = np.percentile(phenotype_scores, self.case_frac*100)95 phenotype = [1 if x >= phenotype_cutoff else 0 for x in phenotype_scores]96 else:97 phenotype = list(phenotype_scores)98 self.save_file("phenotype", phenotype)99 print("Success!")100 return phenotype, causal_snps_idx, effect_size, interactive_snps101 102 def heritability_injection(self, scores):103 """104 Given phenotype scores normalize and apply heritability function.105 Creates a score distrobution histogram plot as well as return the updated scores.106 """107 #Normalize scores108 scores = (scores - np.mean(scores))/np.std(scores)109 #Apply g' = h*g + sqrt(1-h^2)*N(0,1)110 phenotype_scores = self.heritability * scores + np.random.randn(len(scores)) * np.sqrt(1 - self.heritability * self.heritability)111 self.get_distribution(phenotype_scores, title = "Phenotype Scores with Heredity {} {}".format(self.heritability, self.phenotype_experiment_name), ylabel="Number of People", xlabel="Genetic Risk Score")112 return phenotype_scores113 114 def patient_level_score_injection(self, scores):115 #coefficients is a dictionary of labels to coefficients116 patient_level_bias = [self.group_coefficients[group] for group in self.groups]117 new_scores = scores + patient_level_bias118 return new_scores119 120 def patient_level_func_injection(self, scores, patients, coefficients):121 #coefficients is a dictionary of labels to functions of genetic risk score122 patient_level_bias = [coefficients[patient](scores[i]) for i,patient in enumerate(patients)]123 new_score = score + patient_level_bias124 return new_score125 126 def gen_snp_interaction(self, causal_snps_idx):127 """128 Generates SNP-SNP interaction Coeffecients for Phentype score calculation.129 Parameters130 ----------131 data: Dataframe that can be supplied directly from generate_genotype_file132 n_causal_snps: Number of desired causal SNPs133 cut: Fraction of causal SNPs to have an interacting pair134 mask_rate: Fraction of interactive realtions that are masking135 max_interaction_coeff: Max value for interaction coefficient uniform distribution136 Results137 -------138 Returns the interactive_snps dictionary for pairing and interaction coeficients139 interactive_snps: Dictionary that maps causal snp indexes to a length 3 list [Interactive SNP Index Pair, Interaction Coefficeint, Partner Risk Allele]140 """141 interactive_snps = {}142 if self.interactive_cut == 0: #No epistasis143 return interactive_snps144 interactive_snps_idx = np.random.choice(causal_snps_idx, size = max(1,int(self.interactive_cut*len(causal_snps_idx))), replace=False)145 coeff = []146 for idx in interactive_snps_idx:147 interaction = []148 possible_snps = [i for i in range(len(self.risk_alleles)) if i != idx]149 partner = np.random.choice(possible_snps, size = 1, replace=False)[0]150 interaction.append(partner)151 if np.random.random() > self.mask_rate:152 interaction_coeff = np.random.uniform(0, self.max_interaction_coeff)153 interaction.append(interaction_coeff)154 coeff.append(interaction_coeff)155 else:156 interaction.append(0)157 coeff.append(0)158 interaction.append(self.risk_alleles[partner])159 interactive_snps[idx] = interaction160 self.save_file("interactive_snps", interactive_snps)161 self.get_distribution(coeff, title = "{} Interactive Coefficients".format(self.phenotype_experiment_name), ylabel="Number of SNPs", xlabel="Interaction Coefficients")162 return interactive_snps163 def get_score(self, person, effect_size, interactive_snps):164 """165 Calculates the Phenotype Score of a input person.166 Parameters167 ----------168 person: Person row of Genotype file169 effect_size: Dictionary that maps causal snp indexes to length 3 list that has the effect size per genotyple value {0, 1, 2} accesed via index170 interactive_snps: Dictionary that maps causal snp indexes to a length 3 list [Interactive SNP Index Pair, Interaction Coefficeint, Partner Risk Allele]171 Results172 -------173 Returns the sum of effective sizes as the phenotype score for the given person.174 """175 score = 0176 for idx in effect_size.keys():177 interaction_coeff = 1178 if idx in interactive_snps.keys():179 other_gen = person[interactive_snps[idx][0]]180 other_risk = interactive_snps[idx][2]181 if (other_risk == 0 and other_gen != 2) or (other_risk == 1 and other_gen != 0):182 interaction_coeff = interactive_snps[idx][1]183 # if the partner risk allele is present in 1 or 2 copies activate the effect184 score += effect_size[idx][person[idx]]*interaction_coeff185 return score186 def generate_phenotypes_scores(self, effect_size, causal_snps_idx):187 """188 Parameters189 ----------190 effect_size: Dictionary that maps causal snp indexes to length 3 list that has the effect size per genotyple value {0, 1, 2} accesed via index191 interactive_snps: Dictionary that maps causal snp indexes to a length 2 list [Interactive SNP Index Pair, Interaction Coefficeint]192 causal_snps_idx: Dictionary of SNP indices that are chosen as the causal SNPs to gene risk coefficients if present.193 Results194 -------195 Generates interactive relations if not given already.196 Returns the calculated phenotype scores.197 """198 interactive_snps = self.gen_snp_interaction(causal_snps_idx)199 with tables.open_file(self.data_path + "genotype_{}.h5".format(self.data_identifier), "r") as f:200 patients = f.root.data201 pheno_scores = [self.get_score(patients[idx,:], effect_size, interactive_snps) for idx in range(patients.shape[0])] 202 self.get_distribution(pheno_scores, title = "{} Phenotype Scores".format(self.phenotype_experiment_name), ylabel="Number of People", xlabel="Genetic Risk Score")203 print(len(pheno_scores), "Phenotype Scores Closed")204 return pheno_scores, interactive_snps205 206 def simulate_causal_snps_random(self):207 """208 Simulates causal SNP selection and generates effect sizes209 Parameters210 ----------211 data_path: Path to File created via generate_genotype_file to be read212 CHR: Chromosome Number213 n_causal_snps: Number of desired causal SNPs214 data: Dataframe that can be supplied directly from generate_genotype_file215 dosage_frac: Fraction of effect sizes that would be dosage dependant vs absolute216 Results217 -------218 Simulates the causal SNPs and samples the effect sizes for each.219 Returns the data, the effect_size dictionary, list of causal snp indices220 """221 self.read_genotype_data()222 possible_snps = list(range(len(self.risk_alleles)))223 causal_snps_id = np.random.choice(possible_snps, size = self.n_causal_snps, replace=False)224 causal_snps_idx = {idx: 1 for idx in causal_snps_id}225 effect_size = self.simulate_effect_sizes(causal_snps_idx)226 causal_genes = self.get_causal_genes(causal_snps_id)227 self.save_file("causal_genes", causal_genes)228 self.save_file("causal_snp_idx", causal_snps_idx)229 return effect_size, causal_snps_id230 def get_causal_genes(self, snp_ids):231 """232 Grabs genes associated for random causal snp selection.233 """234 causal_genes = {}235 for snp in snp_ids:236 genes = self.feature_id[snp]237 if type(genes) == list:238 for g in genes:239 causal_genes[g] = 1240 else:241 causal_genes[genes] = 1242 return causal_genes243 244 def get_unique_genes(self):245 """246 Given dataframe return list of unique genes present247 Can use gene to snps map to count all genes such that have non empty lists248 """249 gene_list = self.feature_id250 gene_set = set()251 for g in gene_list:252 if type(g) == int:253 if g < 0:254 continue255 gene_set.add(g)256 else:257 gene_set.update(g)258 return list(gene_set)259 def get_snps_from_gene(self, gene_id):260 """261 Given dataframe and gene_id returns list of snp indices that map to that gene262 """263 gene_list = self.feature_id264 snp_idx_list = list(range(len(self.risk_alleles)))265 gsnps = []266 for snp, gene in zip(snp_idx_list,gene_list):267 if type(gene) == int:268 if gene_id == gene:269 gsnps.append(snp)270 else:271 if gene_id in gene:272 gsnps.append(snp)273 return gsnps274 275 def get_distribution(self, values, title=None, xlabel = None, ylabel = None, binsize = 0.1):276 plt.hist(values, bins=np.arange(np.min(values)-binsize, np.max(values)+binsize, binsize))277 plt.title(title)278 plt.ylabel(ylabel)279 plt.xlabel(xlabel)280 plt.savefig(self.data_path +'{}.png'.format(title.replace(' ', '_')))281 plt.close()282 283 def simulate_causal_snps_gene(self):284 """285 Simulate causal genes and snps and return effect_size, list of causal snp indices286 """287 self.read_genotype_data()288 possible_genes = self.get_unique_genes()289 causal_genes_idx = np.random.choice(possible_genes, size = max(1,int(len(possible_genes)*self.causal_gene_cut)), replace=False)290 causal_snps_idx = {}291 causal_genes = {}292 for cgene in causal_genes_idx:293 gene_risk = np.random.uniform(0, self.max_gene_risk)294 causal_genes[cgene] = gene_risk295 causal_fraction = np.random.random()296 possible_snps = self.get_snps_from_gene(cgene)297 causal_snps = np.random.choice(possible_snps, size = max(1,int(len(possible_snps)*causal_fraction)), replace=False)298 for cs in causal_snps:299 causal_snps_idx[cs] = gene_risk 300 effect_size = self.simulate_effect_sizes(causal_snps_idx)301 self.save_file("causal_snp_idx", causal_snps_idx)302 self.save_file("causal_genes", causal_genes)303 return effect_size, list(causal_snps_idx.keys())304 305 def simulate_effect_sizes(self, causal_snps_idx):306 """307 Simulates causal snps effect size where we apply a sampled fraction to the gene risk308 """309 effect_size = {}310 self.dosage_frac = 1 - self.dominance_frac - self.recessive_frac311 dominant = np.random.choice([k for k in causal_snps_idx.keys()], size = int(self.dominance_frac*len(causal_snps_idx.keys())), replace=False)312 leftover = list(set(causal_snps_idx.keys()).difference(set(dominant)))313 recessive = np.random.choice([k for k in leftover], size = int(self.recessive_frac*len(causal_snps_idx.keys())), replace=False)314 dosage = list(set(leftover).difference(set(recessive)))315 316 for idx in dosage:317 a= np.random.random()318 b= 2*a319 if self.risk_alleles[idx] == 1:320 effect_size[idx] = [0, a*causal_snps_idx[idx], b*causal_snps_idx[idx]]321 else:322 effect_size[idx] = [b*causal_snps_idx[idx], a*causal_snps_idx[idx], 0]323 for idx in dominant:324 a= np.random.normal()325 if self.risk_alleles[idx] == 1:326 effect_size[idx] = [0, a*causal_snps_idx[idx], a*causal_snps_idx[idx]]327 else:328 effect_size[idx] = [a*causal_snps_idx[idx], a*causal_snps_idx[idx], 0]329 for idx in recessive:330 a= np.random.normal()331 if self.risk_alleles[idx] == 1:332 effect_size[idx] = [0, 0, a*causal_snps_idx[idx]]333 else:334 effect_size[idx] = [a*causal_snps_idx[idx], 0, 0]335 self.save_file("effect_size", effect_size)336 return effect_size...

Full Screen

Full Screen

do_why.py

Source:do_why.py Github

copy

Full Screen

1""" Module containing the main model class for the dowhy package.2"""3import logging4from sympy import init_printing5import dowhy.causal_estimators as causal_estimators6import dowhy.causal_refuters as causal_refuters7import dowhy.utils.cli_helpers as cli8from dowhy.causal_estimator import CausalEstimate9from dowhy.causal_graph import CausalGraph10from dowhy.causal_identifier import CausalIdentifier11from dowhy.utils.api import parse_state12init_printing() # To display symbolic math symbols13class CausalModel:14 """Main class for storing the causal model state.15 """16 def __init__(self, data, treatment, outcome, graph=None,17 common_causes=None, instruments=None, estimand_type="ate",18 proceed_when_unidentifiable=False,19 **kwargs):20 """Initialize data and create a causal graph instance.21 Assigns treatment and outcome variables.22 Also checks and finds the common causes and instruments for treatment23 and outcome.24 At least one of graph, common_causes or instruments must be provided.25 :param data: a pandas dataframe containing treatment, outcome and other26 variables.27 :param treatment: name of the treatment variable28 :param outcome: name of the outcome variable29 :param graph: path to DOT file containing a DAG or a string containing30 a DAG specification in DOT format31 :param common_causes: names of common causes of treatment and _outcome32 :param instruments: names of instrumental variables for the effect of33 treatment on outcome34 :returns: an instance of CausalModel class35 """36 self._data = data37 self._treatment = parse_state(treatment)38 self._outcome = parse_state(outcome)39 self._estimand_type = estimand_type40 self._proceed_when_unidentifiable = proceed_when_unidentifiable41 if 'logging_level' in kwargs:42 logging.basicConfig(level=kwargs['logging_level'])43 else:44 logging.basicConfig(level=logging.INFO)45 # TODO: move the logging level argument to a json file. Tue 20 Feb 2018 06:56:27 PM DST46 self.logger = logging.getLogger(__name__)47 if graph is None:48 self.logger.warning("Causal Graph not provided. DoWhy will construct a graph based on data inputs.")49 self._common_causes = parse_state(common_causes)50 self._instruments = parse_state(instruments)51 if common_causes is not None and instruments is not None:52 self._graph = CausalGraph(53 self._treatment,54 self._outcome,55 common_cause_names=self._common_causes,56 instrument_names=self._instruments,57 observed_node_names=self._data.columns.tolist()58 )59 elif common_causes is not None:60 self._graph = CausalGraph(61 self._treatment,62 self._outcome,63 common_cause_names=self._common_causes,64 observed_node_names=self._data.columns.tolist()65 )66 elif instruments is not None:67 self._graph = CausalGraph(68 self._treatment,69 self._outcome,70 instrument_names=self._instruments,71 observed_node_names=self._data.columns.tolist()72 )73 else:74 cli.query_yes_no(75 "WARN: Are you sure that there are no common causes of treatment and outcome?",76 default=None77 )78 else:79 self._graph = CausalGraph(80 self._treatment,81 self._outcome,82 graph,83 observed_node_names=self._data.columns.tolist()84 )85 self._common_causes = self._graph.get_common_causes(self._treatment, self._outcome)86 self._instruments = self._graph.get_instruments(self._treatment,87 self._outcome)88 self._other_variables = kwargs89 self.summary()90 def identify_effect(self):91 """Identify the causal effect to be estimated, using properties of the causal graph.92 :returns: a probability expression for the causal effect if identified, else NULL93 """94 self.identifier = CausalIdentifier(self._graph,95 self._estimand_type,96 proceed_when_unidentifiable=self._proceed_when_unidentifiable)97 identified_estimand = self.identifier.identify_effect()98 return identified_estimand99 def estimate_effect(self, identified_estimand, method_name=None,100 test_significance=None, method_params=None):101 """Estimate the identified causal effect.102 If method_name is provided, uses the provided method. Else, finds a103 suitable method to be used.104 :param identified_estimand: a probability expression105 that represents the effect to be estimated. Output of106 CausalModel.identify_effect method107 :param method_name: (optional) name of the estimation method to be used.108 :returns: an instance of the CausalEstimate class, containing the causal effect estimate109 and other method-dependent information110 """111 if method_name is None:112 pass113 else:114 str_arr = method_name.split(".")115 identifier_name = str_arr[0]116 estimator_name = str_arr[1]117 identified_estimand.set_identifier_method(identifier_name)118 causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")119 # Check if estimator's target estimand is identified120 if identified_estimand.estimands[identifier_name] is None:121 self.logger.warning("No valid identified estimand for using instrumental variables method")122 estimate = CausalEstimate(None, None, None)123 else:124 causal_estimator = causal_estimator_class(125 self._data,126 identified_estimand,127 self._treatment, self._outcome,128 test_significance=test_significance,129 params=method_params130 )131 estimate = causal_estimator.estimate_effect()132 estimate.add_params(133 estimand_type=identified_estimand.estimand_type,134 estimator_class=causal_estimator_class135 )136 return estimate137 def do(self, x, identified_estimand, method_name=None, method_params=None):138 """Estimate the identified causal effect.139 If method_name is provided, uses the provided method. Else, finds a140 suitable method to be used.141 :param identified_estimand: a probability expression142 that represents the effect to be estimated. Output of143 CausalModel.identify_effect method144 :param method_name: (optional) name of the estimation method to be used.145 :returns: an instance of the CausalEstimate class, containing the causal effect estimate146 and other method-dependent information147 """148 if method_name is None:149 pass150 else:151 str_arr = method_name.split(".")152 identifier_name = str_arr[0]153 estimator_name = str_arr[1]154 identified_estimand.set_identifier_method(identifier_name)155 causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")156 # Check if estimator's target estimand is identified157 if identified_estimand.estimands[identifier_name] is None:158 self.logger.warning("No valid identified estimand for using instrumental variables method")159 estimate = CausalEstimate(None, None, None)160 else:161 causal_estimator = causal_estimator_class(162 self._data,163 identified_estimand,164 self._treatment, self._outcome,165 test_significance=False,166 params=method_params167 )168 try:169 estimate = causal_estimator.do(x)170 except NotImplementedError:171 self.logger.error('Do Operation not implemented or not supported for this estimator.')172 raise NotImplementedError173 return estimate174 def refute_estimate(self, estimand, estimate, method_name=None, **kwargs):175 """Refute an estimated causal effect.176 If method_name is provided, uses the provided method. Else, finds a177 suitable method to use.178 :param estimate: an instance of the CausalEstimate class.179 :returns: an instance of the RefuteResult class180 """181 if method_name is None:182 pass183 else:184 refuter_class = causal_refuters.get_class_object(method_name)185 refuter = refuter_class(186 self._data,187 identified_estimand=estimand,188 estimate=estimate,189 **kwargs190 )191 res = refuter.refute_estimate()192 return res193 def view_model(self, layout="dot"):194 """View the causal DAG.195 :returns: a visualization of the graph196 """197 self._graph.view_graph(layout)198 def summary(self):199 """Print a text summary of the model.200 :returns: None201 """...

Full Screen

Full Screen

kvs.py

Source:kvs.py Github

copy

Full Screen

1"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""2kvs.py3kvs.py accepts inputs on /kvs/keys/<key> endpoints to save, forward and distribute4"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""5from app import app6from flask import request7import json8import requests9from state import State10import logging11from static import Request, Http_Error, Entry12from constants import *13global state14@app.before_first_request15def build_state():16 global state17 state = State()18def get_causal_context(request):19 data = request.get_json()20 if data == None: data = {}21 causal_context = data.get('causal-context', {})22 if len(causal_context) == 0 or causal_context['view'] != state.view or causal_context['repl_factor'] != state.repl_factor: 23 causal_context = state.new_causal_context()24 return causal_context25def privlege(request):26 body = request.get_json()27 if body == None: return 028 access = body.get('access_token')29 if access in state.access_tokens['users']:30 return USER_PRIVLEGE31 elif access == state.access_tokens['servers']:32 return SERVER_PRIVLEGE33 return 0 # no privelege34"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""35key value store36"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""37@app.route('/kvs/keys/<key>', methods=['GET'])38def get(key):39 if state.address not in state.view: return json.dumps({"error":"Unable to satisfy request", "message":"Error in GET"}), 50340 if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list"}), 40141 address = state.maps_to(key)42 shard_id = state.shard_map[address]43 causal_context = get_causal_context(request)44 if shard_id == state.shard_id:45 response = Request.send_get(state.address, key, causal_context)46 payload = response.json()47 if 'address' in payload: del payload['address']48 return payload, response.status_code49 else:50 # Attempt to send to every address in a shard, first one that doesn't tine 51 shard_id -= 152 for i in range(state.repl_factor):53 address = state.view[shard_id*state.repl_factor + i]54 response = Request.send_get(address, key, causal_context)55 if response.status_code != 500:56 return response.json(), response.status_code57 #unreachable due to TA guarentee 58 app.logger.error(f'No requests were successfully forwarded to shard.{shard_id}')59 return json.dumps({"error":"Unable to satisfy request", "message":"Error in GET"}), 50360@app.route('/kvs/keys/<key>', methods=['PUT'])61def put(key):62 if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list", "replaced":False}), 40163 data = request.get_json()64 causal_context = get_causal_context(request)65 address = state.maps_to(key)66 shard_id = state.shard_map[address]67 if shard_id == state.shard_id:68 if 'value' not in data:69 return json.dumps({"error":"Value is missing","message":"Error in PUT","causal-context":causal_context}), 40070 if len(key) > 50:71 return json.dumps({"error":"Key is too long","message":"Error in PUT","causal-context":causal_context}), 40072 # if in storage update, else create73 entry = state.update_put_entry(data['value'], state.storage[key]) if key in state.storage else state.build_put_entry(data['value'])74 causal_context['queue'][key] = entry75 successful_broadcast = state.put_to_replicas(key, entry, causal_context)76 if not successful_broadcast:77 causal_context['queue'][key] = entry78 elif key in causal_context['queue']:79 del causal_context['queue'][key]80 # # save on local, return causal context to client81 response = Request.send_put_endpoint(state.address, key, entry, causal_context)82 payload = response.json()83 payload['causal-context'] = causal_context84 return payload, response.status_code85 else:86 # try sending to every node inside of expected shard, first successful quit87 return state.put_to_shard(shard_id, key, data['value'], causal_context)88@app.route('/kvs/keys/<key>', methods=['DELETE'])89def delete(key):90 if privlege(request) < USER_PRIVLEGE: return json.dumps({"error":"Unauthorized access", "message":"Not on authenticated user or server list"}), 40191 address = state.maps_to(key)92 shard_id = state.shard_map[address] 93 # get causal context, if empty, initalize94 causal_context = get_causal_context(request)95 # if its in our shard, foward96 if shard_id == state.shard_id:97 entry = state.update_delete_entry(state.storage[key]) if key in state.storage else state.build_delete_entry()98 # forward to replicas99 successful_broadcast = state.delete_from_replicas(key, entry, causal_context)100 if not successful_broadcast: # if a node didn't recieve, save in client101 causal_context['queue'][key] = entry102 elif key in causal_context['queue']: # if every node recieved, delete previous context from client103 del causal_context['queue'][key]104 # send to self105 response = Request.send_delete_endpoint(state.address, key, entry, causal_context)106 payload = response.json()107 payload['causal-context'] = causal_context108 return payload, response.status_code109 else:110 # key belongs to different shard, foward deletion...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful