Best Python code snippet using httmock_python
OrderedList_test.py
Source:OrderedList_test.py  
...9        ordered_list = OrderedList(asc=True)10        ordered_list.add(val_1)11        ordered_list.add(val_2)12        ordered_list.add(val_3)13        self.assertEqual(val_1, ordered_list.get_all()[0].value)14        self.assertEqual(val_2, ordered_list.get_all()[1].value)15        self.assertEqual(val_3, ordered_list.get_all()[2].value)16        self.assertEqual(val_1, ordered_list.find(val_1).value)17        self.assertEqual(val_2, ordered_list.find(val_2).value)18        self.assertEqual(val_3, ordered_list.find(val_3).value)19        self.assertEqual(val_1, ordered_list.head.value)20        self.assertEqual(val_3, ordered_list.tail.value)21        self.assertEqual(3, ordered_list.len())22    def test_list_sorted_desc(self):23        val_1 = 1224        val_2 = 5525        val_3 = 10026        ordered_list = OrderedList(asc=False)27        ordered_list.add(val_1)28        ordered_list.add(val_2)29        ordered_list.add(val_3)30        self.assertEqual(val_1, ordered_list.get_all()[2].value)31        self.assertEqual(val_2, ordered_list.get_all()[1].value)32        self.assertEqual(val_3, ordered_list.get_all()[0].value)33        self.assertEqual(val_1, ordered_list.find(val_1).value)34        self.assertEqual(val_2, ordered_list.find(val_2).value)35        self.assertEqual(val_3, ordered_list.find(val_3).value)36        self.assertEqual(val_3, ordered_list.head.value)37        self.assertEqual(val_1, ordered_list.tail.value)38        self.assertEqual(3, ordered_list.len())39    def test_list_text_sorted_asc(self):40        val_1 = ' a'41        val_2 = ' Ñ '42        val_3 = 'b '43        ordered_list = OrderedStringList(asc=True)44        ordered_list.add(val_1)45        ordered_list.add(val_2)46        ordered_list.add(val_3)47        self.assertEqual(val_1, ordered_list.get_all()[0].value)48        self.assertEqual(val_3, ordered_list.get_all()[1].value)49        self.assertEqual(val_2, ordered_list.get_all()[2].value)50        self.assertEqual(val_1, ordered_list.find(val_1).value)51        self.assertEqual(val_2, ordered_list.find(val_2).value)52        self.assertEqual(val_3, ordered_list.find(val_3).value)53        self.assertEqual(val_1, ordered_list.head.value)54        self.assertEqual(val_2, ordered_list.tail.value)55        self.assertEqual(3, ordered_list.len())56    def test_list_text_sorted_desc(self):57        val_1 = ' aaa'58        val_2 = ' aac '59        val_3 = 'aab '60        ordered_list = OrderedStringList(asc=False)61        ordered_list.add(val_1)62        ordered_list.add(val_2)63        ordered_list.add(val_3)64        self.assertEqual(val_2, ordered_list.get_all()[0].value)65        self.assertEqual(val_3, ordered_list.get_all()[1].value)66        self.assertEqual(val_1, ordered_list.get_all()[2].value)67        self.assertEqual(val_1, ordered_list.find(val_1).value)68        self.assertEqual(val_2, ordered_list.find(val_2).value)69        self.assertEqual(val_3, ordered_list.find(val_3).value)70        self.assertEqual(val_2, ordered_list.head.value)71        self.assertEqual(val_1, ordered_list.tail.value)72        self.assertEqual(3, ordered_list.len())73    # comparing testing block74    def test_compare(self):75        node_1 = Node(1)76        node_2 = Node(2)77        node_3 = Node(2)78        ordered_list = OrderedList(asc=True)79        self.assertEqual(1, ordered_list.compare(node_2, node_1))80        self.assertEqual(-1, ordered_list.compare(node_1, node_2))81        self.assertEqual(0, ordered_list.compare(node_3, node_2))82    def test_compare_string(self):83        node_1 = Node(' a')84        node_2 = Node('b')85        node_3 = Node(' ')86        node_4 = Node('a ')87        node_5 = Node('b')88        ordered_list = OrderedStringList(asc=True)89        self.assertEqual(1, ordered_list.compare(node_2, node_1))90        self.assertEqual(-1, ordered_list.compare(node_1, node_2))91        self.assertEqual(0, ordered_list.compare(node_5, node_2))92        self.assertEqual(1, ordered_list.compare(node_4, node_1))93        self.assertEqual(1, ordered_list.compare(node_1, node_3))94    # comparing testing block95    def test_find(self):96        val_1 = 1297        val_2 = 10098        val_3 = 5599        val_4 = -1100        val_5 = 55101        ordered_list = OrderedList(asc=True)102        ordered_list.add(val_1)103        ordered_list.add(val_2)104        ordered_list.add(val_3)105        ordered_list.add(val_4)106        ordered_list.add(val_5)107        self.assertEqual(val_4, ordered_list.get_all()[0].value)108        self.assertEqual(val_1, ordered_list.get_all()[1].value)109        self.assertEqual(val_3, ordered_list.get_all()[2].value)110        self.assertEqual(val_5, ordered_list.get_all()[3].value)111        self.assertEqual(val_2, ordered_list.get_all()[4].value)112        self.assertEqual(val_1, ordered_list.find(val_1).value)113        self.assertEqual(val_2, ordered_list.find(val_2).value)114        self.assertEqual(val_3, ordered_list.find(val_3).value)115        self.assertEqual(val_4, ordered_list.find(val_4).value)116        self.assertEqual(val_5, ordered_list.find(val_5).value)117        self.assertEqual(None, ordered_list.find(15))118    def test_find_string(self):119        val_1 = ' '120        val_2 = ' a'121        val_3 = ' a '122        val_4 = '-14629'123        val_5 = '1'124        val_6 = '2'125        val_7 = 'b'126        ordered_list = OrderedStringList(asc=False)127        ordered_list.add(val_1)128        ordered_list.add(val_2)129        ordered_list.add(val_3)130        ordered_list.add(val_4)131        ordered_list.add(val_5)132        ordered_list.add(val_6)133        ordered_list.add(val_7)134        self.assertEqual(val_7, ordered_list.get_all()[0].value)135        self.assertEqual(val_3, ordered_list.get_all()[1].value)136        self.assertEqual(val_2, ordered_list.get_all()[2].value)137        self.assertEqual(val_6, ordered_list.get_all()[3].value)138        self.assertEqual(val_5, ordered_list.get_all()[4].value)139        self.assertEqual(val_4, ordered_list.get_all()[5].value)140        self.assertEqual(val_1, ordered_list.get_all()[6].value)141        self.assertEqual(val_1, ordered_list.find(val_1).value)142        self.assertEqual(val_2, ordered_list.find(val_2).value)143        self.assertEqual(val_3, ordered_list.find(val_3).value)144        self.assertEqual(val_4, ordered_list.find(val_4).value)145        self.assertEqual(val_5, ordered_list.find(val_5).value)146        self.assertEqual(val_6, ordered_list.find(val_6).value)147        self.assertEqual(val_7, ordered_list.find(val_7).value)148        self.assertEqual(None, ordered_list.find('15'))149    # deleting block150    def test_delete_from_empty(self):151        ordered_list = OrderedList(asc=True)152        self.assertEqual(ordered_list.head, None)153        self.assertEqual(ordered_list.tail, None)154        self.assertEqual(ordered_list.get_all(), [])155        ordered_list.delete(15)156        self.assertEqual(ordered_list.head, None)157        self.assertEqual(ordered_list.tail, None)158        self.assertEqual(ordered_list.get_all(), [])159    def test_delete_none(self):160        val_1 = 12161        val_2 = 55162        val_3 = 100163        ordered_list = OrderedList(asc=True)164        self.assertEqual(ordered_list.head, None)165        self.assertEqual(ordered_list.tail, None)166        self.assertEqual(ordered_list.get_all(), [])167        ordered_list.delete(None)168        self.assertEqual(ordered_list.head, None)169        self.assertEqual(ordered_list.tail, None)170        self.assertEqual(ordered_list.get_all(), [])171        ordered_list.delete(1000)172        self.assertEqual(ordered_list.head, None)173        self.assertEqual(ordered_list.tail, None)174        self.assertEqual(ordered_list.get_all(), [])175    def test_delete_head(self):176        val_1 = 12177        val_2 = 55178        val_3 = 100179        ordered_list = OrderedList(asc=True)180        ordered_list.add(val_1)181        ordered_list.add(val_2)182        ordered_list.add(val_3)183        ordered_list.delete(12)184        self.assertEqual(ordered_list.head.value, val_2)185        self.assertEqual(ordered_list.tail.value, val_3)186        self.assertEqual(ordered_list.get_all()[0].value, val_2)187        self.assertEqual(ordered_list.get_all()[1].value, val_3)188        self.assertEqual(ordered_list.len(), 2)189        ordered_list = OrderedList(asc=False)190        ordered_list.add(val_1)191        ordered_list.add(val_2)192        ordered_list.add(val_3)193        ordered_list.delete(12)194        self.assertEqual(ordered_list.head.value, val_3)195        self.assertEqual(ordered_list.tail.value, val_2)196        self.assertEqual(ordered_list.get_all()[0].value, val_3)197        self.assertEqual(ordered_list.get_all()[1].value, val_2)198        self.assertEqual(ordered_list.len(), 2)199        val_1 = ' a'200        val_2 = ' Ñ '201        val_3 = 'b '202        ordered_list = OrderedStringList(asc=True)203        ordered_list.add(val_1)204        ordered_list.add(val_2)205        ordered_list.add(val_3)206        ordered_list.delete(' a')207        self.assertEqual(ordered_list.head.value, val_3)208        self.assertEqual(ordered_list.tail.value, val_2)209        self.assertEqual(ordered_list.get_all()[0].value, val_3)210        self.assertEqual(ordered_list.get_all()[1].value, val_2)211        self.assertEqual(ordered_list.len(), 2)212        ordered_list = OrderedStringList(asc=False)213        ordered_list.add(val_1)214        ordered_list.add(val_2)215        ordered_list.add(val_3)216        ordered_list.delete(' a')217        self.assertEqual(ordered_list.head.value, val_2)218        self.assertEqual(ordered_list.tail.value, val_3)219        self.assertEqual(ordered_list.get_all()[0].value, val_2)220        self.assertEqual(ordered_list.get_all()[1].value, val_3)221        self.assertEqual(ordered_list.len(), 2)222    def test_delete_tail(self):223        val_1 = 12224        val_2 = 55225        val_3 = 100226        ordered_list = OrderedList(asc=True)227        ordered_list.add(val_1)228        ordered_list.add(val_2)229        ordered_list.add(val_3)230        ordered_list.delete(100)231        self.assertEqual(ordered_list.head.value, val_1)232        self.assertEqual(ordered_list.tail.value, val_2)233        self.assertEqual(ordered_list.get_all()[0].value, val_1)234        self.assertEqual(ordered_list.get_all()[1].value, val_2)235        self.assertEqual(ordered_list.len(), 2)236        ordered_list = OrderedList(asc=False)237        ordered_list.add(val_1)238        ordered_list.add(val_2)239        ordered_list.add(val_3)240        ordered_list.delete(100)241        self.assertEqual(ordered_list.head.value, val_2)242        self.assertEqual(ordered_list.tail.value, val_1)243        self.assertEqual(ordered_list.get_all()[0].value, val_2)244        self.assertEqual(ordered_list.get_all()[1].value, val_1)245        self.assertEqual(ordered_list.len(), 2)246        val_1 = ' a'247        val_2 = ' Ñ '248        val_3 = 'b '249        ordered_list = OrderedStringList(asc=True)250        ordered_list.add(val_1)251        ordered_list.add(val_2)252        ordered_list.add(val_3)253        ordered_list.delete(val_2)254        self.assertEqual(ordered_list.head.value, val_1)255        self.assertEqual(ordered_list.tail.value, val_3)256        self.assertEqual(ordered_list.get_all()[0].value, val_1)257        self.assertEqual(ordered_list.get_all()[1].value, val_3)258        self.assertEqual(ordered_list.len(), 2)259        ordered_list = OrderedStringList(asc=False)260        ordered_list.add(val_1)261        ordered_list.add(val_2)262        ordered_list.add(val_3)263        ordered_list.delete(val_2)264        self.assertEqual(ordered_list.head.value, val_3)265        self.assertEqual(ordered_list.tail.value, val_1)266        self.assertEqual(ordered_list.get_all()[0].value, val_3)267        self.assertEqual(ordered_list.get_all()[1].value, val_1)268        self.assertEqual(ordered_list.len(), 2)269    def test_delete_middle(self):270        val_1 = 12271        val_2 = 55272        val_3 = 100273        ordered_list = OrderedList(asc=True)274        ordered_list.add(val_1)275        ordered_list.add(val_2)276        ordered_list.add(val_3)277        ordered_list.delete(val_2)278        self.assertEqual(ordered_list.head.value, val_1)279        self.assertEqual(ordered_list.tail.value, val_3)280        self.assertEqual(ordered_list.get_all()[0].value, val_1)281        self.assertEqual(ordered_list.get_all()[1].value, val_3)282        self.assertEqual(ordered_list.find(val_2), None)283        self.assertEqual(ordered_list.len(), 2)284        ordered_list = OrderedList(asc=False)285        ordered_list.add(val_1)286        ordered_list.add(val_2)287        ordered_list.add(val_3)288        ordered_list.delete(val_2)289        self.assertEqual(ordered_list.head.value, val_3)290        self.assertEqual(ordered_list.tail.value, val_1)291        self.assertEqual(ordered_list.get_all()[0].value, val_3)292        self.assertEqual(ordered_list.get_all()[1].value, val_1)293        self.assertEqual(ordered_list.find(val_2), None)294        self.assertEqual(ordered_list.len(), 2)295        val_1 = ' a'296        val_2 = ' Ñ '297        val_3 = 'b '298        ordered_list = OrderedStringList(asc=True)299        ordered_list.add(val_1)300        ordered_list.add(val_2)301        ordered_list.add(val_3)302        ordered_list.delete(val_3)303        self.assertEqual(ordered_list.head.value, val_1)304        self.assertEqual(ordered_list.tail.value, val_2)305        self.assertEqual(ordered_list.get_all()[0].value, val_1)306        self.assertEqual(ordered_list.get_all()[1].value, val_2)307        self.assertEqual(ordered_list.find(val_3), None)308        self.assertEqual(ordered_list.len(), 2)309        ordered_list = OrderedStringList(asc=False)310        ordered_list.add(val_1)311        ordered_list.add(val_2)312        ordered_list.add(val_3)313        ordered_list.delete(val_3)314        self.assertEqual(ordered_list.head.value, val_2)315        self.assertEqual(ordered_list.tail.value, val_1)316        self.assertEqual(ordered_list.get_all()[0].value, val_2)317        self.assertEqual(ordered_list.get_all()[1].value, val_1)318        self.assertEqual(ordered_list.find(val_3), None)319        self.assertEqual(ordered_list.len(), 2)320    def test_del_equal(self):321        val_1 = 0322        val_2 = 2323        val_3 = 1324        val_4 = 1325        val_5 = 2326        val_6 = 1327        val_7 = 0328        ordered_list = OrderedList(asc=True)329        ordered_list.add(val_1)330        ordered_list.add(val_2)331        ordered_list.add(val_3)332        ordered_list.add(val_4)333        ordered_list.add(val_5)334        ordered_list.add(val_6)335        ordered_list.add(val_7)336        ordered_list.delete(1)337        self.assertEqual(ordered_list.head.value, val_1)338        self.assertEqual(ordered_list.tail.value, val_5)339        self.assertEqual(ordered_list.get_all()[0].value, val_1)340        self.assertEqual(ordered_list.get_all()[3].value, val_6)341        self.assertEqual(ordered_list.get_all()[-1].value, val_5)342        self.assertEqual(ordered_list.len(), 6)343        ordered_list.delete(2)344        self.assertEqual(ordered_list.head.value, val_1)345        self.assertEqual(ordered_list.tail.value, val_5)346        self.assertEqual(ordered_list.get_all()[0].value, val_1)347        self.assertEqual(ordered_list.get_all()[3].value, val_6)348        self.assertEqual(ordered_list.get_all()[-1].value, val_5)349        self.assertEqual(ordered_list.len(), 5)350        ordered_list.delete(2)351        self.assertEqual(ordered_list.head.value, val_1)352        self.assertEqual(ordered_list.tail.value, val_6)353        self.assertEqual(ordered_list.get_all()[0].value, val_1)354        self.assertEqual(ordered_list.get_all()[3].value, val_6)355        self.assertEqual(ordered_list.get_all()[-1].value, val_6)356        self.assertEqual(ordered_list.len(), 4)357        ordered_list.delete(1)358        self.assertEqual(ordered_list.head.value, val_1)359        self.assertEqual(ordered_list.tail.value, val_6)360        self.assertEqual(ordered_list.get_all()[0].value, val_1)361        self.assertEqual(ordered_list.get_all()[1].value, val_7)362        self.assertEqual(ordered_list.get_all()[-1].value, val_6)363        self.assertEqual(ordered_list.len(), 3)364        ordered_list.delete(0)365        self.assertEqual(ordered_list.head.value, val_7)366        self.assertEqual(ordered_list.tail.value, val_6)367        self.assertEqual(ordered_list.get_all()[0].value, val_7)368        self.assertEqual(ordered_list.get_all()[-1].value, val_6)369        self.assertEqual(ordered_list.len(), 2)370        ordered_list.delete(1)371        self.assertEqual(ordered_list.head.value, val_7)372        self.assertEqual(ordered_list.tail.value, val_7)373        self.assertEqual(ordered_list.get_all()[0].value, val_7)374        self.assertEqual(ordered_list.len(), 1)375    def del_one(self):376        val_1 = 12377        ordered_list = OrderedList(asc=True)378        ordered_list.add(val_1)379        ordered_list.delete(12)380        self.assertEqual(ordered_list.head, None)381        self.assertEqual(ordered_list.tail, None)382        self.assertEqual(ordered_list.get_all(), [])383        self.assertEqual(ordered_list.len(), 0)384    # cleaning block385    def test_clean(self):386        val_1 = 12387        val_2 = 55388        val_3 = 100389        ordered_list = OrderedList(asc=True)390        ordered_list.add(val_1)391        ordered_list.add(val_2)392        ordered_list.add(val_3)393        ordered_list.clean(asc=True)394        self.assertEqual(ordered_list.len(), 0)395        self.assertEqual(ordered_list.head, None)396        self.assertEqual(ordered_list.tail, None)...adult_fair_classification.py
Source:adult_fair_classification.py  
1import os2from itertools import product3import numpy as np4import pandas as pd5from cvxopt import matrix, solvers6from sklearn.linear_model import LinearRegression7from sklearn.linear_model import LogisticRegression8from sklearn.metrics import accuracy_score9from sklearn.svm import SVC10from adult_detect import detect_after_remove11from model.dataset import DataSet12from model.loadxml import load_xml_to_cbn13from model.variable import Event14from model.variable import Variable15src_path = os.path.dirname (os.path.realpath (__file__))16yt_pos = 117spos = 118sneg = 019tau = 0.050 - 0.001020def method0(acc_matrix):21	"""22	use all information, regardless fairness23	:return:24	"""25	train_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_train.csv'))26	test_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_test.csv'))27	x = train_df[['age', 'sex', 'workclass', 'education', 'marital-status', 'hours']].values28	y = train_df['income'].values29	acc = []30	for name, clf in zip (['LR', 'SVM'], [LogisticRegression (penalty='l2', solver='liblinear'), SVC (kernel='poly', gamma='auto')]):31		clf.fit (x, y)32		train_df[name] = clf.predict (train_df[['age', 'sex', 'workclass', 'education', 'marital-status', 'hours']].values)33		test_df[name] = clf.predict (test_df[['age', 'sex', 'workclass', 'education', 'marital-status', 'hours']].values)34		acc.append (accuracy_score (train_df['income'], train_df[name]))35		acc.append (accuracy_score (test_df['income'], test_df[name]))36	acc_matrix.iloc[:, 0] = acc37	train_df.to_csv (os.path.join ('temp/adult_binary_train_prediction0.csv'), index=False)38	test_df.to_csv (os.path.join ('temp/adult_binary_test_prediction0.csv'), index=False)39def method1(acc_matrix):40	"""41	we only use the non-descendants of S42	:return:43	"""44	train_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_train.csv'))45	test_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_test.csv'))46	x = train_df[['age', 'education']].values47	y = train_df['income'].values48	acc = []49	for name, clf in zip (['LR', 'SVM'], [LogisticRegression (penalty='l2', solver='liblinear'), SVC (kernel='poly', gamma='auto')]):50		clf.fit (x, y)51		train_df[name] = clf.predict (train_df[['age', 'education']].values)52		test_df[name] = clf.predict (test_df[['age', 'education']].values)53		acc.append (accuracy_score (train_df['income'], train_df[name]))54		acc.append (accuracy_score (test_df['income'], test_df[name]))55	acc_matrix.iloc[:, 1] = acc56	test_df.to_csv ('temp/adult_binary_test_prediction1.csv', index=False)57def method2(acc_matrix):58	train_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_train.csv'))59	test_df = pd.read_csv (os.path.join (src_path, '../data/adult/adult_binary_test.csv'))60	cbn = load_xml_to_cbn (os.path.join (src_path, '../data/adult/adult.xml'))61	# estimate residual error for the descendants62	for att in ['marital-status', 'workclass', 'hours']:63		att_index = cbn.v.name_dict[att].index64		parents_index = cbn.index_graph.pred[att_index].keys ()65		parents = [cbn.v[i].name for i in parents_index]66		regression = LinearRegression ()67		regression.fit (train_df[parents], train_df[att])68		train_df[att + '-error'] = train_df[att] - regression.predict (train_df[parents])69		test_df[att + '-error'] = test_df[att] - regression.predict (test_df[parents])70	x = train_df[['age', 'education'] + [att + '-error' for att in ['marital-status', 'workclass', 'hours']]].values71	y = train_df['income'].values72	acc = []73	# build classifiers using residual errors74	for name, clf in zip (['LR', 'SVM'], [LogisticRegression (penalty='l2', solver='liblinear'), SVC (kernel='poly', gamma='auto')]):75		clf.fit (x, y)76		train_df[name] = clf.predict (train_df[['age', 'education'] + [att + '-error' for att in ['marital-status', 'workclass', 'hours']]].values)77		test_df[name] = clf.predict (test_df[['age', 'education'] + [att + '-error' for att in ['marital-status', 'workclass', 'hours']]].values)78		acc.append (accuracy_score (train_df['income'], train_df[name]))79		acc.append (accuracy_score (test_df['income'], test_df[name]))80	acc_matrix.iloc[:, 2] = acc81	test_df.drop ([att + '-error' for att in ['marital-status', 'workclass', 'hours']], axis=1)82	test_df.to_csv ('temp/adult_binary_test_prediction2.csv', index=False)83def method3(acc_matrix):84	df_train = pd.read_csv ('temp/adult_binary_train_prediction0.csv')85	# df_train = pd.concat ([df_train] * 10, ignore_index=True)86	train = DataSet (df_train)87	df_test = pd.read_csv ('temp/adult_binary_test_prediction0.csv')88	df_test = pd.concat ([df_test] * 3, ignore_index=True)89	test = DataSet (df_test)90	acc = []91	for name in ['LR', 'SVM']:92		probabilistic_cbn = load_xml_to_cbn (os.path.join (src_path, '../data/adult/adult.xml'))93		def find_condition_prob(e, t):94			return probabilistic_cbn.find_prob (e, t)95		def get_loc(e):96			return probabilistic_cbn.get_loc (e)97		A1 = probabilistic_cbn.v['age']98		A2 = probabilistic_cbn.v['education']99		S = probabilistic_cbn.v['sex']100		M1 = probabilistic_cbn.v['workclass']101		M2 = probabilistic_cbn.v['marital-status']102		N = probabilistic_cbn.v['hours']103		Y = probabilistic_cbn.v['income']104		YH = Variable (name=name, index=Y.index + 1, domains=Y.domains)105		probabilistic_cbn.v[(YH.index, YH.name)] = YH106		YT = Variable (name=name + "M", index=Y.index + 2, domains=Y.domains)107		probabilistic_cbn.v[(YT.index, YT.name)] = YT108		# build linear loss function109		C_vector = np.zeros ((2 ** 8 + 2 ** 8 // 4, 1))110		for a1, a2, n, m1, m2, s in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), M1.domains.get_all (), M2.domains.get_all (),111											 S.domains.get_all ()):112			p_x_s = train.get_marginal_prob (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))113			p_yh_1_y = p_x_s * train.count (Event ({Y: 0, YH: 0}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}), 'notequal')114			loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: 0, YT: 0}))115			C_vector[loc] = p_yh_1_y * train.get_conditional_prob (Event ({YH: 0}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))116			loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: 1, YT: 1}))117			C_vector[loc] = p_yh_1_y * train.get_conditional_prob (Event ({YH: 1}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))118			p_yh__y = p_x_s * train.count (Event ({Y: 0, YH: 0}), Event ({A1: a1, A2: a2, M1: m1, M2: m2, N: n, S: s}), 'equal')119			loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: 0, YT: 1}))120			C_vector[loc] = p_yh__y * train.get_conditional_prob (Event ({YH: 0}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))121			loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: 1, YT: 0}))122			C_vector[loc] = p_yh__y * train.get_conditional_prob (Event ({YH: 1}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))123		# the inequality of max and min124		G_matrix_1 = np.zeros ((2 ** 8, 2 ** 8 + 2 ** 8 // 4))125		h_1 = np.zeros (2 ** 8)126		# max127		i = 0128		for a1, a2, n, s, yt in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), S.domains.get_all (), YT.domains.get_all ()):129			for m1, m2 in product (M1.domains.get_all (), M2.domains.get_all ()):130				for yh in YH.domains.get_all ():131					loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))132					G_matrix_1[i, loc] = train.get_conditional_prob (Event ({YH: yh}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))133				loc = get_loc (Event ({A1: a1, A2: a2, N: n, S: s, YT: yt}))134				G_matrix_1[i, 2 ** 8 + loc] = -1135				i += 1136		# min137		assert i == 2 ** 8 // 2138		for a1, a2, n, s, yt in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), S.domains.get_all (), YT.domains.get_all ()):139			for m1, m2 in product (M1.domains.get_all (), M2.domains.get_all ()):140				for yh in YH.domains.get_all ():141					loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))142					G_matrix_1[i, loc] = -train.get_conditional_prob (Event ({YH: yh}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}))143				loc = get_loc (Event ({A1: a1, A2: a2, N: n, S: s, YT: yt}))144				G_matrix_1[i, 2 ** 8 + 2 ** 8 // 8 + loc] = 1145				i += 1146		# build counterfactual fairness constraints147		G_matrix_2 = np.zeros ((2 ** 4 * 2, 2 ** 8 + 2 ** 8 // 4))148		h_2 = np.ones (2 ** 4 * 2) * tau149		i = 0150		for a1, a2, m1, m2 in product (A1.domains.get_all (), A2.domains.get_all (), M1.domains.get_all (), M2.domains.get_all ()):151			for n in N.domains.get_all ():152				loc = get_loc (Event ({A1: a1, A2: a2, N: n, S: spos, YT: yt_pos}))153				G_matrix_2[i, 2 ** 8 + loc] = find_condition_prob (Event ({N: n}), Event ({A1: a1, A2: a2, M1: m1, M2: m2, S: spos}))154				for yh in YH.domains.get_all ():155					loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: sneg, YH: yh, YT: yt_pos}))156					G_matrix_2[i, loc] = -find_condition_prob (Event ({N: n}), Event ({A1: a1, A2: a2, M1: m1, M2: m2, S: sneg})) \157										 * train.get_conditional_prob (Event ({YH: yh}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: sneg}))158			i += 1159		assert i == 2 ** 4160		for a1, a2, m1, m2 in product (A1.domains.get_all (), A2.domains.get_all (), M1.domains.get_all (), M2.domains.get_all ()):161			for n in N.domains.get_all ():162				loc = get_loc (Event ({A1: a1, A2: a2, N: n, S: spos, YT: yt_pos}))163				G_matrix_2[i, 2 ** 8 + 2 ** 8 // 8 + loc] = -find_condition_prob (Event ({N: n}), Event ({A1: a1, A2: a2, M1: m1, M2: m2, S: spos}))164				for yh in YH.domains.get_all ():165					loc = get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: sneg, YH: yh, YT: yt_pos}))166					G_matrix_2[i, loc] = find_condition_prob (Event ({N: n}), Event ({A1: a1, A2: a2, M1: m1, M2: m2, S: sneg})) \167										 * train.get_conditional_prob (Event ({YH: yh}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: sneg}))168			i += 1169		###########170		# mapping in [0, 1]171		G_matrix_3 = np.zeros ((2 * (2 ** 8 + 2 ** 8 // 4), 2 ** 8 + 2 ** 8 // 4))172		h_3 = np.zeros (2 * (2 ** 8 + 2 ** 8 // 4))173		for i in range (2 ** 8 + 2 ** 8 // 4):174			G_matrix_3[i, i] = 1175			h_3[i] = 1176			G_matrix_3[2 ** 8 + 2 ** 8 // 4 + i, i] = -1177			h_3[2 ** 8 + 2 ** 8 // 4 + i] = 0178		# sum = 1179		A_matrix = np.zeros ((2 ** 8 // 2, 2 ** 8 + 2 ** 8 // 4))180		b = np.ones (2 ** 8 // 2)181		i = 0182		for a1, a2, n, m1, m2, s, yh in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), M1.domains.get_all (), M2.domains.get_all (),183												 S.domains.get_all (),184												 YH.domains.get_all ()):185			for yt in YT.domains.get_all ():186				A_matrix[i, get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))] = 1187			i += 1188		assert i == 2 ** 8 // 2189		# combine the inequality constraints190		G_matrix = np.vstack ([G_matrix_1, G_matrix_2, G_matrix_3])191		h = np.hstack ([h_1, h_2, h_3])192		# Test193		# print (np.linalg.matrix_rank (A_matrix), A_matrix.shape[0])194		# print (np.linalg.matrix_rank (np.vstack ([A_matrix, G_matrix])), A_matrix.shape[1])195		# def check():196		# 	sol = np.zeros (2 ** 8 + 2 ** 8 // 4)197		# 	for a1, a2, n, m1, m2, s, yh, yt in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), M1.domains.get_all (), M2.domains.get_all (),198		# 												 S.domains.get_all (), YH.domains.get_all (), YT.domains.get_all ()):199		# 		if yh.name == yt.name:200		# 			sol[get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))] = 1.0201		# 		else:202		# 			sol[get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))] = 0.0203		#204		# 	for a1, a2, n, s, yt in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), S.domains.get_all (), YT.domains.get_all ()):205		# 		p_min = 1206		# 		p_max = 0207		# 		for m1, m2 in product (M1.domains.get_all (), M2.domains.get_all ()):208		# 			p = 0.0209		# 			for yh in YH.domains.get_all ():210		# 				p = train.get_conditional_prob (Event ({YH: yh}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s})) \211		# 					* sol[get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt}))]212		# 			if p < p_min:213		# 				p_min = p214		# 			if p > p_max:215		# 				p_max = p216		# 		loc = get_loc (Event ({A1: a1, A2: a2, N: n, S: s, YT: yt}))217		# 		sol[2 ** 8 + loc] = p_max218		# 		sol[2 ** 8 + 2 ** 8 // 8 + loc] = p_min219		#220		# 	np.dot (G_matrix_2, sol)221		# check ()222		# solver223		solvers.options['show_progress'] = False224		sol = solvers.lp (c=matrix (C_vector),225						  G=matrix (G_matrix),226						  h=matrix (h),227						  A=matrix (A_matrix),228						  b=matrix (b),229						  solver=solvers230						  )231		mapping = np.array (sol['x'])232		# build the post-processing result in training and testing233		train.df.loc[:, name + 'M'] = train.df[name]234		test.df[name + 'M'] = test.df[name]235		for a1, a2, n, m1, m2, s, yh, yt in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), M1.domains.get_all (), M2.domains.get_all (),236													 S.domains.get_all (), YH.domains.get_all (), YT.domains.get_all ()):237			if yh.name != yt.name:238				p = mapping[get_loc (Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s, YH: yh, YT: yt})), 0]239				train.random_assign (Event ({YH: yh, A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}), Event ({YT: yt}), p)240				test.random_assign (Event ({YH: yh, A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}), Event ({YT: yt}), p)241		train.df[name] = train.df[name + 'M']242		train.df.drop ([name + 'M'], axis=1)243		test.df[name] = test.df[name + 'M']244		test.df.drop ([name + 'M'], axis=1)245		acc.append (accuracy_score (train.df[name], train.df[Y.name]))246		acc.append (accuracy_score (test.df[name], test.df[Y.name]))247	acc_matrix.iloc[:, 3] = acc248	train.df.to_csv ('temp/adult_binary_train_prediction3.csv', index=False)249	test.df.to_csv ('temp/adult_binary_test_prediction3.csv', index=False)250def detect_classifier(ce_matrix):251	cbn = load_xml_to_cbn (os.path.join (src_path, '../data/adult/adult.xml'))252	A1 = cbn.v['age']253	A2 = cbn.v['education']254	S = cbn.v['sex']255	M1 = cbn.v['workclass']256	M2 = cbn.v['marital-status']257	N = cbn.v['hours']258	Y = cbn.v['income']259	for i in [0, 1, 2, 3]:  # two datasets generated by two methods260		test = DataSet (pd.read_csv ('temp/adult_binary_test_prediction%d.csv' % i))261		for j, label in enumerate (['LR', 'SVM']):  # two classifiers262			# modify cpt of label before detect263			for a1, a2, n, m1, m2, s, y in product (A1.domains.get_all (), A2.domains.get_all (), N.domains.get_all (), M1.domains.get_all (), M2.domains.get_all (),264													S.domains.get_all (), Y.domains.get_all ()):265				cbn.set_conditional_prob (Event ({Y: y}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s}),266										  test.get_conditional_prob (Event ({label: y}), Event ({A1: a1, A2: a2, N: n, M1: m1, M2: m2, S: s})))267			cbn.build_joint_table ()268			for k, (a1prime, a2prime, m1prime, m2prime) in enumerate (product ([0, 1], [0, 1], [0, 1], [0, 1])):269				p_u, p_l = detect_after_remove (cbn=cbn, s=spos, sprime=sneg, y=1, a1prime=a1prime, a2prime=a2prime, m1prime=m1prime, m2prime=m2prime)270				p = detect_after_remove (cbn=cbn, s=sneg, sprime=sneg, y=1, a1prime=a1prime, a2prime=a2prime, m1prime=m1prime, m2prime=m2prime)271				ce_matrix.iloc[j * 32 + k, 2 * i:2 * i + 2] = [p_u - p, p_l - p]272			for k, (a1prime, a2prime, m1prime, m2prime) in enumerate (product ([0, 1], [0, 1], [0, 1], [0, 1])):273				p_u, p_l = detect_after_remove (cbn=cbn, s=sneg, sprime=spos, y=1, a1prime=a1prime, a2prime=a2prime, m1prime=m1prime, m2prime=m2prime)274				p = detect_after_remove (cbn=cbn, s=spos, sprime=spos, y=1, a1prime=a1prime, a2prime=a2prime, m1prime=m1prime, m2prime=m2prime)275				ce_matrix.iloc[j * 32 + k + 16, 2 * i:2 * i + 2] = [p_u - p, p_l - p]276if __name__ == '__main__':277	# preprocessing ()278	acc_matrix = pd.DataFrame (np.zeros ((4, 4)), columns=['BL', 'A1', 'A3', 'CF'])279	method0 (acc_matrix)280	method1 (acc_matrix)281	method2 (acc_matrix)282	method3 (acc_matrix)283	print (acc_matrix.round (5))284	ce_matrix = pd.DataFrame (data=np.zeros ((4 * 16, 2 * 4)), columns=['|- ub', 'lb -|'] * 4)285	detect_classifier (ce_matrix)286	pd.set_option ('display.max_columns', 8)287	pd.set_option ('display.max_rows', 64)288	ce_matrix = ce_matrix.drop (range (16, 32)).drop (range (48, 64))289	ce_matrix.to_csv ('output/adult_fair_classification_result.csv', index=False)...preprocessing.py
Source:preprocessing.py  
...129    W = complete_cbn.v['W']130    A = complete_cbn.v['A']131    Y_hat = complete_cbn.v['Y']132    print('Generate using conditional cpts')133    for s in S.domains.get_all():134        p = 0.0135        for us in US.domains.get_all():136            p = p + complete_cbn.get_prob(Event({S: s}), Event({US: us})) * complete_cbn.get_prob(Event({US: us}), Event({}))137        print(p)138    print()139    for s, w in product(S.domains.get_all(), W.domains.get_all()):140        p = 0.0141        for uw in UW.domains.get_all():142            p = p + complete_cbn.get_prob(Event({W: w}), Event({UW: uw, S: s})) * complete_cbn.get_prob(Event({UW: uw}), Event({}))143        print(p)144    print()145    for w, a in product(W.domains.get_all(), A.domains.get_all()):146        p = 0.0147        for ua in UA.domains.get_all():148            p = p + complete_cbn.get_prob(Event({A: a}), Event({UA: ua, W: w})) * complete_cbn.get_prob(Event({UA: ua}), Event({}))149        print(p)150    print()151    for s, w, a, y in product(S.domains.get_all(), W.domains.get_all(), A.domains.get_all(), Y_hat.domains.get_all()):152        p = 0.0153        for uy in UY.domains.get_all():154            p = p + complete_cbn.get_prob(Event({Y_hat: y}), Event({UY: uy, S: s, W: w, A: a})) * complete_cbn.get_prob(Event({UY: uy}), Event({}))155        print(p)156    print()157    print()158    print('Generate using joint cpt')159    complete_cbn.build_joint_table()160    for s in S.domains.get_all():161        print(complete_cbn.get_prob(Event({S: s})))162    print()163    for s, w in product(S.domains.get_all(), W.domains.get_all()):164        print(complete_cbn.get_prob(Event({W: w}), Event({S: s})))165    print()166    for w, a in product(W.domains.get_all(), A.domains.get_all()):167        print(complete_cbn.get_prob(Event({A: a}), Event({W: w})))168    print()169    for s, w, a, y in product(S.domains.get_all(), W.domains.get_all(), A.domains.get_all(), Y_hat.domains.get_all()):170        print(complete_cbn.get_prob(Event({Y_hat: y}), Event({S: s, W: w, A: a})))171    print()172    print()173    partial_cbn = load_xml_to_cbn(partial_model)174    partial_cbn.build_joint_table()175    S = partial_cbn.v['S']176    W = partial_cbn.v['W']177    A = partial_cbn.v['A']178    Y_hat = partial_cbn.v['Y']179    print('Generate using partial SCM')180    for s in S.domains.get_all():181        print(partial_cbn.get_prob(Event({S: s}), Event({})))182    print()183    for s, w in product(S.domains.get_all(), W.domains.get_all()):184        print(partial_cbn.get_prob(Event({W: w}), Event({S: s})))185    print()186    for w, a in product(W.domains.get_all(), A.domains.get_all()):187        print(partial_cbn.get_prob(Event({A: a}), Event({W: w})))188    print()189    for s, w, a, y in product(S.domains.get_all(), W.domains.get_all(), A.domains.get_all(), Y_hat.domains.get_all()):190        print(partial_cbn.get_prob(Event({Y_hat: y}), Event({S: s, W: w, A: a})))191    print()192    print()193# the following code is designed for testing and may not work in final version194if __name__ == '__main__':195    data_dir = '../data/demo/'196    temp_dir = '../temp/demo/'197    preprocess(data_dir, temp_dir, seed=0)198    complete_model = data_dir + 'complete_model.xml'199    partial_model = data_dir + 'observed_model.xml'...test_global_vendor_point_record_repository.py
Source:test_global_vendor_point_record_repository.py  
...18        johnson_site_id = Mock()19        johnson_fqr = Mock()20        table_mock = self.uow.tables.global_vendor_point_records21        self.repo.get_all_for_johnson_point(johnson_site_id, johnson_fqr)22        # table.get_all()23        table_mock.get_all.assert_called_with([johnson_site_id, johnson_fqr], index='johnson')24        return_value = table_mock.get_all.return_value25        self.uow.run_list.assert_called_with(return_value)26    def test_get_all_for_fieldserver(self):27        fieldserver_site_id = Mock()28        fieldserver_offset = Mock()29        table_mock = self.uow.tables.global_vendor_point_records30        self.repo.get_all_for_fieldserver_point(fieldserver_site_id, fieldserver_offset)31        # table.get_all()32        table_mock.get_all.assert_called_with([fieldserver_site_id, fieldserver_offset], index='fieldserver')33        return_value = table_mock.get_all.return_value34        self.uow.run_list.assert_called_with(return_value)35    def test_get_all_for_invensys_point(self):36        invensys_site_name = Mock()37        invensys_equipment_name = Mock()38        invensys_meter_name = Mock()39        table_mock = self.uow.tables.global_vendor_point_records40        self.repo.get_all_for_invensys_point(invensys_site_name, invensys_equipment_name, invensys_meter_name)41        # table,get_all42        table_mock.get_all.assert_called_with([invensys_site_name, invensys_equipment_name, invensys_meter_name],43                                              index='invensys')44        return_value = table_mock.get_all.return_value45        self.uow.run_list.assert_called_with(return_value)46    def test_get_all_for_siemens_point(self):47        siemens_meter_name = Mock()48        table_mock = self.uow.tables.global_vendor_point_records49        self.repo.get_all_for_siemens_point(siemens_meter_name)50        # table.get_all()51        table_mock.get_all.assert_called_with([siemens_meter_name], index='siemens')52        return_value = table_mock.get_all.return_value53        self.uow.run_list.assert_called_with(return_value)54    def test_date_map_func(self):55        record = {'date': '1', 'Test': 1}56        rv = self.repo.date_map_func(record)57        self.assertEqual('1', rv)58    @patch("db.repositories.global_vendor_point_record_repository.GlobalVendorPointRecordRepository.date_map_func")59    def test_get_existing_dates_for_johnson(self, date_map_func):60        site_id = Mock()61        fqr = Mock()62        table_mock = self.uow.tables.global_vendor_point_records63        rv = self.repo.get_existing_dates_for_johnson(site_id, fqr)64        # table.get_all()65        table_mock.get_all.assert_called_with([site_id, fqr], index='johnson')66        return_value = table_mock.get_all.return_value67        # table.get_all().map()68        return_value.map.assert_called_with(date_map_func)69        return_value = return_value.map.return_value70        self.uow.run_list.assert_called_with(return_value)71        return_value = self.uow.run_list.return_value72        self.assertEqual(rv, return_value)73    @patch("db.repositories.global_vendor_point_record_repository.GlobalVendorPointRecordRepository.date_map_func")74    def test_get_existing_dates_for_fieldserver(self, date_map_func):75        site_id = Mock()76        offset = Mock()77        table_mock = self.uow.tables.global_vendor_point_records78        rv = self.repo.get_existing_dates_for_fieldserver(site_id, offset)79        # table.get_all()80        table_mock.get_all.assert_called_with([site_id, offset], index='fieldserver')81        return_value = table_mock.get_all.return_value82        # table.get_all().map()83        return_value.map.assert_called_with(date_map_func)84        return_value = return_value.map.return_value85        self.uow.run_list.assert_called_with(return_value)86        return_value = self.uow.run_list.return_value87        self.assertEqual(rv, return_value)88    @patch("db.repositories.global_vendor_point_record_repository.GlobalVendorPointRecordRepository.date_map_func")89    def test_get_existing_dates_for_invensys(self, date_map_func):90        site_name = Mock()91        equipment_name = Mock()92        point_name = Mock()93        table_mock = self.uow.tables.global_vendor_point_records94        rv = self.repo.get_existing_dates_for_invensys(site_name, equipment_name, point_name)95        # table.get_all()96        table_mock.get_all.assert_called_with([site_name, equipment_name, point_name], index='invensys')97        return_value = table_mock.get_all.return_value98        # table.get_all().map()99        return_value.map.assert_called_with(date_map_func)100        return_value = return_value.map.return_value101        self.uow.run_list.assert_called_with(return_value)102        return_value = self.uow.run_list.return_value103        self.assertEqual(return_value, rv)104    @patch("db.repositories.global_vendor_point_record_repository.GlobalVendorPointRecordRepository.date_map_func")105    def test_get_existing_dates_for_siemens(self, date_map_func):106        siemens_meter_name = Mock()107        table_mock = self.uow.tables.global_vendor_point_records108        rv = self.repo.get_existing_dates_for_siemens(siemens_meter_name)109        # table.get_all()110        table_mock.get_all.assert_called_with([siemens_meter_name], index='siemens')111        return_value = table_mock.get_all.return_value112        # table.get_all().map()113        return_value.map.assert_called_with(date_map_func)114        return_value = return_value.map.return_value115        self.uow.run_list.assert_called_with(return_value)116        return_value = self.uow.run_list.return_value117        self.assertEqual(return_value, rv)118    def test_delete_all_for_johnson_point(self):119        site_id = Mock()120        fqr = Mock()121        table_mock = self.uow.tables.global_vendor_point_records122        self.repo.delete_all_for_johnson_point(site_id, fqr)123        # table.get_all()124        table_mock.get_all.assert_called_with([site_id, fqr], index='johnson')125        return_value = table_mock.get_all.return_value126        # table.get_all().delete()127        return_value.delete.assert_called_with()128        return_value = return_value.delete.return_value129        self.uow.run.assert_called_with(return_value)130    def test_delete_all_for_fieldserver_point(self):131        site_id = Mock()132        offset = Mock()133        table_mock = self.uow.tables.global_vendor_point_records134        self.repo.delete_all_for_fieldserver_point(site_id, offset)135        # table.get_all()136        table_mock.get_all.assert_called_with([site_id, offset], index='fieldserver')137        return_value = table_mock.get_all.return_value138        # table.get_all().delete()139        return_value.delete.assert_called_with()140        return_value = return_value.delete.return_value141        self.uow.run.assert_called_with(return_value)142    def test_delete_all_for_invensys_point(self):143        site_name = Mock()144        equip_name = Mock()145        point_name = Mock()146        table_mock = self.uow.tables.global_vendor_point_records147        self.repo.delete_all_for_invensys_point(site_name, equip_name, point_name)148        # table.get_all()149        table_mock.get_all.assert_called_with([site_name, equip_name, point_name], index='invensys')150        return_value = table_mock.get_all.return_value151        # table.get_all().delete()152        return_value.delete.assert_called_with()153        return_value = return_value.delete.return_value154        self.uow.run.assert_called_with(return_value)155    def test_delete_all_for_siemens_point(self):156        meter_name = Mock()157        table_mock = self.uow.tables.global_vendor_point_records158        self.repo.delete_all_for_siemens_point(meter_name)159        # table.get_all()160        table_mock.get_all.assert_called_with([meter_name], index='siemens')161        return_value = table_mock.get_all.return_value162        # table.get_all().delete()163        return_value.delete.assert_called_with()164        return_value = return_value.delete.return_value...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
