How to use with_command method in testcontainers-python

Best Python code snippet using testcontainers-python_python

sql.py

Source:sql.py Github

copy

Full Screen

...25 except:26 traceback.print_exc()27 self.db.rollback()28 return ''29 def with_command(self, cmd):30 try:31 self.connect()32 print(cmd)33 self.cursor.execute(cmd) 34 self.db.commit()35 result = self.cursor.fetchall()36 self.db.close()37 return result38 except:39 traceback.print_exc()40 self.db.rollback()41 return ''42 def drop(self, userID):43 self.with_command(f'drop table {userID}')44 def showTable(self):45 tables = []46 for data in self.with_command("show tables"):47 tables.append(data['Tables_in_users'])48 return tables49class Chat(SQL):50 def __init__(self):51 self.database = 'chat_info'52 def get_invitation(self, userID):53 sql = f"select applicantID, applicantNickName from invitation where invitedID='{userID}'"54 return self.with_command(sql) # [{'applicantID':'chen01', 'applicantNickName':'Chen'}, {...}]55 56 def add_invitation(self, applicantID, invitedID, applicantNickName):57 sql = f"""INSERT INTO invitation(applicantID, invitedID, applicantNickName)58 VALUES ('{applicantID}', '{invitedID}', '{applicantNickName}')"""59 self.with_command(sql)60 def check_repeat_invitation(self, applicantID, invitedID):61 sql = f"select invitedID from invitation where applicantID='{applicantID}'"62 result = self.with_command(sql)63 invited_list = []64 for item in result:65 invited_list.append(item['invitedID'])66 if invitedID in invited_list: return True67 return False68 def confirm_invitation(self, userID, friendID, userNickName, friendNickName): 69 # build roomID from history70 lock_room.acquire()71 self.connect()72 sql = f"select * from room_info"73 self.cursor.execute(sql)74 results = self.cursor.fetchone() # {'roomID':0, 'delete_room':0}75 self.db.close()76 roomID = 'room{}'.format(results['roomID']+1)77 sql = f"UPDATE room_info SET roomID='{results['roomID']+1}' WHERE roomID='{results['roomID']}'"78 self.with_command(sql)79 lock_room.release()80 # build room81 room = Rooms()82 room.create(roomID)83 84 # add friend and roomID85 user = Users()86 user.add_friend(userID, friendID, roomID, friendNickName)87 user.add_friend(friendID, userID, roomID, userNickName)88 # delete invitation 89 self.delete_invitation(friendID, userID)90 91 def delete_invitation(self, applicantID, invitedID):92 sql = f"DELETE FROM invitation WHERE applicantID='{applicantID}' AND invitedID='{invitedID}';"93 self.with_command(sql)94class Users(SQL):95 def __init__(self):96 self.database = 'users'97 def create(self, userID):98 sql = f"""CREATE TABLE {userID} (99 friendID char(20) NOT NULL,100 roomID char(20) NOT NULL,101 nickName char(20) DEFAULT NULL,102 PRIMARY KEY (friendID)103 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;"""104 self.with_command(sql)105 def create_user_info(self):106 sql = f"""CREATE TABLE user_info (107 userID char(20) NOT NULL,108 password char(20) NOT NULL,109 nickName char(20) DEFAULT NULL,110 PRIMARY KEY (userID)111 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;"""112 self.with_command(sql)113 def add_user(self, userID, password, nickName):114 self.create(userID)115 sql = f"""INSERT INTO user_info(userID, password, nickName)116 VALUES ('{userID}', '{password}', '{nickName}')"""117 self.with_command(sql)118 def get_userInfo(self): # {'chen01':{'password':'123', 'nickName':''}}119 sql = f"SELECT * from user_info"120 result = self.with_command(sql)121 users = {}122 for item in result:123 users[item['userID']] = {'password':item['password'], 'nickName':item['nickName']}124 return users125 def add_friend(self, userID, friendID, roomID, nickName):126 sql = f"""INSERT INTO {userID}(friendID, roomID, nickName)127 VALUES ('{friendID}', '{roomID}', '{nickName}')"""128 self.with_command(sql)129 def get_friends(self, userID):130 sql = f"""SELECT friendID, nickName from {userID}"""131 return self.with_command(sql) # [{'friendID':'chen01', 'nickName':'Chen'}, {...}]132 def insert(self, userID, friendID, roomID):133 sql = f"""INSERT INTO {userID}(friendID, roomID)134 VALUES ('{friendID}', '{roomID}')"""135 self.with_command(sql)136 def select(self, userID, value=''):137 if not value:138 sql = f"SELECT * from {userID}"139 else:140 sql = f"SELECT {value} from {userID}"141 result = self.with_command(sql)142 if value:143 list1 = []144 for row in result:145 list1.append(row[value])146 print(list1)147 return list1148 else:149 for row in result:150 print(row)151 return result152 def roomID(self, userID, friendID):153 sql = f"SELECT roomID from {userID} where friendID='{friendID}'"154 data = self.with_command(sql)155 return data[0]['roomID']156 def userList(self):157 sql = f"show tables"158 result = self.with_command(sql)159 data = []160 for key in result:161 data.append(key['Tables_in_users'])162 return data163class Rooms(SQL):164 def __init__(self):165 self.database = 'rooms'166 def create(self, roomID):167 sql = f"""CREATE TABLE {roomID} (168 id int(10) NOT NULL AUTO_INCREMENT,169 userID char(20) NOT NULL,170 message text DEFAULT NULL,171 time TIMESTAMP NOT NULL,172 PRIMARY KEY (id)173 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;"""174 self.with_command(sql)175 def insert(self, roomID, userID, message):176 sql = f"""INSERT INTO {roomID}(userID, message)177 VALUES ('{userID}', '{message}')"""178 self.with_command(sql)179 180 def loadMsg(self, roomID, loadMsgNumber = 10):181 sql = f"SELECT * from {roomID} ORDER BY id DESC Limit 0, {loadMsgNumber}"182 result = self.with_command(sql)183 return result[::-1]184 def history(self, roomID, ID):185 if ID <= 1:186 return187 msgNumber = 3188 msgNumber = ID - 1 if ID <= msgNumber else msgNumber189 start = ID-1-msgNumber if ID-1-msgNumber>0 else 0190 sql = f"SELECT * from {roomID} ORDER BY id Limit {start}, {msgNumber}"191 return self.with_command(sql)192 def newMsg(self, roomID, ID):193 #ID = ID-3194 self.connect()195 sql = f"SELECT * from {roomID} ORDER BY id DESC Limit 0, 1"196 self.cursor.execute(sql)197 results = self.cursor.fetchone()198 endID = results['id']199 if endID == ID: 200 self.db.close()201 return ''202 203 loadMsgNumber = endID - ID204 sql = f"SELECT * from {roomID} ORDER BY id Limit {ID}, {loadMsgNumber}"205 self.cursor.execute(sql)206 results = self.cursor.fetchall()207 print(results)208 self.db.close()209 return results210 def all_room(self):211 sql = 'show tables;'212 result = self.with_command(sql)213 rooms = []214 for key in result:215 rooms.append(key['Tables_in_rooms'])216 print('all room', rooms)217if __name__ == '__main__':218 user = Users()219 user.get_userInfo()220 room = Rooms()221 chat = Chat()...

Full Screen

Full Screen

doc_cmd.py

Source:doc_cmd.py Github

copy

Full Screen

1from __future__ import unicode_literals2import os3import subprocess4from .. import repo5from .. import color6from ..uis import get_ui7from .. import content8from ..utils import resolve_citekey, resolve_citekey_list9from ..completion import CiteKeyCompletion10# doc --+- add $file $key [[-L|--link] | [-M|--move]] [-f|--force]11# +- remove $key [$key [...]] [-f|--force]12# +- export $key [$path]13# +- open $key [-w|--with $cmd]14# supplements attach, open15def parser(subparsers, conf):16 doc_parser = subparsers.add_parser(17 'doc',18 help='manage the document relating to a publication')19 doc_subparsers = doc_parser.add_subparsers(20 title='document actions', dest='action',21 help='actions to interact with the documents')22 doc_subparsers.required = True23 add_parser = doc_subparsers.add_parser('add', help='add a document to a publication')24 add_parser.add_argument('-f', '--force', action='store_true', dest='force', default=False,25 help='force overwriting an already assigned document')26 add_parser.add_argument('document', nargs=1, help='document file to assign')27 add_parser.add_argument('citekey', nargs=1, help='citekey of the publication'28 ).completer = CiteKeyCompletion(conf)29 add_exclusives = add_parser.add_mutually_exclusive_group()30 add_exclusives.add_argument(31 '-L', '--link', action='store_true', dest='link', default=False,32 help='do not copy document files, just create a link')33 add_exclusives.add_argument(34 '-M', '--move', action='store_true', dest='move', default=False,35 help='move document instead of of copying (ignored if --link)')36 remove_parser = doc_subparsers.add_parser('remove', help='remove assigned documents from publications')37 remove_parser.add_argument('citekeys', nargs='+', help='citekeys of the publications'38 ).completer = CiteKeyCompletion(conf)39 remove_parser.add_argument('-f', '--force', action='store_true', dest='force',40 default=False,41 help='force removing assigned documents')42 # favor key+ path over: key43 export_parser = doc_subparsers.add_parser('export', help='export assigned documents to given path')44 export_parser.add_argument('citekeys', nargs='+',45 help='citekeys of the documents to export'46 ).completer = CiteKeyCompletion(conf)47 export_parser.add_argument('path', nargs=1, help='directory to export the files to')48 open_parser = doc_subparsers.add_parser('open', help='open an assigned document')49 open_parser.add_argument('citekey', nargs=1, help='citekey of the document to open'50 ).completer = CiteKeyCompletion(conf)51 open_parser.add_argument('-w', '--with', dest='cmd', help='command to open the file with')52 return doc_parser53def command(conf, args):54 ui = get_ui()55 rp = repo.Repository(conf)56 # print(args)57 # ui.exit()58 if args.action == 'add':59 citekey = resolve_citekey(rp, conf, args.citekey[0], ui=ui, exit_on_fail=True)60 paper = rp.pull_paper(citekey)61 if paper.docpath is not None and not args.force:62 msg = ("The publication {} has already the document {} assigned.{newline}Overwrite? "63 ).format(color.dye_out(paper.citekey, 'citekey'),64 color.dye_out(paper.docpath, 'filepath'),65 newline=os.linesep)66 if not ui.input_yn(question=msg, default='n'):67 ui.exit(0)68 else:69 rp.remove_doc(paper.citekey)70 document = args.document[0]71 if args.link:72 rp.push_doc(paper.citekey, document, copy=False)73 else:74 rp.push_doc(paper.citekey, document, copy=True)75 if not args.link and args.move:76 content.remove_file(document)77 # FIXME: coherence with add command, the destination location should be given when copying/moving.78 ui.message('{} added to {}'.format(79 color.dye_out(document, 'filepath'),80 color.dye_out(paper.citekey, 'citekey')))81 elif args.action == 'remove':82 for key in resolve_citekey_list(rp, conf, args.citekeys, ui=ui, exit_on_fail=True):83 paper = rp.pull_paper(key)84 # if there is no document (and the user cares) -> inform + continue85 if paper.docpath is None and not args.force:86 ui.message('Publication {} has no assigned document. Not removed.'.format(87 color.dye_out(paper.citekey, 'citekey')))88 continue89 if not args.force:90 msg = 'Do you really want to remove {} from {} ?'.format(color.dye_out(paper.docpath, 'filepath'),91 color.dye_out(paper.citekey, 'citekey'))92 if not ui.input_yn(question=msg, default='n'):93 continue94 rp.remove_doc(paper.citekey)95 elif args.action == 'export':96 if os.path.isdir(args.path[0]):97 path = args.path[0]98 if not path.endswith('/'):99 path += '/'100 else:101 ui.error('{} is not a directory.'.format(102 color.dye_err(args.path[0], 'filepath')))103 ui.exit(1)104 for key in resolve_citekey_list(rp, conf, args.citekeys, ui=ui, exit_on_fail=True):105 try:106 paper = rp.pull_paper(key)107 doc = paper.docpath108 if doc is None:109 ui.message('Publication {} has no document assigned.'.format(110 color.dye_out(paper.citekey, 'citekey')))111 else:112 real_doc_path = rp.pull_docpath(key)113 dest_path = path + os.path.basename(real_doc_path)114 content.copy_content(real_doc_path, dest_path)115 except (ValueError, IOError) as e:116 ui.error(str(e))117 elif args.action == 'open':118 with_command = args.cmd119 citekey = resolve_citekey(rp, conf, args.citekey[0], ui=ui, exit_on_fail=True)120 paper = rp.pull_paper(citekey)121 if paper.docpath is None:122 ui.error('No document associated with the entry {}.'.format(123 color.dye_err(citekey, 'citekey')))124 ui.exit()125 if with_command is None:126 with_command = conf['main']['open_cmd']127 if with_command is None: # default in conf have not been changed128 pass # TODO platform specific129 try:130 docpath = content.system_path(rp.databroker.real_docpath(paper.docpath))131 cmd = with_command.split()132 cmd.append(docpath)133 subprocess.Popen(cmd)134 ui.message('{} opened.'.format(color.dye_out(docpath, 'filepath')))135 except OSError:136 ui.error("Command does not exist: %s." % with_command)137 ui.exit(127)...

Full Screen

Full Screen

bot.py

Source:bot.py Github

copy

Full Screen

1import hikari2import tanjun3import lavaplayer4import asyncio5TOKEN = "..." # replace with your token6bot = hikari.GatewayBot(TOKEN)7client = tanjun.Client.from_gateway_bot(bot)8lavalink = lavaplayer.Lavalink(9 host="localhost", # Lavalink host10 port=2333, # Lavalink port11 password="youshallnotpass", # Lavalink password12)13component = tanjun.Component()14@component.with_listener(hikari.StartedEvent)15async def on_start(event: hikari.StartedEvent):16 lavalink.set_user_id(bot.get_me().id)17 lavalink.set_event_loop(asyncio.get_event_loop())18 lavalink.connect()19# ------------------------------- #20@component.with_command21@tanjun.as_slash_command("join", "Join a voice channel")22async def join(ctx: tanjun.abc.SlashContext):23 states = bot.cache.get_voice_states_view_for_guild(ctx.guild_id)24 voice_state = [state async for state in states.iterator().filter(lambda i: i.user_id == ctx.author.id)]25 if not voice_state:26 await ctx.respond("you are not in a voice channel")27 return28 channel_id = voice_state[0].channel_id29 await bot.update_voice_state(ctx.guild_id, channel_id, self_deaf=True)30 await lavalink.wait_for_connection(ctx.guild_id)31 await ctx.respond(f"done join to <#{channel_id}>")32@component.with_command33@tanjun.as_slash_command("leave", "Leave a voice channel")34async def leave(ctx: tanjun.abc.SlashContext):35 await bot.update_voice_state(ctx.guild_id, None)36 await ctx.respond("done leave the voice channel")37@component.with_command38@tanjun.with_str_slash_option("query", "query to search")39@tanjun.as_slash_command("play", "Play command")40async def play(ctx: tanjun.abc.SlashContext, query: str):41 result = await lavalink.auto_search_tracks(query) # search for the query42 if not result:43 await ctx.respond("not found result for your query")44 return45 elif isinstance(result, lavaplayer.TrackLoadFailed):46 await ctx.respond("Track load failed, try again later.\n```{}```".format(result.message))47 return48 elif isinstance(result, lavaplayer.PlayList):49 await lavalink.add_to_queue(ctx.guild_id, result.tracks, ctx.author.id)50 await ctx.respond(f"added {len(result.tracks)} tracks to queue")51 return 52 await lavalink.play(ctx.guild_id, result[0], ctx.author.id) # play the first result53 await ctx.respond(f"[{result[0].title}]({result[0].uri})") # send the embed54@component.with_command55@tanjun.as_slash_command("stop", "Stop command")56async def stop(ctx: tanjun.abc.SlashContext):57 await lavalink.stop(ctx.guild_id)58 await ctx.respond("done stop the music")59@component.with_command60@tanjun.as_slash_command("pause", "Pause command")61async def pause(ctx: tanjun.abc.SlashContext):62 await lavalink.pause(ctx.guild_id)63 await ctx.respond("done pause the music")64@component.with_command65@tanjun.as_slash_command("resume", "Resume command")66async def resume(ctx: tanjun.abc.SlashContext):67 await lavalink.resume(ctx.guild_id)68 await ctx.respond("done resume the music")69@component.with_command70@tanjun.with_int_slash_option("position", "position to seek")71@tanjun.as_slash_command("seek", "Seek command")72async def seek(ctx: tanjun.abc.SlashContext, position: int):73 await lavalink.seek(ctx.guild_id, position)74 await ctx.respond(f"done seek to {position}")75@component.with_command76@tanjun.with_int_slash_option("volume", "volume to set")77@tanjun.as_slash_command("volume", "Volume command")78async def volume(ctx: tanjun.abc.SlashContext, volume: int):79 await lavalink.volume(ctx.guild_id, volume)80 await ctx.respond(f"done set volume to {volume}")81@component.with_command82@tanjun.as_slash_command("queue", "Queue command")83async def queue(ctx: tanjun.abc.SlashContext):84 queue = await lavalink.queue(ctx.guild_id)85 if not queue:86 await ctx.respond("queue is empty")87 return88 await ctx.respond("```\n" + "\n".join([f"{i + 1}. {track.title}" for i, track in enumerate(queue)]) + "```")89@component.with_command90@tanjun.as_slash_command("np", "Now playing command")91async def np(ctx: tanjun.abc.SlashContext):92 node = await lavalink.get_guild_node(ctx.guild_id)93 if not node or not node.queue:94 await ctx.respond("nothing playing")95 return96 await ctx.respond(f"[{node.queue[0].title}]({node.queue[0].uri})")97@component.with_command98@tanjun.with_bool_slash_option("status", "status to set repeat mode")99@tanjun.as_slash_command("repeat", "Repeat command")100async def repeat(ctx: tanjun.abc.SlashContext, status: bool):101 await lavalink.repeat(ctx.guild_id, status)102 await ctx.respond(f"done set repeat mode to {status}")103@component.with_command104@tanjun.as_slash_command("shuffle", "Shuffle command")105async def shuffle(ctx: tanjun.abc.SlashContext):106 await lavalink.shuffle(ctx.guild_id)107 await ctx.respond("done shuffle the queue")108# ------------------------------- #109# On voice state update the bot will update the lavalink node110@component.with_listener(hikari.VoiceStateUpdateEvent)111async def voice_state_update(event: hikari.VoiceStateUpdateEvent):112 await lavalink.raw_voice_state_update(event.guild_id, event.state.user_id, event.state.session_id, event.state.channel_id)113@component.with_listener(hikari.VoiceServerUpdateEvent)114async def voice_server_update(event: hikari.VoiceServerUpdateEvent):115 await lavalink.raw_voice_server_update(event.guild_id, event.endpoint, event.token)116client.add_component(component)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run testcontainers-python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful