Best Python code snippet using localstack_python
music.py
Source:music.py  
...110            except discord.errors.NotFound:111                pass112        self.menu_content = []113        self.menu_type = "NONE"114    async def send_notifications(self, messages):115        for message in self.notifications:116            try:117                await message.delete()118            except discord.errors.NotFound:119                pass120        self.notifications = []121        if type(messages) is list:122            for message in messages:123                self.notifications.append(await self.context.send(message))124        else:125            self.notifications.append(await self.context.send(messages))126    async def send_embed(self, embed):127        if self.embed:128            await self.embed.delete()129        self.embed = self.context.send(embed=embed)130    async def join_channel(self, ctx):131        self.context = ctx132        if not ctx.author.voice:133            await self.send_notifications("> You are not in a voice channel.")134            return False135        channel = self.context.author.voice.channel136        await super().connect(channel.id)137        return True138    async def leave_channel(self):139        await self.destroy()140    async def play_song(self, query, ctx):141        self.context = ctx142        query = query.strip("<>")143        # If it is not a URL144        if re.match(URL_REGEX, query):145            tracks = await self.wavelink.get_tracks(query)146            await self.add_track_to_queue(ctx, tracks[0], query)147            return148        query = f"ytsearch:{query}"149        # Search and add the song to queue150        tracks = await self.wavelink.get_tracks(query)151        if not tracks:152            await self.send_notifications("> No songs found for that query.")153            return154        await self.clear_search_messages()155        self.menu_type = "SONG"156        for track in tracks[:min(len(OPTIONS), len(tracks))]:157            self.menu_content.append(track)158        embed = discord.Embed(159            title=f"Results found for {query}...",160            timestamp=dt.datetime.utcnow()161        )162        # embed.set_author(name =  "Query Results")163        embed.set_footer(text=f"Requested by {ctx.author.display_name}")  # icon_url=ctx.author.avatar.url164        embed = discord.Embed(165            title=f"Choose the song:",166            description=(167                "\n".join(168                    f"**{i + 1}.** {t.title} ({t.length // 60000}:{str(t.length % 60).zfill(2)})"169                    for i, t in enumerate(tracks[:min(len(OPTIONS), len(tracks))])170                )171            ),172            colour=ctx.author.colour,173            timestamp=dt.datetime.utcnow()174        )175        self.select_embed = await ctx.send(embed=embed)176        for emoji in list(OPTIONS.keys())[:min(len(tracks), len(OPTIONS))]:177            try:178                await self.select_embed.add_reaction(emoji)179            except discord.errors.NotFound:180                pass181    async def add_track_to_queue(self, ctx, track, search_query):182        self.context = ctx183        if not track:184            await self.send_notifications("> No song found.")185        # Todo one day186        # if isinstance(tracks, wavelink.TrackPlaylist):187        #    self.queue.add(*tracks.tracks)188        song = Song(track=track, search_query=search_query)189        self.queue.add_song_to_queue(song)190        await self.send_notifications(f"> Added {track.title} to queue.")191        if not self.is_playing:192            await self.next_song(self.context)193    async def next_song(self, ctx=None):194        if ctx:195            self.context = ctx196        song = self.queue.next_song()197        if song:198            self.timestamp = time.time()199            self.accumulated_time = 0200            await self.play(song.track)201            if self.play_menu:202                await self.play_menu.delete()203            self.play_menu = await self.context.send(song.track.uri)204            await self.play_menu.add_reaction("â¸")205            await self.play_menu.add_reaction("â¶")206            if self.queue.position > 0:207                await self.play_menu.add_reaction("â®")208            try:209                await self.play_menu.add_reaction("â")210                await self.play_menu.add_reaction("â¹")211                await self.play_menu.add_reaction("ð")212                await self.play_menu.add_reaction("ð")213                await self.play_menu.add_reaction("ð¿")214                await self.play_menu.add_reaction("ðµ")215                await self.play_menu.add_reaction("ð")216                await self.play_menu.add_reaction("ð")217                await self.play_menu.add_reaction("â¹ï¸")218            except discord.errors.NotFound:219                pass220        else:221            # await self.send_notifications(f"> No more songs in queue.")222            pass223    async def shuffle_queue(self, ctx=None):224        if ctx:225            self.context = ctx226        self.queue.shuffle()227        await self.send_notifications("> Shuffled Queue.")228    async def pause(self, ctx=None):229        if ctx:230            self.context = ctx231        if not self.is_playing:232            await self.send_notifications("> No song playing.")233            return234        if self.is_paused:235            await self.send_notifications("> Already paused.")236            return237        self.accumulated_time = time.time() - self.timestamp238        self.timestamp = 0239        await self.set_pause(True)240    async def resume(self, ctx=None):241        if ctx:242            self.context = ctx243        if not self.is_playing:244            await self.send_notifications("> No song playing.")245            return246        if not self.is_paused:247            await self.send_notifications("> Already playing a song.")248            return249        self.timestamp = time.time()250        await self.set_pause(False)251    async def skip(self, ctx=None):252        if ctx:253            self.context = ctx254        if not self.is_playing:255            await self.send_notifications("> No song playing.")256            return257        await self.stop()258        await self.send_notifications("> Skipped.")259    async def back(self, ctx=None):260        if ctx:261            self.context = ctx262        if self.queue.position <= 0:263            await self.send_notifications("> No previous song to go back to.")264            return265        if not self.is_playing:266            await self.send_notifications("> No song playing.")267            return268        self.queue.position -= 2269        await self.stop()270    async def repeat(self, ctx=None):271        if ctx:272            self.context = ctx273        if not self.is_playing:274            await self.send_notifications("> No song playing.")275            return276        await self.seek(0)277    async def increase_volume(self, ctx=None, value=5):278        if ctx:279            self.context = ctx280        await self.set_volume(min(self.volume + value, 100))281        await self.send_notifications(f"> Set volume to {self.volume}.")282    async def decrease_volume(self, ctx=None, value=5):283        if ctx:284            self.context = ctx285        await self.set_volume(max(self.volume - value, 0))286        await self.send_notifications(f"> Set volume to {self.volume}.")287    async def define_volume(self, ctx, value):288        self.context = ctx289        await self.set_volume(max(min(value, 100), 0))290        await self.send_notifications(f"> Set volume to {self.volume}.")291    async def show_lyrics(self, ctx=None):292        if ctx:293            self.context = ctx294        song = self.queue.get_current_song()295        if not song:296            await self.send_notifications(f"> No song playing.")297            return298        if song.name:299            lyrics = Lyrics.get_lyrics(search_string=song.name)300            if lyrics:301                embed = discord.Embed(302                    title=f"Lyrics for {song.author} - {song.name}",303                    timestamp=dt.datetime.utcnow()304                )305                # embed.set_author(name =  "Query Results")306                embed.set_footer(text=f"Requested by {ctx.author.display_name}")  # icon_url=ctx.author.avatar.url307                embed.add_field(name="Lyrics", value=lyrics, inline=False)308                await ctx.send(embed=embed)309                return310        print(f"song name = {song.name}")311        await self.send_notifications("> No lyrics found for current song.")312    async def now_playing(self, ctx=None):313        if ctx:314            self.context = ctx315        if not self.is_playing:316            await self.send_notifications("> No song playing.")317            return318        song = self.queue.get_current_song()319        if not song.name:320            video_title = song.track.title321            spotify_data = Spotify.spotify.search(video_title)322            if len(spotify_data['tracks']['items']) == 0:  # Spotify got results323                await self.send_notifications("> Couldn't get information about song to extract lyrics.")324                return325            item = spotify_data['tracks']['items'][0]326            song.name = item['name']327            song.author = item['artists'][0]['name']328            song.album_name = item['album']['name']329            song.year = item['album']['release_date'][:4]330            self.queue.queue[self.queue.position] = song331        seconds = self.accumulated_time332        if self.timestamp != 0:333            seconds += time.time() - self.timestamp334        passed = Utility.format_seconds(seconds)335        full = Utility.format_seconds(int(self.queue.queue[self.queue.position].track.duration / 1000))336        embed = discord.Embed(337            title=f"{song.author} - {song.name}",338            timestamp=dt.datetime.utcnow()339        )340        embed.add_field(name="Details", value=f"Album : {song.album_name}({song.year})\n Time : {passed} / {full}",341                        inline=False)342        await self.context.send(embed=embed)343    async def reset(self, ctx=None):344        if ctx:345            self.context = ctx346        self.queue.queue = []347        self.queue.position = -1348        self.timestamp = 0349        self.accumulated_time = 0350        if self.is_playing:351            await self.stop()352    async def full_reset(self, ctx=None):353        if ctx:354            self.context = ctx355        await self.reset()356        if self.is_connected:357            await self.disconnect()358        await self.send_notifications("> Queue emptied.")359    async def list_songs_in_queue(self, ctx):360        self.context = ctx361        songs = self.queue.get_remaining_songs()362        if len(songs) == 0:363            await self.send_notifications("> Queue is empty.")364            return365        embed = discord.Embed(366            title="Queue",367            timestamp=dt.datetime.utcnow()368        )369        embed.set_footer(text=f"Requested by {ctx.author.display_name}")  # icon_url=ctx.author.avatar.url370        embed.add_field(name="Currently Playing", value=songs[0].track.title, inline=False)371        if len(songs) > 1:372            embed.add_field(name="Next Up", value="\n".join(song.track.title for song in songs[1:]), inline=False)373        await self.send_embed(embed=embed)374    async def list_songs_in_history(self, ctx):375        self.context = ctx376        songs = self.queue.get_previous_songs()377        if len(songs) == 0:378            await self.send_notifications(">Queue is empty.")379            return380        embed = discord.Embed(381            title="Queue",382            timestamp=dt.datetime.utcnow()383        )384        embed.set_footer(text=f"Requested by {ctx.author.display_name}")  # icon_url=ctx.author.avatar.url385        embed.add_field(name="Next Up", value="\n".join(song.track.title for song in songs), inline=False)386        await self.send_embed(embed=embed)387    async def add_song_to_playlist(self, playlist_name, author_id, ctx=None):388        if ctx:389            self.context = ctx390        if not self.is_playing:391            await self.send_notifications("> No song currently playing to be added.")392            return393        playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)394        if len(playlists) == 0:395            await self.send_notifications(f"> No playlist named {Utility.format_input(playlist_name)}.")396            return397        song = self.queue.get_current_song()398        added = Database.Database.insert_song_into_playlist(author_id=author_id, playlist_name=playlist_name,399                                                            video_id=song.track.ytid)400        if added:401            if song.name and song.author:402                await self.send_notifications(403                    f"> Added {Utility.format_input(song.author)} - {Utility.format_input(song.name)} to {Utility.format_input(playlist_name)}.")404            else:405                await self.send_notifications(406                    f"> Added {Utility.format_input(song.track.title)} to {Utility.format_input(playlist_name)}.")407        else:408            if song.name and song.author:409                await self.send_notifications(410                    f"> {Utility.format_input(song.author)} - {Utility.format_input(song.name)} already exists in {Utility.format_input(playlist_name)}.")411            else:412                await self.send_notifications(413                    f"> Failed to add {Utility.format_input(song.track.title)} to {Utility.format_input(playlist_name)}.")414    async def delete_song_from_playlist(self, playlist_name, author_id, ctx=None):415        if ctx:416            self.context = ctx417        if not self.is_playing:418            await self.send_notifications("> No song currently playing to be removed.")419            return420        playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)421        if len(playlists) == 0:422            await self.send_notifications(f"> No playlist named {Utility.format_input(playlist_name)}.")423            return424        song = self.queue.get_current_song()425        added = Database.Database.delete_song_from_playlist(author_id=author_id, playlist_name=playlist_name,426                                                            video_id=song.track.ytid)427        if added:428            if song.name and song.author:429                await self.send_notifications(430                    f"> Deleted {Utility.format_input(song.author)} - {Utility.format_input(song.name)} from {Utility.format_input(playlist_name)}.")431            else:432                await self.send_notifications(433                    f"> Deleted {Utility.format_input(song.track.title)} from {Utility.format_input(playlist_name)}.")434        else:435            if song.name and song.author:436                await self.send_notifications(437                    f"> Failed to delete {Utility.format_input(song.author)} - {Utility.format_input(song.name)} from {Utility.format_input(playlist_name)}.")438            else:439                await self.send_notifications(440                    f"> Failed to delete {Utility.format_input(song.track.title)} from {Utility.format_input(playlist_name)}.")441    async def delete_song_from_queue(self, index, ctx):442        self.context = ctx443        try:444            index = int(index)445        except ValueError:446            await self.send_notifications("> You need to provide the index integer.")447            return448        if index <= 0:449            await self.send_notifications("> Index must be a positive value.")450            return451        index = index + self.queue.position452        if index > len(self.queue.queue) - 1:453            await self.send_notifications("> Index doesn't exist.")454            return455        del self.queue.queue[index]456        await self.send_notifications("> Removed song from Queue.")457    async def play_playlist(self, playlist_name, ctx=None, author_id=None):458        if ctx:459            self.context = ctx460            author_id = ctx.message.author.id461        else:462            if not author_id:463                await self.send_notifications("> Can't play playlist...")464                return465        await self.reset()466        if playlist_name == "":467            playlists = Database.Database.get_playlists(author_id=author_id, ordered=True)468            if len(playlists) == 0:469                await self.send_notifications("> You have no playlists yet.")470                return471            await self.clear_search_messages()472            self.menu_type = "PLAYLIST"473            # Show them474            for playlist in playlists:475                message = await self.context.send(f"> {playlist[0]}\n")476                self.menu_messages.append(message)477                self.menu_content.append(playlist[0])  # The name478                await message.add_reaction("â¡ï¸")479            self.menu_user_request_id = ctx.message.author.id480            return481        playlists = Database.Database.get_playlists(author_id=author_id, playlist_name=playlist_name)482        if len(playlists) == 0:483            await self.send_notifications(f"> No playlist named:{Utility.format_input(playlist_name)}.")484            return485        video_ids = Database.Database.get_songs_from_playlist(author_id=author_id,486                                                              playlist_name=playlist_name)487        if len(video_ids) == 0:488            await self.send_notifications(f"> {Utility.format_input(playlist_name)} has no songs.")489            return490        for video_id in video_ids:491            video_id = video_id[0]492            url = f"https://www.youtube.com/watch?v={video_id}"493            tracks = await self.wavelink.get_tracks(url)494            song = Song(track=tracks[0])495            self.queue.queue.append(song)496        await self.send_notifications(f"> Playing {Utility.format_input(playlist_name)} with {len(video_ids)} songs.")497        await self.next_song()498    async def list_playlists(self, ctx):499        self.context = ctx500        self.menu_user_request_id = ctx.message.author.id501        playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, ordered=True)502        if len(playlists) == 0:503            await self.send_notifications("> You have no playlists yet.")504            return505        await self.clear_search_messages()506        self.menu_type = "PLAYLIST"507        # Show them508        for playlist in playlists:509            message = await ctx.send("> " + Utility.captitalize_words(playlist[0] + "\n"))510            self.menu_messages.append(message)511            self.menu_content.append(playlist[0])  # The name512            await message.add_reaction("â¡ï¸")513            await message.add_reaction("â")514    async def save_queue_as_playlist(self, playlist_name, ctx):515        self.context = ctx516        if playlist_name == "":517            await self.send_notifications("> You forgot the name of the playlist.")518            return519        playlists = Database.Database.get_playlists(author_id=ctx.message.id, playlist_name=playlist_name)520        if len(playlists) != 0:521            await self.send_notifications("> Playlist already exists in database.")522            return523        inserted = Database.Database.insert_playlist(author_id=ctx.message.author.id, playlist_name=playlist_name)524        if inserted:525            await self.send_notifications(f"> Created playlist {Utility.format_input(playlist_name)}.")526        else:527            await self.send_notifications("> Failed to create playlist.")528        for index in range(self.queue.position, len(self.queue.queue)):529            Database.Database.insert_song_into_playlist(author_id=ctx.message.author.id,530                                                        video_id=self.queue.queue[index].video_id,531                                                        playlist_name=playlist_name)532    async def create_playlist(self, playlist_name, ctx):533        self.context = ctx534        if playlist_name == "":535            await self.send_notifications("> You forgot the name of the playlist.")536            return537        playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)538        if len(playlists) != 0:539            await self.send_notifications("> Playlist already exists in database.")540            return541        inserted = Database.Database.insert_playlist(playlist_name, ctx.message.author.id)542        if inserted:543            await self.send_notifications(f"> Created playlist {playlist_name}.")544        else:545            await self.send_notifications("> Failed to create playlist.")546    async def rename_playlist(self, input, ctx):547        self.context = ctx548        params = input.split("->")549        if len(params) < 2:550            await self.send_notifications("> Please separate previous name and new name with \"->\".")551            return552        playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=params[0])553        if len(playlists) == 0:554            await self.send_notifications(f"> No playlist named {params[0]}.")555            return556        Database.Database.rename_playlist(ctx.message.author.id, params[0], params[1])557        await self.send_notifications(f"> Renamed {params[0]} to {params[1]}.")558    async def delete_playlist(self, playlist_name, ctx=None):559        if ctx:560            self.context = ctx561        if playlist_name:562            playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)563            if len(playlists) == 0:564                await self.context.send(f"> No playlist named {Utility.format_input(playlist_name)}.")565            Database.Database.delete_playlist(author_id=ctx.message.author.id, playlist_name=playlist_name)566            await self.send_notifications(f"> Deleted {Utility.format_input(playlist_name)}.")567        else:568            # TODO569            print("Implement me")570    async def list_playlists_to_add_remove_song(self, payload):571        if not self.is_playing:572            return573        self.menu_user_request_id = payload.user_id574        await self.clear_search_messages()575        playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id, ordered=True)576        if len(playlists) == 0:577            await self.send_notifications("> You have no playlists yet.")578            return579        self.menu_type = "ADD_REMOVE_SONG_TO_PLAYLIST"580        # Show them581        for playlist in playlists:582            message = await self.context.send(f"> {playlist[0]}\n")583            self.menu_messages.append(message)584            self.menu_content.append(585                {"playlist_name": playlist[0], "video_id": self.queue.get_current_song().track.ytid})586            if Database.Database.song_in_playlist(author_id=self.menu_user_request_id, playlist_name=playlist[0],587                                                  video_id=self.queue.get_current_song().track.ytid):588                await message.add_reaction("â")589            else:590                await message.add_reaction("â
")591    async def list_songs_in_playlist(self, playlist_name, ctx):592        self.context = ctx593        if playlist_name == "":594            await self.send_notifications("> Specify playlist's name.")595            return596        playlists = Database.Database.get_playlists(author_id=ctx.message.author.id, playlist_name=playlist_name)597        if len(playlists) == 0:598            await self.send_notifications("> Playlist does not exist.")599            return600        video_ids = Database.Database.get_songs_from_playlist(author_id=ctx.message.author.id,601                                                              playlist_name=playlist_name)602        if len(video_ids) == 0:603            await self.send_notifications(f"> {Utility.format_input(playlist_name)} has no songs.")604            return605        embed = discord.Embed(606            title=f"{playlist_name}",607            timestamp=dt.datetime.utcnow()608        )609        embed.set_footer(text=f"Requested by {ctx.author.display_name}")610        for song in video_ids:611            video_id = song[0]612            url = f"https://www.youtube.com/watch?v={video_id}"613            tracks = await self.wavelink.get_tracks(url)614            song = Song(track=tracks[0])615            title = tracks[0].title616            if song.author and song.name:617                title = f"{song.author} - {song.name}"618            value = ""619            if song.album_name:620                value += f"{song.album_name}"621            if song.year:622                value += f" ({song.year})"623            if not value:624                value = "album unknown"625            embed.add_field(name=title, value=value, inline=False)626        await self.context.send_notifications(embed=embed)627    async def play_album(self, album_name, ctx):628        self.context = ctx629        await self.clear_search_messages()630        if input == "":631            await self.send_notifications("> Specify album to search.")632            return633        spotify_data = Spotify.spotify.search_album(album_name)634        if len(spotify_data["albums"]["items"]) == 0:635            await self.send_notifications("> No results found.")636            return637        self.menu_type = "ALBUM"638        albums = spotify_data['albums']['items']639        albums = albums[:min(10, len(albums))]640        for album in albums:641            album_id = album['id']642            artist = album['artists'][0]['name']643            album_name = album['name']644            album_year = album['release_date'][:4]645            message = await self.context.send(646                f"{Utility.format_input(album_name)} ({album_year}) by {Utility.format_input(artist)}.")647            self.menu_messages.append(message)648            self.menu_content.append(649                {"artist": artist, "album": album_name, "album_year": album_year, "album_id": album_id})650            await message.add_reaction("â¡ï¸")651    async def search_select(self, payload):652        if self.context.message.author.voice:653            if not await self.join_channel(self.context):654                return655        content = None656        for index, message in enumerate(self.menu_messages):657            if message.id == payload.message_id:658                content = self.menu_content[index]659                break660        if self.menu_type == "SONG":661            index = OPTIONS[payload.emoji.name]662            song = Song(track=self.menu_content[index])663            await self.clear_search_messages()664            self.queue.queue.append(song)665            if not self.is_playing:666                await self.next_song()667        elif self.menu_type == "ALBUM":668            await self.clear_search_messages()669            await self.reset()670            tracks = Spotify.spotify.get_album(content["album_id"])['tracks']['items']671            for track in tracks:672                song_name = track['name']673                query = f"ytsearch:{content['artist']} - {song_name}"674                # Search and add the song to queue675                tracks = await self.wavelink.get_tracks(query)676                if not tracks:677                    await self.send_notifications(678                        f"> Couldn't find song: {Utility.format_input(content['artist'])} - {Utility.format_input(song_name)}.")679                    continue680                song = Song(track=tracks[0], name=song_name, author=content["artist"], album_name=content["album"],681                            year=content["album_year"])682                self.queue.queue.append(song)683            await self.send_notifications(684                F"> Playing {Utility.format_input(content['album'])} ({Utility.format_input(content['album_year'])}) by {Utility.format_input(content['artist'])}  with {len(self.queue.queue)} songs.")685            await self.next_song()686        elif self.menu_type == "PLAYLIST":687            await self.clear_search_messages()688            if payload.emoji.name == "â¡ï¸":689                await self.play_playlist(playlist_name=content, author_id=self.menu_user_request_id)690            elif payload.emoji.name == "â":691                await self.delete_playlist(playlist_name=content)692        elif self.menu_type == "ADD_REMOVE_SONG_TO_PLAYLIST":693            await self.clear_search_messages()694            # author_id = payload.user_id695            if payload.emoji.name == "â
":696                playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id,697                                                            playlist_name=content["playlist_name"])698                if len(playlists) == 0:699                    await self.send_notifications(700                        f"> No playlist named {Utility.format_input(content['playlist_name'])}.")701                    return702                video_id = content["video_id"]703                added = Database.Database.insert_song_into_playlist(author_id=self.menu_user_request_id,704                                                                    playlist_name=content["playlist_name"],705                                                                    video_id=video_id)706                if added:707                    await self.send_notifications(f"> Added song to {Utility.format_input(content['playlist_name'])}.")708                else:709                    await self.send_notifications(710                        f"> Song already exists in {Utility.format_input(content['playlist_name'])}.")711            elif payload.emoji.name == "â":712                playlists = Database.Database.get_playlists(author_id=self.menu_user_request_id,713                                                            playlist_name=content["playlist_name"])714                if len(playlists) == 0:715                    await self.send_notifications(716                        f"> No playlist named {Utility.format_input(content['playlist_name'])}.")717                    return718                video_id = content["video_id"]719                removed = Database.Database.delete_song_from_playlist(author_id=self.menu_user_request_id,720                                                                      playlist_name=content["playlist_name"],721                                                                      video_id=video_id)722                if removed:723                    await self.send_notifications(724                        f"> Deleted song from {Utility.format_input(content['playlist_name'])}.")725                else:726                    await self.send_notifications(727                        f"> Song doesn't exist in {Utility.format_input(content['playlist_name'])}.")728        else:729            print("IMPLEMENT ME " + self.menu_type)730        await self.clear_search_messages()731class Music(commands.Cog, wavelink.WavelinkMixin):732    def __init__(self, bot):733        self.bot = bot734        self.wavelink = wavelink.Client(bot=bot)735        self.bot.loop.create_task(self.start_nodes())736    @wavelink.WavelinkMixin.listener()737    async def on_node_ready(self, node):738        print(f"Wavelink node {node.identifier} ready.")739    async def start_nodes(self):740        await self.bot.wait_until_ready()...uploader.py
Source:uploader.py  
1"""2Upload Respa reservations into Exchange as calendar events.3"""4import logging5from django.utils.encoding import force_text6from resources.models import Reservation7from respa_exchange.ews.calendar import CreateCalendarItemRequest, DeleteCalendarItemRequest, UpdateCalendarItemRequest8log = logging.getLogger(__name__)9def _build_subject(res):10    """11    Build a subject line for the given Reservation, to be sent to Exchange12    :type res: resources.models.Reservation13    :return: str14    """15    if res.event_subject:16        return res.event_subject17    bits = ["Respa"]18    if res.reserver_name:19        bits.append(res.reserver_name)20    elif res.user_id:21        bits.append(res.user)22    return " - ".join(force_text(bit) for bit in bits)23def _build_body(res):24    """25    Build the body of the Exchange appointment for a given Reservation.26    :type res: resources.models.Reservation27    :return: str28    """29    return res.event_description or ''30def _build_location(exres, res):31    resource = res.resource32    if resource.name:33        if resource.unit:34            return "%s (%s)" % (resource.name, resource.unit.name)35        return resource.name36    return exres.principal_email37def _get_calendar_item_props(exres):38    res = exres.reservation39    assert isinstance(res, Reservation)40    ret = dict(41        start=res.begin,42        end=res.end,43        subject=_build_subject(res),44        body=_build_body(res),45        location=_build_location(exres, res)46    )47    if res.user and res.user.email:48        ret['required_attendees'] = [res.user.email]49    return ret50def create_on_remote(exres):51    """52    Create and link up an appointment for an ExchangeReservation.53    :param exres: Exchange Reservation54    :type exres: respa_exchange.models.ExchangeReservation55    """56    res = exres.reservation57    if res.state != Reservation.CONFIRMED:58        return59    assert isinstance(res, Reservation)60    send_notifications = True61    if getattr(res, '_skip_notifications', False):62        send_notifications = False63    ccir = CreateCalendarItemRequest(64        principal=force_text(exres.principal_email),65        item_props=_get_calendar_item_props(exres),66        send_notifications=send_notifications67    )68    exres.item_id = ccir.send(exres.exchange.get_ews_session())69    exres.save()70    log.info("Created calendar item for %s", exres)71def update_on_remote(exres):72    """73    Update (or delete) the Exchange appointment for an ExchangeReservation.74    :param exres: Exchange Reservation75    :type exres: respa_exchange.models.ExchangeReservation76    """77    res = exres.reservation78    if res.state in (Reservation.DENIED, Reservation.CANCELLED):79        return delete_on_remote(exres)80    send_notifications = True81    if getattr(res, '_skip_notifications', False):82        send_notifications = False83    # TODO: Should we try and track the state of the object to avoid sending superfluous updates?84    ucir = UpdateCalendarItemRequest(85        principal=force_text(exres.principal_email),86        item_id=exres.item_id,87        update_props=_get_calendar_item_props(exres),88        send_notifications=send_notifications89    )90    exres.item_id = ucir.send(exres.exchange.get_ews_session())91    exres.save()92    log.info("Updated calendar item for %s", exres)93def delete_on_remote(exres):94    """95    Delete the Exchange appointment for an ExchangeReservation.96    :param exres: Exchange Reservation97    :type exres: respa_exchange.models.ExchangeReservation98    """99    send_notifications = True100    if getattr(exres.reservation, '_skip_notifications', False):101        send_notifications = False102    dcir = DeleteCalendarItemRequest(103        principal=exres.principal_email,104        item_id=exres.item_id,105        send_notifications=send_notifications106    )107    dcir.send(exres.exchange.get_ews_session())108    log.info("Deleted %s", exres)...test_indexer_daemon.py
Source:test_indexer_daemon.py  
1#!/usr/bin/env python2# coding: utf-83import os4import sys5import json6from uuid import uuid47import unittest8from unittest import mock9import importlib10from datetime import datetime11pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))  # noqa12sys.path.insert(0, pkg_root)  # noqa13from dss import Replica14from dss.util.version import datetime_to_version_format15from tests.infra import testmode16daemon_app = importlib.import_module('daemons.dss-index.app')17class TestIndexerDaemon(unittest.TestCase):18    @classmethod19    def setUpClass(cls):20        pass21    @classmethod22    def tearDownClass(cls):23        pass24    @testmode.standalone25    def test_launch_from_operator_queue(self):26        key = f"bundles/{uuid4()}.{datetime_to_version_format(datetime.utcnow())}"27        tests = [(True, "index_object"),28                 (False, "index_object"),29                 (None, "index_object")]30        for send_notifications, expected_call in tests:31            if send_notifications is not None:32                msg = dict(replica="aws", key=key, send_notifications=send_notifications)33            else:34                msg = dict(replica="aws", key=key)35            event = dict(body=json.dumps(msg))36            with mock.patch("daemons.dss-index.app.Indexer") as indexer:37                with mock.patch("daemons.dss-index.app.CompositeIndexBackend") as backend:38                    daemon_app.launch_from_operator_queue(dict(Records=[event]), {})39                    name, args, kwargs = indexer.mock_calls[-1]40                    self.assertIn(expected_call, name)41                    args, kwargs = backend.call_args_list[0]42                    self.assertEqual(send_notifications, kwargs['notify'])43    @testmode.standalone44    def test_handle_event(self):45        replica = Replica.aws  # This value is not important for this test46        key = f"bundles/{uuid4()}.{datetime_to_version_format(datetime.utcnow())}"47        tests = [(operator_initiated, send_notifications)48                 for send_notifications in (True, False, None)49                 for operator_initiated in (True, False)]50        for operator_initiated, send_notifications in tests:51            with self.subTest(operator_initiated=operator_initiated, send_notifications=send_notifications):52                with mock.patch("daemons.dss-index.app.Indexer") as indexer:53                    with mock.patch("daemons.dss-index.app.CompositeIndexBackend") as backend:54                        daemon_app._handle_event(replica, key, {}, send_notifications, operator_initiated)55                        name, args, kwargs = indexer.mock_calls[-1]56                        if operator_initiated:57                            self.assertIn("index_object", name)58                        else:59                            self.assertIn("process_new_indexable_object", name)60                        args, kwargs = backend.call_args_list[0]61                        self.assertEqual(send_notifications, kwargs['notify'])62if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
