Best Python code snippet using avocado_python
load.py
Source:load.py  
...115def process_dataset(cfg, dbc, cache_dir, resume=True):116    logger.bind(dataset=cfg.id)117    logger.info('checking for socrata, processing', s=cfg.socrata)118    if cfg.socrata:119        ksoc = get_datafile_path(DatasetPart.SOCRATA.value, cfg.id, cache_dir)120        if os.path.isfile(ksoc) and resume:121            logger.warn('found socrata artifact, moving on', resume=resume)122        else:123            logger.info('generating socrata data', resume=resume, overwriting=os.path.isfile(ksoc))124            dsoc = load_socrata_data(cfg.socrata, cfg.facets)125            logger.info('saving socrata data to feather', f=ksoc)126            dsoc.to_feather(ksoc)127            logger.info('saved socrata data to feather', f=ksoc)128            # short process, can be run regardless129            (qns, facs) = get_metadata_socrata(cfg.socrata, dsoc, cfg.facets)130            logger.info('created schema for socrata')131            qns.to_feather(get_datafile_path(DatasetPart.SCHEMA.value, cfg.id, cache_dir))132            facs.to_feather(get_datafile_path(DatasetPart.FACETS.value, cfg.id, cache_dir))133    logger.info('checking for surveys, processing', s=cfg.surveys)134    if cfg.surveys:135        svyf = get_datafile_path(DatasetPart.SURVEYS.value, cfg.id, cache_dir)136        svy_descf = svyf+'.yaml' 137        if os.path.isfile(svyf) and resume:138            logger.warn('found surveys artifact, moving on', resume=resume)139        else:140            logger.info('generating surveys data', resume=resume, overwriting=os.path.isfile(svyf))141            (svydf, svymeta) = load_survey_data(cfg)142            logger.info('saving survey data to feather', svyf)143            svydf.to_feather(svyf)144            logger.info('saving survey desc data to feather', svy_descf)145            with open(svy_descf, 'w') as fh:146                yaml.dump(svymeta, fh)147            logger.info('saved survey data to feather', name=svy_descf)148    #if dbc is not None:149    #    setup_tables(cfg, dbc.uri)...test_utils.py
Source:test_utils.py  
...48        )49        destination.cleanup()50    def test_get_datafile_path_special(self):51        with self.assertRaises(ValueError):52            get_datafile_path(None, 'anything')53        with self.assertRaises(ValueError):54            get_datafile_path('something', 'anything', zip_path='anything')55    def test_get_datafile_path_with_url(self):56        destination = tempfile.TemporaryDirectory()57        zip_path = os.path.join(destination.name, 'foo.zip')58        self.make_test_zip(zip_path)59        path_to_first_file = get_datafile_path(60            'http://example.com/foo.zip', destination.name61        )62        self.assertEqual(63            os.path.join(destination.name, 'file1.txt'),64            path_to_first_file65        )66        destination.cleanup()67    def test_get_datafile_path_with_zip_path(self):68        destination = tempfile.TemporaryDirectory()69        zip_path = os.path.join(destination.name, 'foo.zip')70        self.make_test_zip(zip_path)71        path_to_first_file = get_datafile_path(None, destination.name, zip_path=zip_path)72        self.assertEqual(73            os.path.join(destination.name, 'file1.txt'),74            path_to_first_file75        )76        destination.cleanup()77    def test_get_zipfile_path(self):78        self.assertEqual(79            '/tmp/favicon.ico',80            get_zipfile_path('https://example.com/favicon.ico', '/tmp')81        )82    def test_line_count(self):83        expected_num_lines = 234584        with tempfile.NamedTemporaryFile(mode='w') as t:85            for i in range(expected_num_lines):...automsg.py
Source:automsg.py  
...12import os13import codecs14__author__ = 'Hellbeard/Dingo'15__version__ = 1.116def get_datafile_path():17    from x84.bbs import ini18    folder = os.path.join(ini.CFG.get('system', 'datapath'))19    return os.path.join(folder, 'automsg.txt')20def ask(msg):21    term = getterminal()22    echo(term.move(21, 0))23    echo(term.white(msg))24    echo(term.green(u'(') + term.cyan(u'yes/no') + term.green(u')'))25def banner():26    session, term = getsession(), getterminal()27    art_file = os.path.join(os.path.dirname(__file__), 'art', 'automsg.ans')28    echo(term.clear)29    for line in showart(art_file, 'topaz'):30        echo(line)31    if not os.path.exists(get_datafile_path()):32        with codecs.open(get_datafile_path(), 'w', 'utf-8') as fo:33            fo.write('big brother\n')34            fo.write('behave yourselves.\n')35            fo.write('\n\n')36    echo(term.move(12, 10) + term.blue(u'Public message from:'))37    with codecs.open(get_datafile_path(), 'r', 'utf-8') as fo:38        handle = fo.readline().strip()39        echo(term.move(12, 31) + term.bold_white(handle))40        for row in range(1, 4):41            echo(term.move(15 + row, 5) + term.white(fo.readline().strip()))42    ask(u'do you want to write a new public message? ')43def main():44    term = getterminal()45    session = getsession()46    banner()47    while True:48        session.activity = 'Viewing automsg'49        inp = term.inkey()50        if inp.lower() in (u'n', 'q') or inp.code == term.KEY_ENTER:51            return52        if inp.lower() == u'y':53            session.activity = 'Writing automsg'54            echo(term.move(12, 31))55            echo(term.bold_white(session.user.handle))56            echo((u' ' * 7))57            echo(term.move(21, 0) + term.clear_eol)58            for row in range(1, 4):59                echo(term.move(15 + row, 5))60                echo(u' ' * 57)61            msg = []62            for row in range(1, 4):63                echo(term.move(15 + row, 5))64                le = LineEditor(70)65                le.colors['highlight'] = term.white66                msg.append(le.read())67            ask(u'submit your text as a public message? ')68            while True:69                inp = term.inkey()70                if inp.lower() == u'n':71                    return72                if inp == 'y' or inp == 'Y':73                    echo(term.move(21, 0) + term.clear_eol)74                    with codecs.open(get_datafile_path(), 'w', 'utf-8') as fo:75                        fo.write('\n'.join([session.user.handle] + msg))76                        fo.close()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
