How to use invalidate_cache method in lisa

Best Python code snippet using lisa_python

test_performance.py

Source:test_performance.py Github

copy

Full Screen

...89 def test_write_base_one2many(self):90 """ Write on one2many field. """91 rec1 = self.env['test_performance.base'].create({'name': 'X'})92 # create N lines on rec1: O(N) queries93 rec1.invalidate_cache()94 with self.assertQueryCount(2):95 rec1.write({'line_ids': [(0, 0, {'value': 0})]})96 self.assertEqual(len(rec1.line_ids), 1)97 rec1.invalidate_cache()98 with self.assertQueryCount(15):99 rec1.write({'line_ids': [(0, 0, {'value': val}) for val in range(1, 12)]})100 self.assertEqual(len(rec1.line_ids), 12)101 lines = rec1.line_ids102 # update N lines: O(N) queries103 rec1.invalidate_cache()104 with self.assertQueryCount(6):105 rec1.write({'line_ids': [(1, line.id, {'value': 42}) for line in lines[0]]})106 self.assertEqual(rec1.line_ids, lines)107 rec1.invalidate_cache()108 with self.assertQueryCount(26):109 rec1.write({'line_ids': [(1, line.id, {'value': 42 + line.id}) for line in lines[1:]]})110 self.assertEqual(rec1.line_ids, lines)111 # delete N lines: O(1) queries112 rec1.invalidate_cache()113 with self.assertQueryCount(__system__=14, demo=14):114 rec1.write({'line_ids': [(2, line.id) for line in lines[0]]})115 self.assertEqual(rec1.line_ids, lines[1:])116 rec1.invalidate_cache()117 with self.assertQueryCount(__system__=12, demo=12):118 rec1.write({'line_ids': [(2, line.id) for line in lines[1:]]})119 self.assertFalse(rec1.line_ids)120 self.assertFalse(lines.exists())121 rec1.write({'line_ids': [(0, 0, {'value': val}) for val in range(12)]})122 lines = rec1.line_ids123 # unlink N lines: O(1) queries124 rec1.invalidate_cache()125 with self.assertQueryCount(__system__=11, demo=11):126 rec1.write({'line_ids': [(3, line.id) for line in lines[0]]})127 self.assertEqual(rec1.line_ids, lines[1:])128 rec1.invalidate_cache()129 with self.assertQueryCount(__system__=12, demo=12):130 rec1.write({'line_ids': [(3, line.id) for line in lines[1:]]})131 self.assertFalse(rec1.line_ids)132 self.assertFalse(lines.exists())133 rec1.write({'line_ids': [(0, 0, {'value': val}) for val in range(12)]})134 lines = rec1.line_ids135 rec2 = self.env['test_performance.base'].create({'name': 'X'})136 # link N lines from rec1 to rec2: O(1) queries137 rec1.invalidate_cache()138 with self.assertQueryCount(2):139 rec2.write({'line_ids': [(4, line.id) for line in lines[0]]})140 self.assertEqual(rec1.line_ids, lines[1:])141 self.assertEqual(rec2.line_ids, lines[0])142 rec1.invalidate_cache()143 with self.assertQueryCount(7):144 rec2.write({'line_ids': [(4, line.id) for line in lines[1:]]})145 self.assertFalse(rec1.line_ids)146 self.assertEqual(rec2.line_ids, lines)147 rec1.invalidate_cache()148 with self.assertQueryCount(5):149 rec2.write({'line_ids': [(4, line.id) for line in lines[0]]})150 self.assertEqual(rec2.line_ids, lines)151 rec1.invalidate_cache()152 with self.assertQueryCount(5):153 rec2.write({'line_ids': [(4, line.id) for line in lines[1:]]})154 self.assertEqual(rec2.line_ids, lines)155 # empty N lines in rec2: O(1) queries156 rec1.invalidate_cache()157 with self.assertQueryCount(__system__=13, demo=13):158 rec2.write({'line_ids': [(5,)]})159 self.assertFalse(rec2.line_ids)160 rec1.invalidate_cache()161 with self.assertQueryCount(3):162 rec2.write({'line_ids': [(5,)]})163 self.assertFalse(rec2.line_ids)164 rec1.write({'line_ids': [(0, 0, {'value': val}) for val in range(12)]})165 lines = rec1.line_ids166 # set N lines in rec2: O(1) queries167 rec1.invalidate_cache()168 with self.assertQueryCount(4):169 rec2.write({'line_ids': [(6, 0, lines[0].ids)]})170 self.assertEqual(rec1.line_ids, lines[1:])171 self.assertEqual(rec2.line_ids, lines[0])172 with self.assertQueryCount(5):173 rec2.write({'line_ids': [(6, 0, lines.ids)]})174 self.assertFalse(rec1.line_ids)175 self.assertEqual(rec2.line_ids, lines)176 with self.assertQueryCount(3):177 rec2.write({'line_ids': [(6, 0, lines.ids)]})178 self.assertEqual(rec2.line_ids, lines)179 @mute_logger('odoo.models.unlink')180 def test_write_base_one2many_with_constraint(self):181 """ Write on one2many field with lines being deleted and created. """182 rec = self.env['test_performance.base'].create({'name': 'Y'})183 rec.write({'line_ids': [(0, 0, {'value': val}) for val in range(12)]})184 # This write() will raise because of the unique index if the unlink() is185 # not performed before the create()186 rec.write({'line_ids': [(5,)] + [(0, 0, {'value': val}) for val in range(6)]})187 self.assertEqual(len(rec.line_ids), 6)188 @mute_logger('odoo.models.unlink')189 @users('__system__', 'demo')190 @warmup191 def test_write_base_many2many(self):192 """ Write on many2many field. """193 rec1 = self.env['test_performance.base'].create({'name': 'X'})194 # create N tags on rec1: O(N) queries195 rec1.invalidate_cache()196 with self.assertQueryCount(4):197 rec1.write({'tag_ids': [(0, 0, {'name': 0})]})198 self.assertEqual(len(rec1.tag_ids), 1)199 rec1.invalidate_cache()200 with self.assertQueryCount(14):201 rec1.write({'tag_ids': [(0, 0, {'name': val}) for val in range(1, 12)]})202 self.assertEqual(len(rec1.tag_ids), 12)203 tags = rec1.tag_ids204 # update N tags: O(N) queries205 rec1.invalidate_cache()206 with self.assertQueryCount(3):207 rec1.write({'tag_ids': [(1, tag.id, {'name': 'X'}) for tag in tags[0]]})208 self.assertEqual(rec1.tag_ids, tags)209 rec1.invalidate_cache()210 with self.assertQueryCount(3):211 rec1.write({'tag_ids': [(1, tag.id, {'name': 'X'}) for tag in tags[1:]]})212 self.assertEqual(rec1.tag_ids, tags)213 # delete N tags: O(1) queries214 rec1.invalidate_cache()215 with self.assertQueryCount(__system__=8, demo=8):216 rec1.write({'tag_ids': [(2, tag.id) for tag in tags[0]]})217 self.assertEqual(rec1.tag_ids, tags[1:])218 rec1.invalidate_cache()219 with self.assertQueryCount(__system__=8, demo=8):220 rec1.write({'tag_ids': [(2, tag.id) for tag in tags[1:]]})221 self.assertFalse(rec1.tag_ids)222 self.assertFalse(tags.exists())223 rec1.write({'tag_ids': [(0, 0, {'name': val}) for val in range(12)]})224 tags = rec1.tag_ids225 # unlink N tags: O(1) queries226 rec1.invalidate_cache()227 with self.assertQueryCount(3):228 rec1.write({'tag_ids': [(3, tag.id) for tag in tags[0]]})229 self.assertEqual(rec1.tag_ids, tags[1:])230 rec1.invalidate_cache()231 with self.assertQueryCount(3):232 rec1.write({'tag_ids': [(3, tag.id) for tag in tags[1:]]})233 self.assertFalse(rec1.tag_ids)234 self.assertTrue(tags.exists())235 rec2 = self.env['test_performance.base'].create({'name': 'X'})236 # link N tags from rec1 to rec2: O(1) queries237 rec1.invalidate_cache()238 with self.assertQueryCount(3):239 rec2.write({'tag_ids': [(4, tag.id) for tag in tags[0]]})240 self.assertEqual(rec2.tag_ids, tags[0])241 rec1.invalidate_cache()242 with self.assertQueryCount(3):243 rec2.write({'tag_ids': [(4, tag.id) for tag in tags[1:]]})244 self.assertEqual(rec2.tag_ids, tags)245 rec1.invalidate_cache()246 with self.assertQueryCount(2):247 rec2.write({'tag_ids': [(4, tag.id) for tag in tags[1:]]})248 self.assertEqual(rec2.tag_ids, tags)249 # empty N tags in rec2: O(1) queries250 rec1.invalidate_cache()251 with self.assertQueryCount(3):252 rec2.write({'tag_ids': [(5,)]})253 self.assertFalse(rec2.tag_ids)254 self.assertTrue(tags.exists())255 rec1.invalidate_cache()256 with self.assertQueryCount(2):257 rec2.write({'tag_ids': [(5,)]})258 self.assertFalse(rec2.tag_ids)259 # set N tags in rec2: O(1) queries260 rec1.invalidate_cache()261 with self.assertQueryCount(3):262 rec2.write({'tag_ids': [(6, 0, tags.ids)]})263 self.assertEqual(rec2.tag_ids, tags)264 rec1.invalidate_cache()265 with self.assertQueryCount(3):266 rec2.write({'tag_ids': [(6, 0, tags[:8].ids)]})267 self.assertEqual(rec2.tag_ids, tags[:8])268 rec1.invalidate_cache()269 with self.assertQueryCount(4):270 rec2.write({'tag_ids': [(6, 0, tags[4:].ids)]})271 self.assertEqual(rec2.tag_ids, tags[4:])272 rec1.invalidate_cache()273 with self.assertQueryCount(3):274 rec2.write({'tag_ids': [(6, 0, tags.ids)]})275 self.assertEqual(rec2.tag_ids, tags)276 rec1.invalidate_cache()277 with self.assertQueryCount(2):278 rec2.write({'tag_ids': [(6, 0, tags.ids)]})279 self.assertEqual(rec2.tag_ids, tags)280 @users('__system__', 'demo')281 @warmup282 def test_create_base(self):283 """ Create records. """284 with self.assertQueryCount(__system__=2, demo=2):285 self.env['test_performance.base'].create({'name': 'X'})286 @users('__system__', 'demo')287 @warmup288 def test_create_base_with_lines(self):289 """ Create records with one2many lines. """290 with self.assertQueryCount(__system__=12, demo=12):291 self.env['test_performance.base'].create({292 'name': 'X',293 'line_ids': [(0, 0, {'value': val}) for val in range(10)],294 })295 @users('__system__', 'demo')296 @warmup297 def test_create_base_with_tags(self):298 """ Create records with many2many tags. """299 with self.assertQueryCount(2):300 self.env['test_performance.base'].create({'name': 'X'})301 # create N tags: add O(N) queries302 with self.assertQueryCount(13):303 self.env['test_performance.base'].create({304 'name': 'X',305 'tag_ids': [(0, 0, {'name': val}) for val in range(10)],306 })307 # link N tags: add O(1) queries308 tags = self.env['test_performance.tag'].create([{'name': val} for val in range(10)])309 with self.assertQueryCount(3):310 self.env['test_performance.base'].create({311 'name': 'X',312 'tag_ids': [(4, tag.id) for tag in tags],313 })314 with self.assertQueryCount(2):315 self.env['test_performance.base'].create({316 'name': 'X',317 'tag_ids': [(6, 0, [])],318 })319 with self.assertQueryCount(3):320 self.env['test_performance.base'].create({321 'name': 'X',322 'tag_ids': [(6, 0, tags.ids)],323 })324 @users('__system__', 'demo')325 @warmup326 def test_several_prefetch(self):327 initial_records = self.env['test_performance.base'].search([])328 self.assertEqual(len(initial_records), 5)329 for _i in range(8):330 self.env.cr.execute(331 'insert into test_performance_base(value) select value from test_performance_base'332 )333 records = self.env['test_performance.base'].search([])334 self.assertEqual(len(records), 1280)335 # should only cause 2 queries thanks to prefetching336 with self.assertQueryCount(__system__=2, demo=2):337 records.mapped('value')338 records.invalidate_cache(['value'])339 with self.assertQueryCount(__system__=2, demo=2):340 records.mapped('value')341 records.invalidate_cache(['value'])342 with self.assertQueryCount(__system__=2, demo=2):343 new_recs = records.browse(records.new(origin=record).id for record in records)344 new_recs.mapped('value')345 # clean up after each pass346 self.env.cr.execute(347 'delete from test_performance_base where id not in %s',348 (tuple(initial_records.ids),)349 )350 def expected_read_group(self):351 groups = defaultdict(list)352 all_records = self.env['test_performance.base'].search([])353 for record in all_records:354 groups[record.partner_id.id].append(record.value)355 partners = self.env['res.partner'].search([('id', 'in', all_records.mapped('partner_id').ids)])356 return [{357 '__domain': [('partner_id', '=', partner.id)],358 'partner_id': (partner.id, partner.display_name),359 'partner_id_count': len(groups[partner.id]),360 'value': sum(groups[partner.id]),361 } for partner in partners]362 @users('__system__', 'demo')363 def test_read_group_with_name_get(self):364 model = self.env['test_performance.base']365 expected = self.expected_read_group()366 # use read_group and check the expected result367 with self.assertQueryCount(__system__=2, demo=2):368 model.invalidate_cache()369 result = model.read_group([], ['partner_id', 'value'], ['partner_id'])370 self.assertEqual(result, expected)371 @users('__system__', 'demo')372 def test_read_group_without_name_get(self):373 model = self.env['test_performance.base']374 expected = self.expected_read_group()375 # use read_group and check the expected result376 with self.assertQueryCount(__system__=1, demo=1):377 model.invalidate_cache()378 result = model.read_group([], ['partner_id', 'value'], ['partner_id'])379 self.assertEqual(len(result), len(expected))380 for res, exp in zip(result, expected):381 self.assertEqual(res['__domain'], exp['__domain'])382 self.assertEqual(res['partner_id'][0], exp['partner_id'][0])383 self.assertEqual(res['partner_id_count'], exp['partner_id_count'])384 self.assertEqual(res['value'], exp['value'])385 # now serialize to json, which should force evaluation386 with self.assertQueryCount(__system__=1, demo=1):387 json.dumps(result, default=json_default)388@tagged('bacon_and_eggs')389class TestIrPropertyOptimizations(TransactionCase):390 def setUp(self):391 super().setUp()392 self.Bacon = self.env['test_performance.bacon']393 self.Eggs = self.env['test_performance.eggs']394 def test_with_falsy_default(self):395 self.assertFalse(self.env['ir.property']._get('property_eggs', 'test_performance.bacon'))396 # warmup397 eggs = self.Eggs.create({})398 self.Bacon.create({})399 self.Bacon.create({'property_eggs': eggs.id})400 # create with default value401 with self.assertQueryCount(1):402 self.Bacon.create({})403 with self.assertQueryCount(1):404 self.Bacon.with_context(default_property_eggs=False).create({})405 with self.assertQueryCount(1):406 self.Bacon.create({'property_eggs': False})407 # create with another value408 with self.assertQueryCount(3):409 self.Bacon.with_context(default_property_eggs=eggs.id).create({})410 with self.assertQueryCount(3):411 self.Bacon.create({'property_eggs': eggs.id})412 def test_with_truthy_default(self):413 eggs = self.Eggs.create({})414 self.env['ir.property']._set_default("property_eggs", "test_performance.bacon", eggs)415 self.assertEqual(eggs, self.env['ir.property']._get('property_eggs', 'test_performance.bacon'))416 # warmup417 self.Bacon.create({})418 # create with default value419 with self.assertQueryCount(1):420 self.Bacon.create({})421 with self.assertQueryCount(1):422 self.Bacon.with_context(default_property_eggs=eggs.id).create({})423 with self.assertQueryCount(1):424 self.Bacon.create({'property_eggs': eggs.id})425 # create with another value426 eggs = self.Eggs.create({})427 self.Bacon.create({'property_eggs': eggs.id})428 with self.assertQueryCount(3):429 self.Bacon.with_context(default_property_eggs=eggs.id).create({})430 with self.assertQueryCount(3):431 self.Bacon.create({'property_eggs': eggs.id})432 with self.assertQueryCount(3):433 self.Bacon.with_context(default_property_eggs=False).create({})434 with self.assertQueryCount(3):435 self.Bacon.create({'property_eggs': False})436@tagged('mapped_perf')437class TestMapped(TransactionCase):438 def test_relational_mapped(self):439 # create 1000 records with one line each440 recs = self.env['test_performance.base'].create([441 {'name': 'foo%d' % index, 'line_ids': [(0, 0, {'value': index})]}442 for index in range(1000)443 ])444 recs.flush()445 recs.invalidate_cache()446 # expected same performance as recs.line_ids.mapped('value')447 with self.assertQueryCount(3):448 for rec in recs:...

Full Screen

Full Screen

route.py

Source:route.py Github

copy

Full Screen

...14 def __init__(self):15 self.resync()16 self.s=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)17 self.cache = {}18 def invalidate_cache(self):19 self.cache = {}20 def resync(self):21 self.invalidate_cache()22 self.routes = read_routes()23 def __repr__(self):24 rt = "Network Netmask Gateway Iface Output IP\n"25 for net,msk,gw,iface,addr in self.routes:26 rt += "%-15s %-15s %-15s %-15s %-15s\n" % (ltoa(net),27 ltoa(msk),28 gw,29 iface,30 addr)31 return rt32 def make_route(self, host=None, net=None, gw=None, dev=None):33 if host is not None:34 thenet,msk = host,3235 elif net is not None:36 thenet,msk = net.split("/")37 msk = int(msk)38 else:39 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net")40 if gw is None:41 gw="0.0.0.0"42 if dev is None:43 if gw:44 nhop = gw45 else:46 nhop = thenet47 dev,ifaddr,x = self.route(nhop)48 else:49 ifaddr = get_if_addr(dev)50 return (atol(thenet), itom(msk), gw, dev, ifaddr)51 def add(self, *args, **kargs):52 """Ex:53 add(net="192.168.1.0/24",gw="1.2.3.4")54 """55 self.invalidate_cache()56 self.routes.append(self.make_route(*args,**kargs))57 58 def delt(self, *args, **kargs):59 """delt(host|net, gw|dev)"""60 self.invalidate_cache()61 route = self.make_route(*args,**kargs)62 try:63 i=self.routes.index(route)64 del(self.routes[i])65 except ValueError:66 warning("no matching route found")67 68 def ifchange(self, iff, addr):69 self.invalidate_cache()70 the_addr,the_msk = (addr.split("/")+["32"])[:2]71 the_msk = itom(int(the_msk))72 the_rawaddr = atol(the_addr)73 the_net = the_rawaddr & the_msk74 75 76 for i in range(len(self.routes)):77 net,msk,gw,iface,addr = self.routes[i]78 if iface != iff:79 continue80 if gw == '0.0.0.0':81 self.routes[i] = (the_net,the_msk,gw,iface,the_addr)82 else:83 self.routes[i] = (net,msk,gw,iface,the_addr)84 conf.netcache.flush()85 86 87 def ifdel(self, iff):88 self.invalidate_cache()89 new_routes=[]90 for rt in self.routes:91 if rt[3] != iff:92 new_routes.append(rt)93 self.routes=new_routes94 95 def ifadd(self, iff, addr):96 self.invalidate_cache()97 the_addr,the_msk = (addr.split("/")+["32"])[:2]98 the_msk = itom(int(the_msk))99 the_rawaddr = atol(the_addr)100 the_net = the_rawaddr & the_msk101 self.routes.append((the_net,the_msk,'0.0.0.0',iff,the_addr))102 def route(self,dest,verbose=None):103 if type(dest) is list and dest:104 dest = dest[0]105 if dest in self.cache:106 return self.cache[dest]107 if verbose is None:108 verbose=conf.verb109 # Transform "192.168.*.1-5" to one IP of the set110 dst = dest.split("/")[0]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful