How to use ok method in tappy

Best Python code snippet using tappy_python

ldb_test.py

Source:ldb_test.py Github

copy

Full Screen

1import os2import os.path3import shutil4import subprocess5import time6import unittest7import tempfile8def my_check_output(*popenargs, **kwargs):9 """10 If we had python 2.7, we should simply use subprocess.check_output.11 This is a stop-gap solution for python 2.612 """13 if 'stdout' in kwargs:14 raise ValueError('stdout argument not allowed, it will be overridden.')15 process = subprocess.Popen(stderr=subprocess.PIPE, stdout=subprocess.PIPE,16 *popenargs, **kwargs)17 output, unused_err = process.communicate()18 retcode = process.poll()19 if retcode:20 cmd = kwargs.get("args")21 if cmd is None:22 cmd = popenargs[0]23 raise Exception("Exit code is not 0. It is %d. Command: %s" %24 (retcode, cmd))25 return output26def run_err_null(cmd):27 return os.system(cmd + " 2>/dev/null ")28class LDBTestCase(unittest.TestCase):29 def setUp(self):30 self.TMP_DIR = tempfile.mkdtemp(prefix="ldb_test_")31 self.DB_NAME = "testdb"32 def tearDown(self):33 assert(self.TMP_DIR.strip() != "/"34 and self.TMP_DIR.strip() != "/tmp"35 and self.TMP_DIR.strip() != "/tmp/") #Just some paranoia36 shutil.rmtree(self.TMP_DIR)37 def dbParam(self, dbName):38 return "--db=%s" % os.path.join(self.TMP_DIR, dbName)39 def assertRunOKFull(self, params, expectedOutput, unexpected=False):40 """41 All command-line params must be specified.42 Allows full flexibility in testing; for example: missing db param.43 """44 output = my_check_output("./ldb %s |grep -v \"Created bg thread\"" %45 params, shell=True)46 if not unexpected:47 self.assertEqual(output.strip(), expectedOutput.strip())48 else:49 self.assertNotEqual(output.strip(), expectedOutput.strip())50 def assertRunFAILFull(self, params):51 """52 All command-line params must be specified.53 Allows full flexibility in testing; for example: missing db param.54 """55 try:56 my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \57 thread\"" % params, shell=True)58 except Exception, e:59 return60 self.fail(61 "Exception should have been raised for command with params: %s" %62 params)63 def assertRunOK(self, params, expectedOutput, unexpected=False):64 """65 Uses the default test db.66 """67 self.assertRunOKFull("%s %s" % (self.dbParam(self.DB_NAME), params),68 expectedOutput, unexpected)69 def assertRunFAIL(self, params):70 """71 Uses the default test db.72 """73 self.assertRunFAILFull("%s %s" % (self.dbParam(self.DB_NAME), params))74 def testSimpleStringPutGet(self):75 print "Running testSimpleStringPutGet..."76 self.assertRunFAIL("put x1 y1")77 self.assertRunOK("put --create_if_missing x1 y1", "OK")78 self.assertRunOK("get x1", "y1")79 self.assertRunFAIL("get x2")80 self.assertRunOK("put x2 y2", "OK")81 self.assertRunOK("get x1", "y1")82 self.assertRunOK("get x2", "y2")83 self.assertRunFAIL("get x3")84 self.assertRunOK("scan --from=x1 --to=z", "x1 : y1\nx2 : y2")85 self.assertRunOK("put x3 y3", "OK")86 self.assertRunOK("scan --from=x1 --to=z", "x1 : y1\nx2 : y2\nx3 : y3")87 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3")88 self.assertRunOK("scan --from=x", "x1 : y1\nx2 : y2\nx3 : y3")89 self.assertRunOK("scan --to=x2", "x1 : y1")90 self.assertRunOK("scan --from=x1 --to=z --max_keys=1", "x1 : y1")91 self.assertRunOK("scan --from=x1 --to=z --max_keys=2",92 "x1 : y1\nx2 : y2")93 self.assertRunOK("scan --from=x1 --to=z --max_keys=3",94 "x1 : y1\nx2 : y2\nx3 : y3")95 self.assertRunOK("scan --from=x1 --to=z --max_keys=4",96 "x1 : y1\nx2 : y2\nx3 : y3")97 self.assertRunOK("scan --from=x1 --to=x2", "x1 : y1")98 self.assertRunOK("scan --from=x2 --to=x4", "x2 : y2\nx3 : y3")99 self.assertRunFAIL("scan --from=x4 --to=z") # No results => FAIL100 self.assertRunFAIL("scan --from=x1 --to=z --max_keys=foo")101 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3")102 self.assertRunOK("delete x1", "OK")103 self.assertRunOK("scan", "x2 : y2\nx3 : y3")104 self.assertRunOK("delete NonExistentKey", "OK")105 # It is weird that GET and SCAN raise exception for106 # non-existent key, while delete does not107 self.assertRunOK("checkconsistency", "OK")108 def dumpDb(self, params, dumpFile):109 return 0 == run_err_null("./ldb dump %s > %s" % (params, dumpFile))110 def loadDb(self, params, dumpFile):111 return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params))112 def testStringBatchPut(self):113 print "Running testStringBatchPut..."114 self.assertRunOK("batchput x1 y1 --create_if_missing", "OK")115 self.assertRunOK("scan", "x1 : y1")116 self.assertRunOK("batchput x2 y2 x3 y3 \"x4 abc\" \"y4 xyz\"", "OK")117 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 abc : y4 xyz")118 self.assertRunFAIL("batchput")119 self.assertRunFAIL("batchput k1")120 self.assertRunFAIL("batchput k1 v1 k2")121 def testCountDelimDump(self):122 print "Running testCountDelimDump..."123 self.assertRunOK("batchput x.1 x1 --create_if_missing", "OK")124 self.assertRunOK("batchput y.abc abc y.2 2 z.13c pqr", "OK")125 self.assertRunOK("dump --count_delim", "x => count:1\tsize:5\ny => count:2\tsize:12\nz => count:1\tsize:8")126 self.assertRunOK("dump --count_delim=\".\"", "x => count:1\tsize:5\ny => count:2\tsize:12\nz => count:1\tsize:8")127 self.assertRunOK("batchput x,2 x2 x,abc xabc", "OK")128 self.assertRunOK("dump --count_delim=\",\"", "x => count:2\tsize:14\nx.1 => count:1\tsize:5\ny.2 => count:1\tsize:4\ny.abc => count:1\tsize:8\nz.13c => count:1\tsize:8")129 def testCountDelimIDump(self):130 print "Running testCountDelimIDump..."131 self.assertRunOK("batchput x.1 x1 --create_if_missing", "OK")132 self.assertRunOK("batchput y.abc abc y.2 2 z.13c pqr", "OK")133 self.assertRunOK("dump --count_delim", "x => count:1\tsize:5\ny => count:2\tsize:12\nz => count:1\tsize:8")134 self.assertRunOK("dump --count_delim=\".\"", "x => count:1\tsize:5\ny => count:2\tsize:12\nz => count:1\tsize:8")135 self.assertRunOK("batchput x,2 x2 x,abc xabc", "OK")136 self.assertRunOK("dump --count_delim=\",\"", "x => count:2\tsize:14\nx.1 => count:1\tsize:5\ny.2 => count:1\tsize:4\ny.abc => count:1\tsize:8\nz.13c => count:1\tsize:8")137 def testInvalidCmdLines(self):138 print "Running testInvalidCmdLines..."139 # db not specified140 self.assertRunFAILFull("put 0x6133 0x6233 --hex --create_if_missing")141 # No param called he142 self.assertRunFAIL("put 0x6133 0x6233 --he --create_if_missing")143 # max_keys is not applicable for put144 self.assertRunFAIL("put 0x6133 0x6233 --max_keys=1 --create_if_missing")145 # hex has invalid boolean value146 def testHexPutGet(self):147 print "Running testHexPutGet..."148 self.assertRunOK("put a1 b1 --create_if_missing", "OK")149 self.assertRunOK("scan", "a1 : b1")150 self.assertRunOK("scan --hex", "0x6131 : 0x6231")151 self.assertRunFAIL("put --hex 6132 6232")152 self.assertRunOK("put --hex 0x6132 0x6232", "OK")153 self.assertRunOK("scan --hex", "0x6131 : 0x6231\n0x6132 : 0x6232")154 self.assertRunOK("scan", "a1 : b1\na2 : b2")155 self.assertRunOK("get a1", "b1")156 self.assertRunOK("get --hex 0x6131", "0x6231")157 self.assertRunOK("get a2", "b2")158 self.assertRunOK("get --hex 0x6132", "0x6232")159 self.assertRunOK("get --key_hex 0x6132", "b2")160 self.assertRunOK("get --key_hex --value_hex 0x6132", "0x6232")161 self.assertRunOK("get --value_hex a2", "0x6232")162 self.assertRunOK("scan --key_hex --value_hex",163 "0x6131 : 0x6231\n0x6132 : 0x6232")164 self.assertRunOK("scan --hex --from=0x6131 --to=0x6133",165 "0x6131 : 0x6231\n0x6132 : 0x6232")166 self.assertRunOK("scan --hex --from=0x6131 --to=0x6132",167 "0x6131 : 0x6231")168 self.assertRunOK("scan --key_hex", "0x6131 : b1\n0x6132 : b2")169 self.assertRunOK("scan --value_hex", "a1 : 0x6231\na2 : 0x6232")170 self.assertRunOK("batchput --hex 0x6133 0x6233 0x6134 0x6234", "OK")171 self.assertRunOK("scan", "a1 : b1\na2 : b2\na3 : b3\na4 : b4")172 self.assertRunOK("delete --hex 0x6133", "OK")173 self.assertRunOK("scan", "a1 : b1\na2 : b2\na4 : b4")174 self.assertRunOK("checkconsistency", "OK")175 def testTtlPutGet(self):176 print "Running testTtlPutGet..."177 self.assertRunOK("put a1 b1 --ttl --create_if_missing", "OK")178 self.assertRunOK("scan --hex", "0x6131 : 0x6231", True)179 self.assertRunOK("dump --ttl ", "a1 ==> b1", True)180 self.assertRunOK("dump --hex --ttl ",181 "0x6131 ==> 0x6231\nKeys in range: 1")182 self.assertRunOK("scan --hex --ttl", "0x6131 : 0x6231")183 self.assertRunOK("get --value_hex a1", "0x6231", True)184 self.assertRunOK("get --ttl a1", "b1")185 self.assertRunOK("put a3 b3 --create_if_missing", "OK")186 # fails because timstamp's length is greater than value's187 self.assertRunFAIL("get --ttl a3")188 self.assertRunOK("checkconsistency", "OK")189 def testInvalidCmdLines(self):190 print "Running testInvalidCmdLines..."191 # db not specified192 self.assertRunFAILFull("put 0x6133 0x6233 --hex --create_if_missing")193 # No param called he194 self.assertRunFAIL("put 0x6133 0x6233 --he --create_if_missing")195 # max_keys is not applicable for put196 self.assertRunFAIL("put 0x6133 0x6233 --max_keys=1 --create_if_missing")197 # hex has invalid boolean value198 self.assertRunFAIL("put 0x6133 0x6233 --hex=Boo --create_if_missing")199 def testDumpLoad(self):200 print "Running testDumpLoad..."201 self.assertRunOK("batchput --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4",202 "OK")203 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")204 origDbPath = os.path.join(self.TMP_DIR, self.DB_NAME)205 # Dump and load without any additional params specified206 dumpFilePath = os.path.join(self.TMP_DIR, "dump1")207 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump1")208 self.assertTrue(self.dumpDb("--db=%s" % origDbPath, dumpFilePath))209 self.assertTrue(self.loadDb(210 "--db=%s --create_if_missing" % loadedDbPath, dumpFilePath))211 self.assertRunOKFull("scan --db=%s" % loadedDbPath,212 "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")213 # Dump and load in hex214 dumpFilePath = os.path.join(self.TMP_DIR, "dump2")215 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump2")216 self.assertTrue(self.dumpDb("--db=%s --hex" % origDbPath, dumpFilePath))217 self.assertTrue(self.loadDb(218 "--db=%s --hex --create_if_missing" % loadedDbPath, dumpFilePath))219 self.assertRunOKFull("scan --db=%s" % loadedDbPath,220 "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")221 # Dump only a portion of the key range222 dumpFilePath = os.path.join(self.TMP_DIR, "dump3")223 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump3")224 self.assertTrue(self.dumpDb(225 "--db=%s --from=x1 --to=x3" % origDbPath, dumpFilePath))226 self.assertTrue(self.loadDb(227 "--db=%s --create_if_missing" % loadedDbPath, dumpFilePath))228 self.assertRunOKFull("scan --db=%s" % loadedDbPath, "x1 : y1\nx2 : y2")229 # Dump upto max_keys rows230 dumpFilePath = os.path.join(self.TMP_DIR, "dump4")231 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump4")232 self.assertTrue(self.dumpDb(233 "--db=%s --max_keys=3" % origDbPath, dumpFilePath))234 self.assertTrue(self.loadDb(235 "--db=%s --create_if_missing" % loadedDbPath, dumpFilePath))236 self.assertRunOKFull("scan --db=%s" % loadedDbPath,237 "x1 : y1\nx2 : y2\nx3 : y3")238 # Load into an existing db, create_if_missing is not specified239 self.assertTrue(self.dumpDb("--db=%s" % origDbPath, dumpFilePath))240 self.assertTrue(self.loadDb("--db=%s" % loadedDbPath, dumpFilePath))241 self.assertRunOKFull("scan --db=%s" % loadedDbPath,242 "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")243 # Dump and load with WAL disabled244 dumpFilePath = os.path.join(self.TMP_DIR, "dump5")245 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump5")246 self.assertTrue(self.dumpDb("--db=%s" % origDbPath, dumpFilePath))247 self.assertTrue(self.loadDb(248 "--db=%s --disable_wal --create_if_missing" % loadedDbPath,249 dumpFilePath))250 self.assertRunOKFull("scan --db=%s" % loadedDbPath,251 "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")252 # Dump and load with lots of extra params specified253 extraParams = " ".join(["--bloom_bits=14", "--compression_type=bzip2",254 "--block_size=1024", "--auto_compaction=true",255 "--write_buffer_size=4194304",256 "--file_size=2097152"])257 dumpFilePath = os.path.join(self.TMP_DIR, "dump6")258 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump6")259 self.assertTrue(self.dumpDb(260 "--db=%s %s" % (origDbPath, extraParams), dumpFilePath))261 self.assertTrue(self.loadDb(262 "--db=%s %s --create_if_missing" % (loadedDbPath, extraParams),263 dumpFilePath))264 self.assertRunOKFull("scan --db=%s" % loadedDbPath,265 "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")266 # Dump with count_only267 dumpFilePath = os.path.join(self.TMP_DIR, "dump7")268 loadedDbPath = os.path.join(self.TMP_DIR, "loaded_from_dump7")269 self.assertTrue(self.dumpDb(270 "--db=%s --count_only" % origDbPath, dumpFilePath))271 self.assertTrue(self.loadDb(272 "--db=%s --create_if_missing" % loadedDbPath, dumpFilePath))273 # DB should have atleast one value for scan to work274 self.assertRunOKFull("put --db=%s k1 v1" % loadedDbPath, "OK")275 self.assertRunOKFull("scan --db=%s" % loadedDbPath, "k1 : v1")276 # Dump command fails because of typo in params277 dumpFilePath = os.path.join(self.TMP_DIR, "dump8")278 self.assertFalse(self.dumpDb(279 "--db=%s --create_if_missing" % origDbPath, dumpFilePath))280 def testMiscAdminTask(self):281 print "Running testMiscAdminTask..."282 # These tests need to be improved; for example with asserts about283 # whether compaction or level reduction actually took place.284 self.assertRunOK("batchput --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4",285 "OK")286 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")287 origDbPath = os.path.join(self.TMP_DIR, self.DB_NAME)288 self.assertTrue(0 == run_err_null(289 "./ldb compact --db=%s" % origDbPath))290 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")291 self.assertTrue(0 == run_err_null(292 "./ldb reduce_levels --db=%s --new_levels=2" % origDbPath))293 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")294 self.assertTrue(0 == run_err_null(295 "./ldb reduce_levels --db=%s --new_levels=3" % origDbPath))296 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")297 self.assertTrue(0 == run_err_null(298 "./ldb compact --db=%s --from=x1 --to=x3" % origDbPath))299 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")300 self.assertTrue(0 == run_err_null(301 "./ldb compact --db=%s --hex --from=0x6131 --to=0x6134"302 % origDbPath))303 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")304 #TODO(dilip): Not sure what should be passed to WAL.Currently corrupted.305 self.assertTrue(0 == run_err_null(306 "./ldb dump_wal --db=%s --walfile=%s --header" % (307 origDbPath, os.path.join(origDbPath, "LOG"))))308 self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")309 def testCheckConsistency(self):310 print "Running testCheckConsistency..."311 dbPath = os.path.join(self.TMP_DIR, self.DB_NAME)312 self.assertRunOK("put x1 y1 --create_if_missing", "OK")313 self.assertRunOK("put x2 y2", "OK")314 self.assertRunOK("get x1", "y1")315 self.assertRunOK("checkconsistency", "OK")316 sstFilePath = my_check_output("ls %s" % os.path.join(dbPath, "*.sst"),317 shell=True)318 # Modify the file319 my_check_output("echo 'evil' > %s" % sstFilePath, shell=True)320 self.assertRunFAIL("checkconsistency")321 # Delete the file322 my_check_output("rm -f %s" % sstFilePath, shell=True)323 self.assertRunFAIL("checkconsistency")324if __name__ == "__main__":...

Full Screen

Full Screen

test_query.py

Source:test_query.py Github

copy

Full Screen

...33 def destroy(self):34 self.destroyed = True35 def test_entry_ok_blank(self):36 dialog = self.Dummy_Query(' ')37 self.assertEqual(dialog.entry_ok(), None)38 self.assertEqual((dialog.result, dialog.destroyed), (None, False))39 self.assertIn('blank line', dialog.entry_error['text'])40 def test_entry_ok_good(self):41 dialog = self.Dummy_Query(' good ')42 Equal = self.assertEqual43 Equal(dialog.entry_ok(), 'good')44 Equal((dialog.result, dialog.destroyed), (None, False))45 Equal(dialog.entry_error['text'], '')46 def test_ok_blank(self):47 dialog = self.Dummy_Query('')48 dialog.entry.focus_set = mock.Mock()49 self.assertEqual(dialog.ok(), None)50 self.assertTrue(dialog.entry.focus_set.called)51 del dialog.entry.focus_set52 self.assertEqual((dialog.result, dialog.destroyed), (None, False))53 def test_ok_good(self):54 dialog = self.Dummy_Query('good')55 self.assertEqual(dialog.ok(), None)56 self.assertEqual((dialog.result, dialog.destroyed), ('good', True))57 def test_cancel(self):58 dialog = self.Dummy_Query('does not matter')59 self.assertEqual(dialog.cancel(), None)60 self.assertEqual((dialog.result, dialog.destroyed), (None, True))61class SectionNameTest(unittest.TestCase):62 "Test SectionName subclass of Query."63 class Dummy_SectionName:64 entry_ok = query.SectionName.entry_ok # Function being tested.65 used_names = ['used']66 def __init__(self, dummy_entry):67 self.entry = Var(value=dummy_entry)68 self.entry_error = {'text': ''}69 def showerror(self, message):70 self.entry_error['text'] = message71 def test_blank_section_name(self):72 dialog = self.Dummy_SectionName(' ')73 self.assertEqual(dialog.entry_ok(), None)74 self.assertIn('no name', dialog.entry_error['text'])75 def test_used_section_name(self):76 dialog = self.Dummy_SectionName('used')77 self.assertEqual(dialog.entry_ok(), None)78 self.assertIn('use', dialog.entry_error['text'])79 def test_long_section_name(self):80 dialog = self.Dummy_SectionName('good'*8)81 self.assertEqual(dialog.entry_ok(), None)82 self.assertIn('longer than 30', dialog.entry_error['text'])83 def test_good_section_name(self):84 dialog = self.Dummy_SectionName(' good ')85 self.assertEqual(dialog.entry_ok(), 'good')86 self.assertEqual(dialog.entry_error['text'], '')87class ModuleNameTest(unittest.TestCase):88 "Test ModuleName subclass of Query."89 class Dummy_ModuleName:90 entry_ok = query.ModuleName.entry_ok # Function being tested.91 text0 = ''92 def __init__(self, dummy_entry):93 self.entry = Var(value=dummy_entry)94 self.entry_error = {'text': ''}95 def showerror(self, message):96 self.entry_error['text'] = message97 def test_blank_module_name(self):98 dialog = self.Dummy_ModuleName(' ')99 self.assertEqual(dialog.entry_ok(), None)100 self.assertIn('no name', dialog.entry_error['text'])101 def test_bogus_module_name(self):102 dialog = self.Dummy_ModuleName('__name_xyz123_should_not_exist__')103 self.assertEqual(dialog.entry_ok(), None)104 self.assertIn('not found', dialog.entry_error['text'])105 def test_c_source_name(self):106 dialog = self.Dummy_ModuleName('itertools')107 self.assertEqual(dialog.entry_ok(), None)108 self.assertIn('source-based', dialog.entry_error['text'])109 def test_good_module_name(self):110 dialog = self.Dummy_ModuleName('idlelib')111 self.assertTrue(dialog.entry_ok().endswith('__init__.py'))112 self.assertEqual(dialog.entry_error['text'], '')113class GotoTest(unittest.TestCase):114 "Test Goto subclass of Query."115 class Dummy_ModuleName:116 entry_ok = query.Goto.entry_ok # Function being tested.117 def __init__(self, dummy_entry):118 self.entry = Var(value=dummy_entry)119 self.entry_error = {'text': ''}120 def showerror(self, message):121 self.entry_error['text'] = message122 def test_bogus_goto(self):123 dialog = self.Dummy_ModuleName('a')124 self.assertEqual(dialog.entry_ok(), None)125 self.assertIn('not a base 10 integer', dialog.entry_error['text'])126 def test_bad_goto(self):127 dialog = self.Dummy_ModuleName('0')128 self.assertEqual(dialog.entry_ok(), None)129 self.assertIn('not a positive integer', dialog.entry_error['text'])130 def test_good_goto(self):131 dialog = self.Dummy_ModuleName('1')132 self.assertEqual(dialog.entry_ok(), 1)133 self.assertEqual(dialog.entry_error['text'], '')134# 3 HelpSource test classes each test one method.135class HelpsourceBrowsefileTest(unittest.TestCase):136 "Test browse_file method of ModuleName subclass of Query."137 class Dummy_HelpSource:138 browse_file = query.HelpSource.browse_file139 pathvar = Var()140 def test_file_replaces_path(self):141 dialog = self.Dummy_HelpSource()142 # Path is widget entry, either '' or something.143 # Func return is file dialog return, either '' or something.144 # Func return should override widget entry.145 # We need all 4 combinations to test all (most) code paths.146 for path, func, result in (147 ('', lambda a,b,c:'', ''),148 ('', lambda a,b,c: __file__, __file__),149 ('htest', lambda a,b,c:'', 'htest'),150 ('htest', lambda a,b,c: __file__, __file__)):151 with self.subTest():152 dialog.pathvar.set(path)153 dialog.askfilename = func154 dialog.browse_file()155 self.assertEqual(dialog.pathvar.get(), result)156class HelpsourcePathokTest(unittest.TestCase):157 "Test path_ok method of HelpSource subclass of Query."158 class Dummy_HelpSource:159 path_ok = query.HelpSource.path_ok160 def __init__(self, dummy_path):161 self.path = Var(value=dummy_path)162 self.path_error = {'text': ''}163 def showerror(self, message, widget=None):164 self.path_error['text'] = message165 orig_platform = query.platform # Set in test_path_ok_file.166 @classmethod167 def tearDownClass(cls):168 query.platform = cls.orig_platform169 def test_path_ok_blank(self):170 dialog = self.Dummy_HelpSource(' ')171 self.assertEqual(dialog.path_ok(), None)172 self.assertIn('no help file', dialog.path_error['text'])173 def test_path_ok_bad(self):174 dialog = self.Dummy_HelpSource(__file__ + 'bad-bad-bad')175 self.assertEqual(dialog.path_ok(), None)176 self.assertIn('not exist', dialog.path_error['text'])177 def test_path_ok_web(self):178 dialog = self.Dummy_HelpSource('')179 Equal = self.assertEqual180 for url in 'www.py.org', 'http://py.org':181 with self.subTest():182 dialog.path.set(url)183 self.assertEqual(dialog.path_ok(), url)184 self.assertEqual(dialog.path_error['text'], '')185 def test_path_ok_file(self):186 dialog = self.Dummy_HelpSource('')187 for platform, prefix in ('darwin', 'file://'), ('other', ''):188 with self.subTest():189 query.platform = platform190 dialog.path.set(__file__)191 self.assertEqual(dialog.path_ok(), prefix + __file__)192 self.assertEqual(dialog.path_error['text'], '')193class HelpsourceEntryokTest(unittest.TestCase):194 "Test entry_ok method of HelpSource subclass of Query."195 class Dummy_HelpSource:196 entry_ok = query.HelpSource.entry_ok197 entry_error = {}198 path_error = {}199 def item_ok(self):200 return self.name201 def path_ok(self):202 return self.path203 def test_entry_ok_helpsource(self):204 dialog = self.Dummy_HelpSource()205 for name, path, result in ((None, None, None),206 (None, 'doc.txt', None),207 ('doc', None, None),208 ('doc', 'doc.txt', ('doc', 'doc.txt'))):209 with self.subTest():210 dialog.name, dialog.path = name, path211 self.assertEqual(dialog.entry_ok(), result)212# 2 CustomRun test classes each test one method.213class CustomRunCLIargsokTest(unittest.TestCase):214 "Test cli_ok method of the CustomRun subclass of Query."215 class Dummy_CustomRun:216 cli_args_ok = query.CustomRun.cli_args_ok217 def __init__(self, dummy_entry):218 self.entry = Var(value=dummy_entry)219 self.entry_error = {'text': ''}220 def showerror(self, message):221 self.entry_error['text'] = message222 def test_blank_args(self):223 dialog = self.Dummy_CustomRun(' ')224 self.assertEqual(dialog.cli_args_ok(), [])225 def test_invalid_args(self):226 dialog = self.Dummy_CustomRun("'no-closing-quote")227 self.assertEqual(dialog.cli_args_ok(), None)228 self.assertIn('No closing', dialog.entry_error['text'])229 def test_good_args(self):230 args = ['-n', '10', '--verbose', '-p', '/path', '--name']231 dialog = self.Dummy_CustomRun(' '.join(args) + ' "my name"')232 self.assertEqual(dialog.cli_args_ok(), args + ["my name"])233 self.assertEqual(dialog.entry_error['text'], '')234class CustomRunEntryokTest(unittest.TestCase):235 "Test entry_ok method of the CustomRun subclass of Query."236 class Dummy_CustomRun:237 entry_ok = query.CustomRun.entry_ok238 entry_error = {}239 restartvar = Var()240 def cli_args_ok(self):241 return self.cli_args242 def test_entry_ok_customrun(self):243 dialog = self.Dummy_CustomRun()244 for restart in {True, False}:245 dialog.restartvar.set(restart)246 for cli_args, result in ((None, None),247 (['my arg'], (['my arg'], restart))):248 with self.subTest(restart=restart, cli_args=cli_args):249 dialog.cli_args = cli_args250 self.assertEqual(dialog.entry_ok(), result)251# GUI TESTS252class QueryGuiTest(unittest.TestCase):253 @classmethod254 def setUpClass(cls):255 requires('gui')256 cls.root = root = Tk()257 cls.root.withdraw()258 cls.dialog = query.Query(root, 'TEST', 'test', _utest=True)259 cls.dialog.destroy = mock.Mock()260 @classmethod261 def tearDownClass(cls):262 del cls.dialog.destroy263 del cls.dialog264 cls.root.destroy()265 del cls.root266 def setUp(self):267 self.dialog.entry.delete(0, 'end')268 self.dialog.result = None269 self.dialog.destroy.reset_mock()270 def test_click_ok(self):271 dialog = self.dialog272 dialog.entry.insert(0, 'abc')273 dialog.button_ok.invoke()274 self.assertEqual(dialog.result, 'abc')275 self.assertTrue(dialog.destroy.called)276 def test_click_blank(self):277 dialog = self.dialog278 dialog.button_ok.invoke()279 self.assertEqual(dialog.result, None)280 self.assertFalse(dialog.destroy.called)281 def test_click_cancel(self):282 dialog = self.dialog283 dialog.entry.insert(0, 'abc')284 dialog.button_cancel.invoke()...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

1from django.shortcuts import render2from django.http import HttpResponse3from django.http import JsonResponse4from datetime import datetime5from django.utils.dateparse import parse_datetime6import json7import requests8from kafka import SimpleProducer, KafkaClient9from elasticsearch import Elasticsearch10# Create your views here.11kafka = KafkaClient('kafka:9092')12producer = SimpleProducer(kafka)13es = Elasticsearch(['es'])14def ride_detail(request, ride):15 if request.method != 'GET':16 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be GET'})17 r = requests.get('http://models-api:8000/models/get_ride/' + ride)18 ok = json.loads(r.text)['ok']19 if(ok != True):20 return JsonResponse({'ok':False})21 ride = json.loads(r.text)['car']22 details = json.loads(ride[1:-1])['fields']23 driver_pk = details['driver']24 vehicle_pk = details['car']25 r2 = requests.get('http://models-api:8000/models/get_user/' + str(driver_pk))26 ok = json.loads(r2.text)['ok']27 if(ok != True):28 driver = "Driver Not Found"29 else:30 driver_model = json.loads(r2.text)['user']31 driver_details = json.loads(driver_model)['fields']32 driver = driver_details['first'] + " " + driver_details['last']33 r3 = requests.get('http://models-api:8000/models/get_car/' + str(vehicle_pk))34 ok = json.loads(r3.text)['ok']35 if(ok != True):36 vMake = "Not Found"37 vModel = "Not Found"38 else:39 car_model = json.loads(r3.text)['car']40 car_details = json.loads(car_model)['fields']41 vMake = car_details['make']42 vModel = car_details['model']43 return JsonResponse({'ok':True, 'driver': driver, 'vMake': vMake, 'vModel': vModel, 'leave': details['leave_time'], 'start': details['start'], 'arrive': details['arrive_time'], 'Destination': details['destination']})44def home_detail(request):45 if request.method != 'GET':46 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be GET'})47 r = requests.get('http://models-api:8000/models/all_rides')48 ok = json.loads(r.text)['ok']49 if(ok != True):50 return JsonResponse({'ok':False})51 ride = json.loads(r.text)['ride']52 result_set = []53 for r in ride:54 details = json.loads(r)['fields']55 pk = json.loads(r)['pk']56 driver_pk = details['driver']57 vehicle_pk = details['car']58 req_driver = requests.get('http://models-api:8000/models/get_user/' + str(driver_pk))59 req_vehicle = requests.get('http://models-api:8000/models/get_car/' + str(vehicle_pk))60 resp_driver = json.loads(json.loads(req_driver.text)['user'])['fields'] 61 resp_vehicle = json.loads(json.loads(req_vehicle.text)['car'])['fields']62 leavetime = parse_datetime(details['leave_time'])63 arrivetime = parse_datetime(details['arrive_time'])64 ride_info = {'ID':pk ,'driver':resp_driver['first'], 'vMake': resp_vehicle['make'], 'vModel':resp_vehicle['model'], 'leave': leavetime.strftime("%B %d %-I:%M:%S %p"), 'start': details['start'], 'arrive': arrivetime.strftime("%B %d %-I:%M:%S %p"), 'Destination': details['destination']}65 result_set.append(ride_info)66 return JsonResponse({'ok':True, 'result_set': result_set})67def create_user(request):68 if request.method != 'POST':69 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})70 r = requests.post('http://models-api:8000/models/add_user', data=request.POST)71 ok = json.loads(r.text)['ok']72 if ok:73 userpass = {}74 userpass['username'] = request.POST['username']75 userpass['password'] = request.POST['password']76 r2 = requests.post('http://models-api:8000/models/get_auth', data=userpass)77 d2 = json.loads(r2.text)['ok']78 if d2:79 auth = json.loads(r2.text)['auth']80 else:81 return JsonResponse({'ok': False, 'error': 'Encountered error while authenticating'})82 else:83 return JsonResponse({'ok': False, 'error': 'Encountered error while creating user'})84 return JsonResponse({'ok': True, 'auth': auth, 'log': 'User Created'})85def login(request):86 if request.method != 'POST':87 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})88 r = requests.post('http://models-api:8000/models/get_auth', data=request.POST)89 ok = json.loads(r.text)['ok']90 if ok:91 return JsonResponse({'ok': True, 'auth': json.loads(r.text)['auth']})92 else:93 return JsonResponse({'ok': False, 'error': 'Encountered error while authenticating'})94def logout(request):95 if request.method != 'POST':96 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})97 r = requests.post('http://models-api:8000/models/revoke_auth', data=request.POST)98 ok = json.loads(r.text)['ok']99 if ok:100 return JsonResponse({'ok': True, 'log': 'Authenticator revoked'})101 else:102 return JsonResponse({'ok': False, 'error': 'Encountered error while removing auth'})103def add_new_ride(request):104 if request.method != 'POST':105 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})106 auth = request.POST['auth']107 r = requests.post('http://models-api:8000/models/is_auth', data={'auth':auth})108 ok = json.loads(r.text)['ok']109 if ok:110 username = json.loads(r.text)['username']111 post_values = request.POST.copy()112 post_values['username'] = username113 r2 = requests.post('http://models-api:8000/models/add_ride', data=post_values)114 d2 = json.loads(r2.text)['ok']115 ride_id = json.loads(r2.text)['id']116 post_copy = request.POST.copy()117 post_copy['id'] = ride_id118 if d2:119 try:120 producer.send_messages(b'new-listings-topic', json.dumps(post_copy).encode('utf-8'))121 except Exception:122 #This is ugly, but if the topic doesn't exist we just try again. More than once is a problem though so we let that happen123 producer.send_messages(b'new-listings-topic', json.dumps(post_copy).encode('utf-8'))124 return JsonResponse({'ok': True, 'log': 'Created Ride'})125 else:126 return JsonResponse({'ok': False, 'error': 'Failed to create ride'})127 else:128 return JsonResponse({'ok': False, 'error': 'Invalid authentication to make ride'})129def add_new_vehicle(request):130 if request.method != 'POST':131 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})132 auth = request.POST['auth']133 auth_req = requests.post('http://models-api:8000/models/is_auth', data={'auth':auth})134 ok = json.loads(auth_req.text)['ok']135 if ok:136 username = json.loads(auth_req.text)['username']137 post_values = request.POST.copy()138 post_values['username'] = username139 resp = requests.post('http://models-api:8000/models/add_vehicle', data=post_values)140 created = json.loads(resp.text)['ok']141 if created:142 return JsonResponse({'ok': True, 'log': 'Added vehicle'})143 else:144 return JsonResponse({'ok': False, 'error': 'Failed to create vehicle'})145 else:146 return JsonResponse({'ok': False, 'error': 'Invalid authentication to make vehicle'})147def search_result(request):148 if request.method != 'POST':149 return JsonResponse({'ok': False, 'error': 'Wrong request type, should be POST'})150 if 'query' not in request.POST:151 return JsonResponse({'ok': False, 'error': 'No query field'})152 results = es.search(index='listing_index', body={'query':{'query_string':{'query': request.POST['query']}}, 'size':10})...

Full Screen

Full Screen

cdif2rsf.py

Source:cdif2rsf.py Github

copy

Full Screen

1# !/usr/bin/python2import os3from os import EX_OK, EX_USAGE, EX_UNAVAILABLE, EX_IOERR4import sys5##6# $Rev: 1155 $: Revision of last commit7# $Author: bdubois $: Author of last commit8# $Date: 2007-05-03 07:51:55 +0200 (Thu, 03 May 2007) $: Date of last commit9##10def convertToRsf(cdif_input_file_name, rsf_output_file_name):11 print "Reading from file", cdif_input_file_name + "..."12 13 print "\tFiles...",14 return_code = os.system("python ./writeFiles.py " + cdif_input_file_name)15 if return_code == EX_OK:16 print "[ok]"17 else:18 print "[failed]"19 return EX_UNAVAILABLE20 21 print "\tIncludes...",22 return_code = os.system("python ./writeIncludes.py " + cdif_input_file_name)23 if return_code == EX_OK:24 print "[ok]"25 else:26 print "[failed]"27 return EX_UNAVAILABLE28 29 print "\tConditional Compilation...",30 return_code = os.system("python ./writeCondComp.py " + cdif_input_file_name)31 if return_code == EX_OK:32 print "[ok]"33 else:34 print "[failed]"35 return EX_UNAVAILABLE36 37 print "\tClasses...",38 return_code = os.system("python ./writeClasses.py " + cdif_input_file_name)39 if return_code == EX_OK:40 print "[ok]"41 else:42 print "[failed]"43 return EX_UNAVAILABLE44 45 #print "\tTypeDefs...",46 #return_code = os.system("python ./writeTypedefs.py " + cdif_input_file_name)47 #if return_code == EX_OK:48 # print "[ok]"49 #else:50 print "[failed]"51 return EX_UNAVAILABLE52 53 print "\tInheritance...",54 return_code = os.system("python ./writeInheritance.py " + cdif_input_file_name)55 if return_code == EX_OK:56 print "[ok]"57 else:58 print "[failed]"59 return EX_UNAVAILABLE60 61 print "\tMethods...",62 return_code = os.system("python ./writeMethods.py " + cdif_input_file_name)63 if return_code == EX_OK:64 print "[ok]"65 else:66 print "[failed]"67 return EX_UNAVAILABLE68 69 print "\tAttributes...",70 return_code = os.system("python ./writeAttributes.py " + cdif_input_file_name)71 if return_code == EX_OK:72 print "[ok]"73 else:74 print "[failed]"75 return EX_UNAVAILABLE76 77 print "\tFunctions...",78 return_code = os.system("python ./writeFunctions.py " + cdif_input_file_name)79 if return_code == EX_OK:80 print "[ok]"81 else:82 print "[failed]"83 return EX_UNAVAILABLE84 85 print "\tGlobalVariables...",86 return_code = os.system("python ./writeGlobalVar.py " + cdif_input_file_name)87 if return_code == EX_OK:88 print "[ok]"89 else:90 print "[failed]"91 return EX_UNAVAILABLE92 93 print "\tAccesses...",94 return_code = os.system("python ./writeAccesses.py " + cdif_input_file_name)95 if return_code == EX_OK:96 print "[ok]"97 else:98 print "[failed]"99 return EX_UNAVAILABLE100 101 print "\tInvocations...",102 return_code = os.system("python ./writeInvocations.py " + cdif_input_file_name)103 if return_code == EX_OK:104 print "[ok]"105 else:106 print "[failed]"107 return EX_UNAVAILABLE108 109 print "\tAnnotations...",110 return_code = os.system("python ./writeAnnotations.py " + cdif_input_file_name)111 if return_code == EX_OK:112 print "[ok]"113 else:114 print "[failed]"115 return EX_UNAVAILABLE116 117 print "\tSize and Complexity Metrics...",118 return_code = os.system("python ./writeMetrics.py " + cdif_input_file_name)119 if return_code == EX_OK:120 print "[ok]"121 else:122 print "[failed]"123 return EX_UNAVAILABLE124 125 print "\tChange Frequency Metrics...",126 return_code = os.system("python ./writeCFMetrics.py " + cdif_input_file_name)127 if return_code == EX_OK:128 print "[ok]"129 else:130 print "[failed]"131 return EX_UNAVAILABLE132 133 print "Merging output...",134 return_code = os.system("python ./makeRsf.py " + rsf_output_file_name)135 if return_code == EX_OK:136 print "[ok]"137 else:138 print "[failed]"139 return EX_UNAVAILABLE140 141 remove_command = "rm -f accessesWithIDs.txt attributeBelongsToClass.txt \142 attributeHasClassAsType.txt attributesWithIDs.txt \143 classBelongsToFile.txt classesWithIDs.txt \144 conditionalCompilationBlocks.txt typedefsWithIDs.txt\145 fileBelongsToModule.txt filesWithIDs.txt \146 invokableEntityBelongsToFile.txt functionsWithIDs.txt \147 globalVarsWithIDs.txt globalVarHasClassAsType.txt \148 accessibleEntityBelongsToFile.txt \149 inheritanceWithIDs.txt invocationsWithIDs.txt \150 methodBelongsToClass.txt methodsWithIDs.txt \151 modulesWithIDs.txt metricsWithIDs.txt \152 moduleBelongsToModule.txt includeBelongsToFile.txt \153 methodHasClassAsReturnType.txt functionHasClassAsReturnType.txt \154 entityBelongsToBlock.txt methodVisibility.txt \155 methodSignature.txt attributeSignature.txt attributeVisibility.txt\156 accessesLocations.txt invocationLocations.txt \157 defsWithAssociation.txt cfMetricsWithIDs.txt \158 leftValueAccesses.txt annotations.txt \159 annotationBelongsToEntity.txt"160 161 return_code = os.system(remove_command)162 if return_code != EX_OK:163 print "Error cleaning up."164 return EX_IOERR165 166 print "Output written to", rsf_output_file_name167 168 return EX_OK169if __name__ == '__main__':170 if len(sys.argv) < 2:171 print "Usage:",sys.argv[0],"cdif-input-file-name rsf-output-file-name"172 sys.exit(EX_USAGE)173 174 input_file=sys.argv[1]175 176 cdif_input_file_name = sys.argv[1]177 rsf_output_file_name = sys.argv[2]178 179 exitCode = convertToRsf(cdif_input_file_name, rsf_output_file_name)180 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tappy automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful