Best Python code snippet using fMBT_python
test_db.py
Source:test_db.py  
...53            self.db.add_record(msg_id, rec)54        return msg_ids55    56    def test_add_record(self):57        before = self.db.get_history()58        self.load_records(5)59        after = self.db.get_history()60        self.assertEqual(len(after), len(before)+5)61        self.assertEqual(after[:-5],before)62        63    def test_drop_record(self):64        msg_id = self.load_records()[-1]65        rec = self.db.get_record(msg_id)66        self.db.drop_record(msg_id)67        self.assertRaises(KeyError,self.db.get_record, msg_id)68    69    def _round_to_millisecond(self, dt):70        """necessary because mongodb rounds microseconds"""71        micro = dt.microsecond72        extra = int(str(micro)[-3:])73        return dt - timedelta(microseconds=extra)74    75    def test_update_record(self):76        now = self._round_to_millisecond(datetime.now())77        # 78        msg_id = self.db.get_history()[-1]79        rec1 = self.db.get_record(msg_id)80        data = {'stdout': 'hello there', 'completed' : now}81        self.db.update_record(msg_id, data)82        rec2 = self.db.get_record(msg_id)83        self.assertEqual(rec2['stdout'], 'hello there')84        self.assertEqual(rec2['completed'], now)85        rec1.update(data)86        self.assertEqual(rec1, rec2)87    88    # def test_update_record_bad(self):89    #     """test updating nonexistant records"""90    #     msg_id = str(uuid.uuid4())91    #     data = {'stdout': 'hello there'}92    #     self.assertRaises(KeyError, self.db.update_record, msg_id, data)93    def test_find_records_dt(self):94        """test finding records by date"""95        hist = self.db.get_history()96        middle = self.db.get_record(hist[len(hist)//2])97        tic = middle['submitted']98        before = self.db.find_records({'submitted' : {'$lt' : tic}})99        after = self.db.find_records({'submitted' : {'$gte' : tic}})100        self.assertEqual(len(before)+len(after),len(hist))101        for b in before:102            self.assertTrue(b['submitted'] < tic)103        for a in after:104            self.assertTrue(a['submitted'] >= tic)105        same = self.db.find_records({'submitted' : tic})106        for s in same:107            self.assertTrue(s['submitted'] == tic)108    109    def test_find_records_keys(self):110        """test extracting subset of record keys"""111        found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])112        for rec in found:113            self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))114    115    def test_find_records_msg_id(self):116        """ensure msg_id is always in found records"""117        found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])118        for rec in found:119            self.assertTrue('msg_id' in rec.keys())120        found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])121        for rec in found:122            self.assertTrue('msg_id' in rec.keys())123        found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])124        for rec in found:125            self.assertTrue('msg_id' in rec.keys())126    127    def test_find_records_in(self):128        """test finding records with '$in','$nin' operators"""129        hist = self.db.get_history()130        even = hist[::2]131        odd = hist[1::2]132        recs = self.db.find_records({ 'msg_id' : {'$in' : even}})133        found = [ r['msg_id'] for r in recs ]134        self.assertEqual(set(even), set(found))135        recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})136        found = [ r['msg_id'] for r in recs ]137        self.assertEqual(set(odd), set(found))138    139    def test_get_history(self):140        msg_ids = self.db.get_history()141        latest = datetime(1984,1,1)142        for msg_id in msg_ids:143            rec = self.db.get_record(msg_id)144            newt = rec['submitted']145            self.assertTrue(newt >= latest)146            latest = newt147        msg_id = self.load_records(1)[-1]148        self.assertEqual(self.db.get_history()[-1],msg_id)149    150    def test_datetime(self):151        """get/set timestamps with datetime objects"""152        msg_id = self.db.get_history()[-1]153        rec = self.db.get_record(msg_id)154        self.assertTrue(isinstance(rec['submitted'], datetime))155        self.db.update_record(msg_id, dict(completed=datetime.now()))156        rec = self.db.get_record(msg_id)157        self.assertTrue(isinstance(rec['completed'], datetime))158    def test_drop_matching(self):159        msg_ids = self.load_records(10)160        query = {'msg_id' : {'$in':msg_ids}}161        self.db.drop_matching_records(query)162        recs = self.db.find_records(query)163        self.assertEqual(len(recs), 0)164    165    def test_null(self):166        """test None comparison queries"""167        msg_ids = self.load_records(10)168        query = {'msg_id' : None}169        recs = self.db.find_records(query)170        self.assertEqual(len(recs), 0)171        query = {'msg_id' : {'$ne' : None}}172        recs = self.db.find_records(query)173        self.assertTrue(len(recs) >= 10)174    175    def test_pop_safe_get(self):176        """editing query results shouldn't affect record [get]"""177        msg_id = self.db.get_history()[-1]178        rec = self.db.get_record(msg_id)179        rec.pop('buffers')180        rec['garbage'] = 'hello'181        rec['header']['msg_id'] = 'fubar'182        rec2 = self.db.get_record(msg_id)183        self.assertTrue('buffers' in rec2)184        self.assertFalse('garbage' in rec2)185        self.assertEqual(rec2['header']['msg_id'], msg_id)186    187    def test_pop_safe_find(self):188        """editing query results shouldn't affect record [find]"""189        msg_id = self.db.get_history()[-1]190        rec = self.db.find_records({'msg_id' : msg_id})[0]191        rec.pop('buffers')192        rec['garbage'] = 'hello'193        rec['header']['msg_id'] = 'fubar'194        rec2 = self.db.find_records({'msg_id' : msg_id})[0]195        self.assertTrue('buffers' in rec2)196        self.assertFalse('garbage' in rec2)197        self.assertEqual(rec2['header']['msg_id'], msg_id)198    def test_pop_safe_find_keys(self):199        """editing query results shouldn't affect record [find+keys]"""200        msg_id = self.db.get_history()[-1]201        rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0]202        rec.pop('buffers')203        rec['garbage'] = 'hello'204        rec['header']['msg_id'] = 'fubar'205        rec2 = self.db.find_records({'msg_id' : msg_id})[0]206        self.assertTrue('buffers' in rec2)207        self.assertFalse('garbage' in rec2)208        self.assertEqual(rec2['header']['msg_id'], msg_id)209class TestDictBackend(TaskDBTest, TestCase):210    211    def create_db(self):212        return DictDB()213    214    def test_cull_count(self):215        self.db = self.create_db() # skip the load-records init from setUp216        self.db.record_limit = 20217        self.db.cull_fraction = 0.2218        self.load_records(20)219        self.assertEqual(len(self.db.get_history()), 20)220        self.load_records(1)221        # 0.2 * 20 = 4, 21 - 4 = 17222        self.assertEqual(len(self.db.get_history()), 17)223        self.load_records(3)224        self.assertEqual(len(self.db.get_history()), 20)225        self.load_records(1)226        self.assertEqual(len(self.db.get_history()), 17)227        228        for i in range(100):229            self.load_records(1)230            self.assertTrue(len(self.db.get_history()) >= 17)231            self.assertTrue(len(self.db.get_history()) <= 20)232    def test_cull_size(self):233        self.db = self.create_db() # skip the load-records init from setUp234        self.db.size_limit = 1000235        self.db.cull_fraction = 0.2236        self.load_records(100, buffer_size=10)237        self.assertEqual(len(self.db.get_history()), 100)238        self.load_records(1, buffer_size=0)239        self.assertEqual(len(self.db.get_history()), 101)240        self.load_records(1, buffer_size=1)241        # 0.2 * 100 = 20, 101 - 20 = 81242        self.assertEqual(len(self.db.get_history()), 81)243    244    def test_cull_size_drop(self):245        """dropping records updates tracked buffer size"""246        self.db = self.create_db() # skip the load-records init from setUp247        self.db.size_limit = 1000248        self.db.cull_fraction = 0.2249        self.load_records(100, buffer_size=10)250        self.assertEqual(len(self.db.get_history()), 100)251        self.db.drop_record(self.db.get_history()[-1])252        self.assertEqual(len(self.db.get_history()), 99)253        self.load_records(1, buffer_size=5)254        self.assertEqual(len(self.db.get_history()), 100)255        self.load_records(1, buffer_size=5)256        self.assertEqual(len(self.db.get_history()), 101)257        self.load_records(1, buffer_size=1)258        self.assertEqual(len(self.db.get_history()), 81)259    def test_cull_size_update(self):260        """updating records updates tracked buffer size"""261        self.db = self.create_db() # skip the load-records init from setUp262        self.db.size_limit = 1000263        self.db.cull_fraction = 0.2264        self.load_records(100, buffer_size=10)265        self.assertEqual(len(self.db.get_history()), 100)266        msg_id = self.db.get_history()[-1]267        self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))268        self.assertEqual(len(self.db.get_history()), 100)269        self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))270        self.assertEqual(len(self.db.get_history()), 79)271class TestSQLiteBackend(TaskDBTest, TestCase):272    @dec.skip_without('sqlite3')273    def create_db(self):274        location, fname = os.path.split(temp_db)275        log = logging.getLogger('test')276        log.setLevel(logging.CRITICAL)277        return SQLiteDB(location=location, fname=fname, log=log)278    279    def tearDown(self):280        self.db._db.close()281def teardown():282    """cleanup task db file after all tests have run"""283    try:284        os.remove(temp_db)...indexhistoryfunc.py
Source:indexhistoryfunc.py  
1from nsepy import get_history2from datetime import date3def indexhistory(indexsymbol):4    nifty_index = get_history(symbol=indexsymbol, start=date(2009,3,31), end=date(2009,3,31), index=True)5    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2010,3,31), end=date(2010,3,31), index=True))6    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2011,3,31), end=date(2011,3,31), index=True))7    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2012,3,30), end=date(2012,3,30), index=True))8    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2013,3,28), end=date(2013,3,28), index=True))9    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2014,3,31), end=date(2014,3,31), index=True))10    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2015,3,31), end=date(2015,3,31), index=True))11    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2016,3,31), end=date(2016,3,31), index=True))12    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2017,3,31), end=date(2017,3,31), index=True))13    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2018,3,28), end=date(2018,3,28), index=True))14    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2019,3,29), end=date(2019,3,29), index=True))15    nifty_index = nifty_index.append(get_history(symbol=indexsymbol, start=date(2020,3,31), end=date(2020,3,31), index=True))16    print(nifty_index)17    return nifty_index...nsehistory.py
Source:nsehistory.py  
1from nsepy import get_history2from datetime import date3#data = get_history(symbol="SBIN", start=date(2015,1,1), end=date(2015,1,31))4nifty_pe = get_history(symbol="NIFTY", start=date(2009,3,31), end=date(2009,3,31), index=True)5print(nifty_pe)6nifty_pe = get_history(symbol="NIFTY", start=date(2010,3,31), end=date(2010,3,31), index=True)7print(nifty_pe)8nifty_pe = get_history(symbol="NIFTY", start=date(2011,3,31), end=date(2011,3,31), index=True)9print(nifty_pe)10nifty_pe = get_history(symbol="NIFTY", start=date(2012,3,30), end=date(2012,3,30), index=True)11print(nifty_pe)12nifty_pe = get_history(symbol="NIFTY", start=date(2013,3,28), end=date(2013,3,28), index=True)13print(nifty_pe)14nifty_pe = get_history(symbol="NIFTY", start=date(2014,3,31), end=date(2014,3,31), index=True)15print(nifty_pe)16nifty_pe = get_history(symbol="NIFTY", start=date(2015,3,31), end=date(2015,3,31), index=True)17print(nifty_pe)18nifty_pe = get_history(symbol="NIFTY", start=date(2016,3,31), end=date(2016,3,31), index=True)19print(nifty_pe)20nifty_pe = get_history(symbol="NIFTY", start=date(2017,3,31), end=date(2017,3,31), index=True)21print(nifty_pe)22nifty_pe = get_history(symbol="NIFTY", start=date(2018,3,28), end=date(2018,3,28), index=True)23print(nifty_pe)24nifty_pe = get_history(symbol="NIFTY", start=date(2019,3,29), end=date(2019,3,29), index=True)25print(nifty_pe)26nifty_pe = get_history(symbol="NIFTY", start=date(2020,3,31), end=date(2020,3,31), index=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
