How to use json_response method in localstack

Best Python code snippet using localstack_python

pyTelegramApi.py

Source:pyTelegramApi.py Github

copy

Full Screen

1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import urllib34import certifi5import importlib6import json7import sys8import requests9import _thread10import time11import uuid12import os13from pathlib import Path14class InlineKeyBoard:15 ver=216 Redirect=False #Редирект17 def UXBtn(data = {'txt':'нажми'}): #UXBtn18 txt=data['txt']19 try:20 url=data['url']21 vrm = '"text":"{0}", "url":"{1}"'.format(txt,url)22 except:23 try:24 func=data['func']25 vrm = '"text":"{0}", "callback_data": "{1}"'.format(txt,func)26 except:27 vrm = '"text":"{0}"'.format(txt)28 return "{"+vrm+"}"29 def UXItem(item=''): #UXItem30 vrm = '"inline_keyboard": [['+item+']]'31 return vrm32 def keyboard(desc='Описание', items='', cfg=False): #Возвращаем inline клавиаутуру33 if cfg == False:34 cfg=pyTelegramApi.getCfgThread()35 bot=pyTelegramApi.getBot(cfg.bot.name)36 room = str(cfg.room.id)37 headers={38 'Content-Type' : 'application/json',39 }40 data = '{"chat_id":'+room+', "text":"'+desc+'", "reply_markup": {'+items+'}}"'41 data = data.encode('utf-8')42 try:43 data = json.loads(requests.post(bot.token+'sendMessage', headers=headers, data=data).text)44 if data['ok'] == False:45 return False46 else:47 pyTelegramApi.sniffer(cfg, data)48 return data49 except:50 pass51 def UpdateRedirect(selected): #Обновить пути52 InlineKeyBoard.Redirect=selected53 def getRedirect(name):54 #Кнопки55 B=InlineKeyBoard.UXBtn({'txt': '⬅️','func':InlineKeyBoard.Redirect+'@'+name}) #Назад56 H=InlineKeyBoard.UXBtn({'txt':'🏠','func':'main@'+name}) #Домой57 K=InlineKeyBoard.UXBtn({'txt':'❌','func':'kill@'+name}) #Покончить58 #Ячейки59 return InlineKeyBoard.UXItem(B+','+H+','+K)60 def getCfgThread(name, json_response):61 try:62 user=json_response['result'][0]['message']['from']['id']63 except:64 user=json_response['result'][0]['callback_query']['message']['from']['id']65 for thread in os.scandir(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads'):66 config=os.path.splitext(os.path.basename(thread.name))[0]67 if os.path.isfile(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + config + '.py'):68 cfg=pyTelegramApi.getModuleCfgThread(name, config)69 print(cfg.user.id)70 if user == cfg.user.id:71 return cfg72 return False73class msg:74 def sendMessage(txt = 'Привет', cfg=False): #отправить сообщение без chatid75 if not cfg:76 cfg=pyTelegramApi.getCfgBot()77 bot=pyTelegramApi.getBot()78 pyTelegramApi.request(bot.token, 'sendMessage', {'chat_id' : cfg.room.id, 'text' : txt})79 def sendMessageById(txt = 'Привет', room = '', cfg=False): #отправить сообщение с chatid80 if room == '':81 if cfg == False:82 cfg=pyTelegramApi.getCfgThread()83 room = cfg.room.id84 bot=pyTelegramApi.getBot()85 pyTelegramApi.request(bot.token, 'sendMessage', {'chat_id' : room, 'text' : txt})86 def sendMessage_array(arr = ['Привет1', 'Привет2'], cfg=False):87 if not cfg:88 cfg=pyTelegramApi.getCfgBot()89 bot=pyTelegramApi.getBot()90 txt = ''91 for str in arr:92 txt += str + "\n"93 pyTelegramApi.request(bot.token, 'sendMessage', {'chat_id' : cfg.room.id, 'text' : txt})94class user:95 def getById(id):96 bot=pyTelegramApi.getBot()97 return pyTelegramApi.request(bot.token, 'getChat', {'chat_id' : id})98class env:99 pass100class pyTelegramApi:101 bots={}102 def InitThread(name):103 if os.path.isdir(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads') == False:104 os.mkdir(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads')105 pyTelegramApi.clearCache(name)106 def setToken(name): #Установка токена107 token='https://api.telegram.org/bot{0}/'.format(open(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'token', 'r').read().strip())108 if pyTelegramApi.checkToken(name, token) == False:109 print('[THREAD] [CONNECT] [@{0}] => FAILED...'.format(name))110 _thread.exit()111 else:112 bot=pyTelegramApi.getBot(name)113 bot.token=token114 print('[THREAD] [CONNECT] [@{0}] => OK...'.format(name))115 def getToken(name): #Получение токена116 bot=pyTelegramApi.getBot(name)117 return bot.token118 def getBot(name=False):119 if not name:120 name=pyTelegramApi.bots[_thread.get_ident()]121 return importlib.import_module('bots.' + name + '.bot')122 def setPrefix(name, prefix='$'):123 bot=pyTelegramApi.getBot(name)124 bot.prefix=prefix125 def checkToken(name, token): #Проверка токена126 if pyTelegramApi.request(token, 'getMe'):127 return True128 else:129 return False130 def request(token, method, fields = {}): #Запрос к серверу и возвращение131 try:132 https = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())133 r = https.request('GET', token + method, fields)134 data=json.loads(r.data.decode('utf-8'))135 if data['ok'] == False:136 return False137 else:138 return data139 except:140 return pyTelegramApi.request(token, method, fields)141 def addcommand(bot, name, module): #Добавить команду142 bot=importlib.import_module('bots.' + bot + '.bot')143 bot.listcommand.append(bot.prefix+name + '=' + module)144 print("[THREAD] [Успешно :)] [module] [Load] => {0}".format(bot.prefix+name + '=' + module))145 def sendPhotoId_ByUrl(self, PhotoUrl, chatid): #Отправка фото по url адресу146 pyTelegramApi.request(pyTelegramApi.getToken(self), 'sendPhoto', {'chat_id' : chatid, 'photo' : PhotoUrl})147 def getChatId(json_response): #Возвращаем chatid от сообщение148 try:149 return json_response['result'][0]['message']['chat']['id']150 except:151 return json_response['result'][0]['callback_query']['message']['chat']['id']152 def getText(self): #Возвращаем текст153 json_response = pyTelegramApi.request(pyTelegramApi.getToken(self), 'getUpdates', {'offset' : -1})154 if json_response['result'][0]['message']['text']:155 return json_response['result'][0]['message']['text']156 else:157 json_response['result'][0]['message']['chat']['text']158 def getlist(): #Получить список команд159 bot=pyTelegramApi.getBot()160 return bot.listcommand161 def DeleteMessageId(name, id): # del Msg162 bot=pyTelegramApi.getBot(name)163 cfg=getCfgThread(name)164 pyTelegramApi.request(bot.token, 'deleteMessage', {'chat_id' : chatid, 'message_id' : id})165 def sniffer(cfg, json_response):166 try:167 json_response['result'][0]['message']['message_id']168 #msg169 cfg.msg.id=json_response['result'][0]['message']['message_id']170 #txt171 try:172 json_response['result'][0]['message']['text']173 cfg.msg.txt=json_response['result'][0]['message']['text']174 except:175 cfg.msg.txt=json_response['result'][0]['message']['chat']['text']176 cfg.user.id=json_response['result'][0]['message']['from']['id']177 #room178 cfg.room.id=json_response['result'][0]['message']['chat']['id']179 cfg.room.type=json_response['result'][0]['message']['chat']['type']180 try:181 cfg.sticker.id=json_response['result'][0]['message']['sticker']['file_id']182 except:183 pass184 except:185 try:186 json_response['result'][0]['callback_query']['message']['message_id']187 #msg188 cfg.msg.id=json_response['result'][0]['callback_query']['message']['message_id']189 #txt190 try:191 json_response['result'][0]['callback_query']['message']['text']192 cfg.msg.txt=json_response['result'][0]['callback_query']['message']['text']193 except:194 cfg.msg.txt=json_response['result'][0]['callback_query']['message']['chat']['text']195 cfg.user.id=json_response['result'][0]['callback_query']['message']['from']['id']196 #room197 cfg.room.id=json_response['result'][0]['callback_query']['message']['chat']['id']198 cfg.room.type=json_response['result'][0]['callback_query']['message']['chat']['type']199 try:200 cfg.sticker.id=json_response['result'][0]['callback_query']['message']['sticker']['file_id']201 except:202 pass203 except:204 pass205 return cfg206 def newCfgThread(name):207 try:208 config='a' + uuid.uuid4().hex209 cfg=os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + config + '.py'210 Path(cfg).touch()211 cfg = open(cfg, 'r+')212 cfg.write(open(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'cfg.py', 'r').read().strip())213 cfg.close()214 return config215 except:216 pass217 return False218 def thread(name):219 try:220 thread=_thread.start_new_thread(pyTelegramApi.getUpdates, (name , ))221 pyTelegramApi.bots[thread]=name222 except:223 pass224 def getCfgBot(name=False):225 bot=pyTelegramApi.getBot(name)226 for user in bot.cfg:227 if bot.cfg[user].THREAD == _thread.get_ident():228 return bot.cfg[user]229 return False230 def isUsesUser(json_response):231 user=pyTelegramApi.getUserId(json_response)232 try:233 bot=pyTelegramApi.getBot()234 bot.cfg[user]235 return True236 except:237 return False238 def deleteCfg(name, config):239 try:240 os.remove(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + config + '.py')241 os.remove(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + '__pycache__' + os.sep + config + '.cpython-38.pyc')242 except:243 pass244 def clearCache(name):245 for thread in os.scandir(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads'):246 thread=thread.name247 if thread == '__pycache__':248 for py in os.scandir(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + thread):249 try:250 os.remove(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + '__pycache__' + os.sep + py.name)251 print("[DELETE] [__PYCACHE__] [@{0}] -> {1}".format(name, py.name))252 except:253 pass254 print("[CLEARCACHE] [__PYCACHE__] [@{0}] -> SUCCESS! :)".format(name))255 else:256 try:257 os.remove(os.getcwd() + os.sep + 'bots' + os.sep + name + os.sep + 'threads' + os.sep + thread)258 print("[DELETE] [THREAD] [@{0}] -> {1}".format(name, thread))259 except:260 pass261 print("[CLEARCACHE] [THREAD] [@{0}] -> SUCCESS! :)".format(name))262 def isModule(json_response):263 try:264 json_response['result'][0]['callback_query']['data']265 func=json_response['result'][0]['callback_query']['data']266 module=func.split('@')[0]267 func=func.split('@')[1]268 return module269 except:270 try:271 txt = json_response['result'][0]['message']['text']272 for str in pyTelegramApi.getlist():273 module = str.split('=')[1]274 command = str.split('=')[0]275 txt_gen = txt.split('@')276 if command == txt_gen[0]:277 return module278 elif command == txt.split(' ')[0]:279 return module280 except:281 pass282 return False283 def isIgnoreMsg(json_response):284 bot=pyTelegramApi.getBot()285 for id in bot.message_ids:286 try:287 json_response['result'][0]['message']['message_id']288 message_id=json_response['result'][0]['message']['message_id']289 if id == message_id:290 return True291 except:292 try:293 json_response['result'][0]['callback_query']['message']['message_id']294 message_id=json_response['result'][0]['callback_query']['message']['message_id']295 if id == message_id:296 return True297 except:298 pass299 return False300 def getUserId(json_response):301 try:302 json_response['result'][0]['callback_query']['message']['from']['id']303 return json_response['result'][0]['callback_query']['message']['from']['id']304 except:305 json_response['result'][0]['message']['from']['id']306 return json_response['result'][0]['message']['from']['id']307 return False308 def getMessageId(json_response):309 try:310 json_response['result'][0]['callback_query']['message']['message_id']311 return json_response['result'][0]['callback_query']['message']['message_id']312 except:313 json_response['result'][0]['message']['message_id']314 return json_response['result'][0]['message']['message_id']315 return False316 def getRoomId(json_response):317 try:318 json_response['result'][0]['callback_query']['message']['chat']['id']319 return json_response['result'][0]['callback_query']['message']['chat']['id']320 except:321 json_response['result'][0]['message']['chat']['id']322 return json_response['result'][0]['message']['chat']['id']323 return False324 def pool(bot, cfg):325 name=cfg.name326 try:327 print("[THREAD] [{0}] [@{1}] [POOL] [PROCCESS] => CLONE...".format(cfg.THREAD, name))328 importlib.import_module('bots.' + name + '.behavior').pool(cfg)329 bot.message_ids.append(cfg.msg.id)330 print("[THREAD] [{0}] [@{1}] [POOL] [PROCCESS] => SUCCESS!...".format(cfg.THREAD, name))331 except:332 pass333 def getUpdates(name):#Бесконечный цикл334 token=pyTelegramApi.getToken(name)335 try:336 json_response = pyTelegramApi.request(token, 'getUpdates', {'offset' : -1})337 pyTelegramApi.thread(name) # new start thread...338 except:339 pass340 bot=pyTelegramApi.getBot(name)341 user=pyTelegramApi.getUserId(json_response)342 try:343 #bot.cfg[user].THREAD=_thread.get_ident()344 cfg=bot.cfg[user]345 #данные сообщение346 #cfg=pyTelegramApi.sniffer(cfg, json_response)347 #cfg.THREAD=_thread.get_ident()348 #bot349 #cfg.name=name350 #bot cfg351 #cfg.opt.name=config352 #bot.cfg[user]=cfg353 #pyTelegramApi.pool(bot, cfg)354 except:355 pass356 try:357 json_response['result'][0]['callback_query']['data']358 #cfgThread=InlineKeyBoard.getCfgThread(name, json_response)359 if cfgThread:360 print(cfgThread.InlineKeyBoard.active)361 print('SUCCESS')362 #func=json_response['result'][0]['callback_query']['data']363 #module=func.split('@')[0]364 #func=func.split('@')[1]365 #func=getattr(importlib.import_module(func), module)366 #pyTelegramApi.sniffer(cfg, json_response)367 #pyTelegramApi.request(token, 'deleteMessage', {'chat_id' : chatid, 'message_id' : message_id})368 #bot.message_ids.append(json_response['result'][0]['callback_query']['message']['message_id'])369 #func()370 else:371 msg.sendMessageById('Ошибка использование меню пожалуйста перезапустите меню...', pyTelegramApi.getRoomId(json_response))372 except:373 if pyTelegramApi.isIgnoreMsg(json_response):374 pyTelegramApi.bots.pop(_thread.get_ident())375 return _thread.exit()376 if pyTelegramApi.isUsesUser(json_response):377 #pyTelegramApi.bots.pop(_thread.get_ident())378 if pyTelegramApi.isModule(json_response):379 bot.message_ids.append(pyTelegramApi.getMessageId(json_response))380 msg.sendMessageById('Пожалуйста ожидайте завершение прошлого сеанса', pyTelegramApi.getRoomId(json_response))381 return _thread.exit()382 else:383 #cfg384 config=pyTelegramApi.newCfgThread(name)385 cfg=importlib.import_module('bots.' + name + '.threads.' + config)386 #данные сообщение387 cfg=pyTelegramApi.sniffer(cfg, json_response)388 cfg.THREAD=_thread.get_ident()389 #bot390 cfg.name=name391 #bot cfg392 cfg.env.name=config393 bot.cfg[cfg.user.id]=cfg394 pyTelegramApi.deleteCfg(name, config)395 #Наш общий396 pyTelegramApi.pool(bot, cfg)397 #pool модульный398 for str in pyTelegramApi.getlist():399 module = str.split('=')[1]400 try:401 bot.message_ids.append(pyTelegramApi.getMessageId(json_response))402 print("[THREAD] [{0}] [@{1}] [{2}] [POOL] [PROCCESS] => CLONE...".format(cfg.THREAD, name, module))403 importlib.import_module('bots.' + name + '.modules.' + module).pool(cfg)404 print("[THREAD] [{0}] [@{1}] [{2}] [POOL] [PROCCESS] => SUCCESS!...".format(cfg.THREAD, name, module))405 except:406 pass407 for txt in json_response['result'][0]['message']:408 try:409 json_response['result'][0]['message']['text']410 if txt == 'text':411 txt = json_response['result'][0]['message']['text']412 for str in pyTelegramApi.getlist():413 module = str.split('=')[1]414 cmd = str.split('=')[0]415 cmdm=cmd + '=' + module416 txt_gen = txt.split('@')417 if cmd == txt_gen[0]:418 try:419 bot.message_ids.append(pyTelegramApi.getMessageId(json_response))420 print("[THREAD] [{0}] [@{1}] [{2}] [MAIN] [PROCCESS] => CLONE...".format(cfg.THREAD, name, cmdm))421 pyTelegramApi.request(token, 'deleteMessage', {'chat_id' : cfg.room.id, 'message_id' : cfg.msg.id})422 importlib.import_module('bots.' + name + '.modules.' + module).main(cfg)423 print("[THREAD] [{0}] [@{1}] [{2}] [MAIN] [PROCCESS] => SUCCESS!...".format(cfg.THREAD, name, cmdm))424 except:425 pass426 elif cmd == txt.split(' ')[0]:427 try:428 bot.message_ids.append(pyTelegramApi.getMessageId(json_response))429 print("[THREAD] [{0}] [@{1}] [{2}] [MAIN] [PROCCESS] => CLONE...".format(cfg.THREAD, name, cmdm))430 pyTelegramApi.request(token, 'deleteMessage', {'chat_id' : cfg.room.id, 'message_id' : cfg.msg.id})431 importlib.import_module('bots.' + name + '.modules.' + module).main(cfg)432 print("[THREAD] [{0}] [@{1}] [{2}] [MAIN] [PROCCESS] => SUCCESS!...".format(cfg.THREAD, name, cmdm))433 except:434 pass435 except:436 pass437 if cfg.InlineKeyBoard.active == False:438 bot.cfg.pop(user)439 pyTelegramApi.bots.pop(_thread.get_ident())...

Full Screen

Full Screen

pokedex_maker.py

Source:pokedex_maker.py Github

copy

Full Screen

1"""2Module contains a facade to create PokeObjects from a request.3"""4import concurrent5from pokeretriever.pokeretriever import *6class InvalidPokeObject(Exception):7 """Exception for Invalid PokeObjects."""8 def __init__(self, name: str):9 """Initialize the exception with the name that did not work."""10 super().__init__()11 self.name = name12 def __str__(self):13 """display the name that cause the error."""14 return f'Invalid PokeObject: "{self.name}"'15class PokedexMaker:16 """17 Facade to create PokeObjects from a request.18 """19 session = None20 def __init__(self, session):21 """22 Initialize a PokedexMaker.23 :param session: a requests.Session()24 """25 PokedexMaker.session = session26 @classmethod27 def execute_request(cls, pokedex_request: PokedexRequest) -> PokedexObject:28 """29 Creates a PokeObject from a request.30 :param pokedex_request: PokedexRequest31 :return: PokedexObject32 """33 if pokedex_request.mode == 'pokemon':34 return cls._get_pokemon(pokedex_request.name_or_id,35 pokedex_request.expanded,36 pokedex_request.num_threads)37 elif pokedex_request.mode == 'stat':38 return cls._get_stats(pokedex_request.name_or_id)39 elif pokedex_request.mode == 'ability':40 return cls._get_abilities(pokedex_request.name_or_id)41 elif pokedex_request.mode == 'move':42 return cls._get_move(pokedex_request.name_or_id)43 @classmethod44 def _get_pokemon(cls, name: str, expanded=False, num_threads=1):45 """46 Helper method to get a Pokemon.47 :param name: name or id of the pokemon48 :param expanded: bool49 :param num_threads: int max number of threads for request50 :return: Pokemon51 """52 url = f'https://pokeapi.co/api/v2/pokemon/{name}'53 with cls.session.get(url) as response:54 if str(response.content) == "b'Not Found'":55 raise InvalidPokeObject(name)56 json_response = response.json()57 if expanded:58 with concurrent.futures.ThreadPoolExecutor(59 max_workers=num_threads) as executor:60 stats_param = [stat['stat']['name'] for stat in61 json_response['stats']]62 stats_list = list(executor.map(cls._get_stats,63 stats_param))64 abilities_param = [ability['ability']['name']65 for ability in66 json_response['abilities']]67 ability_list = list(executor.map(cls._get_abilities,68 abilities_param))69 moves_param = [move['move']['name'] for move in70 json_response['moves']]71 moves_list = list(executor.map(cls._get_move,72 moves_param))73 return Pokemon(74 name=json_response['name'],75 id_=json_response['id'],76 height=json_response['height'],77 weight=json_response['weight'],78 stats=stats_list,79 types=[a_type['type']['name'] for a_type80 in json_response['types']],81 abilities=ability_list,82 move=moves_list,83 expanded=expanded84 )85 else:86 return Pokemon(87 name=json_response['name'],88 id_=json_response['id'],89 height=json_response['height'],90 weight=json_response['weight'],91 stats=[(stat['stat']['name'], stat['base_stat'])92 for stat in json_response['stats']],93 types=[a_type['type']['name'] for a_type94 in json_response['types']],95 abilities=[ability['ability']['name'] for96 ability in json_response['abilities']],97 move=[(move['move']['name'],98 move['version_group_details'][0][99 'level_learned_at'])100 for move in json_response['moves']],101 expanded=expanded102 )103 @classmethod104 def _get_stats(cls, name: str):105 """106 Helper method to get Stats.107 :param name: the name of the stat108 :return: Stat109 """110 url = f'https://pokeapi.co/api/v2/stat/{name}'111 with cls.session.get(url) as response:112 if str(response.content) == "b'Not Found'":113 raise InvalidPokeObject(name)114 json_response = response.json()115 return Stat(116 name=json_response['name'],117 id_=json_response['id'],118 is_battle_only=json_response['is_battle_only']119 )120 @classmethod121 def _get_abilities(cls, name: str):122 """123 Helper method to get Ability.124 :param name: the name of the Ability125 :return: Ability126 """127 url = f'https://pokeapi.co/api/v2/ability/{name}'128 with cls.session.get(url) as response:129 if str(response.content) == "b'Not Found'":130 raise InvalidPokeObject(name)131 json_response = response.json()132 return Ability(133 name=json_response['name'],134 id_=json_response['id'],135 generation=json_response['generation']['name'],136 effect=json_response['effect_entries'][0]['effect'],137 effect_short=json_response['effect_entries'][0]138 ['short_effect'],139 pokemon=[pokemon['pokemon']['name']140 for pokemon in json_response['pokemon']]141 )142 @classmethod143 def _get_move(cls, name: str):144 """145 Helper method to get Move.146 :param name: the name of the Move147 :return: Move148 """149 url = f'https://pokeapi.co/api/v2/move/{name}'150 with cls.session.get(url) as response:151 if str(response.content) == "b'Not Found'":152 raise InvalidPokeObject(name)153 json_response = response.json()154 return Move(155 name=json_response['name'],156 id_=json_response['id'],157 generation=json_response['generation']['name'],158 accuracy=json_response['accuracy'],159 pp=json_response['pp'],160 power=json_response['power'],161 type_=json_response['type']['name'],162 damage_class=json_response['damage_class']['name'],163 effect_short=json_response['effect_entries'][0]['short_effect']...

Full Screen

Full Screen

movies.py

Source:movies.py Github

copy

Full Screen

1import os2from flask import Blueprint3from flask.json import jsonify4from flask import request5import json6import requests7movies = Blueprint('movies', __name__) # pylint: disable=C01038API_KEY = os.environ['API_KEY']9IMG_URL_PATH = "http://image.tmdb.org/t/p/w500"10@movies.route('search', methods=['GET'])11def get_movies():12 genre = {}13 query_parameters = dict(request.args)14 query_parameters['api_key'] = API_KEY15 response = requests.get('https://api.themoviedb.org/3/genre/movie/list', params=query_parameters)16 json_response = response.json()17 for items in json_response.get("genres"):18 genre[items['id']] = items['name']19 sort_by = query_parameters['sort_by']20 if "," in sort_by:21 sort_by = query_parameters['sort_by'].split(",")22 isreverse= True if query_parameters['reverse'].lower() == 'true' else False23 response = requests.get('https://api.themoviedb.org/3/search/movie', params=query_parameters)24 json_response = response.json()25 for index, items in enumerate(json_response["results"]):26 genre_list = list()27 for genre_id in items.get("genre_ids"):28 genre_list.append(genre.get(genre_id))29 items['genre_names'] = genre_list30 del items['genre_ids']31 json_response['results'][index] = items32 if items.get("backdrop_path"):33 items['backdrop_path'] = IMG_URL_PATH + items['backdrop_path']34 if items.get("poster_path"):35 items['poster_path'] = IMG_URL_PATH + items['poster_path']36 if isinstance(sort_by, list):37 json_response['results'] = sorted(json_response['results'],key=lambda i: (i[sort_by[0]],i[sort_by[1]]), reverse=isreverse)38 else:39 json_response['results'] = sorted(json_response['results'], key=lambda i: i[sort_by],40 reverse=isreverse)41 return json_response42@movies.route('<string:movie_id>/details', methods=['GET'])43def get_movie(movie_id):44 query_parameters = dict(request.args)45 query_parameters['api_key'] = API_KEY46 response = requests.get('https://api.themoviedb.org/3/movie/{}'.format(movie_id), params=query_parameters)47 json_response = response.json()48 json_response['belongs_to_collection']['poster_path']= IMG_URL_PATH + json_response["poster_path"]49 json_response['belongs_to_collection']['backdrop_path'] = IMG_URL_PATH + json_response["backdrop_path"]50 if json_response.get("poster_path"):51 json_response["poster_path"] = IMG_URL_PATH + json_response["poster_path"]52 return jsonify(json_response)53@movies.route('<string:movie_id>/ratings/', methods=['POST'])54def rate_movie(movie_id):55 query_parameters = dict(request.args)56 query_parameters['api_key'] = API_KEY57 if not request.data:58 return {"Failure": {"message": "Rating is not available! Please provide Rating."},"Status": 400}59 try:60 body = json.loads(request.data.decode('utf-8'))61 except Exception as ex:62 return {'Failure': {"message":str(ex)}, "status": 400}63 if not query_parameters.get("guest_session_id"):64 response = requests.get('https://api.themoviedb.org/3/authentication/guest_session/new', params=query_parameters)65 json_response = response.json()66 query_parameters["guest_session_id"] = json_response.get("guest_session_id")67 result = requests.post(f'https://api.themoviedb.org/3/movie/{movie_id}/rating', params=query_parameters, data=body)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful