Best Python code snippet using autotest_python
ts_2019_ncov_dataprep_us.py
Source:ts_2019_ncov_dataprep_us.py  
...11loader = Loader(client=client)12extractor = Extractor(client=client)13geo='us'14#### Functions ####15def print_job_result(job, client, max_results=20):16    job.result()17    bq_table = client.get_table(job.destination)18    df = client.list_rows(bq_table, max_results=max_results).to_dataframe()19    print(bq_table.full_table_id)20    print(df.head(max_results))21#### Load Data ####22confirmed = pd.read_csv('https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_US.csv')23deaths = pd.read_csv('https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_US.csv')24dt_cols = [datetime.datetime.strptime(dt, '%m/%d/%y').strftime('dt_%Y%m%d') for dt in list(confirmed.columns)[11:]]25# confirmed26confirmed_cols = ['UID', 'iso2', 'iso3', 'code3', 'FIPS', 'county', 'province_state', 'country_region', 'latitude', 'longitude', 'combined_key'] + dt_cols27confirmed.columns = confirmed_cols28confirmed_schema = [29    ('UID', 'INT64'),30    ('iso2', 'STRING'),31    ('iso3', 'STRING'),32    ('code3', 'INT64'),33    ('FIPS', 'INT64'),34    ('county', 'STRING'),35    ('province_state', 'STRING'),36    ('country_region', 'STRING'),37    ('latitude', 'FLOAT64'),38    ('longitude', 'FLOAT64'),39    ('combined_key', 'STRING'),40] + [(dt_col, 'INT64') for dt_col in dt_cols]41loader.load_df(confirmed, 'stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_confirmed_raw'.format(geo=geo), schema=confirmed_schema)42# deaths43deaths_cols = confirmed_cols44deaths_cols.insert(11, 'population')45deaths.columns = deaths_cols46deaths_schema = confirmed_schema47deaths_schema.insert(11, ('population', 'INT64'))48loader.load_df(deaths, 'stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_deaths_raw'.format(geo=geo), schema=deaths_schema)49for job in loader.job_history:50    print_job_result(job, client)51#### Dataprep ####52# restructure raw data53for metric in ['confirmed', 'deaths']:54    array_query = ''55    for dt_col in dt_cols:56        array_query = \57        """{array_query}58        STRUCT(PARSE_DATE('dt_%Y%m%d', '{dt_col}') AS dt, {dt_col} AS total_{metric}),59        """.format(array_query=array_query, dt_col=dt_col, metric=metric)60    array_query = re.sub('[,\n ]*$', '\n', array_query)61    62    query = \63    """64    SELECT dt, a.* EXCEPT(arr), total_{metric}65    FROM (66        SELECT67            county, province_state, country_region, combined_key, latitude, longitude,68            [{array_query}] AS arr69        FROM `stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_{metric}_raw`70    ) a, UNNEST(arr)71    """.format(metric=metric, array_query=array_query, geo=geo)72    73    query_job = qr.run_query(query, destination_table='stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_{metric}'.format(geo=geo, metric=metric), time_partitioning=True, partition_field='dt')74for job in qr.job_history:75    print_job_result(job, client)76# agg data77ts_2019_ncov_query = \78"""79SELECT80    dt, county, province_state, country_region, combined_key, latitude, longitude,81    population,82    total_confirmed, total_deaths, daily_new_confirmed, daily_new_deaths,83    ROUND(AVG(daily_new_confirmed) OVER(PARTITION BY combined_key ORDER BY dt ROWS BETWEEN 6 PRECEDING AND CURRENT ROW), 1) AS daily_new_confirmed_7d_ma,84    ROUND(AVG(daily_new_deaths) OVER(PARTITION BY combined_key ORDER BY dt ROWS BETWEEN 6 PRECEDING AND CURRENT ROW), 1) AS daily_new_deaths_7d_ma,85    ROUND(AVG(daily_new_confirmed) OVER(PARTITION BY combined_key ORDER BY dt ROWS BETWEEN 27 PRECEDING AND CURRENT ROW), 1) AS daily_new_confirmed_28d_ma,86    ROUND(AVG(daily_new_deaths) OVER(PARTITION BY combined_key ORDER BY dt ROWS BETWEEN 27 PRECEDING AND CURRENT ROW), 1) AS daily_new_deaths_28d_ma,87    IF(population = 0, NULL, ROUND(total_confirmed/population, 4)) AS incident_rate,88    IF(total_confirmed = 0, NULL, ROUND(total_deaths/total_confirmed, 4)) AS case_fatality_rate,89    MAX(dt) OVER() AS last_updated_dt,90    TIMESTAMP(REGEXP_REPLACE(STRING(CURRENT_TIMESTAMP, "America/Los_Angeles"), r'[\+-][0-9]{{2}}$', '')) AS last_updated_ts -- need the double bracket to avoid error with str.format91FROM (92    SELECT93        * EXCEPT(daily_new_confirmed, daily_new_deaths),94        IFNULL(daily_new_confirmed, total_confirmed) AS daily_new_confirmed,95        IFNULL(daily_new_deaths, total_deaths) AS daily_new_deaths96    FROM (97        SELECT98            *,99            total_confirmed - LAG(total_confirmed) OVER(PARTITION BY combined_key ORDER BY dt) AS daily_new_confirmed,100            total_deaths - LAG(total_deaths) OVER(PARTITION BY combined_key ORDER BY dt) AS daily_new_deaths101        FROM (102            SELECT103                a.*, b.population104            FROM (105                SELECT106                    COALESCE(a.dt, b.dt) AS dt,107                    COALESCE(a.county, b.county) AS county,108                    COALESCE(a.province_state, b.province_state) AS province_state,109                    COALESCE(a.country_region, b.country_region) AS country_region,110                    COALESCE(a.combined_key, b.combined_key) AS combined_key,111                    COALESCE(a.latitude, b.latitude) AS latitude,112                    COALESCE(a.longitude, b.longitude) AS longitude,113                    IFNULL(a.total_confirmed, 0) AS total_confirmed,114                    IFNULL(b.total_deaths, 0) AS total_deaths115                FROM `stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_confirmed` a116                FULL JOIN `stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_deaths` b117                ON a.dt = b.dt AND a.combined_key = b.combined_key118            ) a119            LEFT JOIN `stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_deaths_raw` b120            ON a.combined_key = b.combined_key121        )122    )123)124""".format(geo=geo)125query_job = qr.run_query(ts_2019_ncov_query, destination_table='stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}'.format(geo=geo), time_partitioning=True, partition_field='dt')126print_job_result(query_job, client)127# US current table128us_cur_query = \129"""130SELECT *131FROM `stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}`132WHERE dt = last_updated_dt133""".format(geo=geo)134query_job = qr.run_query(us_cur_query, destination_table='stanleysfang.surveillance_2019_ncov.ts_2019_ncov_{geo}_cur'.format(geo=geo))135print_job_result(query_job, client)136#### Extract Table ####137extract_job = extractor.extract(query_job.destination, 'gs://surveillance_2019_ncov/ts_2019_ncov_{geo}_cur.csv'.format(geo=geo))...master.py
Source:master.py  
...46    print(f'\nWorker {worker}: {data}')47    client.sendall(bytes("EXECUTE_JOB", "utf-8"))48    client.sendall(bytes(job, "utf-8"))49    result = client.recv(1024).decode('utf-8')50    print_job_result(result, job)51    job_execution.pop(job)52    workers[worker] = 'ACTIVE'53def check_job_execution():54    print('---------------RESULT---------------')55    if not job_execution:56        print()57        print('There is no job can be execute')58    else:59        print('\n')60        for key in job_execution:61            print('Job '+key+' got executed in Worker Node : '+job_execution[key])62    print('------------------------------------')63def check_worker_availability():64    for worker, status in workers.items():65        try:66            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)67            client.connect((worker, PORT))68            # Print Worker 'IP': Connection accepted69            data = client.recv(1024).decode('utf-8')70            print(f'\nWorker {worker}: {data}')71            # Print Worker 'IP': 'Status'72            client.send(bytes('CHECK_AVAILABILITY','utf-8'))73            data = client.recv(1024).decode('utf-8')74            workers[worker] = data75            print(f'Worker {worker}: {data}')76        except(OSError):77            print(f"Worker {worker}: DEAD")78            workers[worker] = 'DEAD'79            continue80def check_job():81    for worker, status_job in workers.items():82        try:83            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)84            client.connect((worker, PORT))85            # Print Worker 'IP': Connection accepted86            data = client.recv(1024).decode('utf-8')87            print(f'\nWorker {worker}: {data}')88            # Print Worker 'IP': 'Status'89            client.send(bytes('CHECK_STATUS_JOB','utf-8'))90            data = client.recv(1024).decode('utf-8')91            workers[worker] = data92            print(f'Worker {worker}, Status Job= : {data}')93        except(OSError):94            print(f"Worker {worker}: DEAD")95            workers[worker] = 'DEAD'96            continue97def print_list_of_actions():98    states = "Action"99    action = input('\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}'.format(100        '-------------------- WAITING FOR NEW ACTIONS --------------------',101        '*** List of actions: ***',102        "[1] Execute jobs on server",103        "[2] Check worker node availability",104        "[3] Check worker status job",105        "[4] Check execution place",106        "[0] Exit",107        '************************',108        "Input Action(just number): "))109    return int(action)110def print_list_of_jobs():111    states = "Job"112    # print('\n-------------------- Waiting for new jobs --------------------')113    job = input('\n{}\n{}\n{}\n{}\n{}\n{}'.format(114        "*** List of jobs: ***",115        "[1] List sorting. \nHOW TO USE: sort [list]",116        "[2] Book search. \nHOW TO USE: book_search \"Harry_potter\". \nNote: book title in quotes and separated with underscores (_)",117        "[3] Generate n-th fibonnaci number. \nHOW TO USE: fibonacci n",118        '*********************',119        "Input Job(pay attention to 'HOW TO USE'): "120    ))121    return job122def print_job_result(result, job):123    print('\n\n{}\n{}\n{}\n{}\n{}\n\n'.format(124        '-------------------------------------------- RESULT --------------------------------------------',125        f'RESULT FROM PREVIOUS "{job}" JOB',126        f'{result}',127        '***  To continue using the program, follow the instructions before this result section  ***',128        '------------------------------------------ END RESULT ------------------------------------------'))129    print(states)130if __name__ == '__main__':...example_job.py
Source:example_job.py  
...38        yield {'index': message.get('index', 0) * 10 + i,39               'job_arg1': job_arg1,40               'job_arg2': job_arg2}41@pipe_processor42def print_job_result(message, logger, job_arg1=None, job_arg2=None):43    logger.info('Job with parameters (%s, %s) result: %s',44                job_arg1, job_arg2, message.index)45job1 = Job('job1', job_arg1=context, job_arg2=context)46job2 = Job('job2', job_arg1=context)47job1_pipeline = job1(Message.log('Job 1 start message') >>48                     multiplicate >> multiplicate >> multiplicate >> multiplicate49                     ) >> print_job_result50job2_pipeline = job2(Message.log('Job 2 start message') >>51                     multiplicate >> multiplicate >> multiplicate >> multiplicate52                     ) >> print_job_result53program = Program(name='celery_example',54                  pipelines={55                      'start': Event.on_start >> Message.log('Program started'),56                      'job1': job1_pipeline,...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
