How to use _list_subtract method in Testify

Best Python code snippet using Testify_python

anett_admm.py

Source:anett_admm.py Github

copy

Full Screen

...65 # Shrinkage operator for minimization in xi66 self._enc_x = self._encoder(self._x_var)67 self._update_xi_variable = self._xi_update()6869 self._constraint_error = self._list_norm(self._list_subtract(self._enc_x, self._xi_var))7071 # Dual ascent update72 self._update_dual_variable = self._dual_update()7374 # Optimizer with momentum for minimization in x75 self._optimizer_momentum = tf.train.MomentumOptimizer(learning_rate=self._lr,76 momentum=self._mom,77 use_nesterov=True)78 self._minimize_momentum = self._optimizer_momentum.minimize(self._loss_x, var_list=[self._x_var])7980 # Variable initializer for all variables!81 self._var_init = tf.variables_initializer([self._x_var] + self._xi_var + self._dual_var +82 self._optimizer_momentum.variables())83 print("ANETT initialization successful!", flush=True)84 pass8586 def _xi_update(self):87 if isinstance(self._enc_x, list):88 ret = [z.assign(self._shrinkage(e + u, (self._alpha/self._rho)*w)) for e, z, u, w in89 zip(self._enc_x, self._xi_var, self._dual_var, self._weights)]90 else:91 ret = self._xi_var[0].assign(self._shrinkage(self._enc_x + self._dual_var[0], (self._alpha/self._rho)*self._weights[0]))92 return ret9394 def _dual_update(self):95 if isinstance(self._enc_x, list):96 ret = [u.assign(u+e-xi) for u, e, xi in zip(self._dual_var, self._enc_x, self._xi_var)]97 else:98 ret = self._dual_var[0].assign(self._dual_var[0] + self._enc_x - self._xi_var[0])99 return ret100101 def _variable_initialization(self, x0):102 temp = np.asarray(x0).reshape((1, self._img_height, self._img_width, 1))103 xi_inp = self._sess.run(self._xi_init, feed_dict={self._x: temp})104 fd = {self._x: temp}105 if isinstance(xi_inp, list):106 for i in range(len(xi_inp)):107 fd[self._xi[i].name] = xi_inp[i] # initialize xi[i] as E(x)[i]108 fd[self._dual[i].name] = np.zeros(self._input_shape[i]) # initialize u[i] as zero109 else:110 fd[self._xi[0].name] = xi_inp111 fd[self._dual[0].name] = np.zeros(self._input_shape[0])112113 self._sess.run(self._var_init, feed_dict=fd)114 del xi_inp115 del fd116 del temp117 pass118119 def _update_x_variable(self, feed_dict, niter=100, tol=10**(-5)):120 err = [self._sess.run(self._loss_x, feed_dict=feed_dict)]121 improv = 1122 while (improv > tol) and (len(err) <= niter):123 self._sess.run(self._minimize_momentum, feed_dict=feed_dict) # make gradient step with momentum124 err.append(self._sess.run(self._loss_x, feed_dict=feed_dict))125 improv = err[-2] - err[-1] # calculates improvement of loss function126127 return improv128129 def reconstruct(self, x0, data, niter=10, lr=10**(-3), alpha=10**(-3), beta=10**(-3), rho=10**(-3),130 niterx=100, mom=0.8, tol=10**(-3)):131 self._variable_initialization(x0=x0)132 fd = {self._data: data,133 self._alpha: alpha,134 self._beta: beta,135 self._rho: rho,136 self._lr: 0,137 self._mom: mom}138 err = [self._sess.run(self._loss_total, feed_dict=fd)]139 tolerances = []140 for it in range(niter):141 fd[self._lr] = lr(it) if callable(lr) else lr142143 impro = self._update_x_variable(feed_dict=fd, niter=niterx, tol=tol) # Step 1: argmin_x144 tolerances.append(impro)145 self._sess.run(self._update_xi_variable, feed_dict=fd) # Step 2: argmin_xi146 self._sess.run(self._update_dual_variable, feed_dict=fd) # Step 3: Dual ascent147148 err.append(self._sess.run(self._loss_total, feed_dict=fd)) # Calculate loss after iteration149150 xout = self._sess.run(self._x_var, feed_dict=fd)151 xout = xout.reshape((self._img_height, self._img_width))152 del fd153154 xdec = self._sess.run(self._x_decoded).reshape((self._img_height, self._img_width))155 return xout, xdec, err, tolerances156157 def _regularizer_lq(self, w, q=1):158 def reg(x=None, xi=None):159 assert (x is not None) or (xi is not None)160 if xi is None:161 xi = self._encoder(x)162 return K.sum([tf.norm(xi[i]*w[i], ord=q)**q for i in range(len(w))])163 return reg164165 def _regularizer_reconstructable(self, p=2):166 def reg(x=None, xi=None):167 if xi is None:168 xi = self._encoder(x)169 xrec = self._decoder(xi)170 return (1./p) * self._norm(x - xrec, p=p)171 return reg172173 def add_regularizer(self, reg):174 self._loss_x += reg(self._x_var)175 self._loss_total += reg(self._x_var)176 print("Added addiotional regularizer!", flush=True)177 pass178179 def _data_discrepancy(self, x, data, p=2, loss='gauss', mu=0.02, photons=1e4):180 data = tf.reshape(tf.convert_to_tensor(data), (1,) + self._shape + (1,))181 if loss == 'gauss':182 ret = (1./p)*self._norm(self._operator(x) - data, p=p)183 elif loss == 'poisson':184 k_value = (tf.exp(-mu*data)*photons - 1)185 lambda_x = tf.exp(-mu*self._operator(x))*photons186 pois = lambda_x - k_value*tf.log(lambda_x)187 ret = tf.reduce_sum(pois)188 elif loss == 'poisson_approx':189 k_value = (tf.exp(-mu * data) * photons - 1)190 lambda_x = tf.exp(-mu * self._operator(x)) * photons191 ret = tf.log(lambda_x) + (1./lambda_x)*tf.squared_difference(lambda_x, k_value)192 ret = 0.5*tf.reduce_sum(ret)193 elif loss == 'poisson_l2':194 k_value = (tf.exp(-mu * data) * photons - 1)195 lambda_x = tf.exp(-mu * self._operator(x)) * photons196 ret = tf.squared_difference(lambda_x, k_value)/lambda_x197 ret = 0.5*tf.reduce_sum(ret)198 elif loss == 'kl':199 k_value = (tf.exp(-mu * data) * photons - 1)200 k_value = k_value/tf.reduce_sum(k_value)201 lambda_x = tf.exp(-mu * self._operator(x)) * photons202 lambda_x = lambda_x/tf.reduce_sum(lambda_x)203204 ret = tf.reduce_sum(lambda_x*tf.log(lambda_x/k_value))205 elif loss == 'mixture':206 # Poisson207 k_value = (tf.exp(-mu * data) * photons - 1)208 lambda_x = tf.exp(-mu * self._operator(x)) * photons209 ret = tf.squared_difference(lambda_x, k_value) / lambda_x210 ret = 0.5 * tf.reduce_sum(ret)211212 # l2213 ret += (1. / p) * self._norm(self._operator(x) - data, p=p)214215 else:216 ret = tf.zeros(1)217 print("WARNING: No data-discrepancy chosen!", flush=True)218 return ret219220 def _augmented_lagrangian(self):221 v = self._list_subtract(self._xi_var, self._dual_var)222223 def loss(x):224 xi = self._encoder(x)225 ret = self._list_norm(self._list_subtract(xi, v)) if isinstance(xi, list) else self._norm(xi - v)226 return (1./2)*ret227 return loss228229 def _decaying_weights(self):230 w = []231 for s in self._decoder.inputs:232 t = s.shape[1:]233 scale = 2 ** (1 + np.log2(s.shape[1].value) - np.log2(self._img_height))234 w.append(np.ones([1, ] + [z.value for z in t]) * scale)235 return w236237 def _constant_weights(self):238 return [np.ones([1, ] + [z.value for z in s.shape[1:]]) for s in self._decoder.inputs]239240 @staticmethod241 def _shrinkage(xi, gamma):242 return tf.maximum(tf.abs(xi) - gamma, 0) * tf.sign(xi)243244 @staticmethod245 def _norm(x, p=2):246 """247 Implementation of p-norm to the power of p. This is used in optimization since tf.norm is numerically248 instable for x = 0.249 """250 return K.sum(K.pow(K.abs(x), p))251252 @staticmethod253 def _list_subtract(a, b):254 if isinstance(a, list):255 ret = [i - j for i, j in zip(a, b)]256 else:257 ret = a - b258 return ret259260 def _list_norm(self, a, p=2):261 if isinstance(a, list):262 ret = K.sum([self._norm(i, p=p) for i in a])263 else:264 ret = self._norm(a, p=p)265 return ret266267 def predict(self, x): ...

Full Screen

Full Screen

anett_admm_batch.py

Source:anett_admm_batch.py Github

copy

Full Screen

...6364 # Shrinkage operator for minimization in xi6566 self._update_xi_variable = self._xi_update()67 self._constraint_error = self._list_norm(self._list_subtract(self._enc_x, self._xi_var))6869 # Dual ascent update70 self._update_dual_variable = self._dual_update()7172 # Optimizer with momentum for minimization in x73 self._optimizer_momentum = tf.train.MomentumOptimizer(learning_rate=self._lr,74 momentum=self._mom,75 use_nesterov=True)76 self._minimize_momentum = self._optimizer_momentum.minimize(self._loss_x, var_list=[self._x_var])7778 # Variable initializer for all variables!79 self._var_init = tf.variables_initializer([self._x_var] + self._xi_var + self._dual_var +80 self._optimizer_momentum.variables())81 print("ANETT initialization successful!", flush=True)82 pass8384 def _xi_update(self):85 if isinstance(self._enc_x, list):86 ret = [z.assign(self._shrinkage(e + u, (self._alpha/self._rho)*w)) for e, z, u, w in87 zip(self._enc_x, self._xi_var, self._dual_var, self._weights)]88 else:89 ret = self._xi_var[0].assign(self._shrinkage(self._enc_x + self._dual_var[0], (self._alpha/self._rho)*self._weights[0]))90 return ret9192 def _dual_update(self):93 if isinstance(self._enc_x, list):94 ret = [u.assign(u+e-xi) for u, e, xi in zip(self._dual_var, self._enc_x, self._xi_var)]95 else:96 ret = self._dual_var[0].assign(self._dual_var[0] + self._enc_x - self._xi_var[0])97 return ret9899 def _variable_initialization(self, x0):100 fd = {self._x: x0}101 xi_inp = self._sess.run(self._xi, feed_dict=fd)102103 if isinstance(xi_inp, list):104 for i in range(len(xi_inp)):105 fd[self._xi[i].name] = xi_inp[i] # initialize xi[i] as E(x)[i]106 fd[self._dual[i].name] = np.zeros(self._input_shape[i]) # initialize u[i] as zero107 else:108 fd[self._xi[0].name] = xi_inp109 fd[self._dual[0].name] = np.zeros(self._input_shape[0])110111 self._sess.run(self._var_init, feed_dict=fd)112 del xi_inp113 del fd114 pass115116 def _update_x_variable(self, feed_dict, niter=100, tol=10**(-5)):117 err = [self._sess.run(self._loss_x, feed_dict=feed_dict)]118 # Improv has to be remade to avoid weird stuff happening because of batches119 for i in range(niter):120 self._sess.run(self._minimize_momentum, feed_dict=feed_dict) # make gradient step with momentum121 err.append(self._sess.run(self._loss_x, feed_dict=feed_dict))122 pass123124 def reconstruct(self, x0, data, niter=10, lr=10**(-3), alpha=10**(-3), beta=10**(-3), rho=10**(-3),125 niterx=100, mom=0.8, tol=10**(-3)):126 self._variable_initialization(x0=x0)127 fd = {self._data: data[..., None],128 self._alpha: alpha,129 self._beta: beta,130 self._rho: rho,131 self._lr: 0,132 self._mom: mom}133 err = [self._sess.run(self._loss_total, feed_dict=fd)]134135 for it in range(niter):136 fd[self._lr] = lr(it) if callable(lr) else lr137138 self._update_x_variable(feed_dict=fd, niter=niterx, tol=tol) # Step 1: argmin_x139 self._sess.run(self._update_xi_variable, feed_dict=fd) # Step 2: argmin_xi140 self._sess.run(self._update_dual_variable, feed_dict=fd) # Step 3: Dual ascent141142 err.append(self._sess.run(self._loss_total, feed_dict=fd)) # Calculate loss after iteration143144 xout = self._sess.run(self._x_var, feed_dict=fd)145 xdec = self._sess.run(self._x_decoded)146 return xout, xdec, err147148 def _regularizer_lq(self, q=1):149 return K.sum([tf.norm(self._enc_x[i], ord=q)**q for i in range(len(self._enc_x))])150151 def _regularizer_reconstructable(self, p=2):152 return (1./p) * self._norm(self._x_var - self._x_decoded, p=p)153154 def add_regularizer(self, reg):155 self._loss_x += reg(self._x_var)156 self._loss_total += reg(self._x_var)157 print("Added addiotional regularizer!", flush=True)158 pass159160 def _data_discrepancy(self, p=2, loss='gauss', mu=0.02, photons=1e4):161 if loss == 'gauss':162 ret = (1./p)*self._norm(self._operator(self._x_var) - self._data, p=p)163 elif loss == 'poisson':164 k_value = (tf.exp(-mu*self._data)*photons - 1)165 lambda_x = tf.exp(-mu*self._operator(self._x_var))*photons166 pois = lambda_x - k_value*tf.log(lambda_x)167 ret = tf.reduce_sum(pois)168 elif loss == 'poisson_approx':169 k_value = (tf.exp(-mu * self._data) * photons - 1)170 lambda_x = tf.exp(-mu * self._operator(self._x_var)) * photons171 ret = tf.log(lambda_x) + (1./lambda_x)*tf.squared_difference(lambda_x, k_value)172 ret = 0.5*tf.reduce_sum(ret)173 elif loss == 'poisson_l2':174 k_value = (tf.exp(-mu * self._data) * photons - 1)175 lambda_x = tf.exp(-mu * self._operator(self._x_var)) * photons176 ret = tf.squared_difference(lambda_x, k_value)/lambda_x177 ret = 0.5*tf.reduce_sum(ret)178 elif loss == 'kl':179 k_value = (tf.exp(-mu * self._data) * photons - 1)180 lambda_x = tf.exp(-mu * self._operator(self._x_var)) * photons181182 ret = tf.reduce_sum(lambda_x*tf.log(lambda_x/k_value) - lambda_x)183 elif loss == 'mixture':184 # Poisson185 k_value = (tf.exp(-mu * self._data) * photons - 1)186 lambda_x = tf.exp(-mu * self._operator(self._x_var)) * photons187 ret = tf.squared_difference(lambda_x, k_value) / lambda_x188 ret = 0.5 * tf.reduce_sum(ret)189190 # l2191 ret += (1. / p) * self._norm(self._operator(self._x_var) - self._data, p=p)192193 else:194 ret = tf.zeros(1)195 print("WARNING: No data-discrepancy chosen!", flush=True)196 return ret197198 def _augmented_lagrangian(self):199 v = self._list_subtract(self._xi_var, self._dual_var)200 ret = self._list_norm(self._list_subtract(self._enc_x, v))201 return 0.5*ret202203 def _decaying_weights(self):204 w = []205 for s in self._decoder.inputs:206 t = s.shape[1:]207 scale = 2 ** (1 + np.log2(s.shape[1].value) - np.log2(self._size))208 w.append(np.ones([1, ] + [z.value for z in t]) * scale)209 return w210211 def _constant_weights(self):212 return [np.ones([1, ] + [z.value for z in s.shape[1:]]) for s in self._decoder.inputs]213214 @staticmethod215 def _shrinkage(xi, gamma):216 return tf.maximum(tf.abs(xi) - gamma, 0) * tf.sign(xi)217218 @staticmethod219 def _norm(x, p=2):220 """221 Implementation of p-norm to the power of p. This is used in optimization since tf.norm is numerically222 instable for x = 0.223 """224 return K.sum(K.pow(K.abs(x), p))225226 @staticmethod227 def _list_subtract(a, b):228 if isinstance(a, list):229 ret = [i - j for i, j in zip(a, b)]230 else:231 ret = a - b232 return ret233234 def _list_norm(self, a, p=2):235 if isinstance(a, list):236 ret = K.sum([self._norm(i, p=p) for i in a])237 else:238 ret = self._norm(a, p=p)239 return ret240241 def predict(self, x): ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful