Best Python code snippet using tempest_python
main_window.py
Source:main_window.py  
...181        self.file.change_directory(self.directory)182        if self.validate.requirement_check() != True:183            self.button_state(False)184            self.action_state(False)185            self.create_log(self.validate.requirement_check())186        else:187            self.button_state(True)188            self.action_state(True)189            self.create_log(190                "<span style='color:green;'>All the requirements are met.</span>"191            )192            self.port = self.file.find_ports(self.env_file_name)193    def create_log(self, message: str) -> None:194        """195        create_log creates the log.196        :param message: The message to be displayed.197        :type message: str198        """199        self.current_time = time.localtime()200        self.current_time = time.strftime("%H:%M:%S", self.current_time)201        self.op_log.append(202            "<html><body>"203            + "<b style='color:blue;'>"204            + f"[{self.current_time}]"205            + "</b>"206            + " >>> "207            + message208            + "<br/></body></html>"209        )210    def button_state(self, state: bool) -> None:211        """212        button_state changes the state of the buttons.213        :param state: The state of the buttons.214        :type state: bool215        """216        self.start_stop_btn.setEnabled(state)217        self.lhost_btn.setEnabled(state)218        self.pma_btn.setEnabled(state)219        self.flocation_btn.setEnabled(state)220    def action_state(self, state: bool) -> None:221        """222        action_state changes the state of the actions.223        :param state: The state of the actions.224        :type state: bool225        """226        self.actionEdit_Ports.setEnabled(state)227        self.actionRemove_Services.setEnabled(state)228    def service_state(self) -> None:229        """230        service_state changes the state of the services.231        """232        ready_msg_1 = "Starting..."233        ready_msg_2 = "Stopping..."234        success_msg_1 = "Service started."235        success_msg_2 = "Service stopped."236        error_msg_1 = "Service failed to start."237        error_msg_2 = "Service failed to stop."238        btn_state = self.start_stop_btn.isChecked()239        if btn_state:240            self.create_log(ready_msg_1)241            self.start_stop_btn.setText("Stop")242            if self.docker.start():243                self.create_log(success_msg_1)244            else:245                self.create_log(error_msg_1)246                self.error.show(error_msg_1)247        else:248            self.create_log(ready_msg_2)249            self.start_stop_btn.setText("Start")250            if self.docker.stop():251                self.create_log(success_msg_2)252            else:253                self.create_log(error_msg_2)254                self.error.show(error_msg_2)255    def open_localhost(self) -> None:256        """257        open_localhost opens the localhost.258        """259        ready_msg = "Opening localhost..."260        success_msg = "Opened localhost."261        warning_msg = "Please start the service first."262        error_msg = "Failed to open localhost."263        url = f"http://localhost:{self.port['WEB_PORT']}"264        if self.start_stop_btn.isChecked():265            self.create_log(ready_msg)266            try:267                self.file.open_this(url)268                self.create_log(success_msg)269            except:270                self.create_log(error_msg)271                self.error.show(error_msg)272        else:273            self.create_log(warning_msg)274            self.error.show(warning_msg)275    def open_pma(self) -> None:276        """277        open_pma opens the phpmyadmin.278        """279        ready_msg = "Opening phpmyadmin..."280        success_msg = "Opened phpmyadmin."281        warning_msg = "Please start the service first."282        error_msg = "Failed to open phpmyadmin."283        url = f"http://localhost:{self.port['PMA_PORT']}"284        if self.start_stop_btn.isChecked():285            self.create_log(ready_msg)286            try:287                self.file.open_this(url)288                self.create_log(success_msg)289            except:290                self.create_log(error_msg)291                self.error.show(error_msg)292        else:293            self.create_log(warning_msg)294            self.error.show(warning_msg)295    def open_project(self) -> None:296        """297        open_project opens the project.298        """299        success_msg = "Opened project."300        error_msg = "Failed to open project folder."301        url = f"{self.directory}/{self.public_directory}"302        self.create_log("Opening project...")303        try:304            self.file.open_this(url)305            self.create_log(success_msg)306        except:307            self.create_log(error_msg)308            self.error.show(error_msg)309    def create_project(self) -> None:310        """311        create_project creates the project.312        """313        ready_msg = "Adding new project..."314        success_msg = "Project created."315        error_msg = "Failed to create project."316        self.create_log(ready_msg)317        if self.new_project.show():318            self.create_log(success_msg)319        else:320            self.create_log(error_msg)321            self.error.show(error_msg)322        self.load_projects()323    def exit_app(self) -> None:324        """325        exit_app will exit the application.326        """327        ready_msg = "Stopping services..."328        confirm_msg = "Are you sure you want to quit?"329        success_msg = "Exited."330        cancel_msg = "Exiting canceled."331        if self.confirm.show(confirm_msg):332            self.create_log(ready_msg)333            self.docker.stop()334            self.create_log(success_msg)335            exit()336        else:337            self.create_log(cancel_msg)338    def edit_ports(self) -> None:339        """340        edit_ports will edit the ports.341        """342        ready_msg = "Editing ports..."343        success_msg = "Ports edited."344        warning_msg = "Please start the service first."345        error_msg = "Failed to edit ports."346        self.create_log(ready_msg)347        if not self.start_stop_btn.isChecked():348            result = self.edit_port_dialog.show()349            if result != False:350                self.create_log(success_msg)351            else:352                self.create_log(error_msg)353                self.error.show(error_msg)354        else:355            self.create_log(warning_msg)356            self.error.show(warning_msg)357        self.port = self.file.find_ports(self.env_file_name)358    def remove_services(self) -> None:359        """360        remove_services will remove the services.361        """362        ready_msg = "Removing services..."363        confirm_msg = "Are you sure you want to remove the services?"364        success_msg = "Services removed."365        warning_msg = "Please start the service first."366        error_msg = "Failed to remove services."367        cancel_msg = "Services removal canceled."368        if not self.start_stop_btn.isChecked():369            self.create_log(ready_msg)370            if self.confirm.show(confirm_msg):371                if self.docker.remove():372                    self.create_log(success_msg)373                else:374                    self.create_log(error_msg)375                    self.error.show(error_msg)376            else:377                self.create_log(cancel_msg)378        else:379            self.create_log(warning_msg)...tests.py
Source:tests.py  
...173	# scopes174	def test_get_unique_scopes(self):175		scope1 = 'test1'176		scope2 = 'test2'177		log1 = self.access_log.create_log(scope=scope1)178		log2 = self.access_log.create_log(scope=scope1)179		log3 = self.access_log.create_log(scope=scope2)180		unique_scopes = self.access_log.get_unique_scopes()181		self.assertEqual(2, len(unique_scopes))182		self.assertTrue(scope1 in unique_scopes)183		self.assertTrue(scope2 in unique_scopes)184	# logs185	def test_create_log_with_preset_remote_origin(self):186		preset_remote_origin = '1.1.1.1'187		self.access_log.remote_origin = '1.1.1.1'188		log = self.access_log.create_log()189		self.assertEqual(preset_remote_origin, log.remote_origin.exploded)190	def test_prune_logs_all(self):191		log1 = self.access_log.create_log(creation_time=1)192		log2 = self.access_log.create_log(creation_time=2)193		log3 = self.access_log.create_log(creation_time=3)194		self.assertIsNotNone(self.access_log.get_log(log1.id))195		self.assertIsNotNone(self.access_log.get_log(log2.id))196		self.assertIsNotNone(self.access_log.get_log(log3.id))197		self.access_log.prune_logs()198		self.assertIsNone(self.access_log.get_log(log1.id))199		self.assertIsNone(self.access_log.get_log(log2.id))200		self.assertIsNone(self.access_log.get_log(log3.id))201	def test_prune_logs_created_before(self):202		log1 = self.access_log.create_log(creation_time=1)203		log2 = self.access_log.create_log(creation_time=2)204		log3 = self.access_log.create_log(creation_time=3)205		self.assertIsNotNone(self.access_log.get_log(log1.id))206		self.assertIsNotNone(self.access_log.get_log(log2.id))207		self.assertIsNotNone(self.access_log.get_log(log3.id))208		self.access_log.prune_logs(created_before=3)209		self.assertIsNone(self.access_log.get_log(log1.id))210		self.assertIsNone(self.access_log.get_log(log2.id))211		self.assertIsNotNone(self.access_log.get_log(log3.id))212	def test_cooldown_remote_origin(self):213		remote_origin = '1.1.1.1'214		amount = 1215		period = 60216		scope = 'test'217		self.assertFalse(218			self.access_log.cooldown(219				scope,220				amount,221				period,222				remote_origin=remote_origin,223			)224		)225		self.access_log.create_log(scope=scope, remote_origin=remote_origin)226		# explicit remote origin227		self.assertTrue(228			self.access_log.cooldown(229				scope,230				amount,231				period,232				remote_origin=remote_origin,233			)234		)235		# preset remote origin is used if not specified236		self.access_log.remote_origin = remote_origin237		self.assertTrue(self.access_log.cooldown(scope, amount, period))238		# remote origin matches even logs with non-matching subject id239		self.assertTrue(240			self.access_log.cooldown(241				scope,242				amount,243				period,244				remote_origin=remote_origin,245				subject_id=uuid.uuid4().bytes,246			)247		)248	def test_cooldown_subject_id(self):249		subject_id = uuid.uuid4().bytes250		amount = 1251		period = 60252		scope = 'test'253		self.assertFalse(254			self.access_log.cooldown(255				scope,256				amount,257				period,258				subject_id=subject_id,259			)260		)261		self.access_log.create_log(scope=scope, subject_id=subject_id)262		self.assertTrue(263			self.access_log.cooldown(264				scope,265				amount,266				period,267				subject_id=subject_id,268			)269		)270		# subject id matches even logs with non-matching remote_origin271		self.assertTrue(272			self.access_log.cooldown(273				scope,274				amount,275				period,276				remote_origin='1.1.1.1',277				subject_id=subject_id,278			)279		)280	def test_cooldown_range_and_amount_per_period(self):281		remote_origin = '1.1.1.1'282		self.access_log.remote_origin = remote_origin283		amount = 1284		period = 60285		scope = 'test'286		the_past = time.time() - period - 1287		# logs outside of the cooldown period aren't counted towards the amount288		self.access_log.create_log(scope=scope, creation_time=the_past)289		# multiple logs up to a cap can be set for a given period290		self.access_log.create_log(scope=scope)291		amount = 2292		self.assertFalse(self.access_log.cooldown(scope, amount, period))293		self.access_log.create_log(scope=scope)294		self.assertTrue(self.access_log.cooldown(scope, amount, period))295	# anonymization296	def test_anonymize_id(self):297		id = uuid.uuid4().bytes298		log_subject = self.access_log.create_log(subject_id=id)299		log_object = self.access_log.create_log(object_id=id)300		count_methods_filter_fields = [301			(self.access_log.count_logs, 'subject_ids'),302			(self.access_log.count_logs, 'object_ids'),303		]304		for count, filter_field in count_methods_filter_fields:305			self.assertEqual(1, count(filter={filter_field: id}))306		new_id_bytes = self.access_log.anonymize_id(id)307		for count, filter_field in count_methods_filter_fields:308			self.assertEqual(0, count(filter={filter_field: id}))309		# assert logs still exist, but with the new id as subject/object310		for count, filter_field in count_methods_filter_fields:311			self.assertEqual(1, count(filter={filter_field: new_id_bytes}))312	#TODO passing in an id to use for anonymization is allowed, so test it313	def test_anonymize_id_with_new_id(self):314		pass315	def test_anonymize_log_origins(self):316		origin1 = '1.2.3.4'317		expected_anonymized_origin1 = '1.2.0.0'318		log1 = self.access_log.create_log(remote_origin=origin1)319		origin2 = '2001:0db8:85a3:0000:0000:8a2e:0370:7334'320		expected_anonymized_origin2 = '2001:0db8:85a3:0000:0000:0000:0000:0000'321		log2 = self.access_log.create_log(remote_origin=origin2)322		logs = self.access_log.search_logs()323		self.access_log.anonymize_log_origins(logs)324		anonymized_log1 = self.access_log.get_log(log1.id)325		anonymized_log2 = self.access_log.get_log(log2.id)326		self.assertEqual(327			expected_anonymized_origin1,328			anonymized_log1.remote_origin.exploded,329		)330		self.assertEqual(331			expected_anonymized_origin2,332			anonymized_log2.remote_origin.exploded,333		)334if __name__ == '__main__':335	if '--db' in sys.argv:...integrityChecker.py
Source:integrityChecker.py  
...19from checkLogPath import checkLogPath20from writeIntegrityPanic import writeIntegrityPanic21from createProject import createProject22class integrityChecker(opdb_setup,getElementType,getProjects,createElement,getElementList,getVersionList,checkLogPath,writeIntegrityPanic,createProject):23   def create_log(self,Path,logdescription):24	logfile=""25	if self.getElementType_main(Path=Path)=="projectRoot":26		logfile=self.AdminROOT+"/common/integrity_logs/"+str(self.timerec)+".log"27	elif self.getElementType_main(Path=Path)!="":28		pr=Path.split(":")[1]29		logfile=self.AdminROOT+"/"+pr+"/integrity_logs/"+str(self.timerec)+".log"30	else:31		pass32		33	if logfile!="":34		if os.path.isfile(logfile):35			fl=open(logfile,"r")36			rea=fl.read()37			fl.close()38			39			tolog=rea.strip().lstrip()+"\n"+str(Path)+"   "+logdescription+"\n"40			41			fl=open(logfile,"w")42			fl.write(tolog)43			fl.close()44		else:45			tolog=str(Path)+"   "+logdescription+"\n"46			47			fl=open(logfile,"w")48			fl.write(tolog)49			fl.close()			50   def checker_loop(self,Path):51	print Path52	path_type=self.getElementType_main(Path=Path)53	if path_type=="projectRoot":54		pre=self.getProjects_main()55		p_c=os.listdir(self.ProjectROOT+Path.replace(":","/"))56		p_d=[]57		for p_ in p_c:58			if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+str(p_)):59				p_d.append(p_)60		61		filestodelete=[]62		for pp in p_c:63			if os.path.isfile(self.ProjectROOT+"/"+str(pp)):64				if pp=="elements.atr" or pp=="versions.atr":65					filestodelete.append(pp)66					67		if len(filestodelete) > 0:68			for flss in filestodelete:69				os.remove(self.ProjectROOT+"/"+flss)70				self.create_log(Path,"Project root was cleaned from failing .atr files.")71		72		duplchk={}73		for i in pre:74			duplchk[i]=pre.count(i)75		ccc=076		for i in duplchk.keys():77			if int(duplchk[i]) > 1:78				ccc=179			else:80				pass81		if ccc==1:82			ret_solve=""83			for elem in duplchk.keys():84				ret_solve+=elem+"\n"85			#-------------------------here we solve the duplicated entries problem86			fi=open(self.ProjectROOT+"/projects.atr","w")87			fi.write(ret_solve)88			fi.close()89			pre=self.getProjects_main()90			self.create_log(Path,"There was duplicated entries in the elementdescription file, so I recreated them.")91				92		93		for prr in pre:94			#------------- solving the case of the registrad folder without a type definition file95			if os.path.isdir(self.ProjectROOT+"/"+prr):96				if self.getElementType_main(Path=Path+prr)=="":97					ff=open(self.ProjectROOT+"/"+prr+"/elements.atr","w")98					ff.write("")99					ff.close()100					self.create_log(Path,"This was a registred empty folder: "+str(":"+prr)+" now it is a container.")101				else: 102					pass103		104		if set(pre) == set(p_d):105			pass106				107		else:108			p_mr=list(set(pre).difference(set(p_d))) #tobb a bejegyzes mint kene109			p_md=list(set(p_d).difference(set(pre))) #tobb a konyvtar mint kene110			if len(p_md) > 0:111				for di in p_md:112					if self.getElementType_main(Path=Path+di)=="":113						114						if len(os.listdir(self.ProjectROOT+"/"+di)) < 1:115							#-----------------------we solved the empty folder case..........116							os.rmdir(self.ProjectROOT+"/"+di)117							self.create_log(Path,"Empty folder was deleted: "+str(":"+di))118						else:	119							#-------------------------------------------here we solve the problem when we found a directory without any element entry and we do not even know its type120							self.writeIntegrityPanic_main(Path=Path,Problem=di,Description="I can not categorize this folder: "+di+" at the end of the path, please take care about it.")121							self.create_log(Path,"User action is needed for: "+str(":"+di))122						123					elif self.getElementType_main(Path=Path+di)=="item" or self.getElementType_main(Path=Path+di)=="container":124						125						#---------------------------------herer we solve the case with the folders which are for delete126						if int(self.checkLogPath_main(logSystemPath=self.AdminROOT+"/common/project_delete_list.atr",Path=Path+str(di)))==int(1):127							pass128						else:129							fl=open(self.AdminROOT+"/common/project_delete_list.atr","r")130							rea=fl.read()131							fl.close()132							133							tolog=rea.strip().lstrip()+"\n"+str(Path+di)+"   integrityCheck   "+str(self.timerec)+"   integrityCheck automatically placed this item to the delelte list.\n"134					135							fl=open(self.AdminROOT+"/common/project_delete_list.atr","w")136							fl.write(tolog)137							fl.close()	138							self.create_log(Path,"Delete request was placed for: "+str(":"+di))139			140			if len(p_mr) > 0:141				for el in p_mr:142					#--------------------here we solve the problem when there is more items in a project.art file, and we create containers for it.143					#os.makedirs(self.ProjectROOT+Path.replace(":","/")+str(el))144					#pf=open(self.ProjectROOT+str(Path.replace(":","/"))+"/"+str(el)+"/elements.atr","w")145					#pf.write("")146					#pf.close()147					self.createProject_main(str(el))148					self.create_log(Path,"Project: "+str(el)+" created.")149		150	151		for em in pre:152			self.checker_loop(":"+em)153		154		#ve=self.getVersionList_main(Path)155		#if len(ve)>0:156		#	for v in self.getVersionList_main(Path):157		#		self.checker_loop(Path+"@"+v[0])158	159	elif path_type=="container" or path_type=="item":160		pre=self.getElementList_main(Path=Path)161		p_c=os.listdir(self.ProjectROOT+Path.replace(":","/"))162		p_d=[]163		for p_ in p_c:164			if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+"/"+str(p_)):165				p_d.append(p_)166		duplchk={}167		for i in pre:168			duplchk[i]=pre.count(i)169		ccc=0170		171		for i in duplchk.keys():172			if int(duplchk[i]) > 1:173				ccc=1174			else:175				pass176		if ccc==1:177			ret_solve=""178			for elem in duplchk.keys():179				ret_solve+=elem+"\n"180			#-------------------------here we solve the duplicated entries problem181			fi=open(self.ProjectROOT+Path.replace(":","/")+"/elements.atr","w")182			fi.write(ret_solve)183			fi.close()184			pre=self.getProjects_main()185			self.create_log(Path,"There was duplicated entries in the elementdescription file, so I recreated them.")186			187		for prr in pre:188			#------------- solving the case of the registred folder without a type definition file189			if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+"/"+prr):190				if self.getElementType_main(Path=Path+":"+prr)=="":191					ff=open(self.ProjectROOT+Path.replace(":","/")+"/"+prr+"/elements.atr","w")192					ff.write("")193					ff.close()194					self.create_log(Path,"This was a registred empty folder: "+str(Path+":"+prr)+" now it is a container.")195				else: 196					pass197		198		if set(pre) == set(p_d):199			pass200		201		else:202			p_mr=list(set(pre).difference(set(p_d))) #tobb a bejegyzes mint kene203			p_md=list(set(p_d).difference(set(pre))) #tobb a konyvtar mint kene204			if len(p_md) > 0:205				for di in p_md:206					207					if self.getElementType_main(Path=Path+":"+di)=="":208						if len(os.listdir(self.ProjectROOT+Path.replace(":","/")+"/"+di)) < 1:209							#-----------------------we solved the empty folder case..........210							os.rmdir(self.ProjectROOT+Path.replace(":","/")+"/"+di)211							self.create_log(Path,"Empty folder was deleted: "+str(Path+":"+di))212						else:	213							#-------------------------------------------here we solve the problem when we found a directory without any element entry and we do not even know its type214							self.writeIntegrityPanic_main(Path=Path,Problem=di,Description=("I can not categorize this folder: "+di+" at the end of the path, please take care about it."))215							self.create_log(Path,"User action is needed for: "+str(Path+":"+di))216						217					elif self.getElementType_main(Path=Path+":"+di)=="item" or self.getElementType_main(Path=Path+":"+di)=="container":218						219						#---------------------------------herer we solve the case with the folders which are for delete220						if int(self.checkLogPath_main(logSystemPath=self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr",Path=Path+":"+str(di)))==int(1):221							pass222						else:223							fl=open(self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr","r")224							rea=fl.read()225							fl.close()226							227							tolog=rea.strip().lstrip()+"\n"+str(Path+":"+di)+"   integrityCheck   "+str(self.timerec)+"   integrityCheck automatically placed this item to the delelte list.\n"228					229							fl=open(self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr","w")230							fl.write(tolog)231							fl.close()	232							self.create_log(Path,"Delete request was placed for: "+str(Path+":"+di))233			234			if len(p_mr) > 0:235				for el in p_mr:236					#--------------------here we solve the problem when there is more items in a element.art file, and we create containers for it.237					os.makedirs(self.ProjectROOT+Path.replace(":","/")+"/"+str(el))238					pf=open(self.ProjectROOT+str(Path.replace(":","/"))+"/"+str(el)+"/elements.atr","w")239					pf.write("")240					pf.close()241					self.create_log(Path,"Container: "+str(el)+" created.")242		243	244		if path_type=="item":245			vers=self.getVersionList_main(Path=Path)246			if len(vers) > 0:247				for ww in vers:248					vw=ww[0]249					if os.path.isdir(self.VaultROOT+"/"+Path.replace(":","/")+"/"+vw):250						pass251					else:252						os.makedirs(self.VaultROOT+"/"+Path.replace(":","/")+"/"+vw)253						self.create_log(Path,"Version folder was created in the vaults: @"+str(vw))254			else:255				if os.path.isdir(self.VaultROOT+"/"+Path.replace(":","/")):256					pass257				else:258					os.makedirs(self.VaultROOT+"/"+Path.replace(":","/"))259					self.create_log(Path,"Item folder was created in the Vault.")260		for em in pre:261			self.checker_loop(Path+":"+em)262		263		#ve=self.getVersionList_main(Path)264		#if len(ve)>0:265		#	for v in self.getVersionList_main(Path):266		#		self.checker_loop(Path+"@"+v[0])267			268	elif path_type=="version":269		pass270		271		272	elif path_type=="attribute":273		pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
