How to use decoded_body method in autotest

Best Python code snippet using autotest_python

server.py

Source:server.py Github

copy

Full Screen

1from ast import parse2from asyncio.proactor_events import _ProactorBaseWritePipeTransport3from http.server import BaseHTTPRequestHandler, HTTPServer4from urllib import request5from urllib.parse import parse_qs6import json7from foods_db import FoodsDB8from passlib.hash import bcrypt9from http import cookies10from session_store import SessionStore11import sys12SESSION_STORE = SessionStore()13class HttpHandler(BaseHTTPRequestHandler):14 #runs once per response15 def end_headers(self):16 self.send_cookie()17 self.send_header("Access-Control-Allow-Credentials", "true")18 self.send_header("Access-Control-Allow-Origin", self.headers["Origin"])19 super().end_headers()20 21 def load_cookie(self):22 if "Cookie" in self.headers:23 self.cookie = cookies.SimpleCookie(self.headers["Cookie"])24 else:25 self.cookie = cookies.SimpleCookie()26 def send_cookie(self):27 for morsel in self.cookie.values():28 #uncomment when using in chrome29 morsel["samesite"] = "None"30 morsel["secure"] = True31 self.send_header("Set-Cookie", morsel.OutputString())32 def load_session_data(self):33 self.load_cookie()34 if "sessionId" in self.cookie:35 sessionId = self.cookie["sessionId"].value36 self.sessionData = SESSION_STORE.loadSessionData(sessionId)37 if self.sessionData == None:38 sessionId = SESSION_STORE.createSession()39 self.sessionData = SESSION_STORE.loadSessionData(sessionId)40 self.cookie["sessionId"] = sessionId41 else:42 sessionId = SESSION_STORE.createSession()43 self.sessionData = SESSION_STORE.loadSessionData(sessionId)44 self.cookie["sessionId"] = sessionId45 print("MY SESSION DATA: ", self.sessionData)46 #if path = /choices47 def handleListItems(self):48 if "userId" not in self.sessionData:49 self.handle401()50 return51 db = FoodsDB()52 allRecords = db.getAllFoods()53 #send status code, 200 means all good54 self.send_response(200)55 self.send_header("Content-type", "application/json")56 self.end_headers()57 self.wfile.write(bytes(json.dumps(allRecords), "utf-8"))58 def handleGetOneFood(self, id):59 if "userId" not in self.sessionData:60 self.handle401()61 return62 db = FoodsDB()63 food = db.getOneFood(id)64 print(id)65 66 if food:67 self.send_response(200)68 self.send_header("Content-type", "application/json")69 self.end_headers()70 self.wfile.write(bytes(json.dumps(food), "utf-8"))71 else:72 self.handleNotFound()73 def handleCreateItem(self):74 if "userId" not in self.sessionData:75 self.handle401()76 return77 #add choice to choice list78 #read length from request header79 length = self.headers["Content-Length"]80 #tell the read how many bytes it needs to read81 #all requests are strings so need to make int82 request_body = self.rfile.read(int(length)).decode("utf-8")83 #turn request body to dictionary84 decoded_body = parse_qs(request_body)85 #decoded body is a dictionary so need to grab value86 print("raw request body:", decoded_body)87 #decoded body values come in lists so need to choose index88 name = decoded_body["name"][0]89 servingSize = decoded_body["servingSize"][0]90 calories = decoded_body["calories"][0]91 protein = decoded_body["protein"][0]92 fat = decoded_body["fat"][0]93 carbs = decoded_body["carbs"][0]94 db = FoodsDB() 95 db.createFood(name, servingSize, calories, protein, fat, carbs)96 self.send_response(201)97 self.end_headers()98 def handleCreateUser(self):99 length = self.headers["Content-Length"]100 request_body = self.rfile.read(int(length)).decode("utf-8")101 parsed_body = parse_qs(request_body)102 print(parsed_body)103 first_name = parsed_body["first_name"][0]104 last_name = parsed_body["last_name"][0]105 email = parsed_body["email"][0]106 password = parsed_body["password"][0]107 db = FoodsDB()108 if not db.findUserByEmail(email):109 encrypted_password = bcrypt.hash(password)110 db.createUser(first_name, last_name, email, encrypted_password)111 self.send_response(201)112 self.end_headers()113 else:114 self.send_response(422)115 self.end_headers()116 #user enters first n, last n, email, pass117 #use 422 code when making sure email is unique118 #encrypted_pass = bcrypt.hash(password)119 return120 def handleCreateAuthSession(self): #let them login121 length = self.headers["Content-Length"]122 request_body = self.rfile.read(int(length)).decode("utf-8")123 parsed_body = parse_qs(request_body)124 #given attempted email and pass from client125 email = parsed_body["email"][0]126 password = parsed_body["password"][0]127 #check if that email exists in db128 db = FoodsDB()129 user = db.findUserByEmail(email)130 if user:#email matches131 if bcrypt.verify(password, user["password"]):#password matches132 self.send_response(201)133 self.end_headers()134 self.sessionData["userId"] = user["id"]135 else:#password doesnt match136 self.send_response(401)137 self.end_headers()138 else:#email doesnt match139 self.send_response(401)140 self.end_headers()141 return142 #if path not found143 def handleNotFound(self):144 self.send_response(404)145 self.end_headers()146 self.wfile.write(bytes('Path not found.', "utf-8"))147 def handle401(self):148 self.send_response(401)149 self.send_header("Content-type", "text/plain")150 self.end_headers()151 self.wfile.write(bytes('Resource not found.', "utf-8"))152 def handleDeleteItem(self, id):153 if "userId" not in self.sessionData:154 self.handle401()155 return156 db = FoodsDB()157 food = db.getOneFood(id)158 print(id)159 160 if food:161 food = db.deleteFood(id)162 self.send_response(200)163 self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")164 self.send_header("Access-Control-Allow-Headers","Content-Type")165 self.end_headers()166 else:167 self.handleNotFound()168 return169 def handleCreateDay(self):170 length = self.headers["Content-Length"]171 request_body = self.rfile.read(int(length)).decode("utf-8")172 parsed_body = parse_qs(request_body)173 print(parsed_body)174 calories = parsed_body["calories"][0]175 protein = parsed_body["protein"][0]176 carbs = parsed_body["carbs"][0]177 fats = parsed_body["fats"][0]178 db = FoodsDB()179 db.createDay(calories, protein, carbs, fats)180 self.send_response(201)181 self.end_headers()182 return183 def handleUpdateDay(self):184 return185 def handleUpdateItem(self, id):186 if "userId" not in self.sessionData:187 self.handle401()188 return189 length = self.headers["Content-Length"]190 request_body = self.rfile.read(int(length)).decode("utf-8")191 decoded_body = parse_qs(request_body)192 name = decoded_body["name"][0]193 servingSize = decoded_body["servingSize"][0]194 calories = decoded_body["calories"][0]195 protein = decoded_body["protein"][0]196 fat = decoded_body["fat"][0]197 carbs = decoded_body["carbs"][0]198 print("raw request body:", decoded_body)199 db = FoodsDB()200 food = db.getOneFood(id)201 202 if food:203 food = db.updateFood(name, servingSize, calories, protein, fat, carbs, id)204 self.send_response(200)205 self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")206 self.send_header("Access-Control-Allow-Headers","Content-Type")207 self.end_headers()208 else:209 self.handleNotFound()210 return211 def do_OPTIONS(self):212 self.load_session_data()213 self.send_response(200)214 self.send_header("Access-Control-Allow-Methods","GET, POST, PUT, DELETE")215 self.send_header("Access-Control-Allow-Headers","Content-Type")216 self.end_headers()217 #get request218 def do_GET(self):219 self.load_session_data()220 print("request path: ", self.path)221 path_parts = self.path.split("/")222 print(path_parts)223 if len(path_parts) > 2:224 collection_name = path_parts[1]225 member_id = path_parts[2]226 else:227 collection_name = path_parts[1]228 member_id = None229 if collection_name == "foods":230 if member_id == None:231 self.handleListItems()232 else:233 self.handleGetOneFood(member_id)234 else:235 self.handleNotFound()236 def do_DELETE(self):237 self.load_session_data()238 print("request path: ", self.path)239 path_parts = self.path.split("/")240 print(path_parts)241 if len(path_parts) > 2:242 collection_name = path_parts[1]243 member_id = path_parts[2]244 else:245 collection_name = path_parts[1]246 member_id = None247 if collection_name == "foods":248 if member_id:249 self.handleDeleteItem(member_id)250 else:251 self.handleNotFound()252 else:253 self.handleNotFound()254 def do_PUT(self):255 self.load_session_data()256 #veryify that the path is correct257 #parse the body of request258 #send body of request to db to update item259 print("request path: ", self.path)260 path_parts = self.path.split("/")261 print(path_parts)262 if len(path_parts) > 2:263 collection_name = path_parts[1]264 member_id = path_parts[2]265 else:266 collection_name = path_parts[1]267 member_id = None268 if collection_name == "foods":269 if member_id:270 self.handleUpdateItem(member_id)271 else:272 self.handleNotFound()273 elif collection_name == "days":274 if member_id:275 self.handleUpdateDay(member_id)276 else:277 self.handleNotFound()278 else:279 self.handleNotFound()280 def do_POST(self):281 self.load_session_data()282 if self.path == '/foods':283 self.handleCreateItem()284 elif self.path == "/users":285 self.handleCreateUser()286 elif self.path == "/sessions":287 self.handleCreateAuthSession()288 elif self.path == '/days':289 self.handleCreateDay()290 else:291 self.handleNotFound()292def main():293 db = FoodsDB()294 db.createFoodsTable()295 db.createUsersTable()296 db = None #disconnect db297 port = 8080298 if len(sys.argv) > 1:299 port = int(sys.argv[1])300 #tuple has the server address and the port number301 server = HTTPServer(("0.0.0.0", port), HttpHandler)302 print('Server running')303 server.serve_forever()304if __name__ == "__main__":...

Full Screen

Full Screen

ReportResultConsumer.py

Source:ReportResultConsumer.py Github

copy

Full Screen

1import json2import logging.config3import os4from pika.exchange_type import ExchangeType5from src.config.RabbitMQConfig import RabbitMQConfig6from src.processing.normalization.dataset_normalizer_factory import DatasetNormalizerFactory7from src.processing.report.ReportPredictionResultsExcelMaker import ReportPredictionResultsExcelMaker8from src.rabbitmq.consumer.Consumer import Consumer9from src.samba.SambaWorker import SambaWorker10logging.config.fileConfig('../resources/logging.conf')11LOGGER = logging.getLogger('exampleApp')12class ReportResultConsumer(Consumer):13 def __init__(self, rabbit_mq_config: RabbitMQConfig, queue: str, routing_key: str,14 samba_worker: SambaWorker,15 exchange: str = "", exchange_type: ExchangeType = ExchangeType.direct):16 self._samba_worker = samba_worker17 self._dataset_normalizer_factory = DatasetNormalizerFactory()18 self._reportMaker = ReportPredictionResultsExcelMaker()19 super().__init__(rabbit_mq_config, queue, routing_key, exchange, exchange_type)20 def on_message(self, _unused_channel, basic_deliver, properties, body):21 """Invoked by pika when a message is delivered from RabbitMQ. The22 channel is passed for your convenience. The basic_deliver object that23 is passed in carries the exchange, routing key, delivery tag and24 a redelivered flag for the message. The properties passed in is an25 instance of BasicProperties with the message properties and the body26 is the message that was sent.27 :param pika.channel.Channel _unused_channel: The channel object28 :param pika.Spec.Basic.Deliver: basic_deliver method29 :param pika.Spec.BasicProperties: properties30 :param bytes body: The message body31 """32 LOGGER.info('Received message # %s from %s: %s',33 basic_deliver.delivery_tag, properties.app_id, body)34 decoded_body: dict = json.loads(body)35 experiment_result_id = decoded_body.get("experimentResultId")36 model = decoded_body.get("model")37 train_errors = decoded_body.get("trainErrors")38 test_errors = decoded_body.get("testErrors")39 prediction_error = decoded_body.get("predictionError")40 prediction_result_file_path = decoded_body.get("predictionResultFile")41 window_train_statistic = decoded_body.get("windowTrainStatistic")42 experiment_id = decoded_body.get("experimentId")43 project_id = decoded_body.get("projectId")44 project_folder = decoded_body.get("projectFolder")45 source_file_path: str = decoded_body.get("verifiedFile")46 columns = decoded_body.get("columns")47 legend = decoded_body.get("legend")48 neat_settings = decoded_body.get("neatSettings")49 normalization_method = decoded_body.get("normalizationMethod")50 enable_log_transform = decoded_body.get("enableLogTransform")51 prediction_period = decoded_body.get("predictionPeriod")52 prediction_window_size = decoded_body.get("predictionWindowSize")53 train_end_index = decoded_body.get("trainEndIndex")54 test_end_index = decoded_body.get("testEndIndex")55 normalization_statistic = decoded_body.get("normalizationStatistic")56 report_response = {57 'experimentId': experiment_id,58 'experimentResultId': experiment_result_id,59 'projectId': project_id,60 'status': 'REPORT_SERVICE_ERROR',61 'predictionReportFile': None,62 'predictionResult': None,63 }64 try:65 temp = f'source-{project_id}-{experiment_id}.csv'66 source_file = self._samba_worker.download(source_file_path, temp)67 temp = f'results-{project_id}-{experiment_id}.csv'68 result_file = self._samba_worker.download(prediction_result_file_path, temp)69 report_filename, prediction_result = self._reportMaker.makeExcelReport(source_file.name,70 result_file.name,71 train_errors,72 test_errors,73 prediction_error,74 window_train_statistic,75 columns,76 legend,77 neat_settings,78 normalization_method,79 enable_log_transform,80 prediction_period,81 prediction_window_size,82 train_end_index,83 test_end_index,84 normalization_statistic85 )86 path_to_save = f'{project_folder}/report-{experiment_id}.xlsx'87 self._samba_worker.upload(path_to_save=path_to_save, file=report_filename)88 report_response = {89 'experimentId': experiment_id,90 'experimentResultId': experiment_result_id,91 'projectId': project_id,92 'status': 'DONE',93 'predictionReportFile': path_to_save,94 'predictionResult': prediction_result,95 }96 except BaseException as ex:97 LOGGER.error('reportConsumer: on_message - {0}'.format(body))98 LOGGER.exception(ex)99 finally:100 result_file.close()101 source_file.close()102 os.remove(result_file.name)103 os.remove(source_file.name)104 os.remove(report_filename)105 encoded_body = json.dumps(report_response)106 queue_config = self._rabbit_mq_config.OUTPUT_REPORT_RESULT_CONFIG107 self._rabbit_mq_writer.writeMessage(exchange=queue_config.get("exchange"), routing_key=queue_config.get("routingKey"), message=encoded_body)...

Full Screen

Full Screen

MailHandlers.py

Source:MailHandlers.py Github

copy

Full Screen

1from google.appengine.ext.webapp.mail_handlers import InboundMailHandler2from source.api.ConvMessages import create_message3from source.models.Users import Users4from source.config.authentication import FIREBASE_SECRET5import logging6import re7# Incoming email assumptions:8# - subject should be the conversation name ... but this is not necessary9# - TO address should be conversation_<conv_id>@hailing-frequencies-2017.appspotmail.com10class GeneralMailHandler(InboundMailHandler):11 """Web handler to incoming emails"""12 def receive(self, email):13 logging.info("Received a message from: " + email.sender)14 logging.info("subject: " + email.subject)15 logging.info("to: " + email.to)16 if hasattr(email, 'cc'):17 logging.info("cc: " + email.cc)18 logging.info("date: " + email.date)19 html_bodies = email.bodies('text/html')20 logging.info("I see {} html bodies")21 for content_type, body in html_bodies:22 logging.info('content_type: ' + content_type)23 decoded_body = body.decode()24 logging.info('decoded_body: ' + decoded_body)25 plaintext_bodies = email.bodies('text/plain')26 logging.info("I see {} plaintext bodies")27 for content_type, body in plaintext_bodies:28 logging.info('content_type: ' + content_type)29 decoded_body = body.decode()30 logging.info('decoded_body: ' + decoded_body)31class ConversationMailHandler(InboundMailHandler):32 """Web handler to incoming emails"""33 def __init__(self, arg1, arg2):34 super(InboundMailHandler, self).__init__(arg1, arg2)35 self.convid_regex = re.compile('.*_(\d+)@.*')36 self.email_regex = re.compile('.*<(.*)>')37 def receive(self, email):38 logging.info("CONV: Received a message from: " + email.sender)39 logging.info("CONV: subject: " + email.subject)40 logging.info("CONV: to: " + email.to)41 # parse sender email42 match = self.email_regex.match(email.sender)43 sender = match.groups()[0]44 # parse conversation ID45 match = self.convid_regex.match(email.to)46 if not match:47 ConversationMailHandler.send_wtf_email(sender)48 self.response.set_status(404)49 return50 convID = match.groups()[0]51 logging.debug('found convid: {}'.format(convID))52 # make sure email is authorized53 user = Users.get_a_user(sender)54 if not user:55 logging.info('could not find user for email {}'.format(sender))56 # send error email57 self.response.set_status(404)58 return59 logging.debug('found user: {}'.format(user.get_id()))60 # put message in conversation61 message = []62 plaintext_bodies = email.bodies('text/plain')63 for content_type, body in plaintext_bodies:64 logging.info('content_type: ' + content_type)65 decoded_body = str(body.decode())66 logging.info('decoded_body: ' + decoded_body)67 message.append(decoded_body)68 # create message, which includes message broadcast to conversation69 response = create_message(user, convID, " ".join(message), "", FIREBASE_SECRET)70 logging.debug("Create message response: {}".format(response))71 @classmethod72 def send_wtf_email(cls, to_address):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful