How to use load_report method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

marc2post

Source:marc2post Github

copy

Full Screen

1#!/usr/bin/env python2import sys3import json4from os import path5import re6import yaml7from modules.config_parser import args8from datetime import datetime, timedelta9from time import gmtime, strftime10from os import listdir11from os.path import isdir, isfile, join12from requests_futures import sessions13# from multiprocessing.dummy import Pool as ThreadPool 14import xml.etree.ElementTree as ET15from pymarc import MARCReader16from pymarc import XMLWriter17from pymarc.marcxml import record_to_xml, record_to_xml_node18config = yaml.safe_load(open(args.config))19print()20print("Config:")21print(config)22print()23jobconfig = config["marc2post"]24print("Job config:")25print(jobconfig)26print()27today = datetime.now()28yesterday = today - timedelta(1) #Yesterday's date.29startdate = yesterday # Set the startdate to yesterday, unless overwritten by parameter.30if args.since != "":31 startdate = datetime.strptime(args.since, "%Y-%m-%d")32# Empty sync report33load_report = {}34# Establish job start time.35st = datetime.now()36starttime = st.isoformat()[:19]37load_report["job_start"] = starttime38load_report["loaded_records_success"] = 039load_report["loaded_records_fail"] = 040load_report["unsuppressed"] = 041load_report["suppressed"] = 042load_report["deletes"] = 043date_format = "%Y-%m-%d"44yesterday_formatted = datetime.strftime(yesterday, date_format)45startdate_formatted = datetime.strftime(startdate, date_format)46dates = [startdate_formatted]47if (yesterday_formatted != startdate_formatted):48 nextday = startdate49 nextday_formatted_sourcedate = datetime.strftime(startdate, date_format)50 while nextday_formatted_sourcedate != yesterday_formatted:51 nextday = nextday + timedelta(days=1)52 nextday_formatted_sourcedate = datetime.strftime(nextday, date_format)53 dates.append(nextday_formatted_sourcedate)54 55print(dates)56days = {}57for d in dates:58 files = []59 for s in jobconfig["sources"]:60 file_date = datetime.strftime(datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"])61 file = s["source_directory"] + s["file_pattern"].replace('%SOURCEDATE%', file_date)62 if path.exists(file):63 files.append(file)64 days[d] = {}65 days[d]["files"] = files66# Got here so let's set up some MARC processing info.67def process_ingest_response(f):68 global load_report69 try:70 r = { 71 "status_code": f.result().status_code, 72 "output": f.result().content73 }74 if "Location" not in f.result().headers:75 # Probably have an error of some kind.76 load_report["loaded_records_fail"] += 177 print("No 'location' header found in response. Possible error: {}".format(f.result().content))78 print("Source record: {}".format(f.result().request.body))79 else:80 load_report["loaded_records_success"] += 181 r["uri"] = f.result().headers["Location"]82 print("Ingest status for {}: {}".format(r["uri"], r["status_code"]))83 except:84 print('Error processing requests-future result.')85 e_type = sys.exc_info()[0]86 e_value = str(sys.exc_info()[1])87 e_traceback = sys.exc_info()[2]88 print('Error processing requests-future result, e_type: {}'.format(e_type))89 print('Error processing requests-future result, e_value: {}'.format(e_value))90 print('Error processing requests-future result, e_traceback: {}'.format(e_traceback))91find_fromoldcatalog = re.compile(re.escape('[from old catalog]'), re.IGNORECASE)92auth=(jobconfig["target"]["username"], jobconfig["target"]["password"])93headers={"Content-type": "application/xml"}94session = sessions.FuturesSession(max_workers=jobconfig["threads"])95for d in days:96 print("Processing date: " + d)97 for f in days[d]["files"]:98 print("Processing file (" + d + "): " + f)99 marcxml_records = []100 with open(f, 'rb') as fh:101 reader = MARCReader(fh)102 for r in reader:103 try:104 bytesxml = record_to_xml_node(r, False, True)105 # This is necessary otherwise the default output is to encode106 # special characters using &#1234 convention.107 # https://stackoverflow.com/questions/15304229/convert-python-elementtree-to-string108 bytesxml = ET.tostring(bytesxml, encoding="unicode")109 bytesxml = find_fromoldcatalog.sub('', bytesxml)110 marcxml_records.append(bytesxml)111 except:112 print('Error generating MARC/XML.')113 if r is not None:114 print('Error record 001: ' + str(r['001']))115 load_report["loaded_records_fail"] += 1116 e_type = sys.exc_info()[0]117 e_value = str(sys.exc_info()[1])118 e_traceback = sys.exc_info()[2]119 print('Error generating MARC/XML, e_type: {}'.format(e_type))120 print('Error generating MARC/XML, e_value: {}'.format(e_value))121 print('Error generating MARC/XML, e_traceback: {}'.format(e_traceback))122 123 if 'delete' in f:124 load_report["deletes"] += len(marcxml_records)125 suppress_status = "deleted" #Suppressing seems to make sense, though not quite right.126 elif 'unsuppressed' not in f:127 load_report["suppressed"] += len(marcxml_records)128 suppress_status = "suppressed"129 else:130 load_report["unsuppressed"] += len(marcxml_records)131 suppress_status = "unsuppressed"132 133 futures = [134 session.post(jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status, auth=auth, headers=headers, data=r.encode('utf-8'))135 for r in marcxml_records136 ]137 results = [138 process_ingest_response(f)139 for f in futures140 ]141 142et = datetime.now()143endtime = et.isoformat()[:19]144timedelta = et - st145load_report["job_end"] = endtime146load_report["status"] = "success"147 148print('************')149print('************')150print('Job started at: {}'.format(starttime))151print('Job ended at: {}'.format(endtime))152print('Elapsed time: {}'.format(timedelta))153print('Start date of loaded records: {}'.format(startdate_formatted))154print('Number of days loaded: {}'.format(len(dates)))155print('Number of records loaded successfully: {}'.format(load_report["loaded_records_success"]))156print('Number of records load failures: {}'.format(load_report["loaded_records_fail"]))157print('Number of unsuppressed records: {}'.format(load_report["unsuppressed"]))158print('Number of suppressed records: {}'.format(load_report["suppressed"]))159print('Number of deleted records: {}'.format(load_report["deletes"]))160print('************')161print('************')162print()...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

...10from django.template.loader import render_to_string11from django.urls import reverse, reverse_lazy12from visorxml.pdf_utils import render_to_pdf13from .reports import XMLReport, BASE_XML_MINI, random_name14def load_report(session):15 """ Load current report from filesystem if it exists16 else: return None17 """18 file_name = session['new_xml_name']19 file_path = os.path.join(settings.MEDIA_ROOT, file_name)20 try:21 with open(file_path, 'rb') as xmlfile:22 report = XMLReport([(file_name, xmlfile.read())])23 return report24 except FileNotFoundError:25 session.pop("new_xml_name")26 return None27def clean_report(request):28 "Remove report data from session and file backup"29 if "new_xml_name" in request.session:30 file_name = request.session['new_xml_name']31 file_path = os.path.join(settings.MEDIA_ROOT, file_name)32 try:33 os.remove(file_path)34 except FileNotFoundError: # nothing to do35 pass36 request.session.pop("new_xml_name")37 return HttpResponseRedirect(reverse("generate-report"))38def new_report(session):39 """ Create new mini-xml40 """41 name = random_name()42 report = XMLReport([(name, BASE_XML_MINI),])43 report.save_to_file(name)44 session['new_xml_name'] = name45 return report46def validate(request):47 "validate report"48 report = load_report(request.session)49 if len(report.errors["validation_errors"]) == 0:50 return HttpResponse("Yes")51 return HttpResponse("No")52def generate_report(request):53 """ View for the mini xml form. If not exists, new mini-xml is created54 """55 if request.session.get('new_xml_name', False):56 load_report(request.session)57 else:58 new_report(request.session)59 return render(request, "generadorxml/generate_xml.html", locals())60@method_decorator(csrf_exempt)61def update_xml_mini(request):62 """ View for update elements of the XML.63 If value is a uppercase letter (Calificacion) returns new svg for replace it64 """65 element = request.POST['name']66 value = request.POST['value']67 report = load_report(request.session)68 report.update_element(element, value)69 return HttpResponse()70def download_xml(request):71 """ Download mini-XML72 """73 session = request.session74 file_name = session['new_xml_name']75 file_path = os.path.join(settings.MEDIA_ROOT, file_name)76 output_file_name = 'certificado-%s.xml' % datetime.now().strftime('%Y%m%d%H%M')77 with open(file_path, 'rb') as xmlfile:78 response = HttpResponse(xmlfile.read(), content_type='application/xml')79 response['Content-Disposition'] = 'attachment;filename=%s' % output_file_name80 return response81def download_pdf(request):82 """ Download the current energy certificate as PDF83 """84 if not request.session.get('new_xml_name', False):85 return HttpResponseRedirect(reverse_lazy("generate-report"))86 session = request.session87 filename = 'certificado-%s.pdf' % datetime.now().strftime('%Y%m%d%H%M')88 report = load_report(session)89 pdf = True90 html = render_to_string('generadorxml/generate_xml.html', locals())91 env = os.environ.copy()92 env.update({93 'generation_date': report.data.DatosDelCertificador.Fecha,94 'reference': report.data.IdentificacionEdificio.ReferenciaCatastral95 })96 xml_name = session['new_xml_name']97 xml_path = os.path.join(settings.MEDIA_ROOT, xml_name)...

Full Screen

Full Screen

common.py

Source:common.py Github

copy

Full Screen

...9### ----------------------------- SETUP ---------------------------------- ###10client = auth_gspread()11# load hours report12hours_report = pd.concat([13 load_report(client, 'hours-entries', '2021-table'),14 load_report(client, 'hours-entries', '2020-table')15 # load_report(client, 'hours-entries', '2019-table')16]) #TODO Exceeds memory of 550Mb17hours_report['DT'] = pd.to_datetime(hours_report['DT'])18# load usernames19with open('components/usernames.json') as f:20 usernames = json.load(f)21# load hours entries22hours_entries = pd.concat([23 load_report(client, 'hours-entries', '2021-hours'),24 load_report(client, 'hours-entries', '2020-hours')25 # load_report(client, 'hours-entries', '2019-hours')26])27hours_entries['Hours Date'] = pd.to_datetime(hours_entries['Hours Date'])28LOGO = app.get_asset_url('ei-logo-white.png')29### ----------------------------- LAYOUT ----------------------------------###30### NAVBAR ###31nav_items = dbc.Container([32 dbc.NavItem(dbc.NavLink('My Utilization', href='/')),33 dbc.NavItem(dbc.NavLink('My Projects', href='/my_projects')),34 dbc.NavItem(dbc.NavLink('My Team', href='/my_team')),35 # dbc.NavItem(dbc.NavLink('Allocation', href='/allocation')) # turn on for dev36]37)38navbar = dbc.Navbar(39 dbc.Container(...

Full Screen

Full Screen

urls.py

Source:urls.py Github

copy

Full Screen

1from django.urls import re_path, path2from file_upload import views3from base.views import load_report, log_detail, log_list, scan_viscidus_poison_tick, viscidus_poison_tick_info,\4 scan_boss_nature_protection, boss_nature_protection_info, gold_run_detail5# namespace6app_name = "base"7urlpatterns = [8 # Upload File Without Using Model Form9 re_path(r'^upload1/$', views.file_upload, name='file_upload'),10 # Upload Files Using Model Form11 re_path(r'^upload2/$', views.model_form_upload, name='model_form_upload'),12 # Upload Files Using Ajax Form13 re_path(r'^upload3/$', views.ajax_form_upload, name='ajax_form_upload'),14 # Handling Ajax requests15 re_path(r'^ajax_upload/$', views.ajax_upload, name='ajax_upload'),16 re_path(r'^parse/(?P<pk>\d+)', views.parse, name='parse'),17 re_path(r'^poison/(?P<pk>\d+)/', views.poison_data, name='poison'),18 # View Log List19 path('', view=log_list, name='log_list'),20 # load report21 re_path(r'^loadreport/$', view=load_report, name='load_report'),22 # re_path(r'submitreport/(?P<code>\d+)$', load_report, name='submit'),23 # log detail24 re_path(r'^log_detail/(?P<id>\d+)', view=log_detail, name='log_detail'),25 # gold run detail26 re_path(r'^gold_run_detail/(?P<id>\d+)', view=gold_run_detail, name='gold_run_detail'),27 # scan task28 re_path(r'^scan_viscidus_poison_tick/(?P<log_id>\d+)$', view=scan_viscidus_poison_tick, name='scan_viscidus_poison_tick'),29 re_path(r'^viscidus_poison_tick_info/(?P<log_id>\d+)$', view=viscidus_poison_tick_info, name='viscidus_poison_tick_info'),30 re_path(r'^scan_boss_nature_protection/(?P<log_id>\d+)$', view=scan_boss_nature_protection, name='scan_boss_nature_protection'),31 re_path(r'^boss_nature_protection_info/(?P<log_id>\d+)$', view=boss_nature_protection_info, name='boss_nature_protection_info'),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful