How to use test_case_complete method in Testify

Best Python code snippet using Testify_python

test_shared.py

Source:test_shared.py Github

copy

Full Screen

1'''2Test module for cross-module logic for running3integration tests for the `yt_videos_list`4package - regardless of the driver5being tested or host operating system.6'''7import os8import sys9import time10import shutil11import hashlib12import datetime13import threading14from determine import determine_path_slash15from save_thread_result import ThreadWithResult16from yt_videos_list import ListCreator17NOW = datetime.datetime.now18ISOFORMAT = datetime.datetime.isoformat19def run_tests_for(browsers_list):20 '''21 Runs 2 threads simultaneously to test both22 `reverse_chronological` AND `chronological`23 test cases for every driver in the provided24 `browsers_list`.25 '''26 file_content_mismatch = 'At least one output file produced by this test does not match the expectation!'27 test_cases = create_test_cases(browsers_list)28 total = len(test_cases)29 current = 030 log_files = [31 'CoreySchafer_reverse_chronological_videos_list.log',32 'CoreySchafer_chronological_videos_list.log',33 'CoreySchafer_reverse_chronological_video_ids_list.log',34 'CoreySchafer_chronological_video_ids_list.log'35 ]36 log_test_info('*' * 140, *log_files)37 log_test_info('Running tests!', *log_files)38 while current < total:39 # each test_case is a ListCreator instance for40 # video_id_only set to True or False with41 # reverse_chronological set to True or False42 # for EACH driver in the browsers_list,43 # and within EACH test case,44 # there are 5 variations to test - see45 # the `variations` `list`46 # in verify_update() for more details47 if threading.active_count() - 1 == 0:48 # the main thread counts as a thread, so we49 # need to subtract 1 to determine the50 # number of threads we've created51 # manually52 thread_1_case = test_cases[current]53 is_id = '_id' if getattr(thread_1_case, 'video_id_only') is True else ''54 if getattr(thread_1_case, 'reverse_chronological') is True: log_1_name = f'CoreySchafer_reverse_chronological_video{is_id}s_list.log'55 else: log_1_name = f'CoreySchafer_chronological_video{is_id}s_list.log'56 test_case_thread_1 = ThreadWithResult(target=run_test_case, args=(thread_1_case, log_1_name))57 log_test_info('Starting thread for test case 1...', log_1_name)58 test_case_thread_1.start()59 log_test_info('Started thread for test case 1!', log_1_name)60 current += 161 # safaridriver does not allow multi-threading:62 # Could not create a session: The Safari instance is already paired with another WebDriver session.63 if 'safari' not in browsers_list:64 if current == 1:65 # wait 5 seconds to allow all just the firefox selenium webdriver dependency to download (necessary after test_cross_platforms module runs remove_dependencies())66 time.sleep(5)67 thread_2_case = test_cases[current]68 is_id = '_id' if getattr(thread_2_case, 'video_id_only') is True else ''69 if getattr(thread_2_case, 'reverse_chronological') is True: log_2_name = f'CoreySchafer_reverse_chronological_video{is_id}s_list.log'70 else: log_2_name = f'CoreySchafer_chronological_video{is_id}s_list.log'71 test_case_thread_2 = ThreadWithResult(target=run_test_case, args=(thread_2_case, log_2_name))72 log_test_info('Starting thread for test case 2...', log_2_name)73 test_case_thread_2.start()74 log_test_info('Started thread for test case 2!', log_2_name)75 current += 176 while threading.active_count() - 1 != 0 and current < total:77 # the threads are still running78 time.sleep(7)79 if 'test_case_thread_1' in locals(): test_case_thread_1.join()80 if 'test_case_thread_2' in locals(): test_case_thread_2.join()81 if 'thread_1_case' in locals(): log_test_info(f'Finished testing {[thread_1_case]}!', log_1_name)82 if 'thread_2_case' in locals(): log_test_info(f'Finished testing {[thread_2_case]}!', log_2_name)83 if 'test_case_thread_1' in locals() and (getattr(test_case_thread_1, 'result', None) in [None, 'Failed!']): raise ValueError(file_content_mismatch)84 if 'test_case_thread_2' in locals() and (getattr(test_case_thread_2, 'result', None) in [None, 'Failed!']): raise ValueError(file_content_mismatch)85 test_case_complete = 'Moving on to the next driver...\n' + '⏬ '*11 + '\n\n\n'86 if 'thread_1_case' in locals() and 'thread_2_case' in locals(): log_test_info(test_case_complete, log_1_name, log_2_name)87 elif 'thread_1_case' in locals(): log_test_info(test_case_complete, log_1_name)88 elif 'thread_2_case' in locals(): log_test_info(test_case_complete, log_2_name)89def log_test_info(message, *args):90 thread_name = f'[{threading.current_thread().name}]'91 current_time = datetime.datetime.now().isoformat()92 utc_offset = time.strftime('%z')93 message = f'{current_time}{utc_offset} {thread_name:>12} {message}\n'94 sys.stdout.writelines(message)95 for log_file in args:96 with open(log_file, mode='a', encoding='utf-8') as output_location:97 output_location.writelines(message)98def create_test_cases(browsers):99 '''100 Creates an instance of `ListCreator` for a101 video_id_only=True and video_id_only=False for a102 reverse chronological AND chronological test case103 for each driver in the provided `browsers` list.104 '''105 return [106 ListCreator(driver=browser, reverse_chronological=is_reverse_chronological, video_id_only=is_video_id_only)107 for browser in browsers108 for is_video_id_only in [True, False]109 for is_reverse_chronological in [True, False]110 ]111def run_test_case(list_creator, log_file):112 '''113 Calls `verify_update()`, which114 runs all variations (no pre-existing files,115 pre-existing txt, pre-existing csv, pre-existing md, and116 pre-existing txt + csv + md files) of the provided test case117 (a specific driver for the `video_id_only` attribute set to `True` or `False`118 with the `reverse_chronological` attribute119 set to either `True` or `False`) using the120 `list_creator.create_list_for()` method and verifies resulting121 output files match the content in the corresponding full122 reference files.123 '''124 path_slash = determine_path_slash()125 test_url = 'youtube.com/user/schafer5'126 is_reverse_chronological = getattr(list_creator, 'reverse_chronological')127 is_video_id_only = getattr(list_creator, 'video_id_only')128 is_id = '_id' if is_video_id_only else ''129 suffix = f'reverse_chronological_video{is_id}s_list' if is_reverse_chronological else f'chronological_video{is_id}s_list'130 partialfile_path = f'tests{path_slash}reference_files{path_slash}partial_CoreySchafer_{suffix}'131 fullfile_path = f'tests{path_slash}reference_files{path_slash}full_CoreySchafer_{suffix}'132 return verify_update(list_creator, test_url, partialfile_path, fullfile_path, log_file)133def verify_update(list_creator, test_url, test_file, full_file, log_file):134 '''135 Uses the `reverse_chronological` and `video_id_only` attributes of the `list_creator`136 argument to determine the suffix, then uses the reference137 `test_file` to create a partial test file. Runs the138 `create_list_for(test_url)` method on the provided139 `list_creator` instance. Calls140 `compare_test_files_to_reference_files(full_file, file_name)`141 to ensure content in the created output files match the142 content in the full reference files.143 '''144 variations = [145 ['csv', 'txt', 'md'],146 [],147 ['csv'],148 ['txt'],149 ['md']150 ]151 for variation in variations:152 is_video_id_only = vars (list_creator)["video_id_only"]153 is_reverse_chronological = vars (list_creator)['reverse_chronological']154 driver_name = getattr(list_creator, 'driver')155 log_test_info(f'TESTING list_creator.video_id_only={is_video_id_only}, list_creator.reverse_chronological={is_reverse_chronological} for {driver_name}driver', log_file)156 log_test_info(f'Full configuration: {repr(list_creator)}', log_file)157 is_id = '_id' if is_video_id_only else ''158 suffix = f'reverse_chronological_video{is_id}s_list' if is_reverse_chronological else f'chronological_video{is_id}s_list'159 use_partial_files(variation, test_file, suffix, log_file) # the file this function creates should be the SAME as the returned string to the file_name variable in the next line160 test_output_file = list_creator.create_list_for(test_url, log_silently=True)[1][1]161 # verify calling the create_list_for() method updates the partial file properly162 failed = compare_test_files_to_reference_files(full_file, test_output_file, log_file)163 if failed == 'Failed!':164 return 'Failed!'165 return 'Passed!'166def use_partial_files(types_of_partial_files, test_file, suffix, log_file):167 '''168 Removes all pre-existing files with the corresponding `suffix`169 for the file extensions in the `types_of_partial_files` tuple170 (`reverse_chronological_video[_id]s_list`171 or172 `chronological_video[_id]s_list`;173 the prefix in all cases is `CoreySchafer_`), then creates a174 partial file for the file extensions in the175 `types_of_partial_files` tuple176 using the `partial_CoreySchafer_{suffix}.{extension}`177 reference file(s).178 '''179 log_test_info(f'TESTING with pre-existing files containing the following extensions: {types_of_partial_files}....\n', log_file)180 delete_all_test_output_files_with_suffix(suffix)181 for extension in types_of_partial_files:182 create_partial_file(test_file, suffix, extension)183def delete_all_test_output_files_with_suffix(suffix):184 '''185 Deletes all files containing the corresponding suffix to ensure186 tests return valid results not skewed by any pre-existing files.187 The `delete_file(filepath, extension)` helper function deletes188 pre-existing files with the specified extension. The `suffix`189 used to create `filepath` includes190 `reverse_chronological_video[_id]s_list` and `chronological_video[_id]s_list`191 and the extensions include `txt`, `csv`, and `md`. The prefix in192 all cases is `CoreySchafer_`193 '''194 test_output_file = f'CoreySchafer_{suffix}'195 delete_file(test_output_file, 'txt')196 delete_file(test_output_file, 'csv')197 delete_file(test_output_file, 'md' )198def delete_file(filepath, extension):199 '''200 Removes the file {filepath}.{extension} if it exists.201 '''202 if os.path.exists(f'{filepath}.{extension}'):203 os.remove(f'{filepath}.{extension}')204def create_partial_file(test_file, suffix, extension):205 '''206 Creates a partial file using `{test_file}.{extension}`207 as the reference file and renames it to208 `CoreySchafer_{suffix}.{extension}`.209 '''210 shutil.copy(f'{test_file}.{extension}', f'CoreySchafer_{suffix}.{extension}')211def compare_test_files_to_reference_files(full_file, test_output_file, log_file):212 '''213 Ensures the resulting test output file `test_output_file`214 contains the exact same content as the reference `full_file` by215 comparing the sha256 hash of both files. If the hashes match,216 the tests continue, but if the hashes don't match, the217 program exits by raising the `ValueError` exception in the218 originating `run_tests_for(browsers_list)` function.219 NOTE: on a multi-threaded program, a `ValueError` exception220 (or any other exception) only terminates the thread on which221 the exception is raised, but any other threads will222 continue to execute until they finish. For this reason, the223 `ValueError` exception is raised in the224 `run_tests_for(browsers_list)` function instead of in225 this function, since this function is called from and run from226 a subthread, whereas `run_tests_for(browsers_list)` function227 is called from and run on the MainThread.228 Since the exception is raised in the MainThread, any other229 concurrently running threads will continue to execute until230 they finish, but no FURTHER work will be done after that.231 '''232 with open(f'{test_output_file}.txt', mode='r', encoding='utf-8') as test_txt, open(f'{test_output_file}.csv', mode='r', encoding='utf-8') as test_csv, open(f'{test_output_file}.md', mode='r', encoding='utf-8') as test_md:233 current_txt = hashlib.sha256(test_txt.read().encode('utf-8')).hexdigest()234 current_csv = hashlib.sha256(test_csv.read().encode('utf-8')).hexdigest()235 current_md = hashlib.sha256(test_md.read().encode ('utf-8')).hexdigest()236 with open(f'{full_file}.txt', mode='r', encoding='utf-8') as full_txt, open(f'{full_file}.csv', mode='r', encoding='utf-8') as full_csv, open(f'{full_file}.md', mode='r', encoding='utf-8') as full_md:237 expected_txt = hashlib.sha256(full_txt.read().encode('utf-8')).hexdigest()238 expected_csv = hashlib.sha256(full_csv.read().encode('utf-8')).hexdigest()239 expected_md = hashlib.sha256(full_md.read().encode ('utf-8')).hexdigest()240 failed = False241 if current_txt != expected_txt: log_test_info(f'❌ ERROR! The updated txt file does NOT match the {full_file}.txt file!', log_file); failed = True242 else: log_test_info(f'✅ The updated txt file matches the {full_file}.txt file :)', log_file)243 if current_csv != expected_csv: log_test_info(f'❌ ERROR! The updated csv file does NOT match the {full_file}.csv file!', log_file); failed = True244 else: log_test_info(f'✅ The updated csv file matches the {full_file}.csv file :)', log_file)245 if current_md != expected_md: log_test_info(f'❌ ERROR! The updated md file does NOT match the {full_file}.md file!\n\n\n', log_file); failed = True246 else: log_test_info(f'✅ The updated md file matches the {full_file}.md file :)\n\n\n', log_file)247 if failed:248 log_test_info(f'❗️❗️ FAILED at {ISOFORMAT(NOW())}! ❗️❗️', log_file)249 return 'Failed!'250 return 'Passed!'251def delete_all_test_output_files():252 suffixes = [253 'chronological_videos_list',254 'chronological_video_ids_list',255 'reverse_chronological_videos_list',256 'reverse_chronological_video_ids_list',257 ]258 for suffix in suffixes:...

Full Screen

Full Screen

run_test_case.py

Source:run_test_case.py Github

copy

Full Screen

1import os2import json3import parse4import requests5import psycopg26import delegator7MODES_WORTH_PULLING = ['pull']8# get test_case_id & pull_mode in event.body9def handler(context, event):10 # load data from given json,11 run_information = json.loads(event.body)12 pull_mode = run_information.get('pull_mode')13 test_case_id = run_information.get('test_case_id')14 # get a connection to the postgresSql database15 conn = connect_to_db()16 # cur is the cursor of current connection17 cur = conn.cursor()18 # pull tester image19 run_command(context, f'docker pull localhost:5000/tester:latest-amd64')20 if _pull_mode_requires_pulling(pull_mode):21 _pull_images(context, cur, test_case_id)22 # gather logs to a variable23 logs = _run_next_test_case(context, test_case_id, cur)24 # update test_cases values25 _update_test_cases_logs(logs, test_case_id, cur)26 # get run_result from logs27 run_result = get_run_result_from_logs(logs)28 context.logger.info_with('Sending data to test_case_complete', test_case=test_case_id, test_case_result=run_result)29 # call test_case_complete30 # call_function('test_case_complete', json.dumps(31 # {32 # 'test_case': test_case_id,33 # 'test_case_result': 'success' if run_result == 0 else 'failure',34 # }))35# gets current cursor and command, alerts if returned 0 values36def get_cursors_one_result(cur, cmd):37 returned_value = cur.fetchone()38 if returned_value is None:39 return Exception(cmd)40 return returned_value[0]41# get run result, ignore empty lines42def get_run_result_from_logs(logs):43 # separate to lines44 run_result = logs.split('/n')45 # filter all empty lines46 run_result = list(filter(lambda x: x, run_result))47 # return last line48 return run_result[-1]49# connect to db, return psycopg2 connection50def connect_to_db():51 # get postgres connection information from container's environment vars52 postgres_info = parse_env_var_info(os.environ.get('PGINFO'))53 # raise NameError if env var not found, or found in wrong format54 if postgres_info is None:55 raise ValueError('Local variable PGINFO in proper format (user:password@host:port or user:password@host)'56 ' could not be found')57 postgres_user, postgres_password, postgres_host, postgres_port = postgres_info58 # connect to postgres database, set autocommit to True to avoid committing59 conn = psycopg2.connect(host=postgres_host, user=postgres_user, password=postgres_password, port=postgres_port)60 conn.autocommit = True61 return conn62# calls given function with given arguments, returns body of response63def call_function(function_name, function_arguments=None):64 functions_ports = {65 'database_init': 36543,66 'github_status_updater': 36544,67 'slack_notifier': 36545,68 'build_and_push_artifacts': 36546,69 'run_test_case': 36547,70 'test_case_complete': 3654871 }72 # if given_host is specified post it instead of73 given_host = os.environ.get('DOCKER_HOST', '172.17.0.1')74 response = requests.post('http://{0}:{1}'.format(given_host, functions_ports[function_name]),75 data=function_arguments)76 return response.text77# gets string to process, return None if format is wrong, list of info if format is well writen -78# return-list : username, password, host, port79def parse_env_var_info(formatted_string):80 if formatted_string is not None:81 # check if default formatted string given82 if parse.parse('{}:{}@{}:{}', formatted_string) is not None:83 return list(parse.parse('{}:{}@{}:{}', formatted_string))84 # if not, try get same format without the port specification85 if parse.parse('{}:{}@{}', formatted_string) is not None:86 return list(parse.parse('{}:{}@{}', formatted_string)) + [5432]87 return None88# get env in map format {"key1":"value1"}89def run_command(context, cmd, cwd=None, env=None):90 context.logger.info_with('Running command', cmd=cmd, cwd=cwd, env=env)91 os_environ_copy = os.environ.copy()92 if env is not None:93 for key in env:94 del os_environ_copy[key]95 env.update(os_environ_copy)96 else:97 env = os_environ_copy98 if cwd is not None:99 cmd = f'cd {cwd} && {cmd}'100 proc = delegator.run(cmd, env=env)101 # if we got here, the process completed102 if proc.return_code != 0:103 raise ValueError(f'Command failed. cmd({cmd}) result({proc.return_code}), log({proc.out})')104 # log result105 context.logger.info_with('Command executed successfully', Command=cmd, Exit_code=proc.return_code, Stdout=proc.out)106 return proc.out107def get_artifact_test_from_test_case(cur, test_case_id):108 cur.execute('select artifact_test from test_cases where oid = %s', (test_case_id,))109 return get_cursors_one_result(cur, f'select artifact_test from test_cases where oid = {test_case_id}')110def _pull_mode_requires_pulling(pull_mode):111 return pull_mode in MODES_WORTH_PULLING112def _run_next_test_case(context, test_case_id, cur):113 artifact_test = get_artifact_test_from_test_case(cur, test_case_id)114 return(run_command(context, f'docker run --rm --volume /var/run/docker.sock:/var/run/docker.sock '115 f'--volume /tmp:/tmp --workdir /go/src/github.com/nuclio/nuclio --env '116 f'NUCLIO_TEST_HOST=172.17.0.1 localhost:5000/tester:latest-amd64'117 f' /bin/bash -c "make test-undockerized '118 f'NUCLIO_TEST_NAME=github.com/nuclio/nuclio/{artifact_test}" && echo $?'))119# pull images of given test case,120def _pull_images(context, cur, test_case_id):121 # get job's artifact-urls122 cur.execute('select job from test_cases where oid = %s', (test_case_id,))123 jobs_id = get_cursors_one_result(cur, f'select job from test_cases where oid = {test_case_id}')124 # get all artifact url's from job's id125 cur.execute('select artifact_urls from jobs where oid = %s', (jobs_id,))126 artifact_urls = json.loads(get_cursors_one_result(cur, f'select artifact_urls from jobs where oid = {jobs_id}'))127 for url in artifact_urls:128 run_command(context, f'docker pull {url}')129def _update_test_cases_logs(logs, test_case_id, cur):...

Full Screen

Full Screen

test_case_time_log.py

Source:test_case_time_log.py Github

copy

Full Screen

...26 root = logging.getLogger('')27 if self.log_hndl:28 # Remove it if we already have one29 root.removeHandler(self.log_hndl)30 def test_case_complete(self, result):31 self.log_file.write(json.dumps(result))32 self.log_file.write("\n")33 self._reset_logging()34 def report(self):35 self.log_file.write("RUN COMPLETE\n")36 self.log_file.close()37 return True38# Hooks for plugin system39def add_command_line_options(parser):40 parser.add_option(41 "--test-case-results",42 action="store",43 dest="test_case_json_results",44 type="string",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful