Best Python code snippet using localstack_python
api.py
Source:api.py  
1"""                                     ..2django-helpdesk - A Django powered ticket tracker for small enterprise.3(c) Copyright 2008 Jutda. All Rights Reserved. See LICENSE for details.4api.py - Wrapper around API calls, and core functions to provide complete5         API to third party applications.6The API documentation can be accessed by visiting http://helpdesk/api/help/7(obviously, substitute helpdesk for your django-helpdesk URI), or by reading8through templates/helpdesk/help_api.html.9"""10from django import forms11from django.contrib.auth import authenticate12try:13    from django.contrib.auth import get_user_model14    User = get_user_model()15except ImportError:16    from django.contrib.auth.models import User17from django.http import HttpResponse18from django.shortcuts import render_to_response19from django.template import loader, Context20import simplejson21from django.views.decorators.csrf import csrf_exempt22try:23    from django.utils import timezone24except ImportError:25    from datetime import datetime as timezone26from tendenci.apps.helpdesk.forms import TicketForm27from tendenci.apps.helpdesk.lib import send_templated_mail, safe_template_context28from tendenci.apps.helpdesk.models import Ticket, Queue, FollowUp29STATUS_OK = 20030STATUS_ERROR = 40031STATUS_ERROR_NOT_FOUND = 40432STATUS_ERROR_PERMISSIONS = 40333STATUS_ERROR_BADMETHOD = 40534@csrf_exempt35def api(request, method):36    """37    Regardless of any other paramaters, we provide a help screen38    to the user if they requested one.39    If the user isn't looking for help, then we enforce a few conditions:40        * The request must be sent via HTTP POST41        * The request must contain a 'user' and 'password' which42          must be valid users43        * The method must match one of the public methods of the API class.44    """45    if method == 'help':46        return render_to_response('helpdesk/help_api.html')47    if request.method != 'POST':48        return api_return(STATUS_ERROR_BADMETHOD)49    # TODO: Move away from having the username & password in every request.50    request.user = authenticate(51        username=request.POST.get('user', False),52        password=request.POST.get('password'),53        )54    if request.user is None:55        return api_return(STATUS_ERROR_PERMISSIONS)56    api = API(request)57    if hasattr(api, 'api_public_%s' % method):58        return getattr(api, 'api_public_%s' % method)()59    return api_return(STATUS_ERROR)60def api_return(status, text='', json=False):61    content_type = 'text/plain'62    if status == STATUS_OK and json:63        content_type = 'text/json'64    if text is None:65        if status == STATUS_ERROR:66            text = 'Error'67        elif status == STATUS_ERROR_NOT_FOUND:68            text = 'Resource Not Found'69        elif status == STATUS_ERROR_PERMISSIONS:70            text = 'Invalid username or password'71        elif status == STATUS_ERROR_BADMETHOD:72            text = 'Invalid request method'73        elif status == STATUS_OK:74            text = 'OK'75    r = HttpResponse(status=status, content=text, content_type=content_type)76    if status == STATUS_ERROR_BADMETHOD:77        r.Allow = 'POST'78    return r79class API:80    def __init__(self, request):81        self.request = request82    def api_public_create_ticket(self):83        form = TicketForm(self.request.POST)84        form.fields['queue'].choices = [[q.id, q.title] for q in Queue.objects.all()]85        form.fields['assigned_to'].choices = [[u.id, u.get_username()] for u in User.objects.filter(is_active=True)]86        if form.is_valid():87            ticket = form.save(user=self.request.user)88            return api_return(STATUS_OK, "%s" % ticket.id)89        else:90            return api_return(STATUS_ERROR, text=form.errors.as_text())91    def api_public_list_queues(self):92        return api_return(STATUS_OK, simplejson.dumps([{"id": "%s" % q.id, "title": "%s" % q.title} for q in Queue.objects.all()]), json=True)93    def api_public_find_user(self):94        username = self.request.POST.get('username', False)95        try:96            u = User.objects.get(username=username)97            return api_return(STATUS_OK, "%s" % u.id)98        except User.DoesNotExist:99            return api_return(STATUS_ERROR, "Invalid username provided")100    def api_public_delete_ticket(self):101        if not self.request.POST.get('confirm', False):102            return api_return(STATUS_ERROR, "No confirmation provided")103        try:104            ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))105        except Ticket.DoesNotExist:106            return api_return(STATUS_ERROR, "Invalid ticket ID")107        ticket.delete()108        return api_return(STATUS_OK)109    def api_public_hold_ticket(self):110        try:111            ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))112        except Ticket.DoesNotExist:113            return api_return(STATUS_ERROR, "Invalid ticket ID")114        ticket.on_hold = True115        ticket.save()116        return api_return(STATUS_OK)117    def api_public_unhold_ticket(self):118        try:119            ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))120        except Ticket.DoesNotExist:121            return api_return(STATUS_ERROR, "Invalid ticket ID")122        ticket.on_hold = False123        ticket.save()124        return api_return(STATUS_OK)125    def api_public_add_followup(self):126        try:127            ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))128        except Ticket.DoesNotExist:129            return api_return(STATUS_ERROR, "Invalid ticket ID")130        message = self.request.POST.get('message', None)131        public = self.request.POST.get('public', 'n')132        if public not in ['y', 'n']:133            return api_return(STATUS_ERROR, "Invalid 'public' flag")134        if not message:135            return api_return(STATUS_ERROR, "Blank message")136        f = FollowUp(137            ticket=ticket,138            date=timezone.now(),139            comment=message,140            user=self.request.user,141            title='Comment Added',142            )143        if public:144            f.public = True145        f.save()146        context = safe_template_context(ticket)147        context['comment'] = f.comment148        messages_sent_to = []149        if public and ticket.submitter_email:150            send_templated_mail(151                'updated_submitter',152                context,153                recipients=ticket.submitter_email,154                sender=ticket.queue.from_address,155                fail_silently=True,156                )157            messages_sent_to.append(ticket.submitter_email)158        if public:159            for cc in ticket.ticketcc_set.all():160                if cc.email_address not in messages_sent_to:161                    send_templated_mail(162                        'updated_submitter',163                        context,164                        recipients=cc.email_address,165                        sender=ticket.queue.from_address,166                        fail_silently=True,167                        )168                    messages_sent_to.append(cc.email_address)169        if ticket.queue.updated_ticket_cc and ticket.queue.updated_ticket_cc not in messages_sent_to:170            send_templated_mail(171                'updated_cc',172                context,173                recipients=ticket.queue.updated_ticket_cc,174                sender=ticket.queue.from_address,175                fail_silently=True,176                )177            messages_sent_to.append(ticket.queue.updated_ticket_cc)178        if (179            ticket.assigned_to and180            self.request.user != ticket.assigned_to and181            ticket.assigned_to.usersettings.settings.get('email_on_ticket_apichange', False) and182            ticket.assigned_to.email and183            ticket.assigned_to.email not in messages_sent_to184        ):185            send_templated_mail(186                'updated_owner',187                context,188                recipients=ticket.assigned_to.email,189                sender=ticket.queue.from_address,190                fail_silently=True,191            )192        ticket.save()193        return api_return(STATUS_OK)194    def api_public_resolve(self):195        try:196            ticket = Ticket.objects.get(id=self.request.POST.get('ticket', False))197        except Ticket.DoesNotExist:198            return api_return(STATUS_ERROR, "Invalid ticket ID")199        resolution = self.request.POST.get('resolution', None)200        if not resolution:201            return api_return(STATUS_ERROR, "Blank resolution")202        f = FollowUp(203            ticket=ticket,204            date=timezone.now(),205            comment=resolution,206            user=self.request.user,207            title='Resolved',208            public=True,209            )210        f.save()211        context = safe_template_context(ticket)212        context['resolution'] = f.comment213        subject = '%s %s (Resolved)' % (ticket.ticket, ticket.title)214        messages_sent_to = []215        if ticket.submitter_email:216            send_templated_mail(217                'resolved_submitter',218                context,219                recipients=ticket.submitter_email,220                sender=ticket.queue.from_address,221                fail_silently=True,222                )223            messages_sent_to.append(ticket.submitter_email)224            for cc in ticket.ticketcc_set.all():225                if cc.email_address not in messages_sent_to:226                    send_templated_mail(227                        'resolved_submitter',228                        context,229                        recipients=cc.email_address,230                        sender=ticket.queue.from_address,231                        fail_silently=True,232                        )233                    messages_sent_to.append(cc.email_address)234        if ticket.queue.updated_ticket_cc and ticket.queue.updated_ticket_cc not in messages_sent_to:235            send_templated_mail(236                'resolved_cc',237                context,238                recipients=ticket.queue.updated_ticket_cc,239                sender=ticket.queue.from_address,240                fail_silently=True,241                )242            messages_sent_to.append(ticket.queue.updated_ticket_cc)243        if ticket.assigned_to and self.request.user != ticket.assigned_to and getattr(ticket.assigned_to.usersettings.settings, 'email_on_ticket_apichange', False) and ticket.assigned_to.email and ticket.assigned_to.email not in messages_sent_to:244            send_templated_mail(245                'resolved_resolved',246                context,247                recipients=ticket.assigned_to.email,248                sender=ticket.queue.from_address,249                fail_silently=True,250                )251        ticket.resoltuion = f.comment252        ticket.status = Ticket.RESOLVED_STATUS253        ticket.save()...ea_tasks.py
Source:ea_tasks.py  
1import easyaccess as ea2import threading3import os4import time5import json6import glob7STATUS_OK = 'ok'8STATUS_ERROR = 'error'9DEFAULT_RESPONSE = {10    'status': STATUS_OK,11    'msg': '',12    'elapsed': 0.0,13    'data': {},14    'files': [],15    'sizes': []16}17def get_filesize(filename):18    size = os.path.getsize(filename)19    size = size * 1. / 1024.20    if size > 1024. * 1024:21        size = '%.2f GB' % (1. * size / 1024. / 1024)22    elif size > 1024.:23        size = '%.2f MB' % (1. * size / 1024.)24    else:25        size = '%.2f KB' % (size)26    return size27def check_query(query, db, username, password):28    response = DEFAULT_RESPONSE29    try:30        connection = ea.connect(db, user=username, passwd=password)31        cursor = connection.cursor()32    except Exception as e:33        response['status'] = STATUS_ERROR34        response['msg'] = str(e).strip()35        return response36    try:37        cursor.parse(query.encode())38    except Exception as e:39        response['status'] = STATUS_ERROR40        response['msg'] = str(e).strip()41    cursor.close()42    connection.close()43    return response44def run_quick(query, db, username, password):45    response = DEFAULT_RESPONSE46    try:47        connection = ea.connect(db, user=username, passwd=password)48        cursor = connection.cursor()49        # Start the clock50        tt = threading.Timer(25, connection.con.cancel)51        tt.start()52        if query.lower().lstrip().startswith('select'):53            try:54                df = connection.query_to_pandas(query)55                # df.to_csv(os.path.join(user_folder, 'quickResults.csv'), index=False)56                df = df[0:1000]57                data = df.to_json(orient='records')58                response['data'] = data59            except Exception as e:60                err_out = str(e).strip()61                if 'ORA-01013' in err_out:62                    err_out = 'Time Exceeded (30 seconds). Please try submitting the job'63                response['status'] = STATUS_ERROR64                response['msg'] = err_out65        else:66            try:67                df = cursor.execute(query)68                connection.con.commit()69            except Exception as e:70                err_out = str(e).strip()71                if 'ORA-01013' in err_out:72                    err_out = 'Time Exceeded (30 seconds). Please try submitting this query as job'73                response['status'] = STATUS_ERROR74                response['msg'] = err_out75        # Stop the clock76        tt.cancel()77        # Close database connection78        cursor.close()79        connection.close()80    except Exception as e:81        response['status'] = STATUS_ERROR82        response['msg'] = str(e).strip()83        return response84    return response85def run_query(query, db, username, password, job_folder, filename, compression = False, timeout=None):86    response = DEFAULT_RESPONSE87    if not os.path.exists(job_folder):88        os.mkdir(job_folder)89    jsonfile = os.path.join(job_folder, 'meta.json')90    try:91        t1 = time.time()92        # Open database connection93        try:94            connection = ea.connect(db, user=username, passwd=password)95            cursor = connection.cursor()96        except Exception as e:97            response['status'] = STATUS_ERROR98            response['msg'] = str(e).strip()99            with open(jsonfile, 'w') as fp:100                json.dump(response, fp)101            return response102        if timeout is not None:103            tt = threading.Timer(timeout, connection.con.cancel)104            tt.start()105        if query.lower().lstrip().startswith('select'):106            try:107                outfile = os.path.join(job_folder, filename)108                if compression:109                    connection.compression = True110                connection.query_and_save(query, outfile)111                if timeout is not None:112                    tt.cancel()113                t2 = time.time()114                files = glob.glob(job_folder + '/*')115                response['files'] = [os.path.basename(i) for i in files]116                response['sizes'] = [get_filesize(i) for i in files]117            except Exception as e:118                if timeout is not None:119                    tt.cancel()120                t2 = time.time()121                response['status'] = STATUS_ERROR122                response['msg'] = str(e).strip()123                raise124        else:125            try:126                cursor.execute(query)127                connection.con.commit()128                if timeout is not None:129                    tt.cancel()130                t2 = time.time()131            except Exception as e:132                if timeout is not None:133                    tt.cancel()134                t2 = time.time()135                response['status'] = STATUS_ERROR136                response['msg'] = str(e).strip()137        response['elapsed'] = t2 - t1138        with open(jsonfile, 'w') as fp:139            json.dump(response, fp, indent=2)140        cursor.close()141        connection.close()142        return response143    except Exception:...utils.py
Source:utils.py  
1from rest_framework.serializers import ValidationError2STATUS_ERROR = '02'3STATUS_SUCCESS = '00'4def check_user_data(email, first_name, last_name, phone_number) -> dict:5    ''' Check if Account data is valid '''6    error_template = 'User account must have'7    data = {8        'errors': {},9        'status': STATUS_SUCCESS,10    }11    if not email:12        data['errors']['email'] = f'{error_template} email.'13        data['status'] = STATUS_ERROR14    if not first_name:15        data['errors']['first_name'] =f'{error_template} first name.'16        data['status'] = STATUS_ERROR17    if not last_name:18        data['errors']['last_name'] = f'{error_template} last name.'19        data['status'] = STATUS_ERROR20    if not phone_number:21        data['errors']['phone_number'] = f'{error_template} phone number.'22        data['status'] = STATUS_ERROR23    24    return data25def check_passwords(password1, password2):26    ''' CHECK IF 2 PASSWORDS ARE EQUAL OR NOT NONE '''27    data = {28        'errors': {},29        'password': None,30        'status': STATUS_SUCCESS, 31    }32    if password1 is not None and password2 is not None:33        if password1 == password2:34            data['password'] = password135        else:36            data['errors']['password2'] = 'Passwords are not equal.' 37            data['status'] = STATUS_ERROR38    else:39        data['errors']['password'] = 'password1 or password2 have not been declared.' 40        data['status'] = STATUS_ERROR41    ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
