Best Python code snippet using grail_python
trace_event_unittest.py
Source:trace_event_unittest.py  
1#!/usr/bin/env python2# Copyright 2014 The Chromium Authors. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5import contextlib6import json7import logging8import math9import multiprocessing10import os11import tempfile12import time13import unittest14from py_trace_event import trace_event15from py_trace_event import trace_time16from py_trace_event.trace_event_impl import log17class TraceEventTests(unittest.TestCase):18  def setUp(self):19    tf = tempfile.NamedTemporaryFile(delete=False)20    self._log_path = tf.name21    tf.close()22  def tearDown(self):23    if os.path.exists(self._log_path):24      os.remove(self._log_path)25  @contextlib.contextmanager26  def _test_trace(self, disable=True):27    try:28      trace_event.trace_enable(self._log_path)29      yield30    finally:31      if disable:32        trace_event.trace_disable()33  def testNoImpl(self):34    orig_impl = trace_event.trace_event_impl35    try:36      trace_event.trace_event_impl = None37      self.assertFalse(trace_event.trace_can_enable())38    finally:39      trace_event.trace_event_impl = orig_impl40  def testImpl(self):41    self.assertTrue(trace_event.trace_can_enable())42  def testIsEnabledFalse(self):43    self.assertFalse(trace_event.trace_is_enabled())44  def testIsEnabledTrue(self):45    with self._test_trace():46      self.assertTrue(trace_event.trace_is_enabled())47  def testEnable(self):48    with self._test_trace():49      with open(self._log_path, 'r') as f:50        log_output = json.loads(f.read() + ']')51        self.assertEquals(len(log_output), 1)52        self.assertTrue(trace_event.trace_is_enabled())53        log_output = log_output.pop()54        self.assertEquals(log_output['category'], 'process_argv')55        self.assertEquals(log_output['name'], 'process_argv')56        self.assertTrue(log_output['args']['argv'])57        self.assertEquals(log_output['ph'], 'M')58  def testDoubleEnable(self):59    try:60      with self._test_trace():61        with self._test_trace():62          pass63    except log.TraceException:64      return65    assert False66  def testDisable(self):67    with self._test_trace(disable=False):68      with open(self._log_path, 'r') as f:69        self.assertTrue(trace_event.trace_is_enabled())70        trace_event.trace_disable()71        self.assertEquals(len(json.loads(f.read() + ']')), 1)72        self.assertFalse(trace_event.trace_is_enabled())73  def testDoubleDisable(self):74    with self._test_trace():75      pass76    trace_event.trace_disable()77  def testFlushChanges(self):78    with self._test_trace():79      with open(self._log_path, 'r') as f:80        trace_event.clock_sync('1')81        self.assertEquals(len(json.loads(f.read() + ']')), 1)82        f.seek(0)83        trace_event.trace_flush()84        self.assertEquals(len(json.loads(f.read() + ']')), 2)85  def testFlushNoChanges(self):86    with self._test_trace():87      with open(self._log_path, 'r') as f:88        self.assertEquals(len(json.loads(f.read() + ']')),1)89        f.seek(0)90        trace_event.trace_flush()91        self.assertEquals(len(json.loads(f.read() + ']')), 1)92  def testDoubleFlush(self):93    with self._test_trace():94      with open(self._log_path, 'r') as f:95        trace_event.clock_sync('1')96        self.assertEquals(len(json.loads(f.read() + ']')), 1)97        f.seek(0)98        trace_event.trace_flush()99        trace_event.trace_flush()100        self.assertEquals(len(json.loads(f.read() + ']')), 2)101  def testTraceBegin(self):102    with self._test_trace():103      with open(self._log_path, 'r') as f:104        trace_event.trace_begin('test_event', this='that')105        trace_event.trace_flush()106        log_output = json.loads(f.read() + ']')107        self.assertEquals(len(log_output), 2)108        current_entry = log_output.pop(0)109        self.assertEquals(current_entry['category'], 'process_argv')110        self.assertEquals(current_entry['name'], 'process_argv')111        self.assertTrue( current_entry['args']['argv'])112        self.assertEquals( current_entry['ph'], 'M')113        current_entry = log_output.pop(0)114        self.assertEquals(current_entry['category'], 'python')115        self.assertEquals(current_entry['name'], 'test_event')116        self.assertEquals(current_entry['args']['this'], '\'that\'')117        self.assertEquals(current_entry['ph'], 'B')118  def testTraceEnd(self):119    with self._test_trace():120      with open(self._log_path, 'r') as f:121        trace_event.trace_end('test_event')122        trace_event.trace_flush()123        log_output = json.loads(f.read() + ']')124        self.assertEquals(len(log_output), 2)125        current_entry = log_output.pop(0)126        self.assertEquals(current_entry['category'], 'process_argv')127        self.assertEquals(current_entry['name'], 'process_argv')128        self.assertTrue(current_entry['args']['argv'])129        self.assertEquals(current_entry['ph'], 'M')130        current_entry = log_output.pop(0)131        self.assertEquals(current_entry['category'], 'python')132        self.assertEquals(current_entry['name'], 'test_event')133        self.assertEquals(current_entry['args'], {})134        self.assertEquals(current_entry['ph'], 'E')135  def testTrace(self):136   with self._test_trace():137      with trace_event.trace('test_event', this='that'):138        pass139      trace_event.trace_flush()140      with open(self._log_path, 'r') as f:141        log_output = json.loads(f.read() + ']')142        self.assertEquals(len(log_output), 3)143        current_entry = log_output.pop(0)144        self.assertEquals(current_entry['category'], 'process_argv')145        self.assertEquals(current_entry['name'], 'process_argv')146        self.assertTrue(current_entry['args']['argv'])147        self.assertEquals(current_entry['ph'], 'M')148        current_entry = log_output.pop(0)149        self.assertEquals(current_entry['category'], 'python')150        self.assertEquals(current_entry['name'], 'test_event')151        self.assertEquals(current_entry['args']['this'], '\'that\'')152        self.assertEquals(current_entry['ph'], 'B')153        current_entry = log_output.pop(0)154        self.assertEquals(current_entry['category'], 'python')155        self.assertEquals(current_entry['name'], 'test_event')156        self.assertEquals(current_entry['args'], {})157        self.assertEquals(current_entry['ph'], 'E')158  def testTracedDecorator(self):159    @trace_event.traced("this")160    def test_decorator(this="that"):161      pass162    with self._test_trace():163      test_decorator()164      trace_event.trace_flush()165      with open(self._log_path, 'r') as f:166        log_output = json.loads(f.read() + ']')167        self.assertEquals(len(log_output), 3)168        current_entry = log_output.pop(0)169        self.assertEquals(current_entry['category'], 'process_argv')170        self.assertEquals(current_entry['name'], 'process_argv')171        self.assertTrue(current_entry['args']['argv'])172        self.assertEquals(current_entry['ph'], 'M')173        current_entry = log_output.pop(0)174        self.assertEquals(current_entry['category'], 'python')175        self.assertEquals(current_entry['name'], '__main__.test_decorator')176        self.assertEquals(current_entry['args']['this'], '\'that\'')177        self.assertEquals(current_entry['ph'], 'B')178        current_entry = log_output.pop(0)179        self.assertEquals(current_entry['category'], 'python')180        self.assertEquals(current_entry['name'], '__main__.test_decorator')181        self.assertEquals(current_entry['args'], {})182        self.assertEquals(current_entry['ph'], 'E')183  def testClockSyncWithTs(self):184    with self._test_trace():185      with open(self._log_path, 'r') as f:186        trace_event.clock_sync('id', issue_ts=trace_time.Now())187        trace_event.trace_flush()188        log_output = json.loads(f.read() + ']')189        self.assertEquals(len(log_output), 2)190        current_entry = log_output.pop(0)191        self.assertEquals(current_entry['category'], 'process_argv')192        self.assertEquals(current_entry['name'], 'process_argv')193        self.assertTrue(current_entry['args']['argv'])194        self.assertEquals(current_entry['ph'], 'M')195        current_entry = log_output.pop(0)196        self.assertEquals(current_entry['category'], 'python')197        self.assertEquals(current_entry['name'], 'clock_sync')198        self.assertTrue(current_entry['args']['issue_ts'])199        self.assertEquals(current_entry['ph'], 'c')200  def testClockSyncWithoutTs(self):201    with self._test_trace():202      with open(self._log_path, 'r') as f:203        trace_event.clock_sync('id')204        trace_event.trace_flush()205        log_output = json.loads(f.read() + ']')206        self.assertEquals(len(log_output), 2)207        current_entry = log_output.pop(0)208        self.assertEquals(current_entry['category'], 'process_argv')209        self.assertEquals(current_entry['name'], 'process_argv')210        self.assertTrue(current_entry['args']['argv'])211        self.assertEquals(current_entry['ph'], 'M')212        current_entry = log_output.pop(0)213        self.assertEquals(current_entry['category'], 'python')214        self.assertEquals(current_entry['name'], 'clock_sync')215        self.assertFalse(current_entry['args'].get('issue_ts'))216        self.assertEquals(current_entry['ph'], 'c')217  def testTime(self):218    actual_diff = []219    def func1():220      trace_begin("func1")221      start = time.time()222      time.sleep(0.25)223      end = time.time()224      actual_diff.append(end-start) # Pass via array because of Python scoping225      trace_end("func1")226    with self._test_trace():227      start_ts = time.time()228      trace_event.trace_begin('test')229      end_ts = time.time()230      trace_event.trace_end('test')231      trace_event.trace_flush()232      with open(self._log_path, 'r') as f:233        log_output = json.loads(f.read() + ']')234        self.assertEquals(len(log_output), 3)235        meta_data = log_output[0]236        open_data = log_output[1]237        close_data = log_output[2]238        self.assertEquals(meta_data['category'], 'process_argv')239        self.assertEquals(meta_data['name'], 'process_argv')240        self.assertTrue(meta_data['args']['argv'])241        self.assertEquals(meta_data['ph'], 'M')242        self.assertEquals(open_data['category'], 'python')243        self.assertEquals(open_data['name'], 'test')244        self.assertEquals(open_data['ph'], 'B')245        self.assertEquals(close_data['category'], 'python')246        self.assertEquals(close_data['name'], 'test')247        self.assertEquals(close_data['ph'], 'E')248        event_time_diff = close_data['ts'] - open_data['ts']249        recorded_time_diff = (end_ts - start_ts) * 1000000250        self.assertLess(math.fabs(event_time_diff - recorded_time_diff), 1000)251  def testNestedCalls(self):252    with self._test_trace():253      trace_event.trace_begin('one')254      trace_event.trace_begin('two')255      trace_event.trace_end('two')256      trace_event.trace_end('one')257      trace_event.trace_flush()258      with open(self._log_path, 'r') as f:259        log_output = json.loads(f.read() + ']')260        self.assertEquals(len(log_output), 5)261        meta_data = log_output[0]262        one_open = log_output[1]263        two_open = log_output[2]264        two_close = log_output[3]265        one_close = log_output[4]266        self.assertEquals(meta_data['category'], 'process_argv')267        self.assertEquals(meta_data['name'], 'process_argv')268        self.assertTrue(meta_data['args']['argv'])269        self.assertEquals(meta_data['ph'], 'M')270        self.assertEquals(one_open['category'], 'python')271        self.assertEquals(one_open['name'], 'one')272        self.assertEquals(one_open['ph'], 'B')273        self.assertEquals(one_close['category'], 'python')274        self.assertEquals(one_close['name'], 'one')275        self.assertEquals(one_close['ph'], 'E')276        self.assertEquals(two_open['category'], 'python')277        self.assertEquals(two_open['name'], 'two')278        self.assertEquals(two_open['ph'], 'B')279        self.assertEquals(two_close['category'], 'python')280        self.assertEquals(two_close['name'], 'two')281        self.assertEquals(two_close['ph'], 'E')282        self.assertLessEqual(one_open['ts'], two_open['ts'])283        self.assertGreaterEqual(one_close['ts'], two_close['ts'])284  def testInterleavedCalls(self):285    with self._test_trace():286      trace_event.trace_begin('one')287      trace_event.trace_begin('two')288      trace_event.trace_end('one')289      trace_event.trace_end('two')290      trace_event.trace_flush()291      with open(self._log_path, 'r') as f:292        log_output = json.loads(f.read() + ']')293        self.assertEquals(len(log_output), 5)294        meta_data = log_output[0]295        one_open = log_output[1]296        two_open = log_output[2]297        two_close = log_output[4]298        one_close = log_output[3]299        self.assertEquals(meta_data['category'], 'process_argv')300        self.assertEquals(meta_data['name'], 'process_argv')301        self.assertTrue(meta_data['args']['argv'])302        self.assertEquals(meta_data['ph'], 'M')303        self.assertEquals(one_open['category'], 'python')304        self.assertEquals(one_open['name'], 'one')305        self.assertEquals(one_open['ph'], 'B')306        self.assertEquals(one_close['category'], 'python')307        self.assertEquals(one_close['name'], 'one')308        self.assertEquals(one_close['ph'], 'E')309        self.assertEquals(two_open['category'], 'python')310        self.assertEquals(two_open['name'], 'two')311        self.assertEquals(two_open['ph'], 'B')312        self.assertEquals(two_close['category'], 'python')313        self.assertEquals(two_close['name'], 'two')314        self.assertEquals(two_close['ph'], 'E')315        self.assertLessEqual(one_open['ts'], two_open['ts'])316        self.assertLessEqual(one_close['ts'], two_close['ts'])317  def testMultiprocess(self):318    def child_function():319      with trace_event.trace('child_event'):320        pass321    with self._test_trace():322      trace_event.trace_begin('parent_event')323      trace_event.trace_flush()324      p = multiprocessing.Process(target=child_function)325      p.start()326      self.assertTrue(hasattr(p, "_shimmed_by_trace_event"))327      p.join()328      trace_event.trace_end('parent_event')329      trace_event.trace_flush()330      with open(self._log_path, 'r') as f:331        log_output = json.loads(f.read() + ']')332        self.assertEquals(len(log_output), 5)333        meta_data = log_output[0]334        parent_open = log_output[1]335        child_open = log_output[2]336        child_close = log_output[3]337        parent_close = log_output[4]338        self.assertEquals(meta_data['category'], 'process_argv')339        self.assertEquals(meta_data['name'], 'process_argv')340        self.assertTrue(meta_data['args']['argv'])341        self.assertEquals(meta_data['ph'], 'M')342        self.assertEquals(parent_open['category'], 'python')343        self.assertEquals(parent_open['name'], 'parent_event')344        self.assertEquals(parent_open['ph'], 'B')345        self.assertEquals(child_open['category'], 'python')346        self.assertEquals(child_open['name'], 'child_event')347        self.assertEquals(child_open['ph'], 'B')348        self.assertEquals(child_close['category'], 'python')349        self.assertEquals(child_close['name'], 'child_event')350        self.assertEquals(child_close['ph'], 'E')351        self.assertEquals(parent_close['category'], 'python')352        self.assertEquals(parent_close['name'], 'parent_event')353        self.assertEquals(parent_close['ph'], 'E')354  def testMultiprocessExceptionInChild(self):355    def bad_child():356      trace_event.trace_disable()357    with self._test_trace():358      p = multiprocessing.Pool(1)359      trace_event.trace_begin('parent')360      self.assertRaises(Exception, lambda: p.apply(bad_child, ()))361      p.close()362      p.terminate()363      p.join()364      trace_event.trace_end('parent')365      trace_event.trace_flush()366      with open(self._log_path, 'r') as f:367        log_output = json.loads(f.read() + ']')368        self.assertEquals(len(log_output), 3)369        meta_data = log_output[0]370        parent_open = log_output[1]371        parent_close = log_output[2]372        self.assertEquals(parent_open['category'], 'python')373        self.assertEquals(parent_open['name'], 'parent')374        self.assertEquals(parent_open['ph'], 'B')375        self.assertEquals(parent_close['category'], 'python')376        self.assertEquals(parent_close['name'], 'parent')377        self.assertEquals(parent_close['ph'], 'E')378if __name__ == '__main__':379  logging.getLogger().setLevel(logging.DEBUG)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
