Best Python code snippet using lemoncheesecake
marc2post
Source:marc2post  
1#!/usr/bin/env python2import sys3import json4from os import path5import re6import yaml7from modules.config_parser import args8from datetime import datetime, timedelta9from time import gmtime, strftime10from os import listdir11from os.path import isdir, isfile, join12from requests_futures import sessions13# from multiprocessing.dummy import Pool as ThreadPool 14import xml.etree.ElementTree as ET15from pymarc import MARCReader16from pymarc import XMLWriter17from pymarc.marcxml import record_to_xml, record_to_xml_node18config = yaml.safe_load(open(args.config))19print()20print("Config:")21print(config)22print()23jobconfig = config["marc2post"]24print("Job config:")25print(jobconfig)26print()27today = datetime.now()28yesterday = today - timedelta(1) #Yesterday's date.29startdate = yesterday # Set the startdate to yesterday, unless overwritten by parameter.30if args.since != "":31    startdate = datetime.strptime(args.since, "%Y-%m-%d")32# Empty sync report33load_report = {}34# Establish job start time.35st = datetime.now()36starttime = st.isoformat()[:19]37load_report["job_start"] = starttime38load_report["loaded_records_success"] = 039load_report["loaded_records_fail"] = 040load_report["unsuppressed"] = 041load_report["suppressed"] = 042load_report["deletes"] = 043date_format = "%Y-%m-%d"44yesterday_formatted = datetime.strftime(yesterday, date_format)45startdate_formatted = datetime.strftime(startdate, date_format)46dates = [startdate_formatted]47if (yesterday_formatted != startdate_formatted):48    nextday = startdate49    nextday_formatted_sourcedate = datetime.strftime(startdate, date_format)50    while nextday_formatted_sourcedate != yesterday_formatted:51        nextday = nextday + timedelta(days=1)52        nextday_formatted_sourcedate = datetime.strftime(nextday, date_format)53        dates.append(nextday_formatted_sourcedate)54        55print(dates)56days = {}57for d in dates:58    files = []59    for s in jobconfig["sources"]:60        file_date = datetime.strftime(datetime.strptime(d, "%Y-%m-%d"), s["%SOURCEDATE%"])61        file = s["source_directory"] + s["file_pattern"].replace('%SOURCEDATE%', file_date)62        if path.exists(file):63            files.append(file)64    days[d] = {}65    days[d]["files"] = files66# Got here so let's set up some MARC processing info.67def process_ingest_response(f):68    global load_report69    try:70        r = { 71            "status_code": f.result().status_code, 72            "output": f.result().content73        }74        if "Location" not in f.result().headers:75            # Probably have an error of some kind.76            load_report["loaded_records_fail"] += 177            print("No 'location' header found in response.  Possible error: {}".format(f.result().content))78            print("Source record: {}".format(f.result().request.body))79        else:80            load_report["loaded_records_success"] += 181            r["uri"] = f.result().headers["Location"]82            print("Ingest status for {}: {}".format(r["uri"], r["status_code"]))83    except:84        print('Error processing requests-future result.')85        e_type = sys.exc_info()[0]86        e_value = str(sys.exc_info()[1])87        e_traceback = sys.exc_info()[2]88        print('Error processing requests-future result, e_type: {}'.format(e_type))89        print('Error processing requests-future result, e_value: {}'.format(e_value))90        print('Error processing requests-future result, e_traceback: {}'.format(e_traceback))91find_fromoldcatalog = re.compile(re.escape('[from old catalog]'), re.IGNORECASE)92auth=(jobconfig["target"]["username"], jobconfig["target"]["password"])93headers={"Content-type": "application/xml"}94session = sessions.FuturesSession(max_workers=jobconfig["threads"])95for d in days:96    print("Processing date: " + d)97    for f in days[d]["files"]:98        print("Processing file (" + d + "): " + f)99        marcxml_records = []100        with open(f, 'rb') as fh:101            reader = MARCReader(fh)102            for r in reader:103                try:104                    bytesxml = record_to_xml_node(r, False, True)105                    # This is necessary otherwise the default output is to encode106                    # special characters using Ӓ convention.107                    # https://stackoverflow.com/questions/15304229/convert-python-elementtree-to-string108                    bytesxml = ET.tostring(bytesxml, encoding="unicode")109                    bytesxml = find_fromoldcatalog.sub('', bytesxml)110                    marcxml_records.append(bytesxml)111                except:112                    print('Error generating MARC/XML.')113                    if r is not None:114                        print('Error record 001: ' + str(r['001']))115                    load_report["loaded_records_fail"] += 1116                    e_type = sys.exc_info()[0]117                    e_value = str(sys.exc_info()[1])118                    e_traceback = sys.exc_info()[2]119                    print('Error generating MARC/XML, e_type: {}'.format(e_type))120                    print('Error generating MARC/XML, e_value: {}'.format(e_value))121                    print('Error generating MARC/XML, e_traceback: {}'.format(e_traceback))122                123        if 'delete' in f:124            load_report["deletes"] += len(marcxml_records)125            suppress_status = "deleted" #Suppressing seems to make sense, though not quite right.126        elif 'unsuppressed' not in f:127            load_report["suppressed"] += len(marcxml_records)128            suppress_status = "suppressed"129        else:130            load_report["unsuppressed"] += len(marcxml_records)131            suppress_status = "unsuppressed"132        133        futures = [134            session.post(jobconfig["target"]["endpoint"] + "?suppress_status=" + suppress_status, auth=auth, headers=headers, data=r.encode('utf-8'))135            for r in marcxml_records136        ]137        results = [138            process_ingest_response(f)139            for f in futures140        ]141        142et = datetime.now()143endtime = et.isoformat()[:19]144timedelta = et - st145load_report["job_end"] = endtime146load_report["status"] = "success"147    148print('************')149print('************')150print('Job started at: {}'.format(starttime))151print('Job ended at: {}'.format(endtime))152print('Elapsed time: {}'.format(timedelta))153print('Start date of loaded records: {}'.format(startdate_formatted))154print('Number of days loaded: {}'.format(len(dates)))155print('Number of records loaded successfully: {}'.format(load_report["loaded_records_success"]))156print('Number of records load failures: {}'.format(load_report["loaded_records_fail"]))157print('Number of unsuppressed records: {}'.format(load_report["unsuppressed"]))158print('Number of suppressed records: {}'.format(load_report["suppressed"]))159print('Number of deleted records: {}'.format(load_report["deletes"]))160print('************')161print('************')162print()...views.py
Source:views.py  
...10from django.template.loader import render_to_string11from django.urls import reverse, reverse_lazy12from visorxml.pdf_utils import render_to_pdf13from .reports import XMLReport, BASE_XML_MINI, random_name14def load_report(session):15    """ Load current report from filesystem if it exists16    else: return None17    """18    file_name = session['new_xml_name']19    file_path = os.path.join(settings.MEDIA_ROOT, file_name)20    try:21        with open(file_path, 'rb') as xmlfile:22            report = XMLReport([(file_name, xmlfile.read())])23            return report24    except FileNotFoundError:25        session.pop("new_xml_name")26        return None27def clean_report(request):28    "Remove report data from session and file backup"29    if "new_xml_name" in request.session:30        file_name = request.session['new_xml_name']31        file_path = os.path.join(settings.MEDIA_ROOT, file_name)32        try:33            os.remove(file_path)34        except FileNotFoundError: # nothing to do35            pass36        request.session.pop("new_xml_name")37    return HttpResponseRedirect(reverse("generate-report"))38def new_report(session):39    """ Create new mini-xml40    """41    name = random_name()42    report = XMLReport([(name, BASE_XML_MINI),])43    report.save_to_file(name)44    session['new_xml_name'] = name45    return report46def validate(request):47    "validate report"48    report = load_report(request.session)49    if len(report.errors["validation_errors"]) == 0:50        return HttpResponse("Yes")51    return HttpResponse("No")52def generate_report(request):53    """ View for the mini xml form. If not exists, new mini-xml is created54    """55    if request.session.get('new_xml_name', False):56        load_report(request.session)57    else:58        new_report(request.session)59    return render(request, "generadorxml/generate_xml.html", locals())60@method_decorator(csrf_exempt)61def update_xml_mini(request):62    """ View for update elements of the XML.63        If value is a uppercase letter (Calificacion) returns new svg for replace it64    """65    element = request.POST['name']66    value = request.POST['value']67    report = load_report(request.session)68    report.update_element(element, value)69    return HttpResponse()70def download_xml(request):71    """ Download mini-XML72    """73    session = request.session74    file_name = session['new_xml_name']75    file_path = os.path.join(settings.MEDIA_ROOT, file_name)76    output_file_name = 'certificado-%s.xml' % datetime.now().strftime('%Y%m%d%H%M')77    with open(file_path, 'rb') as xmlfile:78        response = HttpResponse(xmlfile.read(), content_type='application/xml')79        response['Content-Disposition'] = 'attachment;filename=%s' % output_file_name80        return response81def download_pdf(request):82    """ Download the current energy certificate as PDF83    """84    if not request.session.get('new_xml_name', False):85        return HttpResponseRedirect(reverse_lazy("generate-report"))86    session = request.session87    filename = 'certificado-%s.pdf' % datetime.now().strftime('%Y%m%d%H%M')88    report = load_report(session)89    pdf = True90    html = render_to_string('generadorxml/generate_xml.html', locals())91    env = os.environ.copy()92    env.update({93        'generation_date': report.data.DatosDelCertificador.Fecha,94        'reference': report.data.IdentificacionEdificio.ReferenciaCatastral95    })96    xml_name = session['new_xml_name']97    xml_path = os.path.join(settings.MEDIA_ROOT, xml_name)...common.py
Source:common.py  
...9### ----------------------------- SETUP ---------------------------------- ###10client = auth_gspread()11# load hours report12hours_report = pd.concat([13    load_report(client, 'hours-entries', '2021-table'),14    load_report(client, 'hours-entries', '2020-table')15    # load_report(client, 'hours-entries', '2019-table')16])  #TODO Exceeds memory of 550Mb17hours_report['DT'] = pd.to_datetime(hours_report['DT'])18# load usernames19with open('components/usernames.json') as f:20    usernames = json.load(f)21# load hours entries22hours_entries = pd.concat([23    load_report(client, 'hours-entries', '2021-hours'),24    load_report(client, 'hours-entries', '2020-hours')25    # load_report(client, 'hours-entries', '2019-hours')26])27hours_entries['Hours Date'] = pd.to_datetime(hours_entries['Hours Date'])28LOGO = app.get_asset_url('ei-logo-white.png')29### ----------------------------- LAYOUT ----------------------------------###30### NAVBAR ###31nav_items = dbc.Container([32    dbc.NavItem(dbc.NavLink('My Utilization', href='/')),33    dbc.NavItem(dbc.NavLink('My Projects', href='/my_projects')),34    dbc.NavItem(dbc.NavLink('My Team', href='/my_team')),35    # dbc.NavItem(dbc.NavLink('Allocation', href='/allocation'))  # turn on for dev36]37)38navbar = dbc.Navbar(39    dbc.Container(...urls.py
Source:urls.py  
1from django.urls import re_path, path2from file_upload import views3from base.views import load_report, log_detail, log_list, scan_viscidus_poison_tick, viscidus_poison_tick_info,\4    scan_boss_nature_protection, boss_nature_protection_info, gold_run_detail5# namespace6app_name = "base"7urlpatterns = [8    # Upload File Without Using Model Form9    re_path(r'^upload1/$', views.file_upload, name='file_upload'),10    # Upload Files Using Model Form11    re_path(r'^upload2/$', views.model_form_upload, name='model_form_upload'),12    # Upload Files Using Ajax Form13    re_path(r'^upload3/$', views.ajax_form_upload, name='ajax_form_upload'),14    # Handling Ajax requests15    re_path(r'^ajax_upload/$', views.ajax_upload, name='ajax_upload'),16    re_path(r'^parse/(?P<pk>\d+)', views.parse, name='parse'),17    re_path(r'^poison/(?P<pk>\d+)/', views.poison_data, name='poison'),18    # View Log List19    path('', view=log_list, name='log_list'),20    # load report21    re_path(r'^loadreport/$', view=load_report, name='load_report'),22    # re_path(r'submitreport/(?P<code>\d+)$', load_report, name='submit'),23    # log detail24    re_path(r'^log_detail/(?P<id>\d+)', view=log_detail, name='log_detail'),25    # gold run detail26    re_path(r'^gold_run_detail/(?P<id>\d+)', view=gold_run_detail, name='gold_run_detail'),27    # scan task28    re_path(r'^scan_viscidus_poison_tick/(?P<log_id>\d+)$', view=scan_viscidus_poison_tick, name='scan_viscidus_poison_tick'),29    re_path(r'^viscidus_poison_tick_info/(?P<log_id>\d+)$', view=viscidus_poison_tick_info, name='viscidus_poison_tick_info'),30    re_path(r'^scan_boss_nature_protection/(?P<log_id>\d+)$', view=scan_boss_nature_protection, name='scan_boss_nature_protection'),31    re_path(r'^boss_nature_protection_info/(?P<log_id>\d+)$', view=boss_nature_protection_info, name='boss_nature_protection_info'),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
