How to use user_id method in tempest

Best Python code snippet using tempest_python

disasters.py

Source:disasters.py Github

copy

Full Screen

...10from tg_bot.modules.helper_funcs.chat_status import whitelist_plus, dev_plus, sudo_plus11from tg_bot.modules.helper_funcs.extraction import extract_user12from tg_bot.modules.log_channel import gloggable13ELEVATED_USERS_FILE = os.path.join(os.getcwd(), 'tg_bot/elevated_users.json')14def check_user_id(user_id: int, bot: Bot) -> Optional[str]:15 if not user_id:16 reply = "That...is a chat! baka ka omae?"17 elif user_id == bot.id:18 reply = "This does not work that way."19 else:20 reply = None21 return reply22#I added extra new lines 23disasters = """ *"Disaster Levels"*24\n*Heroes Association* - Devs who can access the bots server and can execute, edit, modify bot code. Can also manage other Disasters25\n*LEGEND* - Only one exists, bot owner. 26\n*Dragons* - Have super user access, can gban, manage disasters lower than them and are admins in phantom.27\n*HACKER* - Have access go globally ban users across this bot 28\n*Tigers* - Same as wolves but can unban themselves if banned.29\n*Wolves* - Cannot be banned, muted flood kicked but can be manually banned by admins.30"""31# do not async, not a handler 32def send_disasters(update):33 update.effective_message.reply_text(disasters, parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True)34@run_async35@dev_plus36@gloggable37def addsudo(bot: Bot, update: Update, args: List[str]) -> str:38 message = update.effective_message39 user = update.effective_user40 chat = update.effective_chat41 user_id = extract_user(message, args)42 user_member = bot.getChat(user_id)43 rt = ""44 reply = check_user_id(user_id, bot)45 if reply:46 message.reply_text(reply)47 return ""48 with open(ELEVATED_USERS_FILE, 'r') as infile:49 data = json.load(infile)50 if user_id in SUDO_USERS:51 message.reply_text("This member is already a Sudo user")52 return ""53 if user_id in SUPPORT_USERS:54 rt += "Requested HA to promote a Hacker Disaster to Dragon."55 data['supports'].remove(user_id)56 SUPPORT_USERS.remove(user_id)57 if user_id in WHITELIST_USERS:58 rt += "Requested HA to promote a Wolf Disaster to Dragon."59 data['whitelists'].remove(user_id)60 WHITELIST_USERS.remove(user_id)61 data['sudos'].append(user_id)62 SUDO_USERS.append(user_id)63 with open(ELEVATED_USERS_FILE, 'w') as outfile:64 json.dump(data, outfile, indent=4)65 update.effective_message.reply_text(66 rt + "\nSuccessfully set Disaster level of {} to Dragon!".format(user_member.first_name))67 log_message = (f"#SUDO\n"68 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"69 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")70 if chat.type != 'private':71 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message72 return log_message73@run_async74@sudo_plus75@gloggable76def addsupport(bot: Bot, update: Update, args: List[str]) -> str:77 message = update.effective_message78 user = update.effective_user79 chat = update.effective_chat80 user_id = extract_user(message, args)81 user_member = bot.getChat(user_id)82 rt = ""83 reply = check_user_id(user_id, bot)84 if reply:85 message.reply_text(reply)86 return ""87 with open(ELEVATED_USERS_FILE, 'r') as infile:88 data = json.load(infile)89 if user_id in SUDO_USERS:90 rt += "Requested HA to deomote this Dragon to Demon"91 data['sudos'].remove(user_id)92 SUDO_USERS.remove(user_id)93 if user_id in SUPPORT_USERS:94 message.reply_text("This user is already a Support User.")95 return ""96 if user_id in WHITELIST_USERS:97 rt += "Requested HA to promote this Wolf Disaster to hacker"98 data['whitelists'].remove(user_id)99 WHITELIST_USERS.remove(user_id)100 data['supports'].append(user_id)101 SUPPORT_USERS.append(user_id)102 with open(ELEVATED_USERS_FILE, 'w') as outfile:103 json.dump(data, outfile, indent=4)104 update.effective_message.reply_text(rt + f"\n{user_member.first_name} was added as a Hacker Disaster!")105 log_message = (f"#SUPPORT\n"106 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"107 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")108 if chat.type != 'private':109 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message110 return log_message111@run_async112@sudo_plus113@gloggable114def addwhitelist(bot: Bot, update: Update, args: List[str]) -> str:115 message = update.effective_message116 user = update.effective_user117 chat = update.effective_chat118 user_id = extract_user(message, args)119 user_member = bot.getChat(user_id)120 rt = ""121 reply = check_user_id(user_id, bot)122 if reply:123 message.reply_text(reply)124 return ""125 with open(ELEVATED_USERS_FILE, 'r') as infile:126 data = json.load(infile)127 if user_id in SUDO_USERS:128 rt += "This member is a Dragon Disaster, Demoting to Wolf."129 data['sudos'].remove(user_id)130 SUDO_USERS.remove(user_id)131 if user_id in SUPPORT_USERS:132 rt += "This user is already a Hacker Disaster, Demoting to Wolf."133 data['supports'].remove(user_id)134 SUPPORT_USERS.remove(user_id)135 if user_id in WHITELIST_USERS:136 message.reply_text("This user is already a Wolf Disaster.")137 return ""138 data['whitelists'].append(user_id)139 WHITELIST_USERS.append(user_id)140 with open(ELEVATED_USERS_FILE, 'w') as outfile:141 json.dump(data, outfile, indent=4)142 update.effective_message.reply_text(143 rt + f"\nSuccessfully promoted {user_member.first_name} to a Wolf Disaster!")144 log_message = (f"#WHITELIST\n"145 f"<b>Admin:</b> {mention_html(user.id, user.first_name)} \n"146 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")147 if chat.type != 'private':148 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message149 return log_message150@run_async151@sudo_plus152@gloggable153def addtiger(bot: Bot, update: Update, args: List[str]) -> str:154 message = update.effective_message155 user = update.effective_user156 chat = update.effective_chat157 user_id = extract_user(message, args)158 user_member = bot.getChat(user_id)159 rt = ""160 reply = check_user_id(user_id, bot)161 if reply:162 message.reply_text(reply)163 return ""164 with open(ELEVATED_USERS_FILE, 'r') as infile:165 data = json.load(infile)166 if user_id in SUDO_USERS:167 rt += "This member is a Dragon Disaster, Demoting to Tiger."168 data['sudos'].remove(user_id)169 SUDO_USERS.remove(user_id)170 if user_id in SUPPORT_USERS:171 rt += "This user is already a hacker Disaster, Demoting to Tiger."172 data['supports'].remove(user_id)173 SUPPORT_USERS.remove(user_id)174 if user_id in WHITELIST_USERS:175 rt += "This user is already a Wolf Disaster, Demoting to Tiger."176 data['whitelists'].remove(user_id)177 WHITELIST_USERS.remove(user_id)178 if user_id in TIGER_USERS:179 message.reply_text("This user is already a Tiger.")180 return ""181 data['tigers'].append(user_id)182 TIGER_USERS.append(user_id)183 with open(ELEVATED_USERS_FILE, 'w') as outfile:184 json.dump(data, outfile, indent=4)185 update.effective_message.reply_text(186 rt + f"\nSuccessfully promoted {user_member.first_name} to a Tiger Disaster!")187 log_message = (f"#TIGER\n"188 f"<b>Admin:</b> {mention_html(user.id, user.first_name)} \n"189 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")190 if chat.type != 'private':191 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message192 return log_message193@run_async194@dev_plus195@gloggable196def addtiger(bot: Bot, update: Update, args: List[str]) -> str:197 message = update.effective_message198 user = update.effective_user199 chat = update.effective_chat200 user_id = extract_user(message, args)201 user_member = bot.getChat(user_id)202 rt = ""203 reply = check_user_id(user_id, bot)204 if reply:205 message.reply_text(reply)206 return ""207 with open(ELEVATED_USERS_FILE, 'r') as infile:208 data = json.load(infile)209 if user_id in SUDO_USERS:210 rt += "This member is a Dragon Disaster, Demoting to Tiger."211 data['sudos'].remove(user_id)212 SUDO_USERS.remove(user_id)213 if user_id in SUPPORT_USERS:214 rt += "This user is already a Hacker Disaster, Demoting to Tiger."215 data['supports'].remove(user_id)216 SUPPORT_USERS.remove(user_id)217 if user_id in WHITELIST_USERS:218 rt += "This user is already a Wolf Disaster, Demoting to Tiger."219 data['whitelists'].remove(user_id)220 WHITELIST_USERS.remove(user_id)221 if user_id in TIGER_USERS:222 message.reply_text("This user is already a Tiger.")223 return ""224 data['tigers'].append(user_id)225 TIGER_USERS.append(user_id)226 with open(ELEVATED_USERS_FILE, 'w') as outfile:227 json.dump(data, outfile, indent=4)228 update.effective_message.reply_text(229 rt + f"\nSuccessfully promoted {user_member.first_name} to a Tiger Disaster!")230 log_message = (f"#TIGER\n"231 f"<b>Admin:</b> {mention_html(user.id, user.first_name)} \n"232 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")233 if chat.type != 'private':234 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message235 return log_message236@run_async237@dev_plus238@gloggable239def removesudo(bot: Bot, update: Update, args: List[str]) -> str:240 message = update.effective_message241 user = update.effective_user242 chat = update.effective_chat243 user_id = extract_user(message, args)244 user_member = bot.getChat(user_id)245 reply = check_user_id(user_id, bot)246 if reply:247 message.reply_text(reply)248 return ""249 with open(ELEVATED_USERS_FILE, 'r') as infile:250 data = json.load(infile)251 if user_id in SUDO_USERS:252 message.reply_text("Requested HA to demote this user to Civilian")253 SUDO_USERS.remove(user_id)254 data['sudos'].remove(user_id)255 with open(ELEVATED_USERS_FILE, 'w') as outfile:256 json.dump(data, outfile, indent=4)257 log_message = (f"#UNSUDO\n"258 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"259 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")260 if chat.type != 'private':261 log_message = "<b>{}:</b>\n".format(html.escape(chat.title)) + log_message262 return log_message263 else:264 message.reply_text("This user is not a Dragon Disaster!")265 return ""266@run_async267@sudo_plus268@gloggable269def removesupport(bot: Bot, update: Update, args: List[str]) -> str:270 message = update.effective_message271 user = update.effective_user272 chat = update.effective_chat273 user_id = extract_user(message, args)274 user_member = bot.getChat(user_id)275 reply = check_user_id(user_id, bot)276 if reply:277 message.reply_text(reply)278 return ""279 with open(ELEVATED_USERS_FILE, 'r') as infile:280 data = json.load(infile)281 if user_id in SUPPORT_USERS:282 message.reply_text("Requested HA to demote this user to Civilian")283 SUPPORT_USERS.remove(user_id)284 data['supports'].remove(user_id)285 with open(ELEVATED_USERS_FILE, 'w') as outfile:286 json.dump(data, outfile, indent=4)287 log_message = (f"#UNSUPPORT\n"288 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"289 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")290 if chat.type != 'private':291 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message292 return log_message293 else:294 message.reply_text("This user is not a Demon level Disaster!")295 return ""296@run_async297@sudo_plus298@gloggable299def removewhitelist(bot: Bot, update: Update, args: List[str]) -> str:300 message = update.effective_message301 user = update.effective_user302 chat = update.effective_chat303 user_id = extract_user(message, args)304 user_member = bot.getChat(user_id)305 reply = check_user_id(user_id, bot)306 if reply:307 message.reply_text(reply)308 return ""309 with open(ELEVATED_USERS_FILE, 'r') as infile:310 data = json.load(infile)311 if user_id in WHITELIST_USERS:312 message.reply_text("Demoting to normal user")313 WHITELIST_USERS.remove(user_id)314 data['whitelists'].remove(user_id)315 with open(ELEVATED_USERS_FILE, 'w') as outfile:316 json.dump(data, outfile, indent=4)317 log_message = (f"#UNWHITELIST\n"318 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"319 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")320 if chat.type != 'private':321 log_message = f"<b>{html.escape(chat.title)}:</b>\n" + log_message322 return log_message323 else:324 message.reply_text("This user is not a Wolf Disaster!")325 return ""326@run_async327@sudo_plus328@gloggable329def removetiger(bot: Bot, update: Update, args: List[str]) -> str:330 message = update.effective_message331 user = update.effective_user332 chat = update.effective_chat333 user_id = extract_user(message, args)334 user_member = bot.getChat(user_id)335 reply = check_user_id(user_id, bot)336 if reply:337 message.reply_text(reply)338 return ""339 with open(ELEVATED_USERS_FILE, 'r') as infile:340 data = json.load(infile)341 if user_id in TIGER_USERS:342 message.reply_text("Demoting to normal user")343 TIGER_USERS.remove(user_id)344 data['tigers'].remove(user_id)345 with open(ELEVATED_USERS_FILE, 'w') as outfile:346 json.dump(data, outfile, indent=4)347 log_message = (f"#UNTIGER\n"348 f"<b>Admin:</b> {mention_html(user.id, user.first_name)}\n"349 f"<b>User:</b> {mention_html(user_member.id, user_member.first_name)}")...

Full Screen

Full Screen

test_ir_filters.py

Source:test_ir_filters.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# Part of Odoo. See LICENSE file for full copyright and licensing details.3from odoo import exceptions4from odoo.tests.common import TransactionCase, ADMIN_USER_ID5def noid(seq):6 """ Removes values that are not relevant for the test comparisons """7 for d in seq:8 d.pop('id', None)9 d.pop('action_id', None)10 return seq11class FiltersCase(TransactionCase):12 def build(self, model, *args):13 Model = self.env[model].with_user(ADMIN_USER_ID)14 for vals in args:15 Model.create(vals)16class TestGetFilters(FiltersCase):17 def setUp(self):18 super(TestGetFilters, self).setUp()19 self.USER_NG = self.env['res.users'].name_search('demo')[0]20 self.USER_ID = self.USER_NG[0]21 def test_own_filters(self):22 self.build(23 'ir.filters',24 dict(name='a', user_id=self.USER_ID, model_id='ir.filters'),25 dict(name='b', user_id=self.USER_ID, model_id='ir.filters'),26 dict(name='c', user_id=self.USER_ID, model_id='ir.filters'),27 dict(name='d', user_id=self.USER_ID, model_id='ir.filters'))28 filters = self.env['ir.filters'].with_user(self.USER_ID).get_filters('ir.filters')29 self.assertItemsEqual(noid(filters), [30 dict(name='a', is_default=False, user_id=self.USER_NG, domain='[]', context='{}', sort='[]'),31 dict(name='b', is_default=False, user_id=self.USER_NG, domain='[]', context='{}', sort='[]'),32 dict(name='c', is_default=False, user_id=self.USER_NG, domain='[]', context='{}', sort='[]'),33 dict(name='d', is_default=False, user_id=self.USER_NG, domain='[]', context='{}', sort='[]'),34 ])35 def test_global_filters(self):36 self.build(37 'ir.filters',38 dict(name='a', user_id=False, model_id='ir.filters'),39 dict(name='b', user_id=False, model_id='ir.filters'),40 dict(name='c', user_id=False, model_id='ir.filters'),41 dict(name='d', user_id=False, model_id='ir.filters'),42 )43 filters = self.env['ir.filters'].with_user(self.USER_ID).get_filters('ir.filters')44 self.assertItemsEqual(noid(filters), [45 dict(name='a', is_default=False, user_id=False, domain='[]', context='{}', sort='[]'),46 dict(name='b', is_default=False, user_id=False, domain='[]', context='{}', sort='[]'),47 dict(name='c', is_default=False, user_id=False, domain='[]', context='{}', sort='[]'),48 dict(name='d', is_default=False, user_id=False, domain='[]', context='{}', sort='[]'),49 ])50 def test_no_third_party_filters(self):51 self.build(52 'ir.filters',53 dict(name='a', user_id=False, model_id='ir.filters'),54 dict(name='b', user_id=ADMIN_USER_ID, model_id='ir.filters'),55 dict(name='c', user_id=self.USER_ID, model_id='ir.filters'),56 dict(name='d', user_id=ADMIN_USER_ID, model_id='ir.filters') )57 filters = self.env['ir.filters'].with_user(self.USER_ID).get_filters('ir.filters')58 self.assertItemsEqual(noid(filters), [59 dict(name='a', is_default=False, user_id=False, domain='[]', context='{}', sort='[]'),60 dict(name='c', is_default=False, user_id=self.USER_NG, domain='[]', context='{}', sort='[]'),61 ])62class TestOwnDefaults(FiltersCase):63 def setUp(self):64 super(TestOwnDefaults, self).setUp()65 self.USER_NG = self.env['res.users'].name_search('demo')[0]66 self.USER_ID = self.USER_NG[0] 67 def test_new_no_filter(self):68 """69 When creating a @is_default filter with no existing filter, that new70 filter gets the default flag71 """72 Filters = self.env['ir.filters'].with_user(self.USER_ID)73 Filters.create_or_replace({74 'name': 'a',75 'model_id': 'ir.filters',76 'user_id': self.USER_ID,77 'is_default': True,78 })79 filters = Filters.get_filters('ir.filters')80 self.assertItemsEqual(noid(filters), [81 dict(name='a', user_id=self.USER_NG, is_default=True,82 domain='[]', context='{}', sort='[]')83 ])84 def test_new_filter_not_default(self):85 """86 When creating a @is_default filter with existing non-default filters,87 the new filter gets the flag88 """89 self.build(90 'ir.filters',91 dict(name='a', user_id=self.USER_ID, model_id='ir.filters'),92 dict(name='b', user_id=self.USER_ID, model_id='ir.filters'),93 )94 Filters = self.env['ir.filters'].with_user(self.USER_ID)95 Filters.create_or_replace({96 'name': 'c',97 'model_id': 'ir.filters',98 'user_id': self.USER_ID,99 'is_default': True,100 })101 filters = Filters.get_filters('ir.filters')102 self.assertItemsEqual(noid(filters), [103 dict(name='a', user_id=self.USER_NG, is_default=False, domain='[]', context='{}', sort='[]'),104 dict(name='b', user_id=self.USER_NG, is_default=False, domain='[]', context='{}', sort='[]'),105 dict(name='c', user_id=self.USER_NG, is_default=True, domain='[]', context='{}', sort='[]'),106 ])107 def test_new_filter_existing_default(self):108 """109 When creating a @is_default filter where an existing filter is already110 @is_default, the flag should be *moved* from the old to the new filter111 """112 self.build(113 'ir.filters',114 dict(name='a', user_id=self.USER_ID, model_id='ir.filters'),115 dict(name='b', is_default=True, user_id=self.USER_ID, model_id='ir.filters'),116 )117 Filters = self.env['ir.filters'].with_user(self.USER_ID)118 Filters.create_or_replace({119 'name': 'c',120 'model_id': 'ir.filters',121 'user_id': self.USER_ID,122 'is_default': True,123 })124 filters = Filters.get_filters('ir.filters')125 self.assertItemsEqual(noid(filters), [126 dict(name='a', user_id=self.USER_NG, is_default=False, domain='[]', context='{}', sort='[]'),127 dict(name='b', user_id=self.USER_NG, is_default=False, domain='[]', context='{}', sort='[]'),128 dict(name='c', user_id=self.USER_NG, is_default=True, domain='[]', context='{}', sort='[]'),129 ])130 def test_update_filter_set_default(self):131 """132 When updating an existing filter to @is_default, if an other filter133 already has the flag the flag should be moved134 """135 self.build(136 'ir.filters',137 dict(name='a', user_id=self.USER_ID, model_id='ir.filters'),138 dict(name='b', is_default=True, user_id=self.USER_ID, model_id='ir.filters'),139 )140 Filters = self.env['ir.filters'].with_user(self.USER_ID)141 Filters.create_or_replace({142 'name': 'a',143 'model_id': 'ir.filters',144 'user_id': self.USER_ID,145 'is_default': True,146 })147 filters = Filters.get_filters('ir.filters')148 self.assertItemsEqual(noid(filters), [149 dict(name='a', user_id=self.USER_NG, is_default=True, domain='[]', context='{}', sort='[]'),150 dict(name='b', user_id=self.USER_NG, is_default=False, domain='[]', context='{}', sort='[]'),151 ])152class TestGlobalDefaults(FiltersCase):153 def setUp(self):154 super(TestGlobalDefaults, self).setUp()155 self.USER_NG = self.env['res.users'].name_search('demo')[0]156 self.USER_ID = self.USER_NG[0]157 def test_new_filter_not_default(self):158 """159 When creating a @is_default filter with existing non-default filters,160 the new filter gets the flag161 """162 self.build(163 'ir.filters',164 dict(name='a', user_id=False, model_id='ir.filters'),165 dict(name='b', user_id=False, model_id='ir.filters'),166 )167 Filters = self.env['ir.filters'].with_user(self.USER_ID)168 Filters.create_or_replace({169 'name': 'c',170 'model_id': 'ir.filters',171 'user_id': False,172 'is_default': True,173 })174 filters = Filters.get_filters('ir.filters')175 self.assertItemsEqual(noid(filters), [176 dict(name='a', user_id=False, is_default=False, domain='[]', context='{}', sort='[]'),177 dict(name='b', user_id=False, is_default=False, domain='[]', context='{}', sort='[]'),178 dict(name='c', user_id=False, is_default=True, domain='[]', context='{}', sort='[]'),179 ])180 def test_new_filter_existing_default(self):181 """182 When creating a @is_default filter where an existing filter is already183 @is_default, an error should be generated184 """185 self.build(186 'ir.filters',187 dict(name='a', user_id=False, model_id='ir.filters'),188 dict(name='b', is_default=True, user_id=False, model_id='ir.filters'),189 )190 Filters = self.env['ir.filters'].with_user(self.USER_ID)191 with self.assertRaises(exceptions.Warning):192 Filters.create_or_replace({193 'name': 'c',194 'model_id': 'ir.filters',195 'user_id': False,196 'is_default': True,197 })198 def test_update_filter_set_default(self):199 """200 When updating an existing filter to @is_default, if an other filter201 already has the flag an error should be generated202 """203 self.build(204 'ir.filters',205 dict(name='a', user_id=False, model_id='ir.filters'),206 dict(name='b', is_default=True, user_id=False, model_id='ir.filters'),207 )208 Filters = self.env['ir.filters'].with_user(self.USER_ID)209 with self.assertRaises(exceptions.Warning):210 Filters.create_or_replace({211 'name': 'a',212 'model_id': 'ir.filters',213 'user_id': False,214 'is_default': True,215 })216 def test_update_default_filter(self):217 """218 Replacing the current default global filter should not generate any error219 """220 self.build(221 'ir.filters',222 dict(name='a', user_id=False, model_id='ir.filters'),223 dict(name='b', is_default=True, user_id=False, model_id='ir.filters'),224 )225 Filters = self.env['ir.filters'].with_user(self.USER_ID)226 context_value = "{'some_key': True}"227 Filters.create_or_replace({228 'name': 'b',229 'model_id': 'ir.filters',230 'user_id': False,231 'context': context_value,232 'is_default': True,233 })234 filters = Filters.get_filters('ir.filters')235 self.assertItemsEqual(noid(filters), [236 dict(name='a', user_id=False, is_default=False, domain='[]', context='{}', sort='[]'),237 dict(name='b', user_id=False, is_default=True, domain='[]', context=context_value, sort='[]'),238 ])239class TestReadGroup(TransactionCase):240 """Test function read_group with groupby on a many2one field to a model241 (in test, "user_id" to "res.users") which is ordered by an inherited not stored field (in242 test, "name" inherited from "res.partners").243 """244 def test_read_group_1(self):245 Users = self.env['res.users']246 self.assertEqual(Users._order, "name, login", "Model res.users must be ordered by name, login")247 self.assertFalse(Users._fields['name'].store, "Field name is not stored in res.users")248 Filters = self.env['ir.filters']249 filter_a = Filters.create(dict(name="Filter_A", model_id="ir.filters"))250 filter_b = Filters.create(dict(name="Filter_B", model_id="ir.filters"))251 filter_b.write(dict(user_id=False))252 res = Filters.read_group([], ['name', 'user_id'], ['user_id'])...

Full Screen

Full Screen

connection_sql.py

Source:connection_sql.py Github

copy

Full Screen

1import threading2import time3from typing import Union4from sqlalchemy import Column, String, Boolean, UnicodeText, Integer5from tg_bot.modules.sql import SESSION, BASE6class ChatAccessConnectionSettings(BASE):7 __tablename__ = "access_connection"8 chat_id = Column(String(14), primary_key=True)9 allow_connect_to_chat = Column(Boolean, default=True)10 def __init__(self, chat_id, allow_connect_to_chat):11 self.chat_id = str(chat_id)12 self.allow_connect_to_chat = str(allow_connect_to_chat)13 def __repr__(self):14 return "<Chat access settings ({}) is {}>".format(self.chat_id, self.allow_connect_to_chat)15class Connection(BASE):16 __tablename__ = "connection"17 user_id = Column(Integer, primary_key=True)18 chat_id = Column(String(14))19 def __init__(self, user_id, chat_id):20 self.user_id = user_id21 self.chat_id = str(chat_id) # Ensure String22class ConnectionHistory(BASE):23 __tablename__ = "connection_history"24 user_id = Column(Integer, primary_key=True)25 chat_id = Column(String(14), primary_key=True)26 chat_name = Column(UnicodeText)27 conn_time = Column(Integer)28 def __init__(self, user_id, chat_id, chat_name, conn_time):29 self.user_id = user_id30 self.chat_id = str(chat_id)31 self.chat_name = str(chat_name)32 self.conn_time = int(conn_time)33 def __repr__(self):34 return "<connection user {} history {}>".format(self.user_id, self.chat_id)35ChatAccessConnectionSettings.__table__.create(checkfirst=True)36Connection.__table__.create(checkfirst=True)37ConnectionHistory.__table__.create(checkfirst=True)38CHAT_ACCESS_LOCK = threading.RLock()39CONNECTION_INSERTION_LOCK = threading.RLock()40CONNECTION_HISTORY_LOCK = threading.RLock()41HISTORY_CONNECT = {}42def allow_connect_to_chat(chat_id: Union[str, int]) -> bool:43 try:44 chat_setting = SESSION.query(ChatAccessConnectionSettings).get(str(chat_id))45 if chat_setting:46 return chat_setting.allow_connect_to_chat47 return False48 finally:49 SESSION.close()50def set_allow_connect_to_chat(chat_id: Union[int, str], setting: bool):51 with CHAT_ACCESS_LOCK:52 chat_setting = SESSION.query(ChatAccessConnectionSettings).get(str(chat_id))53 if not chat_setting:54 chat_setting = ChatAccessConnectionSettings(chat_id, setting)55 chat_setting.allow_connect_to_chat = setting56 SESSION.add(chat_setting)57 SESSION.commit()58def connect(user_id, chat_id):59 with CONNECTION_INSERTION_LOCK:60 prev = SESSION.query(Connection).get((int(user_id)))61 if prev:62 SESSION.delete(prev)63 connect_to_chat = Connection(int(user_id), chat_id)64 SESSION.add(connect_to_chat)65 SESSION.commit()66 return True67def get_connected_chat(user_id):68 try:69 return SESSION.query(Connection).get((int(user_id)))70 finally:71 SESSION.close()72def curr_connection(chat_id):73 try:74 return SESSION.query(Connection).get((str(chat_id)))75 finally:76 SESSION.close()77def disconnect(user_id):78 with CONNECTION_INSERTION_LOCK:79 disconnect = SESSION.query(Connection).get((int(user_id)))80 if disconnect:81 SESSION.delete(disconnect)82 SESSION.commit()83 return True84 else:85 SESSION.close()86 return False87def add_history_conn(user_id, chat_id, chat_name):88 global HISTORY_CONNECT89 with CONNECTION_HISTORY_LOCK:90 conn_time = int(time.time())91 if HISTORY_CONNECT.get(int(user_id)):92 counting = SESSION.query(ConnectionHistory.user_id).filter(ConnectionHistory.user_id == str(user_id)).count()93 getchat_id = {}94 for x in HISTORY_CONNECT[int(user_id)]:95 getchat_id[HISTORY_CONNECT[int(user_id)][x]['chat_id']] = x96 if chat_id in getchat_id:97 todeltime = getchat_id[str(chat_id)]98 delold = SESSION.query(ConnectionHistory).get((int(user_id), str(chat_id)))99 if delold:100 SESSION.delete(delold)101 HISTORY_CONNECT[int(user_id)].pop(todeltime)102 elif counting >= 5:103 todel = list(HISTORY_CONNECT[int(user_id)])104 todel.reverse()105 todel = todel[4:]106 for x in todel:107 chat_old = HISTORY_CONNECT[int(user_id)][x]['chat_id']108 delold = SESSION.query(ConnectionHistory).get((int(user_id), str(chat_old)))109 if delold:110 SESSION.delete(delold)111 HISTORY_CONNECT[int(user_id)].pop(x)112 else:113 HISTORY_CONNECT[int(user_id)] = {}114 delold = SESSION.query(ConnectionHistory).get((int(user_id), str(chat_id)))115 if delold:116 SESSION.delete(delold)117 history = ConnectionHistory(int(user_id), str(chat_id), chat_name, conn_time)118 SESSION.add(history)119 SESSION.commit()120 HISTORY_CONNECT[int(user_id)][conn_time] = {'chat_name': chat_name, 'chat_id': str(chat_id)}121def get_history_conn(user_id):122 if not HISTORY_CONNECT.get(int(user_id)):123 HISTORY_CONNECT[int(user_id)] = {}124 return HISTORY_CONNECT[int(user_id)]125def clear_history_conn(user_id):126 global HISTORY_CONNECT127 todel = list(HISTORY_CONNECT[int(user_id)])128 for x in todel:129 chat_old = HISTORY_CONNECT[int(user_id)][x]['chat_id']130 delold = SESSION.query(ConnectionHistory).get((int(user_id), str(chat_old)))131 if delold:132 SESSION.delete(delold)133 HISTORY_CONNECT[int(user_id)].pop(x)134 SESSION.commit()135 return True136def __load_user_history():137 global HISTORY_CONNECT138 try:139 qall = SESSION.query(ConnectionHistory).all()140 HISTORY_CONNECT = {}141 for x in qall:142 check = HISTORY_CONNECT.get(x.user_id)143 if check == None:144 HISTORY_CONNECT[x.user_id] = {}145 HISTORY_CONNECT[x.user_id][x.conn_time] = {'chat_name': x.chat_name, 'chat_id': x.chat_id}146 finally:147 SESSION.close()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful