Best Python code snippet using autotest_python
server.py
Source:server.py  
1from ast import parse2from asyncio.proactor_events import _ProactorBaseWritePipeTransport3from http.server import BaseHTTPRequestHandler, HTTPServer4from urllib import request5from urllib.parse import parse_qs6import json7from foods_db import FoodsDB8from passlib.hash import bcrypt9from http import cookies10from session_store import SessionStore11import sys12SESSION_STORE = SessionStore()13class HttpHandler(BaseHTTPRequestHandler):14    #runs once per response15    def end_headers(self):16        self.send_cookie()17        self.send_header("Access-Control-Allow-Credentials", "true")18        self.send_header("Access-Control-Allow-Origin", self.headers["Origin"])19        super().end_headers()20        21    def load_cookie(self):22        if "Cookie" in self.headers:23            self.cookie = cookies.SimpleCookie(self.headers["Cookie"])24        else:25            self.cookie = cookies.SimpleCookie()26    def send_cookie(self):27        for morsel in self.cookie.values():28            #uncomment when using in chrome29            morsel["samesite"] = "None"30            morsel["secure"] = True31            self.send_header("Set-Cookie", morsel.OutputString())32    def load_session_data(self):33        self.load_cookie()34        if "sessionId" in self.cookie:35            sessionId = self.cookie["sessionId"].value36            self.sessionData = SESSION_STORE.loadSessionData(sessionId)37            if self.sessionData == None:38                sessionId = SESSION_STORE.createSession()39                self.sessionData = SESSION_STORE.loadSessionData(sessionId)40                self.cookie["sessionId"] = sessionId41        else:42            sessionId = SESSION_STORE.createSession()43            self.sessionData = SESSION_STORE.loadSessionData(sessionId)44            self.cookie["sessionId"] = sessionId45        print("MY SESSION DATA: ", self.sessionData)46    #if path = /choices47    def handleListItems(self):48        if "userId" not in self.sessionData:49            self.handle401()50            return51        db = FoodsDB()52        allRecords = db.getAllFoods()53         #send status code, 200 means all good54        self.send_response(200)55        self.send_header("Content-type", "application/json")56        self.end_headers()57        self.wfile.write(bytes(json.dumps(allRecords), "utf-8"))58    def handleGetOneFood(self, id):59        if "userId" not in self.sessionData:60            self.handle401()61            return62        db = FoodsDB()63        food = db.getOneFood(id)64        print(id)65        66        if food:67            self.send_response(200)68            self.send_header("Content-type", "application/json")69            self.end_headers()70            self.wfile.write(bytes(json.dumps(food), "utf-8"))71        else:72            self.handleNotFound()73    def handleCreateItem(self):74        if "userId" not in self.sessionData:75            self.handle401()76            return77        #add choice to choice list78        #read length from request header79        length = self.headers["Content-Length"]80        #tell the read how many bytes it needs to read81        #all requests are strings so need to make int82        request_body = self.rfile.read(int(length)).decode("utf-8")83        #turn request body to dictionary84        decoded_body = parse_qs(request_body)85        #decoded body is a dictionary so need to grab value86        print("raw request body:", decoded_body)87        #decoded body values come in lists so need to choose index88        name = decoded_body["name"][0]89        servingSize = decoded_body["servingSize"][0]90        calories = decoded_body["calories"][0]91        protein = decoded_body["protein"][0]92        fat = decoded_body["fat"][0]93        carbs = decoded_body["carbs"][0]94        db = FoodsDB() 95        db.createFood(name, servingSize, calories, protein, fat, carbs)96        self.send_response(201)97        self.end_headers()98    def handleCreateUser(self):99        length = self.headers["Content-Length"]100        request_body = self.rfile.read(int(length)).decode("utf-8")101        parsed_body = parse_qs(request_body)102        print(parsed_body)103        first_name = parsed_body["first_name"][0]104        last_name = parsed_body["last_name"][0]105        email = parsed_body["email"][0]106        password = parsed_body["password"][0]107        db = FoodsDB()108        if not db.findUserByEmail(email):109            encrypted_password = bcrypt.hash(password)110            db.createUser(first_name, last_name, email, encrypted_password)111            self.send_response(201)112            self.end_headers()113        else:114            self.send_response(422)115            self.end_headers()116        #user enters first n, last n, email, pass117        #use 422 code when making sure email is unique118        #encrypted_pass = bcrypt.hash(password)119        return120    def handleCreateAuthSession(self): #let them login121        length = self.headers["Content-Length"]122        request_body = self.rfile.read(int(length)).decode("utf-8")123        parsed_body = parse_qs(request_body)124        #given attempted email and pass from client125        email = parsed_body["email"][0]126        password = parsed_body["password"][0]127        #check if that email exists in db128        db = FoodsDB()129        user = db.findUserByEmail(email)130        if user:#email matches131           if bcrypt.verify(password, user["password"]):#password matches132                self.send_response(201)133                self.end_headers()134                self.sessionData["userId"] = user["id"]135           else:#password doesnt match136                self.send_response(401)137                self.end_headers()138        else:#email doesnt match139            self.send_response(401)140            self.end_headers()141        return142    #if path not found143    def handleNotFound(self):144        self.send_response(404)145        self.end_headers()146        self.wfile.write(bytes('Path not found.', "utf-8"))147    def handle401(self):148        self.send_response(401)149        self.send_header("Content-type", "text/plain")150        self.end_headers()151        self.wfile.write(bytes('Resource not found.', "utf-8"))152    def handleDeleteItem(self, id):153        if "userId" not in self.sessionData:154            self.handle401()155            return156        db = FoodsDB()157        food = db.getOneFood(id)158        print(id)159        160        if food:161            food = db.deleteFood(id)162            self.send_response(200)163            self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")164            self.send_header("Access-Control-Allow-Headers","Content-Type")165            self.end_headers()166        else:167            self.handleNotFound()168        return169    def handleCreateDay(self):170        length = self.headers["Content-Length"]171        request_body = self.rfile.read(int(length)).decode("utf-8")172        parsed_body = parse_qs(request_body)173        print(parsed_body)174        calories = parsed_body["calories"][0]175        protein = parsed_body["protein"][0]176        carbs = parsed_body["carbs"][0]177        fats = parsed_body["fats"][0]178        db = FoodsDB()179        db.createDay(calories, protein, carbs, fats)180        self.send_response(201)181        self.end_headers()182        return183    def handleUpdateDay(self):184        return185    def handleUpdateItem(self, id):186        if "userId" not in self.sessionData:187            self.handle401()188            return189        length = self.headers["Content-Length"]190        request_body = self.rfile.read(int(length)).decode("utf-8")191        decoded_body = parse_qs(request_body)192        name = decoded_body["name"][0]193        servingSize = decoded_body["servingSize"][0]194        calories = decoded_body["calories"][0]195        protein = decoded_body["protein"][0]196        fat = decoded_body["fat"][0]197        carbs = decoded_body["carbs"][0]198        print("raw request body:", decoded_body)199        db = FoodsDB()200        food = db.getOneFood(id)201        202        if food:203            food = db.updateFood(name, servingSize, calories, protein, fat, carbs, id)204            self.send_response(200)205            self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")206            self.send_header("Access-Control-Allow-Headers","Content-Type")207            self.end_headers()208        else:209            self.handleNotFound()210        return211    def do_OPTIONS(self):212        self.load_session_data()213        self.send_response(200)214        self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")215        self.send_header("Access-Control-Allow-Headers","Content-Type")216        self.end_headers()217    #get request218    def do_GET(self):219        self.load_session_data()220        print("request path: ", self.path)221        path_parts = self.path.split("/")222        print(path_parts)223        if len(path_parts) > 2:224            collection_name = path_parts[1]225            member_id = path_parts[2]226        else:227            collection_name = path_parts[1]228            member_id = None229        if collection_name == "foods":230            if member_id == None:231                self.handleListItems()232            else:233                self.handleGetOneFood(member_id)234        else:235            self.handleNotFound()236    def do_DELETE(self):237        self.load_session_data()238        print("request path: ", self.path)239        path_parts = self.path.split("/")240        print(path_parts)241        if len(path_parts) > 2:242            collection_name = path_parts[1]243            member_id = path_parts[2]244        else:245            collection_name = path_parts[1]246            member_id = None247        if collection_name == "foods":248            if member_id:249                self.handleDeleteItem(member_id)250            else:251                self.handleNotFound()252        else:253            self.handleNotFound()254    def do_PUT(self):255        self.load_session_data()256        #veryify that the path is correct257        #parse the body of request258        #send body of request to db to update item259        print("request path: ", self.path)260        path_parts = self.path.split("/")261        print(path_parts)262        if len(path_parts) > 2:263            collection_name = path_parts[1]264            member_id = path_parts[2]265        else:266            collection_name = path_parts[1]267            member_id = None268        if collection_name == "foods":269            if member_id:270                self.handleUpdateItem(member_id)271            else:272                self.handleNotFound()273        elif collection_name == "days":274            if member_id:275                self.handleUpdateDay(member_id)276            else:277                self.handleNotFound()278        else:279            self.handleNotFound()280    def do_POST(self):281        self.load_session_data()282        if self.path == '/foods':283            self.handleCreateItem()284        elif self.path == "/users":285            self.handleCreateUser()286        elif self.path == "/sessions":287            self.handleCreateAuthSession()288        elif self.path == '/days':289            self.handleCreateDay()290        else:291            self.handleNotFound()292def main():293    db = FoodsDB()294    db.createFoodsTable()295    db.createUsersTable()296    db = None #disconnect db297    port = 8080298    if len(sys.argv) > 1:299        port = int(sys.argv[1])300    #tuple has the server address and the port number301    server = HTTPServer(("0.0.0.0", port), HttpHandler)302    print('Server running')303    server.serve_forever()304if __name__ == "__main__":...ReportResultConsumer.py
Source:ReportResultConsumer.py  
1import json2import logging.config3import os4from pika.exchange_type import ExchangeType5from src.config.RabbitMQConfig import RabbitMQConfig6from src.processing.normalization.dataset_normalizer_factory import DatasetNormalizerFactory7from src.processing.report.ReportPredictionResultsExcelMaker import ReportPredictionResultsExcelMaker8from src.rabbitmq.consumer.Consumer import Consumer9from src.samba.SambaWorker import SambaWorker10logging.config.fileConfig('../resources/logging.conf')11LOGGER = logging.getLogger('exampleApp')12class ReportResultConsumer(Consumer):13    def __init__(self, rabbit_mq_config: RabbitMQConfig, queue: str, routing_key: str,14                 samba_worker: SambaWorker,15                 exchange: str = "", exchange_type: ExchangeType = ExchangeType.direct):16        self._samba_worker = samba_worker17        self._dataset_normalizer_factory = DatasetNormalizerFactory()18        self._reportMaker = ReportPredictionResultsExcelMaker()19        super().__init__(rabbit_mq_config, queue, routing_key, exchange, exchange_type)20    def on_message(self, _unused_channel, basic_deliver, properties, body):21        """Invoked by pika when a message is delivered from RabbitMQ. The22        channel is passed for your convenience. The basic_deliver object that23        is passed in carries the exchange, routing key, delivery tag and24        a redelivered flag for the message. The properties passed in is an25        instance of BasicProperties with the message properties and the body26        is the message that was sent.27        :param pika.channel.Channel _unused_channel: The channel object28        :param pika.Spec.Basic.Deliver: basic_deliver method29        :param pika.Spec.BasicProperties: properties30        :param bytes body: The message body31        """32        LOGGER.info('Received message # %s from %s: %s',33                    basic_deliver.delivery_tag, properties.app_id, body)34        decoded_body: dict = json.loads(body)35        experiment_result_id = decoded_body.get("experimentResultId")36        model = decoded_body.get("model")37        train_errors = decoded_body.get("trainErrors")38        test_errors = decoded_body.get("testErrors")39        prediction_error = decoded_body.get("predictionError")40        prediction_result_file_path = decoded_body.get("predictionResultFile")41        window_train_statistic = decoded_body.get("windowTrainStatistic")42        experiment_id = decoded_body.get("experimentId")43        project_id = decoded_body.get("projectId")44        project_folder = decoded_body.get("projectFolder")45        source_file_path: str = decoded_body.get("verifiedFile")46        columns = decoded_body.get("columns")47        legend = decoded_body.get("legend")48        neat_settings = decoded_body.get("neatSettings")49        normalization_method = decoded_body.get("normalizationMethod")50        enable_log_transform = decoded_body.get("enableLogTransform")51        prediction_period = decoded_body.get("predictionPeriod")52        prediction_window_size = decoded_body.get("predictionWindowSize")53        train_end_index = decoded_body.get("trainEndIndex")54        test_end_index = decoded_body.get("testEndIndex")55        normalization_statistic = decoded_body.get("normalizationStatistic")56        report_response = {57            'experimentId': experiment_id,58            'experimentResultId': experiment_result_id,59            'projectId': project_id,60            'status': 'REPORT_SERVICE_ERROR',61            'predictionReportFile': None,62            'predictionResult': None,63        }64        try:65            temp = f'source-{project_id}-{experiment_id}.csv'66            source_file = self._samba_worker.download(source_file_path, temp)67            temp = f'results-{project_id}-{experiment_id}.csv'68            result_file = self._samba_worker.download(prediction_result_file_path, temp)69            report_filename, prediction_result = self._reportMaker.makeExcelReport(source_file.name,70                                        result_file.name,71                                        train_errors,72                                        test_errors,73                                        prediction_error,74                                        window_train_statistic,75                                        columns,76                                        legend,77                                        neat_settings,78                                        normalization_method,79                                        enable_log_transform,80                                        prediction_period,81                                        prediction_window_size,82                                        train_end_index,83                                        test_end_index,84                                        normalization_statistic85                                        )86            path_to_save = f'{project_folder}/report-{experiment_id}.xlsx'87            self._samba_worker.upload(path_to_save=path_to_save, file=report_filename)88            report_response = {89                'experimentId': experiment_id,90                'experimentResultId': experiment_result_id,91                'projectId': project_id,92                'status': 'DONE',93                'predictionReportFile': path_to_save,94                'predictionResult': prediction_result,95            }96        except BaseException as ex:97            LOGGER.error('reportConsumer: on_message - {0}'.format(body))98            LOGGER.exception(ex)99        finally:100            result_file.close()101            source_file.close()102            os.remove(result_file.name)103            os.remove(source_file.name)104            os.remove(report_filename)105        encoded_body = json.dumps(report_response)106        queue_config = self._rabbit_mq_config.OUTPUT_REPORT_RESULT_CONFIG107        self._rabbit_mq_writer.writeMessage(exchange=queue_config.get("exchange"), routing_key=queue_config.get("routingKey"), message=encoded_body)...MailHandlers.py
Source:MailHandlers.py  
1from google.appengine.ext.webapp.mail_handlers import InboundMailHandler2from source.api.ConvMessages import create_message3from source.models.Users import Users4from source.config.authentication import FIREBASE_SECRET5import logging6import re7# Incoming email assumptions:8#   - subject should be the conversation name ... but this is not necessary9#   - TO address should be conversation_<conv_id>@hailing-frequencies-2017.appspotmail.com10class GeneralMailHandler(InboundMailHandler):11    """Web handler to incoming emails"""12    def receive(self, email):13        logging.info("Received a message from: " + email.sender)14        logging.info("subject: " + email.subject)15        logging.info("to: " + email.to)16        if hasattr(email, 'cc'):17            logging.info("cc: " + email.cc)18        logging.info("date: " + email.date)19        html_bodies = email.bodies('text/html')20        logging.info("I see {} html bodies")21        for content_type, body in html_bodies:22            logging.info('content_type: ' + content_type)23            decoded_body = body.decode()24            logging.info('decoded_body: ' + decoded_body)25        plaintext_bodies = email.bodies('text/plain')26        logging.info("I see {} plaintext bodies")27        for content_type, body in plaintext_bodies:28            logging.info('content_type: ' + content_type)29            decoded_body = body.decode()30            logging.info('decoded_body: ' + decoded_body)31class ConversationMailHandler(InboundMailHandler):32    """Web handler to incoming emails"""33    def __init__(self, arg1, arg2):34        super(InboundMailHandler, self).__init__(arg1, arg2)35        self.convid_regex = re.compile('.*_(\d+)@.*')36        self.email_regex = re.compile('.*<(.*)>')37    def receive(self, email):38        logging.info("CONV: Received a message from: " + email.sender)39        logging.info("CONV: subject: " + email.subject)40        logging.info("CONV: to: " + email.to)41        # parse sender email42        match = self.email_regex.match(email.sender)43        sender = match.groups()[0]44        # parse conversation ID45        match = self.convid_regex.match(email.to)46        if not match:47            ConversationMailHandler.send_wtf_email(sender)48            self.response.set_status(404)49            return50        convID = match.groups()[0]51        logging.debug('found convid: {}'.format(convID))52        # make sure email is authorized53        user = Users.get_a_user(sender)54        if not user:55            logging.info('could not find user for email {}'.format(sender))56            # send error email57            self.response.set_status(404)58            return59        logging.debug('found user: {}'.format(user.get_id()))60        # put message in conversation61        message = []62        plaintext_bodies = email.bodies('text/plain')63        for content_type, body in plaintext_bodies:64            logging.info('content_type: ' + content_type)65            decoded_body = str(body.decode())66            logging.info('decoded_body: ' + decoded_body)67            message.append(decoded_body)68        # create message, which includes message broadcast to conversation69        response = create_message(user, convID, " ".join(message), "", FIREBASE_SECRET)70        logging.debug("Create message response: {}".format(response))71    @classmethod72    def send_wtf_email(cls, to_address):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
