Best Python code snippet using pypom_form_python
dumping_wrapper_test.py
Source:dumping_wrapper_test.py  
1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Unit Tests for classes in dumping_wrapper.py."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import glob20import os21import tempfile22import threading23from tensorflow.python.client import session24from tensorflow.python.debug.lib import debug_data25from tensorflow.python.debug.wrappers import dumping_wrapper26from tensorflow.python.debug.wrappers import framework27from tensorflow.python.debug.wrappers import hooks28from tensorflow.python.framework import constant_op29from tensorflow.python.framework import dtypes30from tensorflow.python.framework import ops31from tensorflow.python.framework import test_util32from tensorflow.python.lib.io import file_io33from tensorflow.python.ops import array_ops34from tensorflow.python.ops import state_ops35from tensorflow.python.ops import variables36from tensorflow.python.platform import gfile37from tensorflow.python.platform import googletest38from tensorflow.python.training import monitored_session39@test_util.run_v1_only("b/120545219")40class DumpingDebugWrapperSessionTest(test_util.TensorFlowTestCase):41  def setUp(self):42    self.session_root = tempfile.mkdtemp()43    self.v = variables.VariableV1(10.0, dtype=dtypes.float32, name="v")44    self.delta = constant_op.constant(1.0, dtype=dtypes.float32, name="delta")45    self.eta = constant_op.constant(-1.4, dtype=dtypes.float32, name="eta")46    self.inc_v = state_ops.assign_add(self.v, self.delta, name="inc_v")47    self.dec_v = state_ops.assign_add(self.v, self.eta, name="dec_v")48    self.ph = array_ops.placeholder(dtypes.float32, shape=(), name="ph")49    self.inc_w_ph = state_ops.assign_add(self.v, self.ph, name="inc_w_ph")50    self.sess = session.Session()51    self.sess.run(self.v.initializer)52  def tearDown(self):53    ops.reset_default_graph()54    if os.path.isdir(self.session_root):55      file_io.delete_recursively(self.session_root)56  def _assert_correct_run_subdir_naming(self, run_subdir):57    self.assertStartsWith(run_subdir, "run_")58    self.assertEqual(2, run_subdir.count("_"))59    self.assertGreater(int(run_subdir.split("_")[1]), 0)60  def testConstructWrapperWithExistingNonEmptyRootDirRaisesException(self):61    dir_path = os.path.join(self.session_root, "foo")62    os.mkdir(dir_path)63    self.assertTrue(os.path.isdir(dir_path))64    with self.assertRaisesRegexp(65        ValueError, "session_root path points to a non-empty directory"):66      dumping_wrapper.DumpingDebugWrapperSession(67          session.Session(), session_root=self.session_root, log_usage=False)68  def testConstructWrapperWithExistingFileDumpRootRaisesException(self):69    file_path = os.path.join(self.session_root, "foo")70    open(file_path, "a").close()  # Create the file71    self.assertTrue(gfile.Exists(file_path))72    self.assertFalse(gfile.IsDirectory(file_path))73    with self.assertRaisesRegexp(ValueError,74                                 "session_root path points to a file"):75      dumping_wrapper.DumpingDebugWrapperSession(76          session.Session(), session_root=file_path, log_usage=False)77  def testConstructWrapperWithNonexistentSessionRootCreatesDirectory(self):78    new_dir_path = os.path.join(tempfile.mkdtemp(), "new_dir")79    dumping_wrapper.DumpingDebugWrapperSession(80        session.Session(), session_root=new_dir_path, log_usage=False)81    self.assertTrue(gfile.IsDirectory(new_dir_path))82    # Cleanup.83    gfile.DeleteRecursively(new_dir_path)84  def testDumpingOnASingleRunWorks(self):85    sess = dumping_wrapper.DumpingDebugWrapperSession(86        self.sess, session_root=self.session_root, log_usage=False)87    sess.run(self.inc_v)88    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))89    self.assertEqual(1, len(dump_dirs))90    self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))91    dump = debug_data.DebugDumpDir(dump_dirs[0])92    self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))93    self.assertEqual(repr(self.inc_v), dump.run_fetches_info)94    self.assertEqual(repr(None), dump.run_feed_keys_info)95  def testDumpingOnASingleRunWorksWithRelativePathForDebugDumpDir(self):96    sess = dumping_wrapper.DumpingDebugWrapperSession(97        self.sess, session_root=self.session_root, log_usage=False)98    sess.run(self.inc_v)99    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))100    cwd = os.getcwd()101    try:102      os.chdir(self.session_root)103      dump = debug_data.DebugDumpDir(104          os.path.relpath(dump_dirs[0], self.session_root))105      self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))106    finally:107      os.chdir(cwd)108  def testDumpingOnASingleRunWithFeedDictWorks(self):109    sess = dumping_wrapper.DumpingDebugWrapperSession(110        self.sess, session_root=self.session_root, log_usage=False)111    feed_dict = {self.ph: 3.2}112    sess.run(self.inc_w_ph, feed_dict=feed_dict)113    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))114    self.assertEqual(1, len(dump_dirs))115    self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))116    dump = debug_data.DebugDumpDir(dump_dirs[0])117    self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))118    self.assertEqual(repr(self.inc_w_ph), dump.run_fetches_info)119    self.assertEqual(repr(feed_dict.keys()), dump.run_feed_keys_info)120  def testDumpingOnMultipleRunsWorks(self):121    sess = dumping_wrapper.DumpingDebugWrapperSession(122        self.sess, session_root=self.session_root, log_usage=False)123    for _ in range(3):124      sess.run(self.inc_v)125    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))126    dump_dirs = sorted(127        dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))128    self.assertEqual(3, len(dump_dirs))129    for i, dump_dir in enumerate(dump_dirs):130      self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))131      dump = debug_data.DebugDumpDir(dump_dir)132      self.assertAllClose([10.0 + 1.0 * i],133                          dump.get_tensors("v", 0, "DebugIdentity"))134      self.assertEqual(repr(self.inc_v), dump.run_fetches_info)135      self.assertEqual(repr(None), dump.run_feed_keys_info)136  def testUsingNonCallableAsWatchFnRaisesTypeError(self):137    bad_watch_fn = "bad_watch_fn"138    with self.assertRaisesRegexp(TypeError, "watch_fn is not callable"):139      dumping_wrapper.DumpingDebugWrapperSession(140          self.sess,141          session_root=self.session_root,142          watch_fn=bad_watch_fn,143          log_usage=False)144  def testDumpingWithLegacyWatchFnOnFetchesWorks(self):145    """Use a watch_fn that returns different whitelists for different runs."""146    def watch_fn(fetches, feeds):147      del feeds148      # A watch_fn that picks fetch name.149      if fetches.name == "inc_v:0":150        # If inc_v, watch everything.151        return "DebugIdentity", r".*", r".*"152      else:153        # If dec_v, watch nothing.154        return "DebugIdentity", r"$^", r"$^"155    sess = dumping_wrapper.DumpingDebugWrapperSession(156        self.sess,157        session_root=self.session_root,158        watch_fn=watch_fn,159        log_usage=False)160    for _ in range(3):161      sess.run(self.inc_v)162      sess.run(self.dec_v)163    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))164    dump_dirs = sorted(165        dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))166    self.assertEqual(6, len(dump_dirs))167    for i, dump_dir in enumerate(dump_dirs):168      self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))169      dump = debug_data.DebugDumpDir(dump_dir)170      if i % 2 == 0:171        self.assertGreater(dump.size, 0)172        self.assertAllClose([10.0 - 0.4 * (i / 2)],173                            dump.get_tensors("v", 0, "DebugIdentity"))174        self.assertEqual(repr(self.inc_v), dump.run_fetches_info)175        self.assertEqual(repr(None), dump.run_feed_keys_info)176      else:177        self.assertEqual(0, dump.size)178        self.assertEqual(repr(self.dec_v), dump.run_fetches_info)179        self.assertEqual(repr(None), dump.run_feed_keys_info)180  def testDumpingWithLegacyWatchFnWithNonDefaultDebugOpsWorks(self):181    """Use a watch_fn that specifies non-default debug ops."""182    def watch_fn(fetches, feeds):183      del fetches, feeds184      return ["DebugIdentity", "DebugNumericSummary"], r".*", r".*"185    sess = dumping_wrapper.DumpingDebugWrapperSession(186        self.sess,187        session_root=self.session_root,188        watch_fn=watch_fn,189        log_usage=False)190    sess.run(self.inc_v)191    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))192    self.assertEqual(1, len(dump_dirs))193    dump = debug_data.DebugDumpDir(dump_dirs[0])194    self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))195    self.assertEqual(14,196                     len(dump.get_tensors("v", 0, "DebugNumericSummary")[0]))197  def testDumpingWithWatchFnWithNonDefaultDebugOpsWorks(self):198    """Use a watch_fn that specifies non-default debug ops."""199    def watch_fn(fetches, feeds):200      del fetches, feeds201      return framework.WatchOptions(202          debug_ops=["DebugIdentity", "DebugNumericSummary"],203          node_name_regex_whitelist=r"^v.*",204          op_type_regex_whitelist=r".*",205          tensor_dtype_regex_whitelist=".*_ref")206    sess = dumping_wrapper.DumpingDebugWrapperSession(207        self.sess,208        session_root=self.session_root,209        watch_fn=watch_fn,210        log_usage=False)211    sess.run(self.inc_v)212    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))213    self.assertEqual(1, len(dump_dirs))214    dump = debug_data.DebugDumpDir(dump_dirs[0])215    self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))216    self.assertEqual(14,217                     len(dump.get_tensors("v", 0, "DebugNumericSummary")[0]))218    dumped_nodes = [dump.node_name for dump in dump.dumped_tensor_data]219    self.assertNotIn("inc_v", dumped_nodes)220    self.assertNotIn("delta", dumped_nodes)221  def testDumpingDebugHookWithoutWatchFnWorks(self):222    dumping_hook = hooks.DumpingDebugHook(self.session_root, log_usage=False)223    mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])224    mon_sess.run(self.inc_v)225    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))226    self.assertEqual(1, len(dump_dirs))227    self._assert_correct_run_subdir_naming(os.path.basename(dump_dirs[0]))228    dump = debug_data.DebugDumpDir(dump_dirs[0])229    self.assertAllClose([10.0], dump.get_tensors("v", 0, "DebugIdentity"))230    self.assertEqual(repr(self.inc_v), dump.run_fetches_info)231    self.assertEqual(repr(None), dump.run_feed_keys_info)232  def testDumpingDebugHookWithStatefulWatchFnWorks(self):233    watch_fn_state = {"run_counter": 0}234    def counting_watch_fn(fetches, feed_dict):235      del fetches, feed_dict236      watch_fn_state["run_counter"] += 1237      if watch_fn_state["run_counter"] % 2 == 1:238        # If odd-index run (1-based), watch every ref-type tensor.239        return framework.WatchOptions(240            debug_ops="DebugIdentity",241            tensor_dtype_regex_whitelist=".*_ref")242      else:243        # If even-index run, watch nothing.244        return framework.WatchOptions(245            debug_ops="DebugIdentity",246            node_name_regex_whitelist=r"^$",247            op_type_regex_whitelist=r"^$")248    dumping_hook = hooks.DumpingDebugHook(249        self.session_root, watch_fn=counting_watch_fn, log_usage=False)250    mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])251    for _ in range(4):252      mon_sess.run(self.inc_v)253    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))254    dump_dirs = sorted(255        dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))256    self.assertEqual(4, len(dump_dirs))257    for i, dump_dir in enumerate(dump_dirs):258      self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))259      dump = debug_data.DebugDumpDir(dump_dir)260      if i % 2 == 0:261        self.assertAllClose([10.0 + 1.0 * i],262                            dump.get_tensors("v", 0, "DebugIdentity"))263        self.assertNotIn("delta",264                         [datum.node_name for datum in dump.dumped_tensor_data])265      else:266        self.assertEqual(0, dump.size)267      self.assertEqual(repr(self.inc_v), dump.run_fetches_info)268      self.assertEqual(repr(None), dump.run_feed_keys_info)269  def testDumpingDebugHookWithStatefulLegacyWatchFnWorks(self):270    watch_fn_state = {"run_counter": 0}271    def counting_watch_fn(fetches, feed_dict):272      del fetches, feed_dict273      watch_fn_state["run_counter"] += 1274      if watch_fn_state["run_counter"] % 2 == 1:275        # If odd-index run (1-based), watch everything.276        return "DebugIdentity", r".*", r".*"277      else:278        # If even-index run, watch nothing.279        return "DebugIdentity", r"$^", r"$^"280    dumping_hook = hooks.DumpingDebugHook(281        self.session_root, watch_fn=counting_watch_fn, log_usage=False)282    mon_sess = monitored_session._HookedSession(self.sess, [dumping_hook])283    for _ in range(4):284      mon_sess.run(self.inc_v)285    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))286    dump_dirs = sorted(287        dump_dirs, key=lambda x: int(os.path.basename(x).split("_")[1]))288    self.assertEqual(4, len(dump_dirs))289    for i, dump_dir in enumerate(dump_dirs):290      self._assert_correct_run_subdir_naming(os.path.basename(dump_dir))291      dump = debug_data.DebugDumpDir(dump_dir)292      if i % 2 == 0:293        self.assertAllClose([10.0 + 1.0 * i],294                            dump.get_tensors("v", 0, "DebugIdentity"))295      else:296        self.assertEqual(0, dump.size)297      self.assertEqual(repr(self.inc_v), dump.run_fetches_info)298      self.assertEqual(repr(None), dump.run_feed_keys_info)299  def testDumpingFromMultipleThreadsObeysThreadNameFilter(self):300    sess = dumping_wrapper.DumpingDebugWrapperSession(301        self.sess, session_root=self.session_root, log_usage=False,302        thread_name_filter=r"MainThread$")303    self.assertAllClose(1.0, sess.run(self.delta))304    child_thread_result = []305    def child_thread_job():306      child_thread_result.append(sess.run(self.eta))307    thread = threading.Thread(name="ChildThread", target=child_thread_job)308    thread.start()309    thread.join()310    self.assertAllClose([-1.4], child_thread_result)311    dump_dirs = glob.glob(os.path.join(self.session_root, "run_*"))312    self.assertEqual(1, len(dump_dirs))313    dump = debug_data.DebugDumpDir(dump_dirs[0])314    self.assertEqual(1, dump.size)315    self.assertEqual("delta", dump.dumped_tensor_data[0].node_name)316  def testDumpingWrapperWithEmptyFetchWorks(self):317    sess = dumping_wrapper.DumpingDebugWrapperSession(318        self.sess, session_root=self.session_root, log_usage=False)319    sess.run([])320if __name__ == "__main__":...session_debug_test.py
Source:session_debug_test.py  
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#     http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Tests for debugger functionalities in tf.Session."""16from __future__ import absolute_import17from __future__ import division18from __future__ import print_function19import glob20import os21import shutil22import tempfile23import numpy as np24from six.moves import xrange  # pylint: disable=redefined-builtin25from tensorflow.core.protobuf import config_pb226from tensorflow.core.util import event_pb227from tensorflow.python.client import session28from tensorflow.python.framework import constant_op29from tensorflow.python.framework import tensor_util30from tensorflow.python.framework import test_util31from tensorflow.python.ops import control_flow_ops32from tensorflow.python.ops import math_ops33from tensorflow.python.ops import state_ops34from tensorflow.python.ops import variables35from tensorflow.python.platform import googletest36class SessionDebugTest(test_util.TensorFlowTestCase):37  def setUp(self):38    self.dump_root_ = tempfile.mkdtemp()39  def tearDown(self):40    # Tear down temporary dump directory.41    shutil.rmtree(self.dump_root_)42  def _addDebugTensorWatch(self,43                           run_opts,44                           node_name,45                           output_slot,46                           debug_op="DebugIdentity",47                           debug_urls=None):48    watch_opts = run_opts.debug_tensor_watch_opts49    # Add debug tensor watch for u.50    watch = watch_opts.add()51    watch.node_name = node_name52    watch.output_slot = 053    watch.debug_ops.append(debug_op)54    if debug_urls:55      for debug_url in debug_urls:56        watch.debug_urls.append(debug_url)57  def _verifyTensorDumpFile(self, dump_file, expected_tensor_name, debug_op,58                            wall_time_lower_bound, expected_tensor_val):59    """Helper method: Verify a Tensor debug dump file and its content.60    Args:61      dump_file: Path to the dump file.62      expected_tensor_name: Expected name of the tensor, e.g., node_a:0.63      debug_op: Name of the debug Op, e.g., DebugIdentity.64      wall_time_lower_bound: Lower bound of the wall time.65      expected_tensor_val: Expected tensor value, as a numpy array.66    """67    self.assertTrue(os.path.isfile(dump_file))68    event = event_pb2.Event()69    f = open(dump_file, "rb")70    event.ParseFromString(f.read())71    wall_time = event.wall_time72    debg_node_name = event.summary.value[0].node_name73    tensor_value = tensor_util.MakeNdarray(event.summary.value[0].tensor)74    self.assertGreater(wall_time, wall_time_lower_bound)75    self.assertEqual("%s:%s" % (expected_tensor_name, debug_op), debg_node_name)76    if expected_tensor_val.dtype.type is np.string_:77      self.assertEqual(str(expected_tensor_val), str(tensor_value))78    else:79      self.assertAllClose(expected_tensor_val, tensor_value)80  def testDumpToFileOverlaoppinpParentDir(self):81    with session.Session() as sess:82      u_init_val = np.array([[5.0, 3.0], [-1.0, 0.0]])83      v_init_val = np.array([[2.0], [-1.0]])84      # Use node names with overlapping namespace (i.e., parent directory) to85      # test concurrent, non-racing directory creation.86      u_name = "testDumpToFile/u"87      v_name = "testDumpToFile/v"88      u_init = constant_op.constant(u_init_val, shape=[2, 2])89      u = variables.Variable(u_init, name=u_name)90      v_init = constant_op.constant(v_init_val, shape=[2, 1])91      v = variables.Variable(v_init, name=v_name)92      w = math_ops.matmul(u, v, name="testDumpToFile/matmul")93      u.initializer.run()94      v.initializer.run()95      run_options = config_pb2.RunOptions()96      debug_url = "file://%s" % self.dump_root_97      # Add debug tensor watch for u.98      self._addDebugTensorWatch(99          run_options, "%s/read" % u_name, 0, debug_urls=[debug_url])100      # Add debug tensor watch for v.101      self._addDebugTensorWatch(102          run_options, "%s/read" % v_name, 0, debug_urls=[debug_url])103      run_metadata = config_pb2.RunMetadata()104      # Invoke Session.run().105      sess.run(w, options=run_options, run_metadata=run_metadata)106      # Verify the dump file for u.107      dump_files = os.listdir(os.path.join(self.dump_root_, u_name))108      self.assertEqual(1, len(dump_files))109      self.assertTrue(dump_files[0].startswith("read_0_"))110      dump_file = os.path.join(self.dump_root_, u_name, dump_files[0])111      self._verifyTensorDumpFile(dump_file, "%s/read:0" % u_name,112                                 "DebugIdentity", 0, u_init_val)113      # Verify the dump file for v.114      dump_files = os.listdir(os.path.join(self.dump_root_, v_name))115      self.assertEqual(1, len(dump_files))116      self.assertTrue(dump_files[0].startswith("read_0_"))117      dump_file = os.path.join(self.dump_root_, v_name, dump_files[0])118      self._verifyTensorDumpFile(dump_file, "%s/read:0" % v_name,119                                 "DebugIdentity", 0, v_init_val)120  def testDumpStringTensorsToFileSystem(self):121    with session.Session() as sess:122      str1_init_val = np.array(b"abc")123      str2_init_val = np.array(b"def")124      str1_init = constant_op.constant(str1_init_val)125      str2_init = constant_op.constant(str2_init_val)126      str1_name = "str1"127      str2_name = "str2"128      str1 = variables.Variable(str1_init, name=str1_name)129      str2 = variables.Variable(str2_init, name=str2_name)130      # Concatenate str1 and str2131      str_concat = math_ops.add(str1, str2, name="str_concat")132      str1.initializer.run()133      str2.initializer.run()134      run_options = config_pb2.RunOptions()135      debug_url = "file://%s" % self.dump_root_136      # Add debug tensor watch for u.137      self._addDebugTensorWatch(138          run_options, "%s/read" % str1_name, 0, debug_urls=[debug_url])139      # Add debug tensor watch for v.140      self._addDebugTensorWatch(141          run_options, "%s/read" % str2_name, 0, debug_urls=[debug_url])142      run_metadata = config_pb2.RunMetadata()143      # Invoke Session.run().144      sess.run(str_concat, options=run_options, run_metadata=run_metadata)145      # Verify the dump file for str1.146      dump_files = os.listdir(os.path.join(self.dump_root_, str1_name))147      self.assertEqual(1, len(dump_files))148      self.assertTrue(dump_files[0].startswith("read_0_"))149      dump_file = os.path.join(self.dump_root_, str1_name, dump_files[0])150      self._verifyTensorDumpFile(dump_file, "%s/read:0" % str1_name,151                                 "DebugIdentity", 0, str1_init_val)152      # Verify the dump file for str2.153      dump_files = os.listdir(os.path.join(self.dump_root_, str2_name))154      self.assertEqual(1, len(dump_files))155      self.assertTrue(dump_files[0].startswith("read_0_"))156      dump_file = os.path.join(self.dump_root_, str2_name, dump_files[0])157      self._verifyTensorDumpFile(dump_file, "%s/read:0" % str2_name,158                                 "DebugIdentity", 0, str2_init_val)159  def testDumpToFileWhileLoop(self):160    with session.Session() as sess:161      num_iter = 10162      # "u" is the Variable being updated in the loop.163      u_name = "testDumpToFileWhileLoop/u"164      u_namespace = u_name.split("/")[0]165      u_init_val = np.array(11.0)166      u_init = constant_op.constant(u_init_val)167      u = variables.Variable(u_init, name=u_name)168      # "v" is the increment.169      v_name = "testDumpToFileWhileLoop/v"170      v_namespace = v_name.split("/")[0]171      v_init_val = np.array(2.0)172      v_init = constant_op.constant(v_init_val)173      v = variables.Variable(v_init, name=v_name)174      u.initializer.run()175      v.initializer.run()176      i = constant_op.constant(0, name="testDumpToFileWhileLoop/i")177      def cond(i):178        return math_ops.less(i, num_iter)179      def body(i):180        new_u = state_ops.assign_add(u, v)181        new_i = math_ops.add(i, 1)182        op = control_flow_ops.group(new_u)183        new_i = control_flow_ops.with_dependencies([op], new_i)184        return [new_i]185      loop = control_flow_ops.while_loop(cond, body, [i], parallel_iterations=1)186      # Create RunOptions for debug-watching tensors187      run_options = config_pb2.RunOptions()188      debug_url = "file://%s" % self.dump_root_189      # Add debug tensor watch for u.190      self._addDebugTensorWatch(run_options, u_name, 0, debug_urls=[debug_url])191      # Add debug tensor watch for v.192      self._addDebugTensorWatch(193          run_options, "%s/read" % v_name, 0, debug_urls=[debug_url])194      # Add debug tensor watch for while/Identity.195      self._addDebugTensorWatch(196          run_options, "while/Identity", 0, debug_urls=[debug_url])197      run_metadata = config_pb2.RunMetadata()198      r = sess.run(loop, options=run_options, run_metadata=run_metadata)199      self.assertEqual(num_iter, r)200      u_val_final = sess.run(u)201      self.assertAllClose(u_init_val + num_iter * v_init_val, u_val_final)202      # Verify dump files203      self.assertTrue(os.path.isdir(self.dump_root_))204      self.assertTrue(os.path.isdir(os.path.join(self.dump_root_, u_namespace)))205      self.assertTrue(206          os.path.isdir(os.path.join(self.dump_root_, v_namespace, "v")))207      # Verify the dump file for tensor "u".208      dump_files = glob.glob(209          os.path.join(self.dump_root_, u_namespace, "u_0_*"))210      self.assertEqual(1, len(dump_files))211      dump_file = os.path.join(self.dump_root_, u_namespace, dump_files[0])212      self.assertTrue(os.path.isfile(dump_file))213      self._verifyTensorDumpFile(dump_file, "%s:0" % u_name, "DebugIdentity", 0,214                                 u_init_val)215      # Verify the dump file for tensor "v".216      dump_files = os.listdir(os.path.join(self.dump_root_, v_name))217      self.assertEqual(1, len(dump_files))218      self.assertTrue(dump_files[0].startswith("read_0_"))219      dump_file = os.path.join(self.dump_root_, v_name, dump_files[0])220      self._verifyTensorDumpFile(dump_file, "%s/read:0" % v_name,221                                 "DebugIdentity", 0, v_init_val)222      # Verify the dump files for tensor while/Identity223      while_identity_dump_files = sorted(224          os.listdir(os.path.join(self.dump_root_, "while")))225      self.assertEqual(num_iter, len(while_identity_dump_files))226      # Verify the content of the individual227      for k in xrange(len(while_identity_dump_files)):228        dump_file_path = os.path.join(self.dump_root_, "while",229                                      while_identity_dump_files[k])230        self._verifyTensorDumpFile(dump_file_path, "while/Identity:0",231                                   "DebugIdentity", 0, np.array(k))232if __name__ == "__main__":...test_postgresql.py
Source:test_postgresql.py  
...18        self.connector.settings['HOST'] = 'hostname'19    def test_user_password_uses_special_characters(self, mock_dump_cmd):20        self.connector.settings['PASSWORD'] = '@!'21        self.connector.settings['USER'] = '@'22        self.connector.create_dump()23        self.assertIn('postgresql://%40:%40%21@hostname/dbname', mock_dump_cmd.call_args[0][0])24    def test_create_dump(self, mock_dump_cmd):25        dump = self.connector.create_dump()26        # Test dump27        dump_content = dump.read()28        self.assertTrue(dump_content)29        self.assertEqual(dump_content, b'foo')30        # Test cmd31        self.assertTrue(mock_dump_cmd.called)32    def test_create_dump_without_host_raises_error(self, mock_dump_cmd):33        self.connector.settings.pop('HOST', None)34        with self.assertRaises(DumpError):35            self.connector.create_dump()36    def test_password_but_no_user(self, mock_dump_cmd):37        self.connector.settings.pop('USER', None)38        self.connector.settings['PASSWORD'] = 'hello'39        self.connector.create_dump()40        self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])41    def test_create_dump_host(self, mock_dump_cmd):42        # With43        self.connector.settings['HOST'] = 'foo'44        self.connector.create_dump()45        self.assertIn('postgresql://foo/dbname', mock_dump_cmd.call_args[0][0])46    def test_create_dump_port(self, mock_dump_cmd):47        # Without48        self.connector.settings.pop('PORT', None)49        self.connector.create_dump()50        self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])51        # With52        self.connector.settings['PORT'] = 4253        self.connector.create_dump()54        self.assertIn('postgresql://hostname:42/dbname', mock_dump_cmd.call_args[0][0])55    def test_create_dump_user(self, mock_dump_cmd):56        # Without57        self.connector.settings.pop('USER', None)58        self.connector.create_dump()59        self.assertIn('postgresql://hostname/dbname', mock_dump_cmd.call_args[0][0])60        # With61        self.connector.settings['USER'] = 'foo'62        self.connector.create_dump()63        self.assertIn('postgresql://foo@hostname/dbname', mock_dump_cmd.call_args[0][0])64    def test_create_dump_exclude(self, mock_dump_cmd):65        # Without66        self.connector.create_dump()67        self.assertNotIn(' --exclude-table-data=', mock_dump_cmd.call_args[0][0])68        # With69        self.connector.exclude = ('foo',)70        self.connector.create_dump()71        self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])72        # With serveral73        self.connector.exclude = ('foo', 'bar')74        self.connector.create_dump()75        self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])76        self.assertIn(' --exclude-table-data=bar', mock_dump_cmd.call_args[0][0])77    def test_create_dump_drop(self, mock_dump_cmd):78        # Without79        self.connector.drop = False80        self.connector.create_dump()81        self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])82        # With83        self.connector.drop = True84        self.connector.create_dump()85        self.assertIn(' --clean', mock_dump_cmd.call_args[0][0])86    @patch('dbbackup.db.postgresql.PgDumpConnector.run_command',87           return_value=(BytesIO(), BytesIO()))88    def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):89        dump = self.connector.create_dump()90        self.connector.restore_dump(dump)91        # Test cmd92        self.assertTrue(mock_restore_cmd.called)93    @patch('dbbackup.db.postgresql.PgDumpConnector.run_command',94           return_value=(BytesIO(), BytesIO()))95    def test_restore_dump_user(self, mock_dump_cmd, mock_restore_cmd):96        dump = self.connector.create_dump()97        # Without98        self.connector.settings.pop('USER', None)99        self.connector.restore_dump(dump)100        self.assertIn(101            'postgresql://hostname/dbname',102            mock_restore_cmd.call_args[0][0]103        )104        self.assertNotIn(' --username=', mock_restore_cmd.call_args[0][0])105        # With106        self.connector.settings['USER'] = 'foo'107        self.connector.restore_dump(dump)108        self.assertIn(109            'postgresql://foo@hostname/dbname',110            mock_restore_cmd.call_args[0][0]111        )112@patch('dbbackup.db.postgresql.PgDumpBinaryConnector.run_command',113       return_value=(BytesIO(b'foo'), BytesIO()))114class PgDumpBinaryConnectorTest(TestCase):115    def setUp(self):116        self.connector = PgDumpBinaryConnector()117        self.connector.settings['HOST'] = 'hostname'118        self.connector.settings['ENGINE'] = 'django.db.backends.postgresql'119        self.connector.settings['NAME'] = 'dbname'120    def test_create_dump(self, mock_dump_cmd):121        dump = self.connector.create_dump()122        # Test dump123        dump_content = dump.read()124        self.assertTrue(dump_content)125        self.assertEqual(dump_content, b'foo')126        # Test cmd127        self.assertTrue(mock_dump_cmd.called)128        self.assertIn('--format=custom', mock_dump_cmd.call_args[0][0])129    def test_create_dump_exclude(self, mock_dump_cmd):130        # Without131        self.connector.create_dump()132        self.assertNotIn(' --exclude-table-data=', mock_dump_cmd.call_args[0][0])133        # With134        self.connector.exclude = ('foo',)135        self.connector.create_dump()136        self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])137        # With serveral138        self.connector.exclude = ('foo', 'bar')139        self.connector.create_dump()140        self.assertIn(' --exclude-table-data=foo', mock_dump_cmd.call_args[0][0])141        self.assertIn(' --exclude-table-data=bar', mock_dump_cmd.call_args[0][0])142    def test_create_dump_drop(self, mock_dump_cmd):143        # Without144        self.connector.drop = False145        self.connector.create_dump()146        self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])147        # Binary drop at restore level148        self.connector.drop = True149        self.connector.create_dump()150        self.assertNotIn(' --clean', mock_dump_cmd.call_args[0][0])151    @patch('dbbackup.db.postgresql.PgDumpBinaryConnector.run_command',152           return_value=(BytesIO(), BytesIO()))153    def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):154        dump = self.connector.create_dump()155        self.connector.restore_dump(dump)156        # Test cmd157        self.assertTrue(mock_restore_cmd.called)158@patch('dbbackup.db.postgresql.PgDumpGisConnector.run_command',159       return_value=(BytesIO(b'foo'), BytesIO()))160class PgDumpGisConnectorTest(TestCase):161    def setUp(self):162        self.connector = PgDumpGisConnector()163        self.connector.settings['HOST'] = 'hostname'164    @patch('dbbackup.db.postgresql.PgDumpGisConnector.run_command',165           return_value=(BytesIO(b'foo'), BytesIO()))166    def test_restore_dump(self, mock_dump_cmd, mock_restore_cmd):167        dump = self.connector.create_dump()168        # Without ADMINUSER169        self.connector.settings.pop('ADMIN_USER', None)170        self.connector.restore_dump(dump)171        self.assertTrue(mock_restore_cmd.called)172        # With173        self.connector.settings['ADMIN_USER'] = 'foo'174        self.connector.restore_dump(dump)175        self.assertTrue(mock_restore_cmd.called)176    def test_enable_postgis(self, mock_dump_cmd):177        self.connector.settings['ADMIN_USER'] = 'foo'178        self.connector._enable_postgis()179        self.assertIn('"CREATE EXTENSION IF NOT EXISTS postgis;"', mock_dump_cmd.call_args[0][0])180        self.assertIn('--username=foo', mock_dump_cmd.call_args[0][0])181    def test_enable_postgis_host(self, mock_dump_cmd):182        self.connector.settings['ADMIN_USER'] = 'foo'183        # Without184        self.connector.settings.pop('HOST', None)185        self.connector._enable_postgis()186        self.assertNotIn(' --host=', mock_dump_cmd.call_args[0][0])187        # With188        self.connector.settings['HOST'] = 'foo'189        self.connector._enable_postgis()190        self.assertIn(' --host=foo', mock_dump_cmd.call_args[0][0])191    def test_enable_postgis_port(self, mock_dump_cmd):192        self.connector.settings['ADMIN_USER'] = 'foo'193        # Without194        self.connector.settings.pop('PORT', None)195        self.connector._enable_postgis()196        self.assertNotIn(' --port=', mock_dump_cmd.call_args[0][0])197        # With198        self.connector.settings['PORT'] = 42199        self.connector._enable_postgis()200        self.assertIn(' --port=42', mock_dump_cmd.call_args[0][0])201@patch('dbbackup.db.base.Popen', **{202    'return_value.wait.return_value': True,203    'return_value.poll.return_value': False,204})205class PgDumpConnectorRunCommandTest(TestCase):206    def test_run_command(self, mock_popen):207        connector = PgDumpConnector()208        connector.settings['HOST'] = 'hostname'209        connector.create_dump()210        self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')211    def test_run_command_with_password(self, mock_popen):212        connector = PgDumpConnector()213        connector.settings['HOST'] = 'hostname'214        connector.settings['PASSWORD'] = 'foo'215        connector.create_dump()216        self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')217    def test_run_command_with_password_and_other(self, mock_popen):218        connector = PgDumpConnector(env={'foo': 'bar'})219        connector.settings['HOST'] = 'hostname'220        connector.settings['PASSWORD'] = 'foo'221        connector.create_dump()222        self.assertEqual(mock_popen.call_args[0][0][0], 'pg_dump')223        self.assertIn('foo', mock_popen.call_args[1]['env'])...elf-dump
Source:elf-dump  
...49        self.sh_link = f.read32()50        self.sh_info = f.read32()51        self.sh_addralign = f.readWord()52        self.sh_entsize = f.readWord()53    def dump(self, shstrtab, f, strtab, dumpdata):54        print "  (('sh_name', %s)" % common_dump.HexDump(self.sh_name), "# %r" % shstrtab[self.sh_name[0]]55        print "   ('sh_type', %s)" % common_dump.HexDump(self.sh_type)56        print "   ('sh_flags', %s)" % common_dump.HexDump(self.sh_flags)57        print "   ('sh_addr', %s)" % common_dump.HexDump(self.sh_addr)58        print "   ('sh_offset', %s)" % common_dump.HexDump(self.sh_offset)59        print "   ('sh_size', %s)" % common_dump.HexDump(self.sh_size)60        print "   ('sh_link', %s)" % common_dump.HexDump(self.sh_link)61        print "   ('sh_info', %s)" % common_dump.HexDump(self.sh_info)62        print "   ('sh_addralign', %s)" % common_dump.HexDump(self.sh_addralign)63        print "   ('sh_entsize', %s)" % common_dump.HexDump(self.sh_entsize)64        if self.sh_type[0] == 2: # SHT_SYMTAB65            print "   ('_symbols', ["66            dumpSymtab(f, self, strtab)67            print "   ])"68        elif self.sh_type[0] == 4 or self.sh_type[0] == 9: # SHT_RELA / SHT_REL69            print "   ('_relocations', ["70            dumpRel(f, self, self.sh_type[0] == 4)71            print "   ])"72        elif dumpdata:73            f.seek(self.sh_offset[0])74            if self.sh_type != 8: # != SHT_NOBITS75                data = f.read(self.sh_size[0])76                print "   ('_section_data', '%s')" % common_dump.dataToHex(data)77            else:78                print "   ('_section_data', '')" 79        print "  ),"80def dumpSymtab(f, section, strtab):81    entries = section.sh_size[0] // section.sh_entsize[0]82    for index in range(entries):83        f.seek(section.sh_offset[0] + index * section.sh_entsize[0])84        print "    # Symbol %s" % index85        name = f.read32()86        print "    (('st_name', %s)" % common_dump.HexDump(name), "# %r" % strtab[name[0]]87        if not f.is64Bit:88            print "     ('st_value', %s)" % common_dump.HexDump(f.read32())89            print "     ('st_size', %s)" % common_dump.HexDump(f.read32())90        st_info = f.read8()[0]91        st_bind = (st_info >> 4, 4)92        st_type = (st_info & 0xf, 4)93        print "     ('st_bind', %s)" % common_dump.HexDump(st_bind)94        print "     ('st_type', %s)" % common_dump.HexDump(st_type)95        print "     ('st_other', %s)" % common_dump.HexDump(f.read8())96        print "     ('st_shndx', %s)" % common_dump.HexDump(f.read16())97        if f.is64Bit:98            print "     ('st_value', %s)" % common_dump.HexDump(f.read64())99            print "     ('st_size', %s)" % common_dump.HexDump(f.read64())100        print "    ),"101def dumpRel(f, section, dumprela = False):102    entries = section.sh_size[0] // section.sh_entsize[0]103    for index in range(entries):104        f.seek(section.sh_offset[0] + index * section.sh_entsize[0])105        print "    # Relocation %s" % index106        print "    (('r_offset', %s)" % common_dump.HexDump(f.readWord())107        r_info = f.readWord()[0]108        if f.is64Bit:109            r_sym = (r_info >> 32, 32)110            r_type = (r_info & 0xffffffff, 32)111        else:112            r_sym = (r_info >> 8, 24)113            r_type = (r_info & 0xff, 8)114        print "     ('r_sym', %s)" % common_dump.HexDump(r_sym)115        print "     ('r_type', %s)" % common_dump.HexDump(r_type)116        if dumprela:117            print "     ('r_addend', %s)" % common_dump.HexDump(f.readWord())118        print "    ),"119def dumpELF(path, opts):120    f = Reader(path)121    magic = f.read(4)122    assert magic == '\x7FELF'123    fileclass = f.read8()124    if fileclass[0] == 1: # ELFCLASS32125        f.is64Bit = False126    elif fileclass[0] == 2: # ELFCLASS64127        f.is64Bit = True128    else:129        raise ValueError, "Unknown file class %s" % common_dump.HexDump(fileclass)130    print "('e_indent[EI_CLASS]', %s)" % common_dump.HexDump(fileclass)131    byteordering = f.read8()132    if byteordering[0] == 1: # ELFDATA2LSB133        f.isLSB = True134    elif byteordering[0] == 2: # ELFDATA2MSB135        f.isLSB = False136    else:137        raise ValueError, "Unknown byte ordering %s" % common_dump.HexDump(byteordering)138    print "('e_indent[EI_DATA]', %s)" % common_dump.HexDump(byteordering)139    print "('e_indent[EI_VERSION]', %s)" % common_dump.HexDump(f.read8())140    print "('e_indent[EI_OSABI]', %s)" % common_dump.HexDump(f.read8())141    print "('e_indent[EI_ABIVERSION]', %s)" % common_dump.HexDump(f.read8())142    f.seek(16) # Seek to end of e_ident.143    print "('e_type', %s)" % common_dump.HexDump(f.read16())144    print "('e_machine', %s)" % common_dump.HexDump(f.read16())145    print "('e_version', %s)" % common_dump.HexDump(f.read32())146    print "('e_entry', %s)" % common_dump.HexDump(f.readWord())147    print "('e_phoff', %s)" % common_dump.HexDump(f.readWord())148    e_shoff = f.readWord()149    print "('e_shoff', %s)" % common_dump.HexDump(e_shoff)150    print "('e_flags', %s)" % common_dump.HexDump(f.read32())151    print "('e_ehsize', %s)" % common_dump.HexDump(f.read16())152    print "('e_phentsize', %s)" % common_dump.HexDump(f.read16())153    print "('e_phnum', %s)" % common_dump.HexDump(f.read16())154    e_shentsize = f.read16()155    print "('e_shentsize', %s)" % common_dump.HexDump(e_shentsize)156    e_shnum = f.read16()157    print "('e_shnum', %s)" % common_dump.HexDump(e_shnum)158    e_shstrndx = f.read16()159    print "('e_shstrndx', %s)" % common_dump.HexDump(e_shstrndx)160    # Read all section headers161    sections = []162    for index in range(e_shnum[0]):163        f.seek(e_shoff[0] + index * e_shentsize[0])164        s = Section(f)165        sections.append(s)166    # Read .shstrtab so we can resolve section names167    f.seek(sections[e_shstrndx[0]].sh_offset[0])168    shstrtab = StringTable(f.read(sections[e_shstrndx[0]].sh_size[0]))169    # Get the symbol string table170    strtab = None171    for section in sections:172        if shstrtab[section.sh_name[0]] == ".strtab":173            f.seek(section.sh_offset[0])174            strtab = StringTable(f.read(section.sh_size[0]))175            break176    print "('_sections', ["177    for index in range(e_shnum[0]):178        print "  # Section %s" % index179        sections[index].dump(shstrtab, f, strtab, opts.dumpSectionData)180    print "])"181if __name__ == "__main__":182    from optparse import OptionParser, OptionGroup183    parser = OptionParser("usage: %prog [options] {files}")184    parser.add_option("", "--dump-section-data", dest="dumpSectionData",185                      help="Dump the contents of sections",186                      action="store_true", default=False)187    (opts, args) = parser.parse_args()188    if not args:189        args.append('-')190    for arg in args:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
