Best Python code snippet using localstack_python
r53_record_cli.py
Source:r53_record_cli.py  
...165                    sys.exit(1)166                else:167                    print(e)168                    sys.exit(1)169    def get_updated_records(self):170        updated_records = []171        for record in self.original_records:172            if record.updated_data:173                updated_records.append(record)174        return updated_records175    def dump_changeset(self, filename):176        updated_records = self.get_updated_records()177        updated_record_list = [x.get_updated_record() for x in updated_records]178        with open(f"changesets/{filename}.json", "w") as f:179            f.write(json.dumps(updated_record_list, indent=4))180        orig_record_list = [x.get_original_record() for x in updated_records]181        with open(f"changesets/{filename}_orig.json", "w") as f:182            f.write(json.dumps(orig_record_list, indent=4))183    def filter_records(self, field=None, filter_string=None):184        if field:185            self.filtered_records = []186            if field == 'All':187                self.filtered_records = self.original_records188            else:189                with display.status("Filtering {0} Records".format(field), spinner="dots"):190                    for record in self.original_records:191                        if record.__dict__.get(field, None):192                            self.filtered_records.append(record)193        if filter_string:194            if filter_string.startswith(":"):195                index_filter = int(filter_string.split(":")[1]) # select the index based on the special ':<int>' format196                try:197                    self.filtered_records = [self.filtered_records[index_filter]]198                except IndexError:199                    pass200            else:201                filtered_list = []202                for record in self.filtered_records:203                    if filter_string in record.Name:204                        filtered_list.append(record)205                if not filtered_list: # If the resultant list is empty, dont return it. Instead return the last populated list206                    pass207                else:208                    self.filtered_records = filtered_list209    def write_records(self):210            changes = []211            originals = []212            to_update = []213            updated_records = self.get_updated_records()214            for record in updated_records:215                change_record = {"Action":"UPSERT", "ResourceRecordSet": record.get_updated_record()}216                to_update.append(change_record)217                originals.append(record.get_original_record())218                changes.append(record.get_updated_record())219            try:220                resp = self.client.change_resource_record_sets(HostedZoneId=HOSTED_ZONE_ID, ChangeBatch = {'Comment':'MTA Update','Changes': to_update})221                status = resp['ResponseMetadata']['HTTPStatusCode']222                if status == 200:223                    logger.info("update: {0}\norig: {1}".format(changes, originals))224                    for record in updated_records:225                        record.reset()226                    return "\n[green] Records Updated!"227                    recordset.refresh_records()228                    input("Press 'Enter' to continue...")229                else:230                    return "\n[red] Error updating records! \n {0}".format(resp)231                    input("Press 'Enter' to continue...")232                    233            except Exception as e:234                return "\n[red]Error updating records: \n {0}".format(e)235                input("Press 'Enter' to continue...")236def load_changeset_from_file(recordset):237    changeset_file_list = os.listdir('changesets')238    table = Table(title="Changeset Files")239    table.add_column("Index")240    table.add_column("Filename")241    count = 0242    for filename in changeset_file_list:243        table.add_row(str(count), filename)244        count += 1245    display.update_screen(table) 246    file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")247    while True:248        if file_choice != 'q':249            try:250                if int(file_choice) not in range(0, len(changeset_file_list)):251                    display.end_screen()252                    display.update_screen(table)253                    rprint("[yellow]Choose a valid file index")254                    file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")255                else:256                    selected_file = changeset_file_list[int(file_choice)]257                    display.end_screen()258                    load_records(recordset, selected_file)259                    break260            except ValueError:261                    display.end_screen()262                    display.update_screen(table)263                    rprint("[yellow]Choose a valid file index")264                    file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")265        else:266            display.end_screen()267            break268    display.end_screen()269def match_original_record(recordset, change_record):270    for record in recordset.original_records:271        if change_record["Name"] == record.Name and change_record["ResourceRecords"] == record.ResourceRecords and change_record["Type"] == record.Type:272            record.updated_data = change_record273def load_records(recordset, filename):274    with open("changesets/{0}".format(filename), 'r') as f:275        data = json.loads(f.read())276    for change_record in data:277        match_original_record(recordset, change_record)278def edit_weight_records_by_filter(recordset):279    original_filtered_set = recordset.filtered_records # this is our fallback280    weighted_record_table = display.create_table(recordset.filtered_records, "weighted")281    display.update_screen(weighted_record_table)282    while True:283        search_filter = Prompt.ask("Filter ('..' to reset filter, 'Enter' to use current selection, ':<index>' for record #, ':q' to exit)")284        if search_filter == "..": # if user enters '..' return full list of weighted records again 285            recordset.filtered_records = original_filtered_set286            weighted_record_table = display.create_table(recordset.filtered_records, "weighted")287            display.update_screen(weighted_record_table)288        elif search_filter == ":q":289            display.end_screen()290            break291        elif search_filter == "": # The user has filtered choice to records shown on screen and presses enter with no other chars292            selection_table = display.create_table(recordset.filtered_records, "weighted", bgcolors=[136, 132])293            valid_weight = False # wait for valid weight value for records to be entered 294            error = 0295            while not valid_weight:296                display.update_screen(selection_table)297                if error:298                    rprint("[yellow]Enter a valid record weight between 0-255")299                weight_setting = IntPrompt.ask("Enter weight to set selected records to (0-255)")300                if weight_setting in range(0,256):301                    valid_weight = True302                else:303                    error = 1304            for record in recordset.filtered_records:305                record.update("Weight", weight_setting)306            update_table = display.create_table(recordset.filtered_records, 'weighted', updated_records=True, bgcolors=[66, 62]) # Create update table with new weights shown307            render_group = RenderGroup(selection_table, update_table) # group original selection table and new updated table so they can be displayed at the same time on screen308            display.update_screen(render_group)309            # Confirm that changes should be staged310            if Confirm.ask("Stage changes"):311                display.end_screen()312                break313            else:314                for record in recordset.filtered_records:315                    record.reset()316                rprint("Update Cancelled")317                time.sleep(2)318                display.end_screen()319                break320        else:321            recordset.filter_records(filter_string=search_filter)322            filtered_table = display.create_table(recordset.filtered_records, "weighted")323            display.update_screen(filtered_table)324    display.end_screen()325def get_staged_changes_view(recordset):326    updated_records = recordset.get_updated_records()327    if updated_records:328        orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])329        update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])330        display.split_display(orig_table, update_table)331        input("Press 'Enter' to continue...")332        display.end_screen()333    else:334        display.update_screen("\n[yellow]No Changes Staged")335        input("Press 'Enter' to continue...")336        display.end_screen()337def edit_staged_changes(recordset):338    updated_records = recordset.get_updated_records()339    if updated_records:340        orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])341        update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])342        display.split_display(orig_table, update_table)343        while True:344            if len(updated_records) == 0:345                input("Press 'Enter' to continue...")346                break347            else:348                delete_choice = Prompt.ask("Select an index to delete from staged changes. 'q' to exit")349            if delete_choice != "q":350                try:351                    delete_choice = int(delete_choice)352                    if delete_choice not in range(0, len(updated_records)):353                        display.end_screen()354                        display.split_display(orig_table, update_table)355                        rprint("[yellow]Enter a valid index or 'q' to exit")356                    else:357                        updated_records[delete_choice].reset()358                        updated_records = recordset.get_updated_records()359                        orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])360                        update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])361                        display.end_screen()362                        display.split_display(orig_table, update_table)363                except ValueError:364                    display.end_screen()365                    display.split_display(orig_table, update_table)366                    rprint("[yellow]Enter a valid index or 'q' to exit")367            else:368                display.end_screen()369                break370        display.end_screen()371    else:372        display.update_screen("\n[yellow]No Changes Staged")373        input("Press 'Enter' to continue...")374        display.end_screen()375# Update AWS records to match locally cached record changes376def update_records(recordset):377    updated_records = recordset.get_updated_records()378    if updated_records:379        orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])380        update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])381    382        display.split_display(orig_table, update_table)383        if Confirm.ask("[green]Are you sure you want to apply these changes?"):384            resp = recordset.write_records()385            display.update_screen(resp)386            input("Press 'Enter' to continue...")387            display.end_screen()388        else:389            display.update_screen("\n[yellow]Cancelling Apply")390            time.sleep(2)391            display.end_screen()392    else:393        display.update_screen("\n[yellow]No records staged for updating")394        input("Press 'Enter' to continue...")395        display.end_screen()396def dump_changesets(recordset):397    updated_records = recordset.get_updated_records()398    if updated_records:399        changeset_filename = Prompt.ask("\n[blue]Enter a filename to write changesets to (will create new and original records in two files)")400        recordset.dump_changeset(changeset_filename)401    else:402        display.update_screen("\n[yellow]No records staged. Stage some changes and then try again")403        input("Press 'Enter' to continue...")404        display.end_screen()405# Create main menu table406def get_menu_table():407    table = Table(title="Main Menu")408    table.add_column("Selection")409    table.add_column("Option Name")410    table.add_row("1",  "List All Records")411    table.add_row("2",  "List All Weighted Records")412    table.add_row("3",  "List All Latency Records")413    table.add_row("4",  "Update Weighted Records")414    table.add_row("5",  "Load Changeset From File")415    table.add_row("7",  "View Staged Changes")416    table.add_row("8",  "Edit Staged Changes")417    table.add_row("9",  "Commit Changes")418    table.add_row("10",  "Dump Changesets")419    table.add_row("99", "Refresh Record Cache")420    table.add_row("0",  "Quit")421    return table422def refresh_record_cache(recordset):423    recordset.refresh_records(init=False)424    display.clear()425    display.update_screen("[green]Records refreshed from AWS")426    time.sleep(2)427    display.end_screen()428def confirm_quit(recordset):429    updated_records = recordset.get_updated_records()430    if len(updated_records) > 0:431        if Confirm.ask("[yellow]There are staged changes pending... Are you sure you want to quit before applying changes?"):432            sys.exit(0)433    else:434        sys.exit(0)435def unattended_apply(recordset, filename):436    load_records(recordset, filename)437    recordset.write_records()438    sys.exit(0)439def main():440    recordset = RecordSet()441    quit = False442    while quit != True:443        main_menu = get_menu_table()...data-checkpoint.py
Source:data-checkpoint.py  
...188""" 189The Update method is a bit stretch :(190Maybe in the future we will only get and put data for the latest year and ignore past updates (which are definitely sketchy)191"""192def get_updated_records(label):193    if label == 'unfcc':194        #old_df = pd.read_csv('old_'+UNFCC_FILE)195        df = pd.read_csv(UNFCC_FILE)196    else:197        #old_df = pd.read_csv('old_'+EBAL_FILE)198        df = pd.read_csv(EBAL_FILE)199        df = decoding_codes(df)200        coco_dict = {}201        for i in df["REF_AREA"].unique():202            #     if i not in coco_dict:203            coco_dict[i] = coco.convert(i, to='iso3')204            coco_dict["France-Monaco"] = coco.convert("France", to='iso3')205            coco_dict["Italy-San Marino"] = coco.convert("Italy", to='iso3')206            coco_dict["Switzerland-Liechtenstein"] = coco.convert("Switzerland", to='iso3')207        df["REF_AREA"] = [coco_dict[i] for i in df["REF_AREA"]]208    #update_df = new_df[~new_df.apply(tuple,1).isin(old_df.apply(tuple,1))]209    #update_df = pd.concat([old_df,new_df]).drop_duplicates(keep=False)210    update_database(df,label)211t1 = Timeloop()212@t1.job(interval=timedelta(seconds=60))213def _worker():214    """215    Main work loop repeats every hour216    """217    try:218        logger.info('Looping...')219        temp_list = []220        for file in ['data_unfcc.csv','data_ebal.csv']:221            temp_list.append(os.path.isfile(file))222        if not all(temp_list):223            print('Starting from scratch...')224            download_source()225            create_database()226            create_index()227        time_mod = datetime.strptime(time.ctime(os.stat('data_ebal.csv').st_mtime),'%a %b %d %H:%M:%S %Y')228        time_now = datetime.now()229        if (time_now - time_mod).seconds > 3600:230            download_source()231            get_updated_records('unfcc')232            get_updated_records('ebal')233            create_index()234    except Exception as e:235        logger.warning('Main Loop error')236if __name__ == "__main__":237#    t1.start(block=True)238    create_database()239#    get_updated_records('unfcc')...scd_driver_code.py
Source:scd_driver_code.py  
...47	print("First time run end...")48	print("Data saved to Snapshot...")49else:50	print("Subsequent run started...")51	updated_rows_df = get_updated_records(history, current, key_list, rec_eff_dt, ignored_columns)52	deleted_rows_df = get_deleted_rows(history, current, key_list, rec_eff_dt)53	new_rows_df = get_new_records(history, current, key_list, rec_eff_dt)54	unchanged_rows_df = get_no_change_records(history, current, key_list, ignored_columns)55	history_df = updated_rows_df.unionByName(deleted_rows_df).unionByName(new_rows_df)56	history_df = history_df.select(history.columns)57	print("Writing to History...")58	history_df.write.mode("overwrite").insertInto("scd_hive_db.employee_history", overwrite=True)59	print("Data saved to History...")60	unchanged_rows_df = unchanged_rows_df.select(history.columns)61	snapshot_df = history_df.filter("row_opern != 'D'").unionByName(unchanged_rows_df)62	print("Writing to Snapshot...")63	snapshot_df.write.mode("overwrite").saveAsTable("scd_hive_db.cdc_employee_temp")64	new_rows_df_temp_table = spark.sql(''' select * from scd_hive_db.cdc_employee_temp ''')65	new_rows_df_temp_table.write.mode("overwrite").insertInto("scd_hive_db.employee", overwrite=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
