Best Python code snippet using Testify_python
test_shared.py
Source:test_shared.py  
1'''2Test module for cross-module logic for running3integration tests for the `yt_videos_list`4package - regardless of the driver5being tested or host operating system.6'''7import os8import sys9import time10import shutil11import hashlib12import datetime13import threading14from determine import determine_path_slash15from save_thread_result import ThreadWithResult16from yt_videos_list import ListCreator17NOW       = datetime.datetime.now18ISOFORMAT = datetime.datetime.isoformat19def run_tests_for(browsers_list):20    '''21    Runs 2 threads simultaneously to test both22    `reverse_chronological` AND `chronological`23    test cases for every driver in the provided24    `browsers_list`.25    '''26    file_content_mismatch = 'At least one output file produced by this test does not match the expectation!'27    test_cases = create_test_cases(browsers_list)28    total      = len(test_cases)29    current    = 030    log_files  = [31        'CoreySchafer_reverse_chronological_videos_list.log',32        'CoreySchafer_chronological_videos_list.log',33        'CoreySchafer_reverse_chronological_video_ids_list.log',34        'CoreySchafer_chronological_video_ids_list.log'35    ]36    log_test_info('*' * 140,        *log_files)37    log_test_info('Running tests!', *log_files)38    while current < total:39        # each test_case is a ListCreator instance for40        # video_id_only set to True or False with41        # reverse_chronological set to True or False42        # for EACH driver in the browsers_list,43        # and within EACH test case,44        # there are 5 variations to test - see45        # the `variations` `list`46        # in verify_update() for more details47        if threading.active_count() - 1 == 0:48            # the main thread counts as a thread, so we49            # need to subtract 1 to determine the50            # number of threads we've created51            # manually52            thread_1_case = test_cases[current]53            is_id         = '_id' if getattr(thread_1_case, 'video_id_only') is True else ''54            if getattr(thread_1_case, 'reverse_chronological') is True: log_1_name = f'CoreySchafer_reverse_chronological_video{is_id}s_list.log'55            else:                                                       log_1_name = f'CoreySchafer_chronological_video{is_id}s_list.log'56            test_case_thread_1 = ThreadWithResult(target=run_test_case, args=(thread_1_case, log_1_name))57            log_test_info('Starting thread for test case 1...', log_1_name)58            test_case_thread_1.start()59            log_test_info('Started thread for test case 1!', log_1_name)60            current += 161            # safaridriver does not allow multi-threading:62            # Could not create a session: The Safari instance is already paired with another WebDriver session.63            if 'safari' not in browsers_list:64                if current == 1:65                    # wait 5 seconds to allow all just the firefox selenium webdriver dependency to download (necessary after test_cross_platforms module runs remove_dependencies())66                    time.sleep(5)67                thread_2_case = test_cases[current]68                is_id         = '_id' if getattr(thread_2_case, 'video_id_only') is True else ''69                if getattr(thread_2_case, 'reverse_chronological') is True: log_2_name = f'CoreySchafer_reverse_chronological_video{is_id}s_list.log'70                else:                                                       log_2_name = f'CoreySchafer_chronological_video{is_id}s_list.log'71                test_case_thread_2 = ThreadWithResult(target=run_test_case, args=(thread_2_case, log_2_name))72                log_test_info('Starting thread for test case 2...', log_2_name)73                test_case_thread_2.start()74                log_test_info('Started thread for test case 2!', log_2_name)75                current += 176        while threading.active_count() - 1 != 0 and current < total:77            # the threads are still running78            time.sleep(7)79        if 'test_case_thread_1' in locals(): test_case_thread_1.join()80        if 'test_case_thread_2' in locals(): test_case_thread_2.join()81        if 'thread_1_case' in locals(): log_test_info(f'Finished testing {[thread_1_case]}!', log_1_name)82        if 'thread_2_case' in locals(): log_test_info(f'Finished testing {[thread_2_case]}!', log_2_name)83        if 'test_case_thread_1' in locals() and (getattr(test_case_thread_1, 'result', None) in [None, 'Failed!']): raise ValueError(file_content_mismatch)84        if 'test_case_thread_2' in locals() and (getattr(test_case_thread_2, 'result', None) in [None, 'Failed!']): raise ValueError(file_content_mismatch)85        test_case_complete = 'Moving on to the next driver...\n' + '⬠'*11 + '\n\n\n'86        if   'thread_1_case' in locals() and 'thread_2_case' in locals(): log_test_info(test_case_complete, log_1_name, log_2_name)87        elif 'thread_1_case' in locals():                                 log_test_info(test_case_complete, log_1_name)88        elif 'thread_2_case' in locals():                                 log_test_info(test_case_complete, log_2_name)89def log_test_info(message, *args):90    thread_name  = f'[{threading.current_thread().name}]'91    current_time = datetime.datetime.now().isoformat()92    utc_offset   = time.strftime('%z')93    message      = f'{current_time}{utc_offset} {thread_name:>12} {message}\n'94    sys.stdout.writelines(message)95    for log_file in args:96        with open(log_file, mode='a', encoding='utf-8') as output_location:97            output_location.writelines(message)98def create_test_cases(browsers):99    '''100    Creates an instance of `ListCreator` for a101    video_id_only=True and video_id_only=False for a102    reverse chronological AND chronological test case103    for each driver in the provided `browsers` list.104    '''105    return [106        ListCreator(driver=browser, reverse_chronological=is_reverse_chronological, video_id_only=is_video_id_only)107        for browser in browsers108        for is_video_id_only         in [True, False]109        for is_reverse_chronological in [True, False]110    ]111def run_test_case(list_creator, log_file):112    '''113    Calls `verify_update()`, which114    runs all variations (no pre-existing files,115    pre-existing txt, pre-existing csv, pre-existing md, and116    pre-existing txt + csv + md files) of the provided test case117    (a specific driver for the `video_id_only` attribute set to `True` or `False`118    with the `reverse_chronological` attribute119    set to either `True` or `False`) using the120    `list_creator.create_list_for()` method and verifies resulting121    output files match the content in the corresponding full122    reference files.123    '''124    path_slash               = determine_path_slash()125    test_url                 = 'youtube.com/user/schafer5'126    is_reverse_chronological = getattr(list_creator, 'reverse_chronological')127    is_video_id_only         = getattr(list_creator, 'video_id_only')128    is_id                    = '_id'                                       if is_video_id_only         else  ''129    suffix                   = f'reverse_chronological_video{is_id}s_list' if is_reverse_chronological else f'chronological_video{is_id}s_list'130    partialfile_path         = f'tests{path_slash}reference_files{path_slash}partial_CoreySchafer_{suffix}'131    fullfile_path            = f'tests{path_slash}reference_files{path_slash}full_CoreySchafer_{suffix}'132    return verify_update(list_creator, test_url, partialfile_path, fullfile_path, log_file)133def verify_update(list_creator, test_url, test_file, full_file, log_file):134    '''135    Uses the `reverse_chronological` and `video_id_only` attributes of the `list_creator`136    argument to determine the suffix, then uses the reference137    `test_file` to create a partial test file. Runs the138    `create_list_for(test_url)` method on the provided139    `list_creator` instance. Calls140    `compare_test_files_to_reference_files(full_file, file_name)`141    to ensure content in the created output files match the142    content in the full reference files.143    '''144    variations = [145        ['csv', 'txt', 'md'],146        [],147        ['csv'],148        ['txt'],149        ['md']150    ]151    for variation in variations:152        is_video_id_only         = vars   (list_creator)["video_id_only"]153        is_reverse_chronological = vars   (list_creator)['reverse_chronological']154        driver_name              = getattr(list_creator, 'driver')155        log_test_info(f'TESTING list_creator.video_id_only={is_video_id_only}, list_creator.reverse_chronological={is_reverse_chronological} for {driver_name}driver', log_file)156        log_test_info(f'Full configuration: {repr(list_creator)}', log_file)157        is_id  = '_id'                                       if is_video_id_only         else  ''158        suffix = f'reverse_chronological_video{is_id}s_list' if is_reverse_chronological else f'chronological_video{is_id}s_list'159        use_partial_files(variation, test_file, suffix, log_file) # the file this function creates should be the SAME as the returned string to the file_name variable in the next line160        test_output_file = list_creator.create_list_for(test_url, log_silently=True)[1][1]161        # verify calling the create_list_for() method updates the partial file properly162        failed = compare_test_files_to_reference_files(full_file, test_output_file, log_file)163        if failed == 'Failed!':164            return 'Failed!'165    return 'Passed!'166def use_partial_files(types_of_partial_files, test_file, suffix, log_file):167    '''168    Removes all pre-existing files with the corresponding `suffix`169    for the file extensions in the `types_of_partial_files` tuple170    (`reverse_chronological_video[_id]s_list`171    or172    `chronological_video[_id]s_list`;173    the prefix in all cases is `CoreySchafer_`), then creates a174    partial file for the file extensions in the175    `types_of_partial_files` tuple176    using the `partial_CoreySchafer_{suffix}.{extension}`177    reference file(s).178    '''179    log_test_info(f'TESTING with pre-existing files containing the following extensions: {types_of_partial_files}....\n', log_file)180    delete_all_test_output_files_with_suffix(suffix)181    for extension in types_of_partial_files:182        create_partial_file(test_file, suffix, extension)183def delete_all_test_output_files_with_suffix(suffix):184    '''185    Deletes all files containing the corresponding suffix to ensure186    tests return valid results not skewed by any pre-existing files.187    The `delete_file(filepath, extension)` helper function deletes188    pre-existing files with the specified extension. The `suffix`189    used to create `filepath` includes190    `reverse_chronological_video[_id]s_list` and `chronological_video[_id]s_list`191    and the extensions include `txt`, `csv`, and `md`. The prefix in192    all cases is `CoreySchafer_`193    '''194    test_output_file = f'CoreySchafer_{suffix}'195    delete_file(test_output_file, 'txt')196    delete_file(test_output_file, 'csv')197    delete_file(test_output_file, 'md' )198def delete_file(filepath, extension):199    '''200    Removes the file {filepath}.{extension} if it exists.201    '''202    if os.path.exists(f'{filepath}.{extension}'):203        os.remove(f'{filepath}.{extension}')204def create_partial_file(test_file, suffix, extension):205    '''206    Creates a partial file using `{test_file}.{extension}`207    as the reference file and renames it to208    `CoreySchafer_{suffix}.{extension}`.209    '''210    shutil.copy(f'{test_file}.{extension}', f'CoreySchafer_{suffix}.{extension}')211def compare_test_files_to_reference_files(full_file, test_output_file, log_file):212    '''213    Ensures the resulting test output file `test_output_file`214    contains the exact same content as the reference `full_file` by215    comparing the sha256 hash of both files. If the hashes match,216    the tests continue, but if the hashes don't match, the217    program exits by raising the `ValueError` exception in the218    originating `run_tests_for(browsers_list)` function.219    NOTE: on a multi-threaded program, a `ValueError` exception220    (or any other exception) only terminates the thread on which221    the exception is raised, but any other threads will222    continue to execute until they finish. For this reason, the223    `ValueError` exception is raised in the224    `run_tests_for(browsers_list)` function instead of in225    this function, since this function is called from and run from226    a subthread, whereas `run_tests_for(browsers_list)` function227    is called from and run on the MainThread.228    Since the exception is raised in the MainThread, any other229    concurrently running threads will continue to execute until230    they finish, but no FURTHER work will be done after that.231    '''232    with open(f'{test_output_file}.txt', mode='r', encoding='utf-8') as test_txt, open(f'{test_output_file}.csv', mode='r', encoding='utf-8') as test_csv, open(f'{test_output_file}.md', mode='r', encoding='utf-8') as test_md:233        current_txt = hashlib.sha256(test_txt.read().encode('utf-8')).hexdigest()234        current_csv = hashlib.sha256(test_csv.read().encode('utf-8')).hexdigest()235        current_md  = hashlib.sha256(test_md.read().encode ('utf-8')).hexdigest()236    with open(f'{full_file}.txt', mode='r', encoding='utf-8') as full_txt, open(f'{full_file}.csv', mode='r', encoding='utf-8') as full_csv, open(f'{full_file}.md', mode='r', encoding='utf-8') as full_md:237        expected_txt = hashlib.sha256(full_txt.read().encode('utf-8')).hexdigest()238        expected_csv = hashlib.sha256(full_csv.read().encode('utf-8')).hexdigest()239        expected_md  = hashlib.sha256(full_md.read().encode ('utf-8')).hexdigest()240    failed = False241    if current_txt != expected_txt: log_test_info(f'â ERROR! The updated txt file does NOT match the {full_file}.txt file!',       log_file); failed = True242    else:                           log_test_info(f'â
 The updated txt file matches the {full_file}.txt file :)',                   log_file)243    if current_csv != expected_csv: log_test_info(f'â ERROR! The updated csv file does NOT match the {full_file}.csv file!',       log_file); failed = True244    else:                           log_test_info(f'â
 The updated csv file matches the {full_file}.csv file :)',                   log_file)245    if current_md  != expected_md:  log_test_info(f'â ERROR! The updated md  file does NOT match the {full_file}.md  file!\n\n\n', log_file); failed = True246    else:                           log_test_info(f'â
 The updated md  file matches the {full_file}.md  file :)\n\n\n',             log_file)247    if failed:248        log_test_info(f'âï¸âï¸ FAILED at {ISOFORMAT(NOW())}! âï¸âï¸', log_file)249        return 'Failed!'250    return 'Passed!'251def delete_all_test_output_files():252    suffixes = [253        'chronological_videos_list',254        'chronological_video_ids_list',255        'reverse_chronological_videos_list',256        'reverse_chronological_video_ids_list',257    ]258    for suffix in suffixes:...run_test_case.py
Source:run_test_case.py  
1import os2import json3import parse4import requests5import psycopg26import delegator7MODES_WORTH_PULLING = ['pull']8# get test_case_id & pull_mode in event.body9def handler(context, event):10    # load data from given json,11    run_information = json.loads(event.body)12    pull_mode = run_information.get('pull_mode')13    test_case_id = run_information.get('test_case_id')14    # get a connection to the postgresSql database15    conn = connect_to_db()16    # cur is the cursor of current connection17    cur = conn.cursor()18    # pull tester image19    run_command(context, f'docker pull localhost:5000/tester:latest-amd64')20    if _pull_mode_requires_pulling(pull_mode):21        _pull_images(context, cur, test_case_id)22    # gather logs to a variable23    logs = _run_next_test_case(context, test_case_id, cur)24    # update test_cases values25    _update_test_cases_logs(logs, test_case_id, cur)26    # get run_result from logs27    run_result = get_run_result_from_logs(logs)28    context.logger.info_with('Sending data to test_case_complete', test_case=test_case_id, test_case_result=run_result)29    # call test_case_complete30    # call_function('test_case_complete', json.dumps(31    # {32    #     'test_case': test_case_id,33    #     'test_case_result': 'success' if run_result == 0 else 'failure',34    # }))35# gets current cursor and command, alerts if returned 0 values36def get_cursors_one_result(cur, cmd):37    returned_value = cur.fetchone()38    if returned_value is None:39        return Exception(cmd)40    return returned_value[0]41# get run result, ignore empty lines42def get_run_result_from_logs(logs):43    # separate to lines44    run_result = logs.split('/n')45    # filter all empty lines46    run_result = list(filter(lambda x: x, run_result))47    # return last line48    return run_result[-1]49# connect to db, return psycopg2 connection50def connect_to_db():51    # get postgres connection information from container's environment vars52    postgres_info = parse_env_var_info(os.environ.get('PGINFO'))53    # raise NameError if env var not found, or found in wrong format54    if postgres_info is None:55        raise ValueError('Local variable PGINFO in proper format (user:password@host:port or user:password@host)'56                         ' could not be found')57    postgres_user, postgres_password, postgres_host, postgres_port = postgres_info58    # connect to postgres database, set autocommit to True to avoid committing59    conn = psycopg2.connect(host=postgres_host, user=postgres_user, password=postgres_password, port=postgres_port)60    conn.autocommit = True61    return conn62# calls given function with given arguments, returns body of response63def call_function(function_name, function_arguments=None):64    functions_ports = {65        'database_init': 36543,66        'github_status_updater': 36544,67        'slack_notifier': 36545,68        'build_and_push_artifacts': 36546,69        'run_test_case': 36547,70        'test_case_complete': 3654871    }72    # if given_host is specified post it instead of73    given_host = os.environ.get('DOCKER_HOST', '172.17.0.1')74    response = requests.post('http://{0}:{1}'.format(given_host, functions_ports[function_name]),75                             data=function_arguments)76    return response.text77# gets string to process, return None if format is wrong, list of info if format is well writen -78# return-list : username, password, host, port79def parse_env_var_info(formatted_string):80    if formatted_string is not None:81        # check if default formatted string given82        if parse.parse('{}:{}@{}:{}', formatted_string) is not None:83            return list(parse.parse('{}:{}@{}:{}', formatted_string))84        # if not, try get same format without the port specification85        if parse.parse('{}:{}@{}', formatted_string) is not None:86            return list(parse.parse('{}:{}@{}', formatted_string)) + [5432]87    return None88# get env in map format {"key1":"value1"}89def run_command(context, cmd, cwd=None, env=None):90    context.logger.info_with('Running command', cmd=cmd, cwd=cwd, env=env)91    os_environ_copy = os.environ.copy()92    if env is not None:93        for key in env:94            del os_environ_copy[key]95        env.update(os_environ_copy)96    else:97        env = os_environ_copy98    if cwd is not None:99        cmd = f'cd {cwd} && {cmd}'100    proc = delegator.run(cmd, env=env)101    # if we got here, the process completed102    if proc.return_code != 0:103        raise ValueError(f'Command failed. cmd({cmd}) result({proc.return_code}), log({proc.out})')104    # log result105    context.logger.info_with('Command executed successfully', Command=cmd, Exit_code=proc.return_code, Stdout=proc.out)106    return proc.out107def get_artifact_test_from_test_case(cur, test_case_id):108    cur.execute('select artifact_test from test_cases where oid = %s', (test_case_id,))109    return get_cursors_one_result(cur, f'select artifact_test from test_cases where oid = {test_case_id}')110def _pull_mode_requires_pulling(pull_mode):111    return pull_mode in MODES_WORTH_PULLING112def _run_next_test_case(context, test_case_id, cur):113    artifact_test = get_artifact_test_from_test_case(cur, test_case_id)114    return(run_command(context, f'docker run --rm --volume /var/run/docker.sock:/var/run/docker.sock '115                                f'--volume /tmp:/tmp --workdir /go/src/github.com/nuclio/nuclio --env '116                                f'NUCLIO_TEST_HOST=172.17.0.1 localhost:5000/tester:latest-amd64'117                                f' /bin/bash -c "make test-undockerized '118                                f'NUCLIO_TEST_NAME=github.com/nuclio/nuclio/{artifact_test}" && echo $?'))119# pull images of given test case,120def _pull_images(context, cur, test_case_id):121    # get job's artifact-urls122    cur.execute('select job from test_cases where oid = %s', (test_case_id,))123    jobs_id = get_cursors_one_result(cur, f'select job from test_cases where oid = {test_case_id}')124    # get all artifact url's from job's id125    cur.execute('select artifact_urls from jobs where oid = %s', (jobs_id,))126    artifact_urls = json.loads(get_cursors_one_result(cur, f'select artifact_urls from jobs where oid = {jobs_id}'))127    for url in artifact_urls:128        run_command(context, f'docker pull {url}')129def _update_test_cases_logs(logs, test_case_id, cur):...test_case_time_log.py
Source:test_case_time_log.py  
...26        root = logging.getLogger('')27        if self.log_hndl:28            # Remove it if we already have one29            root.removeHandler(self.log_hndl)30    def test_case_complete(self, result):31        self.log_file.write(json.dumps(result))32        self.log_file.write("\n")33        self._reset_logging()34    def report(self):35        self.log_file.write("RUN COMPLETE\n")36        self.log_file.close()37        return True38# Hooks for plugin system39def add_command_line_options(parser):40    parser.add_option(41        "--test-case-results",42        action="store",43        dest="test_case_json_results",44        type="string",...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
