Best Python code snippet using tempest_python
AssessorToDb.py
Source:AssessorToDb.py  
...12	def __init__(self):13		self.working_dir = './working'14		self.file_dict = {}15		self.schema = ''16	def create_working_dir(self):17		try:18			if not os.path.isdir(self.working_dir):19				os.mkdir(self.working_dir)20		except Exception as e:21			print('error in create_working_dir routine')22			raise23	def cleanup_working_dir(self):24		try: 25			if os.path.isdir(self.working_dir):26				shutil.rmtree(self.working_dir)27		except Exception as e:28			print('error in cleanup_working_dir routine')29			raise30	def download_and_unzip(self, url):31		try:32			self.create_working_dir()33			r = requests.get(url)34			z = zipfile.ZipFile(io.BytesIO(r.content))35			z.extractall(self.working_dir)36		except Exception as e:37			raise38	def get_filename_for_download(self, file_title):39		try: 40			file_name = self.working_dir + '/' + file_title + '.txt'41			return file_name42		except Exception as e:43			raise44	def download_file(self, file_title, file_url):45		try:46			r = requests.get(file_url)47			file_name = self.get_filename_for_download(file_title)48			with open(file_name, "wb") as f:49				f.write(r.content)50		except Exception as e:51			print('url given: {}'.format(file_url))52			raise53	def download_files(self):54		try:55			self.create_working_dir()56			fd = self.file_dict57			for f in fd.keys():58				fname = fd[f][0]59				fname = fname.replace(' ', '%20')60				fname = self.url_base + fname61				self.download_and_unzip(fname)62		except Exception as e:63			raise64	def file_name_to_table_name(self, file_name):65		try:66		    s = file_name67		    s = s[:-4]68		    return s69		except Exception as e:70			raise71	def files_to_df_dict(self):72	    try:73	        df_dict = {}74	        for f in os.scandir(working_dir):75	            f = f.name76	            if f[-4:] == '.txt':77	                key = self.file_name_to_table_name(f)78	                df = pd.read_csv(working_dir + '/' + f,79				                	sep='|',80				                	encoding = 'ISO-8859-1')81	                df_dict[key] = df82	        return df_dict83	    except Exception as e:84	        print('filename: {}'.format(f))85	        raise86	def create_df_dict(self):87	    try:88	        df_dict = {}89	        for k in self.file_dict:90	            f = self.file_dict[k][0]91	            f = f.replace('.zip', '.txt')92	            if f[-4:] == '.txt':93	                df = pd.read_csv(self.working_dir + "/" + f,94	                                 sep='|',95	                                 encoding="ISO-8859-1")96	            elif f[-4:] == '.csv':97	            	df = pd.read_csv(self.working_dir + '/' + f,98	            					encoding='ISO-8859-1')99	            df_dict[k] = df100	        return df_dict101	    except Exception as e:102	        raise103	def process_files(self):104	    try:105	        df_dict = self.create_df_dict()106	        # for each df in df_dict, send df to Sockeye107	        self.make_engine()108	        today = date.today()109	        for df_key in df_dict.keys():110	            df = df_dict[df_key]111	            str_date = today.strftime("_%Y%m%d")112	            table_name = df_key +  str_date113	            df.to_sql(name=table_name, schema=self.schema, con=self.engine)114	    except Exception as e:115	        print(df_key)116	        raise117	def make_engine(self):118	    """119	    Make connections to the database120	    """121	    try:122	        conn_string = "DRIVER={{ODBC Driver 17 for SQL Server}}; " \123	            "SERVER={}; DATABASE={}; trusted_connection=yes".format(124	                'AWS-PROD-SQL\\Sockeye',125	                'Assessor')126	        #self.sql_conn = pyodbc.connect(conn_string)127	        params = urllib.parse.quote_plus(conn_string)128	        engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)129	        self.engine = engine130	    except Exception as e:131	        print(e.args[0])132	        raise133	def import_data(self):134		try:135			print("downloading data for {}".format(self.schema))136			self.download_files()137			print("importing data into db...")138			self.process_files()139			self.cleanup_working_dir()140			print("completed import.")141		except Exception as e:142			print(e.args[0])143			raise144class PierceAssessorETL(AssessorETL):145	import PierceColNames as pcn146	def __init__(self):147		super().__init__()148		self.schema = 'Pierce'149		self.file_dict = {'appraisal_account': ['appraisal_account.zip', pcn.appraisal_account_cols],150	            'improvement': ['improvement.zip', pcn.improvement_cols],151	            'improvement_builtas': ['improvement_builtas.zip', pcn.improvement_builtas_cols],152	            'land_attribute': ['land_attribute.zip', pcn.land_attribute_cols],153	            'improvement_detail': ['improvement_detail.zip', pcn.improvement_detail_cols]}154		self.url_base = 'https://online.co.pierce.wa.us/datamart/'155	def create_df_dict(self):156	    try:157	        df_dict = {}158	        for k in self.file_dict:159	            f = self.file_dict[k][0]160	            f = f.replace('.zip', '.txt')161	            if f[-4:] == '.txt':162	                df = pd.read_csv(self.working_dir + "/" + f,163	                                 sep='|',164	                                 encoding="ISO-8859-1",165	                                 names=self.file_dict[k][1])166	                df_dict[k] = df167	        return df_dict168	    except Exception as e:169	        raise170class KingAssessorETL(AssessorETL):171	def __init__(self):172		super().__init__()173		self.working_dir = './working_king'174		self.schema = 'King'175		self.file_dict = {'parcel': ['Parcel.zip'],176		            'RealPropAcct': ['Real Property Account.zip'],177		            'CondoComplexAndUnits': ['Condo Complex and Units.zip'],178		            'ResBuilding': ['Residential Building.zip'],179		            'ApartmentComplex': ['Apartment Complex.zip'],180			        'CommercialBuilding': ['Commercial Building.zip']}181		self.url_base = 'https://aqua.kingcounty.gov/extranet/assessor/'182	def create_df_dict(self):183	    try:184	        df_dict = {}185	        for f in os.scandir(self.working_dir):186	            f = f.name187	            if f[-4:] == '.csv':188	                key = self.file_name_to_table_name(f)189	                df = pd.read_csv(self.working_dir + '/' + f,190				                	encoding = 'ISO-8859-1')191	                df_dict[key] = df192	        return df_dict193	    except Exception as e:194	        print('filename: {}'.format(f))195	        raise196class KitsapAssessorETL(AssessorETL):197	def __init__(self):198		super().__init__()199		self.working_dir = './working_kitsap'200		self.schema = 'Kitsap'201		self.url_base = 'https://www.kitsapgov.com/assessor/Documents/'202		self.file_dict = {'Parcels': ['Parcels.txt'],203	            'Dwellings': ['Dwellings.txt'],204	            'MobileHome': ['MH.txt'],205	            'Valuations': ['Valuations.txt'],206	            'Codes': ['Codes.txt'],207	            'Property_addresses': ['Property_addresses.txt'],208	            'Comml_Imps': ['Comml_Imps.txt']}209	def download_files(self):210		try:211			fd = self.file_dict212			self.create_working_dir()213			for f in fd.keys():214				fname = fd[f][0]215				fname = fname.replace(' ', '%20')216				furl = self.url_base + fname217				self.download_file(fname, furl)218		except Exception as e:219			raise220	def get_filename_for_download(self, file_title):221		try: 222			file_name = self.working_dir + '/' + file_title223			return file_name224		except Exception as e:225			raise226	def create_df_dict(self):227	    try:228	        df_dict = {}229	        for k in self.file_dict:230	            f = self.file_dict[k][0]231	            if f[-4:] == '.txt':232	            	if k == 'Codes':233	            		df = pd.read_csv(self.working_dir + "/" + f,234	            			sep='\t',235	            			encoding="utf-16",236	            			index_col=False)237	            	else:238	            		df = pd.read_csv(self.working_dir + "/" + f,239	            			sep='\t',240	            			encoding="ISO-8859-1")241	            df_dict[k] = df242	        return df_dict243	    except Exception as e:244	        raise245class SnohomishAssessorETL(AssessorETL):246	def __init__(self):247		super().__init__()248		self.working_dir = './working_snohomish'249		self.schema = 'Snohomish'250		self.url_base = 'ftp://ftp.snoco.org/assessor/property_characteristics/'251		self.file_dict = {'Improvement': ['SnohomishCo Improvement Records_2020AV.xlsx'],252						'Land': ['SnohomishCo Land Records_2020AV.xlsx'],253						'Master': ['SnohomishCo Master Records_2020AV.xlsx']}254	def get_filename_for_download(self, file_title):255		try: 256			file_name = self.working_dir + '/' + file_title257			return file_name258		except Exception as e:259			raise260	def download_file(self, file_title, file_url):261		try:262			file_name = self.get_filename_for_download(file_title)263			with closing(urllib.request.urlopen(file_url)) as r:264				with open(file_name, 'wb') as f:265					shutil.copyfileobj(r,f)266		except Exception as e:267			print('url given: {}'.format(file_url))268			print('file_name gotten: {}'.format(file_name))269			raise270	        271	def download_files(self):272		try:273			fd = self.file_dict274			self.create_working_dir()275			for f in fd.keys():276				f_name = fd[f][0]277				#fname = fname.replace(' ', '%20')278				f_url = self.url_base + f_name279				f_url = f_url.replace(' ', '%20')280				self.download_file(f_name, f_url)281		except Exception as e:282			raise283	def create_df_dict(self):284	    try:285	        df_dict = {}286	        for k in self.file_dict:287	            f = self.file_dict[k][0]288	            if f[-5:] == '.xlsx':...test_osutils.py
Source:test_osutils.py  
...36        monkeypatch.setattr(shutil, "rmtree", self._shutil_rmtree)37    ## No errors38    def test_no_working_dir(self, monkeypatch):39        self._monkeypatch_all(monkeypatch)40        with osutils.create_working_dir():41            pass42        assert self._os_getcwd_calls == [((), {})]43        assert self._os_makedirs_calls == []44        assert self._tempfile_mkdtemp_calls == [((), {})]45        assert self._os_chdir_calls == [46            (('/tempdir/', ), {}),47            (('/original-dir/', ), {})48        ]49        assert self._shutil_rmtree_calls == []50    def test_working_dir_specified(self, monkeypatch):51        self._monkeypatch_all(monkeypatch)52        with osutils.create_working_dir(working_dir='/foo/'):53            pass54        assert self._os_getcwd_calls == [((), {})]55        assert self._os_makedirs_calls == [56            (('/foo/', ), {'exist_ok': True})57        ]58        assert self._tempfile_mkdtemp_calls == []59        assert self._os_chdir_calls == [60            (('/foo/', ), {}),61            (('/original-dir/', ), {})62        ]63        assert self._shutil_rmtree_calls == []64    def test_no_working_dir_delete_if_no_error(self, monkeypatch):65        self._monkeypatch_all(monkeypatch)66        with osutils.create_working_dir(delete_if_no_error=True):67            pass68        assert self._os_getcwd_calls == [((), {})]69        assert self._os_makedirs_calls == []70        assert self._tempfile_mkdtemp_calls == [((), {})]71        assert self._os_chdir_calls == [72            (('/tempdir/', ), {}),73            (('/original-dir/', ), {})74        ]75        assert self._shutil_rmtree_calls == [76            (('/tempdir/', ), {})77        ]78    def test_working_dir_specified_delete_if_no_error(self, monkeypatch):79        self._monkeypatch_all(monkeypatch)80        with osutils.create_working_dir(working_dir='/foo/', delete_if_no_error=True):81            pass82        assert self._os_getcwd_calls == [((), {})]83        assert self._os_makedirs_calls == [84            (('/foo/', ), {'exist_ok': True})85        ]86        assert self._tempfile_mkdtemp_calls == []87        assert self._os_chdir_calls == [88            (('/foo/', ), {}),89            (('/original-dir/', ), {})90        ]91        assert self._shutil_rmtree_calls == [92            (('/foo/', ), {})93        ]94    ## With errors95    def test_error_no_working_dir(self, monkeypatch):96        self._monkeypatch_all(monkeypatch)97        try:98            with osutils.create_working_dir():99                123 / 0100        except:101            pass102        assert self._os_getcwd_calls == [((), {})]103        assert self._os_makedirs_calls == []104        assert self._tempfile_mkdtemp_calls == [((), {})]105        assert self._os_chdir_calls == [106            (('/tempdir/', ), {}),107            (('/original-dir/', ), {})108        ]109        assert self._shutil_rmtree_calls == []110    def test_error_working_dir_specified(self, monkeypatch):111        self._monkeypatch_all(monkeypatch)112        try:113            with osutils.create_working_dir(working_dir='/foo/'):114                123 / 0115        except:116            pass117        assert self._os_getcwd_calls == [((), {})]118        assert self._os_makedirs_calls == [119            (('/foo/', ), {'exist_ok': True})120        ]121        assert self._tempfile_mkdtemp_calls == []122        assert self._os_chdir_calls == [123            (('/foo/', ), {}),124            (('/original-dir/', ), {})125        ]126        assert self._shutil_rmtree_calls == []127    def test_error_no_working_dir_delete_if_no_error(self, monkeypatch):128        self._monkeypatch_all(monkeypatch)129        try:130            with osutils.create_working_dir(delete_if_no_error=True):131                123 / 0132        except:133            pass134        assert self._os_getcwd_calls == [((), {})]135        assert self._os_makedirs_calls == []136        assert self._tempfile_mkdtemp_calls == [((), {})]137        assert self._os_chdir_calls == [138            (('/tempdir/', ), {}),139            (('/original-dir/', ), {})140        ]141        assert self._shutil_rmtree_calls == []  # not deleted, dur to error142    def test_error_working_dir_specified_delete_if_no_error(self, monkeypatch):143        self._monkeypatch_all(monkeypatch)144        try:145            with osutils.create_working_dir(working_dir='/foo/', delete_if_no_error=True):146                123 / 0147        except:148            pass149        assert self._os_getcwd_calls == [((), {})]150        assert self._os_makedirs_calls == [151            (('/foo/', ), {'exist_ok': True})152        ]153        assert self._tempfile_mkdtemp_calls == []154        assert self._os_chdir_calls == [155            (('/foo/', ), {}),156            (('/original-dir/', ), {})157        ]...test_osutils_create_working_dir.py
Source:test_osutils_create_working_dir.py  
...9    ## With errors10    logging.basicConfig(level=logging.DEBUG)11    logging.info("-"*80)12    logging.info("No working dir, no cleanup")13    with osutils.create_working_dir() as wdir:14        os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))15    assert os.path.exists(wdir)16    shutil.rmtree(wdir)17    logging.info("-"*80)18    logging.info("Working dir, no cleanup")19    with osutils.create_working_dir(working_dir='./foo/') as wdir:20        os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))21    assert os.path.exists(wdir)22    shutil.rmtree(wdir)23    logging.info("-"*80)24    logging.info("No working dir, with cleanup")25    with osutils.create_working_dir(delete_if_no_error=True) as wdir:26        os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))27    assert not os.path.exists(wdir)28    logging.info("-"*80)29    logging.info("Working dir, with cleanup")30    with osutils.create_working_dir(working_dir='./bar/', delete_if_no_error=True) as wdir:31        os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))32    assert not os.path.exists(wdir)33    ## No errors34    logging.basicConfig(level=logging.DEBUG)35    logging.info("-"*80)36    logging.info("No working dir, no cleanup  ***(WITH ERROR)***")37    try:38        with osutils.create_working_dir() as wdir:39            os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))40            123 / 041    except:42        pass43    assert os.path.exists(wdir)44    shutil.rmtree(wdir)45    logging.info("-"*80)46    logging.info("Working dir, no cleanup  ***(WITH ERROR)***")47    try:48        with osutils.create_working_dir(working_dir='./foo/') as wdir:49            os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))50            123 / 051    except:52        pass53    assert os.path.exists(wdir)54    shutil.rmtree(wdir)55    logging.info("-"*80)56    logging.info("No working dir, with cleanup  ***(WITH ERROR)***")57    try:58        with osutils.create_working_dir(delete_if_no_error=True) as wdir:59            os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))60            123 / 061    except:62        pass63    assert os.path.exists(wdir)64    shutil.rmtree(wdir)65    logging.info("-"*80)66    logging.info("Working dir, with cleanup  ***(WITH ERROR)***")67    try:68        with osutils.create_working_dir(working_dir='./bar/', delete_if_no_error=True) as wdir:69            os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))70            123 / 071    except:72        pass73    assert os.path.exists(wdir)74    shutil.rmtree(wdir)75if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
