How to use get_updated_records method in localstack

Best Python code snippet using localstack_python

r53_record_cli.py

Source:r53_record_cli.py Github

copy

Full Screen

...165 sys.exit(1)166 else:167 print(e)168 sys.exit(1)169 def get_updated_records(self):170 updated_records = []171 for record in self.original_records:172 if record.updated_data:173 updated_records.append(record)174 return updated_records175 def dump_changeset(self, filename):176 updated_records = self.get_updated_records()177 updated_record_list = [x.get_updated_record() for x in updated_records]178 with open(f"changesets/{filename}.json", "w") as f:179 f.write(json.dumps(updated_record_list, indent=4))180 orig_record_list = [x.get_original_record() for x in updated_records]181 with open(f"changesets/{filename}_orig.json", "w") as f:182 f.write(json.dumps(orig_record_list, indent=4))183 def filter_records(self, field=None, filter_string=None):184 if field:185 self.filtered_records = []186 if field == 'All':187 self.filtered_records = self.original_records188 else:189 with display.status("Filtering {0} Records".format(field), spinner="dots"):190 for record in self.original_records:191 if record.__dict__.get(field, None):192 self.filtered_records.append(record)193 if filter_string:194 if filter_string.startswith(":"):195 index_filter = int(filter_string.split(":")[1]) # select the index based on the special ':<int>' format196 try:197 self.filtered_records = [self.filtered_records[index_filter]]198 except IndexError:199 pass200 else:201 filtered_list = []202 for record in self.filtered_records:203 if filter_string in record.Name:204 filtered_list.append(record)205 if not filtered_list: # If the resultant list is empty, dont return it. Instead return the last populated list206 pass207 else:208 self.filtered_records = filtered_list209 def write_records(self):210 changes = []211 originals = []212 to_update = []213 updated_records = self.get_updated_records()214 for record in updated_records:215 change_record = {"Action":"UPSERT", "ResourceRecordSet": record.get_updated_record()}216 to_update.append(change_record)217 originals.append(record.get_original_record())218 changes.append(record.get_updated_record())219 try:220 resp = self.client.change_resource_record_sets(HostedZoneId=HOSTED_ZONE_ID, ChangeBatch = {'Comment':'MTA Update','Changes': to_update})221 status = resp['ResponseMetadata']['HTTPStatusCode']222 if status == 200:223 logger.info("update: {0}\norig: {1}".format(changes, originals))224 for record in updated_records:225 record.reset()226 return "\n[green] Records Updated!"227 recordset.refresh_records()228 input("Press 'Enter' to continue...")229 else:230 return "\n[red] Error updating records! \n {0}".format(resp)231 input("Press 'Enter' to continue...")232 233 except Exception as e:234 return "\n[red]Error updating records: \n {0}".format(e)235 input("Press 'Enter' to continue...")236def load_changeset_from_file(recordset):237 changeset_file_list = os.listdir('changesets')238 table = Table(title="Changeset Files")239 table.add_column("Index")240 table.add_column("Filename")241 count = 0242 for filename in changeset_file_list:243 table.add_row(str(count), filename)244 count += 1245 display.update_screen(table) 246 file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")247 while True:248 if file_choice != 'q':249 try:250 if int(file_choice) not in range(0, len(changeset_file_list)):251 display.end_screen()252 display.update_screen(table)253 rprint("[yellow]Choose a valid file index")254 file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")255 else:256 selected_file = changeset_file_list[int(file_choice)]257 display.end_screen()258 load_records(recordset, selected_file)259 break260 except ValueError:261 display.end_screen()262 display.update_screen(table)263 rprint("[yellow]Choose a valid file index")264 file_choice = Prompt.ask("Choose a file. 'q' to quit to main menu")265 else:266 display.end_screen()267 break268 display.end_screen()269def match_original_record(recordset, change_record):270 for record in recordset.original_records:271 if change_record["Name"] == record.Name and change_record["ResourceRecords"] == record.ResourceRecords and change_record["Type"] == record.Type:272 record.updated_data = change_record273def load_records(recordset, filename):274 with open("changesets/{0}".format(filename), 'r') as f:275 data = json.loads(f.read())276 for change_record in data:277 match_original_record(recordset, change_record)278def edit_weight_records_by_filter(recordset):279 original_filtered_set = recordset.filtered_records # this is our fallback280 weighted_record_table = display.create_table(recordset.filtered_records, "weighted")281 display.update_screen(weighted_record_table)282 while True:283 search_filter = Prompt.ask("Filter ('..' to reset filter, 'Enter' to use current selection, ':<index>' for record #, ':q' to exit)")284 if search_filter == "..": # if user enters '..' return full list of weighted records again 285 recordset.filtered_records = original_filtered_set286 weighted_record_table = display.create_table(recordset.filtered_records, "weighted")287 display.update_screen(weighted_record_table)288 elif search_filter == ":q":289 display.end_screen()290 break291 elif search_filter == "": # The user has filtered choice to records shown on screen and presses enter with no other chars292 selection_table = display.create_table(recordset.filtered_records, "weighted", bgcolors=[136, 132])293 valid_weight = False # wait for valid weight value for records to be entered 294 error = 0295 while not valid_weight:296 display.update_screen(selection_table)297 if error:298 rprint("[yellow]Enter a valid record weight between 0-255")299 weight_setting = IntPrompt.ask("Enter weight to set selected records to (0-255)")300 if weight_setting in range(0,256):301 valid_weight = True302 else:303 error = 1304 for record in recordset.filtered_records:305 record.update("Weight", weight_setting)306 update_table = display.create_table(recordset.filtered_records, 'weighted', updated_records=True, bgcolors=[66, 62]) # Create update table with new weights shown307 render_group = RenderGroup(selection_table, update_table) # group original selection table and new updated table so they can be displayed at the same time on screen308 display.update_screen(render_group)309 # Confirm that changes should be staged310 if Confirm.ask("Stage changes"):311 display.end_screen()312 break313 else:314 for record in recordset.filtered_records:315 record.reset()316 rprint("Update Cancelled")317 time.sleep(2)318 display.end_screen()319 break320 else:321 recordset.filter_records(filter_string=search_filter)322 filtered_table = display.create_table(recordset.filtered_records, "weighted")323 display.update_screen(filtered_table)324 display.end_screen()325def get_staged_changes_view(recordset):326 updated_records = recordset.get_updated_records()327 if updated_records:328 orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])329 update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])330 display.split_display(orig_table, update_table)331 input("Press 'Enter' to continue...")332 display.end_screen()333 else:334 display.update_screen("\n[yellow]No Changes Staged")335 input("Press 'Enter' to continue...")336 display.end_screen()337def edit_staged_changes(recordset):338 updated_records = recordset.get_updated_records()339 if updated_records:340 orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])341 update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])342 display.split_display(orig_table, update_table)343 while True:344 if len(updated_records) == 0:345 input("Press 'Enter' to continue...")346 break347 else:348 delete_choice = Prompt.ask("Select an index to delete from staged changes. 'q' to exit")349 if delete_choice != "q":350 try:351 delete_choice = int(delete_choice)352 if delete_choice not in range(0, len(updated_records)):353 display.end_screen()354 display.split_display(orig_table, update_table)355 rprint("[yellow]Enter a valid index or 'q' to exit")356 else:357 updated_records[delete_choice].reset()358 updated_records = recordset.get_updated_records()359 orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])360 update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])361 display.end_screen()362 display.split_display(orig_table, update_table)363 except ValueError:364 display.end_screen()365 display.split_display(orig_table, update_table)366 rprint("[yellow]Enter a valid index or 'q' to exit")367 else:368 display.end_screen()369 break370 display.end_screen()371 else:372 display.update_screen("\n[yellow]No Changes Staged")373 input("Press 'Enter' to continue...")374 display.end_screen()375# Update AWS records to match locally cached record changes376def update_records(recordset):377 updated_records = recordset.get_updated_records()378 if updated_records:379 orig_table = display.create_table(updated_records, "weighted", bgcolors=[160,160])380 update_table = display.create_table(updated_records, "weighted", updated_records=True, bgcolors=[70,70])381 382 display.split_display(orig_table, update_table)383 if Confirm.ask("[green]Are you sure you want to apply these changes?"):384 resp = recordset.write_records()385 display.update_screen(resp)386 input("Press 'Enter' to continue...")387 display.end_screen()388 else:389 display.update_screen("\n[yellow]Cancelling Apply")390 time.sleep(2)391 display.end_screen()392 else:393 display.update_screen("\n[yellow]No records staged for updating")394 input("Press 'Enter' to continue...")395 display.end_screen()396def dump_changesets(recordset):397 updated_records = recordset.get_updated_records()398 if updated_records:399 changeset_filename = Prompt.ask("\n[blue]Enter a filename to write changesets to (will create new and original records in two files)")400 recordset.dump_changeset(changeset_filename)401 else:402 display.update_screen("\n[yellow]No records staged. Stage some changes and then try again")403 input("Press 'Enter' to continue...")404 display.end_screen()405# Create main menu table406def get_menu_table():407 table = Table(title="Main Menu")408 table.add_column("Selection")409 table.add_column("Option Name")410 table.add_row("1", "List All Records")411 table.add_row("2", "List All Weighted Records")412 table.add_row("3", "List All Latency Records")413 table.add_row("4", "Update Weighted Records")414 table.add_row("5", "Load Changeset From File")415 table.add_row("7", "View Staged Changes")416 table.add_row("8", "Edit Staged Changes")417 table.add_row("9", "Commit Changes")418 table.add_row("10", "Dump Changesets")419 table.add_row("99", "Refresh Record Cache")420 table.add_row("0", "Quit")421 return table422def refresh_record_cache(recordset):423 recordset.refresh_records(init=False)424 display.clear()425 display.update_screen("[green]Records refreshed from AWS")426 time.sleep(2)427 display.end_screen()428def confirm_quit(recordset):429 updated_records = recordset.get_updated_records()430 if len(updated_records) > 0:431 if Confirm.ask("[yellow]There are staged changes pending... Are you sure you want to quit before applying changes?"):432 sys.exit(0)433 else:434 sys.exit(0)435def unattended_apply(recordset, filename):436 load_records(recordset, filename)437 recordset.write_records()438 sys.exit(0)439def main():440 recordset = RecordSet()441 quit = False442 while quit != True:443 main_menu = get_menu_table()...

Full Screen

Full Screen

data-checkpoint.py

Source:data-checkpoint.py Github

copy

Full Screen

...188""" 189The Update method is a bit stretch :(190Maybe in the future we will only get and put data for the latest year and ignore past updates (which are definitely sketchy)191"""192def get_updated_records(label):193 if label == 'unfcc':194 #old_df = pd.read_csv('old_'+UNFCC_FILE)195 df = pd.read_csv(UNFCC_FILE)196 else:197 #old_df = pd.read_csv('old_'+EBAL_FILE)198 df = pd.read_csv(EBAL_FILE)199 df = decoding_codes(df)200 coco_dict = {}201 for i in df["REF_AREA"].unique():202 # if i not in coco_dict:203 coco_dict[i] = coco.convert(i, to='iso3')204 coco_dict["France-Monaco"] = coco.convert("France", to='iso3')205 coco_dict["Italy-San Marino"] = coco.convert("Italy", to='iso3')206 coco_dict["Switzerland-Liechtenstein"] = coco.convert("Switzerland", to='iso3')207 df["REF_AREA"] = [coco_dict[i] for i in df["REF_AREA"]]208 #update_df = new_df[~new_df.apply(tuple,1).isin(old_df.apply(tuple,1))]209 #update_df = pd.concat([old_df,new_df]).drop_duplicates(keep=False)210 update_database(df,label)211t1 = Timeloop()212@t1.job(interval=timedelta(seconds=60))213def _worker():214 """215 Main work loop repeats every hour216 """217 try:218 logger.info('Looping...')219 temp_list = []220 for file in ['data_unfcc.csv','data_ebal.csv']:221 temp_list.append(os.path.isfile(file))222 if not all(temp_list):223 print('Starting from scratch...')224 download_source()225 create_database()226 create_index()227 time_mod = datetime.strptime(time.ctime(os.stat('data_ebal.csv').st_mtime),'%a %b %d %H:%M:%S %Y')228 time_now = datetime.now()229 if (time_now - time_mod).seconds > 3600:230 download_source()231 get_updated_records('unfcc')232 get_updated_records('ebal')233 create_index()234 except Exception as e:235 logger.warning('Main Loop error')236if __name__ == "__main__":237# t1.start(block=True)238 create_database()239# get_updated_records('unfcc')...

Full Screen

Full Screen

scd_driver_code.py

Source:scd_driver_code.py Github

copy

Full Screen

...47 print("First time run end...")48 print("Data saved to Snapshot...")49else:50 print("Subsequent run started...")51 updated_rows_df = get_updated_records(history, current, key_list, rec_eff_dt, ignored_columns)52 deleted_rows_df = get_deleted_rows(history, current, key_list, rec_eff_dt)53 new_rows_df = get_new_records(history, current, key_list, rec_eff_dt)54 unchanged_rows_df = get_no_change_records(history, current, key_list, ignored_columns)55 history_df = updated_rows_df.unionByName(deleted_rows_df).unionByName(new_rows_df)56 history_df = history_df.select(history.columns)57 print("Writing to History...")58 history_df.write.mode("overwrite").insertInto("scd_hive_db.employee_history", overwrite=True)59 print("Data saved to History...")60 unchanged_rows_df = unchanged_rows_df.select(history.columns)61 snapshot_df = history_df.filter("row_opern != 'D'").unionByName(unchanged_rows_df)62 print("Writing to Snapshot...")63 snapshot_df.write.mode("overwrite").saveAsTable("scd_hive_db.cdc_employee_temp")64 new_rows_df_temp_table = spark.sql(''' select * from scd_hive_db.cdc_employee_temp ''')65 new_rows_df_temp_table.write.mode("overwrite").insertInto("scd_hive_db.employee", overwrite=True)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful