How to use get_response method in localstack

Best Python code snippet using localstack_python

sqlite.py

Source:sqlite.py Github

copy

Full Screen

1# - *- coding: utf- 8 - *-2import datetime3import logging4import random5import sqlite36import time7from data.config import bot_description8from data.config import username9# Путь к БД10path_to_db = "data/botBD.sqlite"11def logger(statement):12 logging.basicConfig(13 level=logging.INFO,14 filename="logs.log",15 format=f"[Executing] [%(asctime)s] | [%(filename)s LINE:%(lineno)d] | {statement}",16 datefmt="%d-%b-%y %H:%M:%S"17 )18 logging.info(statement)19def handle_silently(function):20 def wrapped(*args, **kwargs):21 result = None22 try:23 result = function(*args, **kwargs)24 except Exception as e:25 logger("{}({}, {}) failed with exception {}".format(26 function.__name__, repr(args[1]), repr(kwargs), repr(e)))27 return result28 return wrapped29####################################################################################################30###################################### ФОРМАТИРОВАНИЕ ЗАПРОСА ######################################31# Форматирование запроса с аргументами32def update_format_with_args(sql, parameters: dict):33 values = ", ".join([34 f"{item} = ?" for item in parameters35 ])36 sql = sql.replace("XXX", values)37 return sql, tuple(parameters.values())38# Форматирование запроса без аргументов39def get_format_args(sql, parameters: dict):40 sql += " AND ".join([41 f"{item} = ?" for item in parameters42 ])43 return sql, tuple(parameters.values())44####################################################################################################45########################################### ЗАПРОСЫ К БД ###########################################46# Добавление пользователя47def add_userx(user_id, user_login, user_name, balance, all_refill, reg_date):48 with sqlite3.connect(path_to_db) as db:49 db.execute("INSERT INTO storage_users "50 "(user_id, user_login, user_name, balance, all_refill, reg_date) "51 "VALUES (?, ?, ?, ?, ?, ?)",52 [user_id, user_login, user_name, 10, all_refill, reg_date])53 db.commit()54# Изменение пользователя55def update_userx(user_id, **kwargs):56 with sqlite3.connect(path_to_db) as db:57 sql = f"UPDATE storage_users SET XXX WHERE user_id = {user_id}"58 sql, parameters = update_format_with_args(sql, kwargs)59 db.execute(sql, parameters)60 db.commit()61# Удаление пользователя62def delete_userx(**kwargs):63 with sqlite3.connect(path_to_db) as db:64 sql = "DELETE FROM storage_users WHERE "65 sql, parameters = get_format_args(sql, kwargs)66 db.execute(sql, parameters)67 db.commit()68# Получение пользователя69def get_userx(**kwargs):70 with sqlite3.connect(path_to_db) as db:71 sql = "SELECT * FROM storage_users WHERE "72 sql, parameters = get_format_args(sql, kwargs)73 get_response = db.execute(sql, parameters)74 get_response = get_response.fetchone()75 return get_response76# Получение пользователей77def get_usersx(**kwargs):78 with sqlite3.connect(path_to_db) as db:79 sql = "SELECT * FROM storage_users WHERE "80 sql, parameters = get_format_args(sql, kwargs)81 get_response = db.execute(sql, parameters)82 get_response = get_response.fetchall()83 return get_response84# Получение всех пользователей85def get_all_usersx():86 with sqlite3.connect(path_to_db) as db:87 get_response = db.execute("SELECT * FROM storage_users")88 get_response = get_response.fetchall()89 return get_response90# Получение платежных систем91def get_paymentx():92 with sqlite3.connect(path_to_db) as db:93 get_response = db.execute("SELECT * FROM storage_payment")94 get_response = get_response.fetchone()95 return get_response96# Изменение платежных систем97def update_paymentx(**kwargs):98 with sqlite3.connect(path_to_db) as db:99 sql = f"UPDATE storage_payment SET XXX "100 sql, parameters = update_format_with_args(sql, kwargs)101 db.execute(sql, parameters)102 db.commit()103# Получение настроек104def get_settingsx():105 with sqlite3.connect(path_to_db) as db:106 get_response = db.execute("SELECT * FROM storage_settings")107 get_response = get_response.fetchone()108 return get_response109# Обновление настроек110def update_settingsx(**kwargs):111 with sqlite3.connect(path_to_db) as db:112 sql = f"UPDATE storage_settings SET XXX "113 sql, parameters = update_format_with_args(sql, kwargs)114 db.execute(sql, parameters)115 db.commit()116# Добавление пополнения в БД117def add_refillx(user_id, user_login, user_name, comment, amount, receipt, way_pay, dates, dates_unix):118 with sqlite3.connect(path_to_db) as db:119 db.execute("INSERT INTO storage_refill "120 "(user_id, user_login, user_name, comment, amount, receipt, way_pay, dates, dates_unix) "121 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",122 [user_id, user_login, user_name, comment, amount, receipt, way_pay, dates, dates_unix])123 db.commit()124# Получение пополнения125def get_refillx(what_select, **kwargs):126 with sqlite3.connect(path_to_db) as db:127 sql = f"SELECT {what_select} FROM storage_refill WHERE "128 sql, parameters = get_format_args(sql, kwargs)129 get_response = db.execute(sql, parameters)130 get_response = get_response.fetchone()131 return get_response132# Получение пополнений133def get_refillsx(what_select, **kwargs):134 with sqlite3.connect(path_to_db) as db:135 sql = f"SELECT {what_select} FROM storage_refill WHERE "136 sql, parameters = get_format_args(sql, kwargs)137 get_response = db.execute(sql, parameters)138 get_response = get_response.fetchall()139 return get_response140# Получение всех пополнений141def get_all_refillx():142 with sqlite3.connect(path_to_db) as db:143 sql = "SELECT * FROM storage_refill"144 get_response = db.execute(sql)145 get_response = get_response.fetchall()146 return get_response147# Добавление категории в БД148def add_categoryx(category_id, category_name):149 with sqlite3.connect(path_to_db) as db:150 db.execute("INSERT INTO storage_category "151 "(category_id, category_name) "152 "VALUES (?, ?)",153 [category_id, category_name])154 db.commit()155# Изменение категории156def update_categoryx(category_id, **kwargs):157 with sqlite3.connect(path_to_db) as db:158 sql = f"UPDATE storage_category SET XXX WHERE category_id = {category_id}"159 sql, parameters = update_format_with_args(sql, kwargs)160 db.execute(sql, parameters)161 db.commit()162# Получение категории163def get_categoryx(what_select, **kwargs):164 with sqlite3.connect(path_to_db) as db:165 sql = f"SELECT {what_select} FROM storage_category WHERE "166 sql, parameters = get_format_args(sql, kwargs)167 get_response = db.execute(sql, parameters)168 get_response = get_response.fetchone()169 return get_response170# Получение категорий171def get_categoriesx(what_select, **kwargs):172 with sqlite3.connect(path_to_db) as db:173 sql = f"SELECT {what_select} FROM storage_category WHERE "174 sql, parameters = get_format_args(sql, kwargs)175 get_response = db.execute(sql, parameters)176 get_response = get_response.fetchall()177 return get_response178# Получение всех категорий179def get_all_categoriesx():180 with sqlite3.connect(path_to_db) as db:181 sql = "SELECT * FROM storage_category"182 get_response = db.execute(sql)183 get_response = get_response.fetchall()184 return get_response185# Очистка категорий186def clear_categoryx():187 with sqlite3.connect(path_to_db) as db:188 sql = "DELETE FROM storage_category"189 db.execute(sql)190 db.commit()191# Удаление товаров192def remove_categoryx(**kwargs):193 with sqlite3.connect(path_to_db) as db:194 sql = "DELETE FROM storage_category WHERE "195 sql, parameters = get_format_args(sql, kwargs)196 db.execute(sql, parameters)197 db.commit()198# Добавление категории в БД199def add_positionx(position_id, position_name, position_price, position_discription, position_image, position_date,200 category_id):201 with sqlite3.connect(path_to_db) as db:202 db.execute("INSERT INTO storage_position "203 "(position_id, position_name, position_price, position_discription, position_image, position_date, category_id) "204 "VALUES (?, ?, ?, ?, ?, ?, ?)",205 [position_id, position_name, position_price, position_discription, position_image,206 position_date, category_id])207 db.commit()208# Изменение позиции209def update_positionx(position_id, **kwargs):210 with sqlite3.connect(path_to_db) as db:211 sql = f"UPDATE storage_position SET XXX WHERE position_id = {position_id}"212 sql, parameters = update_format_with_args(sql, kwargs)213 db.execute(sql, parameters)214 db.commit()215# Получение категории216def get_positionx(what_select, **kwargs):217 with sqlite3.connect(path_to_db) as db:218 sql = f"SELECT {what_select} FROM storage_position WHERE "219 sql, parameters = get_format_args(sql, kwargs)220 get_response = db.execute(sql, parameters)221 get_response = get_response.fetchone()222 return get_response223# Получение категорий224def get_positionsx(what_select, **kwargs):225 with sqlite3.connect(path_to_db) as db:226 sql = f"SELECT {what_select} FROM storage_position WHERE "227 sql, parameters = get_format_args(sql, kwargs)228 get_response = db.execute(sql, parameters)229 get_response = get_response.fetchall()230 return get_response231# Получение всех категорий232def get_all_positionsx():233 with sqlite3.connect(path_to_db) as db:234 sql = "SELECT * FROM storage_position"235 get_response = db.execute(sql)236 get_response = get_response.fetchall()237 return get_response238# Очистка категорий239def clear_positionx():240 with sqlite3.connect(path_to_db) as db:241 sql = "DELETE FROM storage_position"242 db.execute(sql)243 db.commit()244# Удаление позиций245def remove_positionx(**kwargs):246 with sqlite3.connect(path_to_db) as db:247 sql = "DELETE FROM storage_position WHERE "248 sql, parameters = get_format_args(sql, kwargs)249 db.execute(sql, parameters)250 db.commit()251# Добавление категории в БД252def add_itemx(category_id, position_id, get_all_items, user_id, user_name):253 with sqlite3.connect(path_to_db) as db:254 for item_data in get_all_items:255 if not item_data.isspace() and item_data is not "":256 item_id = [random.randint(100000, 999999)]257 db.execute("INSERT INTO storage_item "258 "(item_id, item_data, position_id, category_id, creator_id, creator_name, add_date) "259 "VALUES (?, ?, ?, ?, ?, ?, ?)",260 [item_id[0], item_data, position_id, category_id, user_id, user_name,261 datetime.datetime.today().replace(microsecond=0)])262 db.commit()263# Изменение категории264def update_itemx(item_id, **kwargs):265 with sqlite3.connect(path_to_db) as db:266 sql = f"UPDATE storage_item SET XXX WHERE item_id = {item_id}"267 sql, parameters = update_format_with_args(sql, kwargs)268 db.execute(sql, parameters)269 db.commit()270# Получение категории271def get_itemx(what_select, **kwargs):272 with sqlite3.connect(path_to_db) as db:273 sql = f"SELECT {what_select} FROM storage_item WHERE "274 sql, parameters = get_format_args(sql, kwargs)275 get_response = db.execute(sql, parameters)276 get_response = get_response.fetchone()277 return get_response278# Получение категорий279def get_itemsx(what_select, **kwargs):280 with sqlite3.connect(path_to_db) as db:281 sql = f"SELECT {what_select} FROM storage_item WHERE "282 sql, parameters = get_format_args(sql, kwargs)283 get_response = db.execute(sql, parameters)284 get_response = get_response.fetchall()285 return get_response286# Получение всех категорий287def get_all_itemsx():288 with sqlite3.connect(path_to_db) as db:289 sql = "SELECT * FROM storage_item"290 get_response = db.execute(sql)291 get_response = get_response.fetchall()292 return get_response293# Очистка категорий294def clear_itemx():295 with sqlite3.connect(path_to_db) as db:296 sql = "DELETE FROM storage_item"297 db.execute(sql)298 db.commit()299# Удаление товаров300def remove_itemx(**kwargs):301 with sqlite3.connect(path_to_db) as db:302 sql = "DELETE FROM storage_item WHERE "303 sql, parameters = get_format_args(sql, kwargs)304 db.execute(sql, parameters)305 db.commit()306# Покупка товаров307def buy_itemx(get_items, get_count):308 with sqlite3.connect(path_to_db) as db:309 send_count = 0310 save_items = []311 for select_send_item in get_items:312 if send_count != get_count:313 send_count += 1314 save_items.append(f"{send_count}. {select_send_item[2]}")315 sql, parameters = get_format_args("DELETE FROM storage_item WHERE ", {"item_id": select_send_item[1]})316 db.execute(sql, parameters)317 split_len = len(f"{send_count}. {select_send_item[2]}")318 else:319 break320 db.commit()321 return save_items, send_count, split_len322# Добавление покупки в БД323def add_purchasex(user_id, user_login, user_name, receipt, item_count, item_price, item_price_one_item,324 item_position_id,325 item_position_name, item_buy, balance_before, balance_after, buy_date, buy_date_unix):326 with sqlite3.connect(path_to_db) as db:327 db.execute("INSERT INTO storage_purchases "328 "(user_id, user_login, user_name, receipt, item_count, item_price, item_price_one_item, item_position_id, "329 "item_position_name, item_buy, balance_before, balance_after, buy_date, buy_date_unix) "330 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",331 [user_id, user_login, user_name, receipt, item_count, item_price, item_price_one_item,332 item_position_id, item_position_name, item_buy, balance_before, balance_after, buy_date,333 buy_date_unix])334 db.commit()335# Получение покупки336def get_purchasex(what_select, **kwargs):337 with sqlite3.connect(path_to_db) as db:338 sql = f"SELECT {what_select} FROM storage_purchases WHERE "339 sql, parameters = get_format_args(sql, kwargs)340 get_response = db.execute(sql, parameters)341 get_response = get_response.fetchone()342 return get_response343# Получение покупок344def get_purchasesx(what_select, **kwargs):345 with sqlite3.connect(path_to_db) as db:346 sql = f"SELECT {what_select} FROM storage_purchases WHERE "347 sql, parameters = get_format_args(sql, kwargs)348 get_response = db.execute(sql, parameters)349 get_response = get_response.fetchall()350 return get_response351# Получение всех покупок352def get_all_purchasesx():353 with sqlite3.connect(path_to_db) as db:354 sql = "SELECT * FROM storage_purchases"355 get_response = db.execute(sql)356 get_response = get_response.fetchall()357 return get_response358# Последние 10 покупок359def last_purchasesx(user_id):360 with sqlite3.connect(path_to_db) as db:361 sql = "SELECT * FROM storage_purchases WHERE user_id = ? ORDER BY increment DESC LIMIT 10"362 get_response = db.execute(sql, [user_id])363 get_response = get_response.fetchall()364 return get_response365# Создание всех таблиц для БД366def create_bdx():367 with sqlite3.connect(path_to_db) as db:368 # Создание БД с хранением данных пользователей369 check_sql = db.execute("PRAGMA table_info(storage_users)")370 check_sql = check_sql.fetchall()371 check_create_users = [c for c in check_sql]372 if len(check_create_users) == 7:373 print(" DB был найден (1/8)")374 else:375 db.execute("CREATE TABLE storage_users("376 "increment INTEGER PRIMARY KEY AUTOINCREMENT, "377 "user_id INTEGER, user_login TEXT, user_name TEXT, "378 "balance INTEGER, all_refill INTEGER, reg_date TIMESTAMP)")379 print(" DB не была найдена (1/8) | Создание DB...")380 # Создание БД с хранением данных платежных систем381 check_sql = db.execute("PRAGMA table_info(storage_payment)")382 check_sql = check_sql.fetchall()383 check_create_payment = [c for c in check_sql]384 if len(check_create_payment) == 6:385 print(" DB был найден (2/8)")386 else:387 db.execute("CREATE TABLE storage_payment("388 "qiwi_login TEXT, qiwi_token TEXT, "389 "qiwi_private_key TEXT, qiwi_nickname TEXT, "390 "way_payment TEXT, status TEXT)")391 db.execute("INSERT INTO storage_payment("392 "qiwi_login, qiwi_token, "393 "qiwi_private_key, qiwi_nickname, "394 "way_payment, status) "395 "VALUES (?, ?, ?, ?, ?, ?)",396 ["None", "None", "None", "None", "form", "False"])397 print(" DB не была найдена (2/8) | Создание DB...")398 # Создание БД с хранением настроек399 check_sql = db.execute("PRAGMA table_info(storage_settings)")400 check_sql = check_sql.fetchall()401 check_create_settings = [c for c in check_sql]402 if len(check_create_settings) == 6:403 print(" DB была найдена (3/8)")404 else:405 db.execute("CREATE TABLE storage_settings("406 "contact INTEGER, faq TEXT, "407 "status TEXT, status_buy TEXT,"408 "profit_buy TEXT, profit_refill TEXT)")409 sql = "INSERT INTO storage_settings(" \410 "contact, faq, status, status_buy, profit_buy, profit_refill) " \411 "VALUES (?, ?, ?, ?, ?, ?)"412 now_unix = int(time.time())413 parameters = ("ℹ Тех Подержка.\n"414 "➖➖➖➖➖➖➖➖➖➖➖➖➖\n"415 f"{bot_description}",416 "ℹ Информация.\n"417 "➖➖➖➖➖➖➖➖➖➖➖➖➖\n"418 f"{bot_description}",419 "True", "True", now_unix, now_unix)420 db.execute(sql, parameters)421 print(" DB не была найдена (3/8) | Создание DB...")422 # Создание БД с хранением пополнений пользователей423 check_sql = db.execute("PRAGMA table_info(storage_refill)")424 check_sql = check_sql.fetchall()425 check_create_refill = [c for c in check_sql]426 if len(check_create_refill) == 10:427 print(" DB была найдена (4/8)")428 else:429 db.execute("CREATE TABLE storage_refill("430 "increment INTEGER PRIMARY KEY AUTOINCREMENT,"431 "user_id INTEGER, user_login TEXT, "432 "user_name TEXT, comment TEXT, "433 "amount TEXT, receipt TEXT, "434 "way_pay TEXT, dates TIMESTAMP, "435 "dates_unix TEXT)")436 print(" DB не была найдена (4/8) | Создание DB...")437 # Создание БД с хранением категорий438 check_sql = db.execute("PRAGMA table_info(storage_category)")439 check_sql = check_sql.fetchall()440 check_create_category = [c for c in check_sql]441 if len(check_create_category) == 3:442 print(" DB была найдена (5/8)")443 else:444 db.execute("CREATE TABLE storage_category("445 "increment INTEGER PRIMARY KEY AUTOINCREMENT,"446 "category_id INTEGER, category_name TEXT)")447 print(" DB не была найдена (5/8) | Создание DB...")448 # Создание БД с хранением позиций449 check_sql = db.execute("PRAGMA table_info(storage_position)")450 check_sql = check_sql.fetchall()451 check_create_position = [c for c in check_sql]452 if len(check_create_position) == 8:453 print(" DB была найдена (6/8)")454 else:455 db.execute("CREATE TABLE storage_position("456 "increment INTEGER PRIMARY KEY AUTOINCREMENT,"457 "position_id INTEGER, position_name TEXT, "458 "position_price INTEGER, position_discription TEXT,"459 "position_image TEXT, position_date TIMESTAMP, "460 "category_id INTEGER)")461 print(" DB не была найдена (6/8) | Создание DB...")462 # Создание БД с хранением товаров463 check_sql = db.execute("PRAGMA table_info(storage_item)")464 check_sql = check_sql.fetchall()465 check_create_item = [c for c in check_sql]466 if len(check_create_item) == 8:467 print(" DB была найдена (7/8)")468 else:469 db.execute("CREATE TABLE storage_item("470 "increment INTEGER PRIMARY KEY AUTOINCREMENT,"471 "item_id INTEGER, item_data TEXT, "472 "position_id INTEGER, category_id INTEGER, "473 "creator_id INTEGER, creator_name TEXT, "474 "add_date TIMESTAMP)")475 print(" DB не была найдена (7/8) | Создание DB...")476 # Создание БД с хранением покупок477 check_sql = db.execute("PRAGMA table_info(storage_purchases)")478 check_sql = check_sql.fetchall()479 check_create_purchases = [c for c in check_sql]480 if len(check_create_purchases) == 15:481 print(" DB была найдена (8/8)")482 print(" ~~~~ Бот Запушен ~~~~")483 print(f"\n {username}")484 else:485 db.execute("CREATE TABLE storage_purchases("486 "increment INTEGER PRIMARY KEY AUTOINCREMENT,"487 "user_id INTEGER, user_login TEXT, "488 "user_name TEXT, receipt TEXT, "489 "item_count INTEGER, item_price TEXT, "490 "item_price_one_item TEXT, item_position_id INTEGER, "491 "item_position_name TEXT, item_buy TEXT, "492 "balance_before TEXT, balance_after TEXT, "493 "buy_date TIMESTAMP, buy_date_unix TEXT)")494 print(" DB не была найдена (8/8) | Создание DB...")...

Full Screen

Full Screen

test_decorator_cache.py

Source:test_decorator_cache.py Github

copy

Full Screen

...109 TestWSGIController.setUp(self)110 environ.update(self.environ)111 112 def test_no_cache(self):113 self.assertRaises(Exception, lambda: self.get_response(action='test_default_cache_decorator'))114class TestCacheDecorator(TestWSGIController):115 def setUp(self):116 app, bad_app = make_cache_controller()117 self.app = app118 TestWSGIController.setUp(self)119 environ.update(self.environ)120 def test_default_cache_decorator(self):121 sap.g.counter = 0122 self.get_response(action='test_default_cache_decorator_invalidate')123 response = self.get_response(action='test_default_cache_decorator_invalidate')124 assert 'text/html' in response.headers['content-type']125 assert 'Counter=1' in response126 response = self.get_response(action='test_default_cache_decorator_invalidate')127 assert 'Counter=1' in response128 129 def test_default_cache_decorator(self):130 sap.g.counter = 0131 self.get_response(action='test_invalidate_cache')132 response = self.get_response(action='test_default_cache_decorator')133 assert 'text/html' in response.headers['content-type']134 assert 'Counter=1' in response135 response = self.get_response(action='test_default_cache_decorator')136 assert 'Counter=1' in response137 138 response = self.get_response(action='test_get_cache_decorator', _url='/?param=123')139 assert 'Counter=2' in response140 response = self.get_response(action='test_get_cache_decorator', _url="/?param=123")141 assert 'Counter=2' in response142 143 response = self.get_response(action='test_expire_cache_decorator')144 assert 'Counter=3' in response145 response = self.get_response(action='test_expire_cache_decorator')146 assert 'Counter=3' in response147 time.sleep(2)148 response = self.get_response(action='test_expire_cache_decorator')149 assert 'Counter=4' in response150 151 response = self.get_response(action='test_key_cache_decorator', id=1)152 assert 'Counter=5' in response153 response = self.get_response(action='test_key_cache_decorator', id=2)154 assert 'Counter=6' in response155 response = self.get_response(action='test_key_cache_decorator', id=1)156 assert 'Counter=5' in response157 158 response = self.get_response(action='test_keyslist_cache_decorator', id=1, id2=2)159 assert 'Counter=7' in response160 response = self.get_response(action='test_keyslist_cache_decorator', id=1, id2=2)161 assert 'Counter=7' in response162 163 response = self.get_response(action='test_get_cache_default', _url='/?param=1243')164 assert 'Counter=8' in response165 response = self.get_response(action='test_get_cache_default', _url="/?param=1243")166 assert 'Counter=8' in response167 response = self.get_response(action='test_get_cache_default', _url="/?param=123")168 assert 'Counter=9' in response169 response = self.get_response(action='test_default_cache_decorator_func')170 assert 'text/html' in response.headers['content-type']171 assert 'Counter=10' in response172 response = self.get_response(action='test_default_cache_decorator_func')173 assert 'Counter=10' in response174 175 response = self.get_response(action='test_response_cache_func', use_cache_status=True)176 177 assert 'Counter=10' in response178 179 response = self.get_response(action='test_response_cache_func', use_cache_status=False,180 test_args=dict(status=404))181 assert 'Counter=10' in response182 183 184 def test_dbm_cache_decorator(self):185 sap.g.counter = 0186 self.get_response(action="test_invalidate_dbm_cache")187 188 response = self.get_response(action="test_dbm_cache_decorator")189 assert "Counter=1" in response190 response = self.get_response(action="test_dbm_cache_decorator")191 assert "Counter=1" in response192 193 self.get_response(action="test_invalidate_dbm_cache")194 response = self.get_response(action="test_dbm_cache_decorator")195 assert "Counter=2" in response196 sap.g.counter = 0197 response = self.get_response(action="test_expire_dbm_cache_decorator")198 assert "Counter=1" in response199 response = self.get_response(action="test_expire_dbm_cache_decorator")200 assert "Counter=1" in response201 time.sleep(2)202 response = self.get_response(action="test_expire_dbm_cache_decorator")203 assert "Counter=2" in response204 205 def test_cache_key(self):206 from pylons.decorators.cache import beaker_cache, create_cache_key207 208 key = create_cache_key(TestCacheDecorator.test_default_cache_decorator)209 assert key == ('%s.TestCacheDecorator' % self.__module__, 'test_default_cache_decorator')210 211 response = self.get_response(action='test_invalidate_cache')212 response = self.get_response(action='test_default_cache_decorator')213 assert 'Counter=1' in response214 response = self.get_response(action='test_default_cache_decorator')215 assert 'Counter=1' in response216 response = self.get_response(action='test_invalidate_cache')217 response = self.get_response(action='test_default_cache_decorator')218 assert 'Counter=2' in response219 def test_cache_key_dupe(self):220 response = self.get_response(action='test_cache_key_dupe',221 _url='/test_cache_key_dupe?id=1')222 time.sleep(0.1)223 response2 = self.get_response(action='test_cache_key_dupe',224 _url='/test_cache_key_dupe?id=2&id=1')225 assert str(response) != str(response2)226 227 def test_header_cache(self):228 response = self.get_response(action='test_header_cache')229 assert response.headers['content-type'] == 'application/special'230 assert response.headers['x-powered-by'] == 'pylons'231 assert 'x-dont-include' not in response.headers232 output = response.body233 time.sleep(1)234 response = self.get_response(action='test_header_cache')235 assert response.body == output236 assert response.headers['content-type'] == 'application/special'237 assert response.headers['x-powered-by'] == 'pylons'238 assert 'x-dont-include' not in response.headers239 240 def test_nocache(self):241 import pylons242 sap.g.counter = 0243 pylons.config['cache_enabled'] = 'False'244 response = self.get_response(action='test_default_cache_decorator')245 assert 'Counter=1' in response246 response = self.get_response(action='test_default_cache_decorator')247 assert 'Counter=2' in response...

Full Screen

Full Screen

checker.py

Source:checker.py Github

copy

Full Screen

...13 if sysarg == "":14 async with borg.conversation(bot) as conv:15 try:16 await conv.send_message("/start")17 await conv.get_response()18 await conv.send_message("/au ")19 audio = await conv.get_response()20 await borg.send_message(event.chat_id, audio.text)21 await event.delete()22 except YouBlockedUserError:23 await event.edit("Error: unblock @Carol5_bot and retry!")24 elif "@" in sysarg:25 async with borg.conversation(bot) as conv:26 try:27 await conv.send_message("/start")28 await conv.get_response()29 await conv.send_message("/au " + sysarg)30 audio = await conv.get_response()31 await borg.send_message(event.chat_id, audio.text)32 await event.delete()33 except YouBlockedUserError:34 await event.edit("Error: unblock @Carol5_bot and try again!")35 elif "" in sysarg:36 async with borg.conversation(bot) as conv:37 try:38 await conv.send_message("/start")39 await conv.get_response()40 await conv.send_message("/au " + sysarg)41 audio = await conv.get_response()42 await borg.send_message(event.chat_id, audio.text)43 await event.delete()44 except YouBlockedUserError:45 await event.edit("Error: unblock @Carol5_bot `and try again!")46#made by LEGENDX2247@borg.on(admin_cmd("ch ?(.*)"))48async def _(event):49 if event.fwd_from:50 return51 sysarg = event.pattern_match.group(1)52 if sysarg == "":53 async with borg.conversation(bot) as conv:54 try:55 await conv.send_message("/start")56 await conv.get_response()57 await conv.send_message("/ch ")58 audio = await conv.get_response()59 await borg.send_message(event.chat_id, audio.text)60 await event.delete()61 except YouBlockedUserError:62 await event.edit("Error: unblock @Carol5_bot and retry!")63 elif "@" in sysarg:64 async with borg.conversation(bot) as conv:65 try:66 await conv.send_message("/start")67 await conv.get_response()68 await conv.send_message("/ch " + sysarg)69 audio = await conv.get_response()70 await borg.send_message(event.chat_id, audio.text)71 await event.delete()72 except YouBlockedUserError:73 await event.edit("Error: unblock @Carol5_bot and try again!")74 elif "" in sysarg:75 async with borg.conversation(bot) as conv:76 try:77 await conv.send_message("/start")78 await conv.get_response()79 await conv.send_message("/ch " + sysarg)80 audio = await conv.get_response()81 await borg.send_message(event.chat_id, audio.text)82 await event.delete()83 except YouBlockedUserError:84 await event.edit("Error: unblock @Carol5_bot `and try again!")85@borg.on(admin_cmd("stripe ?(.*)"))86async def _(event):87 if event.fwd_from:88 return89 sysarg = event.pattern_match.group(1)90 if sysarg == "":91 async with borg.conversation(bot) as conv:92 try:93 await conv.send_message("/start")94 await conv.get_response()95 await conv.send_message("/st ")96 audio = await conv.get_response()97 await borg.send_message(event.chat_id, audio.text)98 await event.delete()99 except YouBlockedUserError:100 await event.edit("Error: unblock @Carol5_bot and retry!")101 elif "@" in sysarg:102 async with borg.conversation(bot) as conv:103 try:104 await conv.send_message("/start")105 await conv.get_response()106 await conv.send_message("/st " + sysarg)107 audio = await conv.get_response()108 await borg.send_message(event.chat_id, audio.text)109 await event.delete()110 except YouBlockedUserError:111 await event.edit("Error: unblock @Carol5_bot and try again!")112 elif "" in sysarg:113 async with borg.conversation(bot) as conv:114 try:115 await conv.send_message("/start")116 await conv.get_response()117 await conv.send_message("/st " + sysarg)118 audio = await conv.get_response()119 await borg.send_message(event.chat_id, audio.text)120 await event.delete()121 except YouBlockedUserError:122 await event.edit("Error: unblock @Carol5_bot `and try again!")123@borg.on(admin_cmd("vbv ?(.*)"))124async def _(event):125 if event.fwd_from:126 return127 sysarg = event.pattern_match.group(1)128 if sysarg == "":129 async with borg.conversation(bot) as conv:130 try:131 await conv.send_message("/start")132 await conv.get_response()133 await conv.send_message("/vbv ")134 audio = await conv.get_response()135 await borg.send_message(event.chat_id, audio.text)136 await event.delete()137 except YouBlockedUserError:138 await event.edit("Error: unblock @Carol5_bot and retry!")139 elif "@" in sysarg:140 async with borg.conversation(bot) as conv:141 try:142 await conv.send_message("/start")143 await conv.get_response()144 await conv.send_message("/vbv " + sysarg)145 audio = await conv.get_response()146 await borg.send_message(event.chat_id, audio.text)147 await event.delete()148 except YouBlockedUserError:149 await event.edit("Error: unblock @Carol5_bot and try again!")150 elif "" in sysarg:151 async with borg.conversation(bot) as conv:152 try:153 await conv.send_message("/start")154 await conv.get_response()155 await conv.send_message("/vbv " + sysarg)156 audio = await conv.get_response()157 await borg.send_message(event.chat_id, audio.text)158 await event.delete()159 except YouBlockedUserError:160 await event.edit("Error: unblock @Carol5_bot `and try again!")161@borg.on(admin_cmd("bin ?(.*)"))162async def _(event):163 if event.fwd_from:164 return165 sysarg = event.pattern_match.group(1)166 if sysarg == "":167 async with borg.conversation(bot) as conv:168 try:169 await conv.send_message("/start")170 await conv.get_response()171 await conv.send_message("/bin ")172 audio = await conv.get_response()173 await borg.send_message(event.chat_id, audio.text)174 await event.delete()175 except YouBlockedUserError:176 await event.edit("Error: unblock @Carol5_bot and retry!")177 elif "@" in sysarg:178 async with borg.conversation(bot) as conv:179 try:180 await conv.send_message("/start")181 await conv.get_response()182 await conv.send_message("/bin " + sysarg)183 audio = await conv.get_response()184 await borg.send_message(event.chat_id, audio.text)185 await event.delete()186 except YouBlockedUserError:187 await event.edit("Error: unblock @Carol5_bot and try again!")188 elif "" in sysarg:189 async with borg.conversation(bot) as conv:190 try:191 await conv.send_message("/start")192 await conv.get_response()193 await conv.send_message("/bin " + sysarg)194 audio = await conv.get_response()195 await borg.send_message(event.chat_id, audio.text)196 await event.delete()197 except YouBlockedUserError:198 await event.edit("Error: unblock @Carol5_bot `and try again!")199@borg.on(admin_cmd("key ?(.*)"))200async def _(event):201 if event.fwd_from:202 return203 sysarg = event.pattern_match.group(1)204 if sysarg == "":205 async with borg.conversation(bot) as conv:206 try:207 await conv.send_message("/start")208 await conv.get_response()209 await conv.send_message("/key ")210 audio = await conv.get_response()211 await borg.send_message(event.chat_id, audio.text)212 await event.delete()213 except YouBlockedUserError:214 await event.edit("Error: unblock @Carol5_bot and retry!")215 elif "@" in sysarg:216 async with borg.conversation(bot) as conv:217 try:218 await conv.send_message("/start")219 await conv.get_response()220 await conv.send_message("/key " + sysarg)221 audio = await conv.get_response()222 await borg.send_message(event.chat_id, audio.text)223 await event.delete()224 except YouBlockedUserError:225 await event.edit("Error: unblock @Carol5_bot and try again!")226 elif "" in sysarg:227 async with borg.conversation(bot) as conv:228 try:229 await conv.send_message("/start")230 await conv.get_response()231 await conv.send_message("/key " + sysarg)232 audio = await conv.get_response()233 await borg.send_message(event.chat_id, audio.text)234 await event.delete()235 except YouBlockedUserError:...

Full Screen

Full Screen

middleware.py

Source:middleware.py Github

copy

Full Screen

2 def __init__(self,get_response):3 self.get_response=get_response4 def __call__(self,request):5 print("This line printed at pre-processing of request")6 response=self.get_response(request)7 print("This line printed at post processing of request")8 return response9from django.http import HttpResponse10class AppMaintenanceMiddelware(object):11 def __init__(self,get_response):12 self.get_response=get_response13 def __call__(self,request):14 return HttpResponse('<h1>Currently Application Under maintenance please after two days</h1>')15class ErrorMessageMiddleware(object):16 def __init__(self,get_response):17 self.get_response=get_response18 def __call__(self,request):19 response=self.get_response(request)20 return response21 def process_exception(self,request,exception):22 s1='<h1>Currently we are facing some technical problems..please try after some time</h1><hr>'23 s2='<h2>Raised Exception:{}</h2>'.format(exception.__class__.__name__)24 s3='<h2>Exception Description/Message:{}</h2>'.format(exception)25 return HttpResponse(s1+s2+s3)26class FirstMiddleware(object):27 def __init__(self,get_response):28 self.get_response=get_response29 def __call__(self,request):30 print("This line printed by first middleware at pre processing of request")31 response=self.get_response(request)32 print("This line printed by first middleware at the post processing of request")33 return response34class SecondMiddleware(object):35 def __init__(self,get_response):36 self.get_response=get_response37 def __call__(self,request):38 print("This line printed by Second middleware at pre processing of request")39 response=self.get_response(request)40 print("This line printed by second middleware at the post processing of request")41 return response42class ThirdMiddleware(object):43 def __init__(self,get_response):44 self.get_response=get_response45 def __call__(self,request):46 print("This line printed by third middleware at pre processing of request")47 response=self.get_response(request)48 print("This line printed by third middleware at the post processing of request")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful