How to use parse_request_data method in localstack

Best Python code snippet using localstack_python

app.py

Source:app.py Github

copy

Full Screen

...29imagestore = get_image_store(settings.IMAGE_STORE_TYPE);30@app.route('/', methods = ['GET'])31def index():32 res = make_response(render_template('index.html'));33 request_data = parse_request_data();34 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);35 return res;36@app.route('/game', methods = ['GET'])37def game():38 res = make_response(render_template('game.html'));39 request_data = parse_request_data();40 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);41 return res;42@app.route('/image/<image_id>', methods = ['GET'])43def get_image(image_id):44 if db.image_exists(image_id):45 data = imagestore.get_image(image_id)46 res = make_response(data);47 if data is None:48 res.status_code = 500;49 res.headers.set("content-type", "image/png");50 else:51 res = make_response(jsonify({"error": "Image not found"}));52 res.status_code = 404;53 return res;54# Get the panels assigned to this user55@app.route('/api/game/<game_id>/assignments', methods = ["GET"])56def get_assignments(game_id):57 request_data = parse_request_data();58 assignments = db.get_assignments_for(game_id, request_data["user_id"]);59 if assignments is None:60 res = make_response(jsonify({"error": "No assignments found"}));61 res.status_code = 404;62 return res;63 return make_response(jsonify({"assignments": assignments}));64def make_comic(comic):65 combined = Image.open('template.png');66 for i in range(0, 3):67 data = imagestore.get_image(comic[i]);68 if data is None:69 continue;70 im = Image.open(io.BytesIO(data));71 combined.paste(im, ((i*1150) + 60,60));72 buffer = cStringIO.StringIO()73 combined.save(buffer, format="PNG")74 return base64.b64encode(buffer.getvalue());75# Get the panels assigned to this user76@app.route('/api/game/<game_id>/comics', methods = ["POST"])77def post_comic(game_id):78 if request.json is None or "comic" not in request.json:79 res = make_response(jsonify({"error": "Invalid parameters: Parameter must be a valid JSON object"}));80 res.status_code = 400;81 return res;82 request_data = parse_request_data();83 # Find all three images for this comic and combine them server-side84 comic = request.json["comic"];85 data = make_comic(comic);86 iid = db.add_comic_to_game(game_id, request_data["user_id"], comic);87 imagestore.store_image(iid, data);88 return jsonify({"success": True, "image_id": iid});89@app.route('/api/game', methods = ["POST"])90def create_game():91 # TODO:92 # If this user is already hosting a game, cancel it and notify players93 request_data = parse_request_data();94 # Add a new game record setting this user as the host95 res = make_response(jsonify({"game_id": db.create_game(request_data["user_id"])}));96 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);97 return res98# Get game info, initial large data dump when loading a page. This is updated incrementally using web scokets99@app.route("/api/game/<game_id>", methods = ["GET"])100def get_game_info(game_id):101 game_id = game_id.lower();102 request_data = parse_request_data();103 record = db.query_game_for_user(game_id, request_data["user_id"]);104 if record != None:105 res = make_response(jsonify(record));106 else:107 res = make_response(jsonify({"error":"Game not found"}));108 res.status_code = 404;109 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);110 return res111@app.route("/api/vote/<vote_id>", methods = ["GET"])112def get_vote_info(vote_id):113 record = db.get_vote_info(vote_id);114 if record != None:115 res = make_response(jsonify(record));116 else:117 res = make_response(jsonify({"error":"Game not found"}));118 res.status_code = 404;119 return res120@app.route("/api/vote/<vote_id>", methods = ["POST"])121def post_vote_info(vote_id):122 if request.json is None or "vote" not in request.json:123 res = make_response(jsonify({"error": "Invalid parameters: Parameter must be a valid JSON object"}));124 res.status_code = 400;125 return res;126 request_data = parse_request_data();127 result = db.add_vote(vote_id, request_data["user_id"], request.json["vote"]);128 if result:129 # make a new vote130 vote_info = db.get_vote_info(vote_id);131 comics = db.get_comics_in_game(vote_info["game_id"]);132 if vote_info["index"] + 1 < len(comics):133 t = Timer(GRACE_PERIOD, create_new_vote, [vote_info["game_id"]]);134 t.start();135 else:136 t = Timer(GRACE_PERIOD, go_to_scoreboard, [vote_info["game_id"]]);137 t.start();138 emit('vote_complete', vote_info, room=vote_info["game_id"], namespace='/');139 return jsonify({"success": True});140def go_to_scoreboard(game_id):141 with app.app_context():142 db.set_game_state(game_id, GameState.SCOREBOARD, 0);143 emit('update', {'state': GameState.SCOREBOARD}, room=game_id, namespace='/');144@app.route('/api/game/<game_id>/start', methods = ["POST"])145def start_game(game_id):146 # TODO:147 # If the user is not hoting a game148 # If the game is in the STARTING state move onto DRAWNING and broadcast notify149 request_data = parse_request_data();150 record = db.query_game_for_user(game_id, request_data["user_id"]);151 if record is None or not record["is_host"]:152 res = make_response(jsonify({"success": False}));153 res.status_code = 400;154 else:155 end_time = int(time.time() + DRAW_TIME);156 if record["state"] == GameState.STARTING:157 record["state"] = GameState.DRAWING;158 db.set_game_state(game_id, record["state"], end_time);159 t = Timer(DRAW_TIME, draw_time_compete, [game_id]);160 t.start();161 res = make_response(jsonify({"success": True, "game_id": game_id, "state": record["state"], "end_time": end_time}));162 emit('update', {'state': record["state"], 'round_end': end_time}, room=game_id, namespace='/');163 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);164 return res165def draw_time_compete(game_id):166 with app.app_context():167 db.set_game_state(game_id, GameState.DISTRIBUTING, 0);168 emit('update', {'state': GameState.DISTRIBUTING}, room=game_id, namespace='/');169 t = Timer(GRACE_PERIOD, start_create_state, [game_id]);170 t.start();171def start_create_state(game_id):172 with app.app_context():173 # Distribute images to the players174 record = db.query_game_for_user(game_id, "");175 panels = db.get_panels_in_game(game_id);176 players = record["players"];177 num_players = len(players);178 player_idx = 0;179 assignments = {};180 for i in range(0, num_players):181 assignments[players[i]["id"]] = [];182 for i in range(0, len(panels)):183 # Don't assign a panel to the player which created it184 if(panels[i]["created_by"] == players[player_idx]["id"]):185 player_idx = player_idx + 1;186 if player_idx >= num_players:187 player_idx = 0;188 assignments[players[player_idx]["id"]].append(panels[i]["id"]);189 amount_per_player = 6;190 amount_needed = (num_players * amount_per_player) - len(panels);191 if amount_needed > 0:192 # we are aiming for 6 panels per player, if there is not enough go grab some pre-made ones193 backup_panels = db.get_panels_by_user(GAME_USER, amount_needed);194 if len(backup_panels) > 0:195 player_idx = 0;196 for i in range(0, len(amount_needed)):197 while len(assignments[players[player_idx]["id"]]) >= amount_per_player:198 player_idx = player_idx + 1;199 j = i % len(backup_panels);200 assignments[players[player_idx]["id"]].append(backup_panels[j]);201 db.store_assignments(game_id, assignments);202 end_time = int(time.time() + CREATE_TIME);203 db.set_game_state(game_id, GameState.CREATING, end_time);204 emit('update', {'state': GameState.CREATING, "round_end": end_time}, room=game_id, namespace='/');205 t = Timer(CREATE_TIME, create_time_compete, [game_id]);206 t.start();207def create_time_compete(game_id):208 with app.app_context():209 db.set_game_state(game_id, GameState.GATHERING, 0);210 emit('update', {'state': GameState.GATHERING}, room=game_id, namespace='/');211 t = Timer(GRACE_PERIOD, gather_time_compete, [game_id]);212 t.start();213def auto_create_comics(game_id):214 # TODO: Find all unused assigned panels, and randomly build comics if requried215 record = db.query_game_for_user(game_id, "");216 comics = db.get_comics_in_game(game_id);217 players = record["players"];218 for comic in comics:219 if comic["by"] in players:220 players.remove(comic["by"]);221 for player in players:222 assignments = db.get_assignments_for(game_id, player["id"]);223 comic = assignments[:3];224 data = make_comic(comic);225 iid = db.add_comic_to_game(game_id, player["id"], comic);226 imagestore.store_image(iid, data);227def create_new_vote(game_id):228 with app.app_context():229 # Votes on the next comic available230 record = db.query_game_for_user(game_id, "");231 comics = db.get_comics_in_game(game_id);232 if record["vote"] == "":233 comicA = comics[0]["id"];234 comicB = comics[1]["id"];235 index = 1236 else:237 vote_info = db.get_vote_info(record["vote"]);238 index = vote_info["index"] + 1;239 if vote_info["forA"] > vote_info["forB"]:240 comicA = vote_info["comicA"];241 comicB = comics[index]["id"];242 else:243 comicA = comics[index]["id"];244 comicB = vote_info["comicB"]245 vid = db.create_vote(game_id, comicA, comicB, index, len(record["players"]));246 db.set_cur_vote(game_id, vid);247 emit('new_vote', {'vote_id': vid}, room=game_id, namespace='/');248def gather_time_compete(game_id):249 with app.app_context():250 auto_create_comics(game_id);251 db.set_game_state(game_id, GameState.RATING, 0);252 emit('update', {'state': GameState.RATING}, room=game_id, namespace='/');253 create_new_vote(game_id);254@app.route('/api/game/<game_id>/join', methods = ["POST"])255def join_game(game_id):256 game_id = game_id.lower();257 request_data = parse_request_data();258 if "name" not in request.form:259 res = make_response(jsonify({"error": "Player name is required"}));260 res.status_code = 400;261 return res;262 if db.add_user_to_game(game_id, request_data["user_id"], request.form["name"]):263 res = make_response(jsonify({"success":True, "game_id": game_id}));264 emit('new_player', {'id': request_data["user_id"], 'name': request.form["name"], 'score': 0}, room=game_id, namespace='/');265 else:266 res = make_response(jsonify({"error":"Game not found"}));267 res.status_code = 404;268 res.set_cookie("cg_data", jwt.encode(request_data, settings.JWT_KEY, algorithm='HS256'), 60 * 60 * 24 * 365);269 return res270@app.route('/api/game/<game_id>/panels', methods = ["POST"])271def add_panel(game_id):272 game_id = game_id.lower();273 request_data = parse_request_data();274 image_id = db.add_panel_to_game(game_id, request_data["user_id"]);275 if image_id is not None:276 parts = request.data.split(',', 1);277 b64String = parts[len(parts)-1];278 imagestore.store_image(image_id, b64String);279 res = make_response(jsonify({"success":True, "game_id": game_id, "image_id": image_id}));280 else:281 res = make_response(jsonify({"error":"Game not found"}));282 res.status_code = 404;283 return res284def parse_request_data():285 data = {};286 if "cg_data" in request.cookies:287 try:288 data = jwt.decode(request.cookies["cg_data"], settings.JWT_KEY, algorithms='HS256');289 except:290 data = {};291 if not "user_id" in data:292 # user ID293 data["user_id"] = db.create_user(request.remote_addr);294 return data;295socketio = SocketIO(app)296@socketio.on('connect', namespace='/')297def test_connect():298 print('Client connected')...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

...26 401: OpenApiResponse(description='E-mail or password incorrect'),27 429: OpenApiResponse(description='Too many requests'),28 })29@api_view(['post'])30@parse_request_data(LoginRequestSerializer)31def login(request, data):32 user = User.objects.filter(all_emails__email=data['email']).first()33 if not user:34 raise AuthenticationFailed()35 LoginCode.check_throttled(user)36 if not user.check_password(data['password']):37 raise AuthenticationFailed()38 return login_and_return_token(request, user)39@extend_schema(request=SendVerificationLinkRequestSerializer,40 responses={41 HTTP_204_NO_CONTENT: None,42 404: OpenApiResponse(description='User with email not found'),43 429: OpenApiResponse(description='Too many requests'),44 })45@api_view(['post'])46@parse_request_data(SendVerificationLinkRequestSerializer)47def send_verification_link(request, data):48 user = User.objects.filter(all_emails__email=data['email']).first()49 if not user: raise NotFound()50 login_code = LoginCode.make(user)51 email_text(user.email, 'Link pro (pře)nastavení hesla, platný jednu hodinu',52 f'{settings.FULL_HOSTNAME}/reset_password'53 f'?email={user.email}'54 f'&code={login_code.code}'55 f'&password_exists={user.has_usable_password()}')56 return Response(status=HTTP_204_NO_CONTENT)57@extend_schema(request=ResetPasswordRequestSerializer,58 responses={59 200: TokenResponse,60 404: OpenApiResponse(description='User with email not found'),61 429: OpenApiResponse(description='Too many requests'),62 })63@api_view(['post'])64@parse_request_data(ResetPasswordRequestSerializer)65def reset_password(request, data):66 user = User.objects.filter(all_emails__email=data['email']).first()67 if not user: raise NotFound()68 LoginCode.is_valid(user, data['code'])69 user.set_password(data['password'])70 user.save()...

Full Screen

Full Screen

parameters.py

Source:parameters.py Github

copy

Full Screen

...6 """ Validate value or throw ValidationError """7 ...8 def get_json_view(self) -> JSONParameterView:9 ...10 def parse_request_data(self, formdata: MultiDict[str]):11 """ Parses data coming from the http request """12 ...13class NumberParameter(Parameter):14 min: float15 max: float16 default: float17 required: bool18 def __init__(self, name, min, max, required, default):19 ...20 def validate(self, value: Any):21 ...22 def parse_request_data(self, formdata):23 return int(formdata.get(self.name))24class StringParameter(Parameter):25 minlength: int26 maxlength: int27 default: str28 required: bool29 def __init__(self, name, minlength, maxlength, required, default):30 ...31 def validate(self, value: Any):32 ...33 def parse_request_data(self, formdata):34 return formdata.get(self.name)35class ChoiceParameter(Parameter):36 choices: list[str]37 default: str38 required: bool39 def __init__(self, name, choices, required, default):40 ...41 def validate(self, value: Any):42 ...43 def parse_request_data(self, formdata):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful