Best Python code snippet using pytractor_python
bus_vehicle_handler_test.py
Source:bus_vehicle_handler_test.py  
1#!/usr/local/bin/python2# -*- coding: utf-8 -*-3"""4- LICENCE5The MIT License (MIT)6Copyright (c) 2016 Eleftherios Anagnostopoulos for Ericsson AB (EU FP7 CityPulse Project)7Permission is hereby granted, free of charge, to any person obtaining a copy8of this software and associated documentation files (the "Software"), to deal9in the Software without restriction, including without limitation the rights10to use, copy, modify, merge, publish, distribute, sublicense, and/or sell11copies of the Software, and to permit persons to whom the Software is12furnished to do so, subject to the following conditions:13The above copyright notice and this permission notice shall be included in all14copies or substantial portions of the Software.15THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR16IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,17FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE18AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER19LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,20OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE21SOFTWARE.22- DESCRIPTION OF DOCUMENTS23-- MongoDB Database Documents:24address_document: {25    '_id', 'name', 'node_id', 'point': {'longitude', 'latitude'}26}27bus_line_document: {28    '_id', 'bus_line_id', 'bus_stops': [{'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}}]29}30bus_stop_document: {31    '_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}32}33bus_stop_waypoints_document: {34    '_id', 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},35    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},36    'waypoints': [[edge_object_id]]37}38bus_vehicle_document: {39    '_id', 'bus_vehicle_id', 'maximum_capacity',40    'routes': [{'starting_datetime', 'ending_datetime', 'timetable_id'}]41}42detailed_bus_stop_waypoints_document: {43    '_id', 'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},44    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},45    'waypoints': [[edge_document]]46}47edge_document: {48    '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},49    'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},50    'max_speed', 'road_type', 'way_id', 'traffic_density'51}52node_document: {53    '_id', 'osm_id', 'tags', 'point': {'longitude', 'latitude'}54}55point_document: {56    '_id', 'osm_id', 'point': {'longitude', 'latitude'}57}58timetable_document: {59    '_id', 'timetable_id', 'bus_line_id', 'bus_vehicle_id',60    'timetable_entries': [{61        'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},62        'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},63        'departure_datetime', 'arrival_datetime', 'number_of_onboarding_passengers',64        'number_of_deboarding_passengers', 'number_of_current_passengers',65        'route': {66            'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',67            'distances_from_starting_node', 'times_from_starting_node',68            'distances_from_previous_node', 'times_from_previous_node'69        }70    }],71    'travel_requests': [{72        '_id', 'client_id', 'bus_line_id',73        'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},74        'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},75        'departure_datetime', 'arrival_datetime',76        'starting_timetable_entry_index', 'ending_timetable_entry_index'77    }]78}79traffic_event_document: {80    '_id', 'event_id', 'event_type', 'event_level', 'point': {'longitude', 'latitude'}, 'datetime'81}82travel_request_document: {83    '_id', 'client_id', 'bus_line_id',84    'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},85    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},86    'departure_datetime', 'arrival_datetime',87    'starting_timetable_entry_index', 'ending_timetable_entry_index'88}89way_document: {90    '_id', 'osm_id', 'tags', 'references'91}92-- Route Generator Responses:93get_route_between_two_bus_stops: {94    'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},95    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},96    'route': {97        'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',98        'distances_from_starting_node', 'times_from_starting_node',99        'distances_from_previous_node', 'times_from_previous_node'100    }101}102get_route_between_multiple_bus_stops: [{103    'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},104    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},105    'route': {106        'total_distance', 'total_time', 'node_osm_ids', 'points', 'edges',107        'distances_from_starting_node', 'times_from_starting_node',108        'distances_from_previous_node', 'times_from_previous_node'109    }110}]111get_waypoints_between_two_bus_stops: {112    'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},113    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},114    'waypoints': [[{115        '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},116        'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},117        'max_speed', 'road_type', 'way_id', 'traffic_density'118    }]]119}120get_waypoints_between_multiple_bus_stops: [{121    'starting_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},122    'ending_bus_stop': {'_id', 'osm_id', 'name', 'point': {'longitude', 'latitude'}},123    'waypoints': [[{124        '_id', 'starting_node': {'osm_id', 'point': {'longitude', 'latitude'}},125        'ending_node': {'osm_id', 'point': {'longitude', 'latitude'}},126        'max_speed', 'road_type', 'way_id', 'traffic_density'127    }]]128}]129"""130import time131import os132import sys133sys.path.append(os.path.join(os.path.dirname(__file__), '../'))134from src.common.logger import log135from src.look_ahead.bus_vehicle_handler import BusVehicleHandler136__author__ = 'Eleftherios Anagnostopoulos'137__email__ = 'eanagnostopoulos@hotmail.com'138__credits__ = [139    'Azadeh Bararsani (Senior Researcher at Ericsson AB) - email: azadeh.bararsani@ericsson.com'140    'Aneta Vulgarakis Feljan (Senior Researcher at Ericsson AB) - email: aneta.vulgarakis@ericsson.com'141]142class BusVehicleHandlerTester(object):143    def __init__(self):144        self.module_name = 'bus_vehicle_handler_tester'145        self.log_type = 'INFO'146        self.log_message = 'initialize_bus_vehicle_handler: starting'147        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)148        self.start_time = time.time()149        self.bus_vehicle_handler = BusVehicleHandler()150        self.elapsed_time = time.time() - self.start_time151        self.log_message = 'initialize_bus_vehicle_handler: finished - elapsed_time = ' \152                           + str(self.elapsed_time) + ' sec'153        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)154    def test_clear_bus_vehicle_documents_collection(self):155        """156        Delete all the documents of the BusVehicleDocuments collection.157        :return: number_of_deleted_documents: int158        """159        self.log_message = 'test_clear_bus_vehicle_documents_collection: starting'160        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)161        self.start_time = time.time()162        number_of_deleted_documents = self.bus_vehicle_handler.clear_bus_vehicle_documents_collection()163        self.elapsed_time = time.time() - self.start_time164        self.log_message = 'test_clear_bus_vehicle_documents_collection: finished - elapsed_time = ' \165                           + str(self.elapsed_time) + ' sec'166        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)167        return number_of_deleted_documents168    def test_delete_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):169        """170        Delete a bus_vehicle_document.171        :param object_id: ObjectId172        :param bus_vehicle_id: int173        :return: True if the document was successfully deleted, otherwise False.174        """175        self.log_message = 'test_delete_bus_vehicle_document: starting'176        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)177        self.start_time = time.time()178        deleted = self.bus_vehicle_handler.delete_bus_vehicle_document(179            object_id=object_id,180            bus_vehicle_id=bus_vehicle_id181        )182        self.elapsed_time = time.time() - self.start_time183        self.log_message = 'test_delete_bus_vehicle_document: finished - elapsed_time = ' \184                           + str(self.elapsed_time) + ' sec'185        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)186        return deleted187    def test_delete_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None):188        """189        Delete multiple bus_vehicle_documents.190        :param object_ids: [ObjectId]191        :param bus_vehicle_ids: [int]192        :return: number_of_deleted_documents: int193        """194        self.log_message = 'test_delete_bus_vehicle_documents: starting'195        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)196        self.start_time = time.time()197        number_of_deleted_documents = self.bus_vehicle_handler.delete_bus_vehicle_documents(198            object_ids=object_ids,199            bus_vehicle_ids=bus_vehicle_ids200        )201        self.elapsed_time = time.time() - self.start_time202        self.log_message = 'test_delete_bus_vehicle_documents: finished - elapsed_time = ' \203                           + str(self.elapsed_time) + ' sec'204        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)205        return number_of_deleted_documents206    def test_generate_bus_vehicle_document(self, maximum_capacity):207        """208        Generate a new bus_vehicle_document.209        :param maximum_capacity: int210        :return: new_object_id: ObjectId211        """212        self.log_message = 'test_generate_bus_vehicle_document: starting'213        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)214        self.start_time = time.time()215        new_object_id = self.bus_vehicle_handler.generate_bus_vehicle_document(216            maximum_capacity=maximum_capacity217        )218        self.elapsed_time = time.time() - self.start_time219        self.log_message = 'test_generate_bus_vehicle_document: finished - elapsed_time = ' \220                           + str(self.elapsed_time) + ' sec'221        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)222        return new_object_id223    def test_generate_bus_vehicle_documents(self, maximum_capacity, number_of_bus_vehicle_documents):224        """225        Generate multiple bus_vehicle_documents.226        :param maximum_capacity: int227        :param number_of_bus_vehicle_documents: int228        :return: new_object_ids: [ObjectIds]229        """230        self.log_message = 'test_generate_bus_vehicle_documents: starting'231        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)232        self.start_time = time.time()233        new_object_ids = self.bus_vehicle_handler.generate_bus_vehicle_documents(234            maximum_capacity=maximum_capacity,235            number_of_bus_vehicle_documents=number_of_bus_vehicle_documents236        )237        self.elapsed_time = time.time() - self.start_time238        self.log_message = 'test_generate_bus_vehicle_documents: finished - elapsed_time = ' \239                           + str(self.elapsed_time) + ' sec'240        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)241        return new_object_ids242    def test_insert_bus_vehicle_document(self, bus_vehicle_document=None, bus_vehicle_id=None,243                                         maximum_capacity=None, routes=None):244        """245        Insert a new bus_vehicle_document or update, if it already exists in the database.246        :param bus_vehicle_document247        :param bus_vehicle_id: int248        :param maximum_capacity: int249        :param routes: [{'starting_datetime', 'ending_datetime', 'timetable_id'}]250        :return: new_object_id: ObjectId251        """252        self.log_message = 'test_insert_bus_vehicle_document: starting'253        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)254        self.start_time = time.time()255        new_object_id = self.bus_vehicle_handler.insert_bus_vehicle_document(256            bus_vehicle_document=bus_vehicle_document,257            bus_vehicle_id=bus_vehicle_id,258            maximum_capacity=maximum_capacity,259            routes=routes260        )261        self.elapsed_time = time.time() - self.start_time262        self.log_message = 'test_insert_bus_vehicle_document: finished - elapsed_time = ' \263                           + str(self.elapsed_time) + ' sec'264        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)265        return new_object_id266    def test_insert_bus_vehicle_documents(self, bus_vehicle_documents, insert_many=False):267        """268        Insert multiple bus_vehicle_documents or update existing ones.269        :param bus_vehicle_documents:270        :param insert_many: bool271        :return: new_object_ids: [ObjectId]272        """273        self.log_message = 'test_insert_bus_vehicle_documents: starting'274        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)275        self.start_time = time.time()276        new_object_ids = self.bus_vehicle_handler.insert_bus_vehicle_documents(277            bus_vehicle_documents=bus_vehicle_documents,278            insert_many=insert_many279        )280        self.elapsed_time = time.time() - self.start_time281        self.log_message = 'test_insert_bus_vehicle_documents: finished - elapsed_time = ' \282                           + str(self.elapsed_time) + ' sec'283        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)284        return new_object_ids285    def test_print_bus_vehicle_document(self, object_id=None, bus_vehicle_id=None):286        """287        Print a bus_vehicle_document.288        :param object_id: ObjectId289        :param bus_vehicle_id: int290        :return: None291        """292        self.log_message = 'test_print_bus_vehicle_document: starting'293        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)294        self.start_time = time.time()295        self.bus_vehicle_handler.print_bus_vehicle_document(296            object_id=object_id,297            bus_vehicle_id=bus_vehicle_id298        )299        self.elapsed_time = time.time() - self.start_time300        self.log_message = 'test_print_bus_vehicle_document: finished - elapsed_time = ' \301                           + str(self.elapsed_time) + ' sec'302        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)303    def test_print_bus_vehicle_documents(self, object_ids=None, bus_vehicle_ids=None, counter=None):304        """305        Print multiple bus_vehicle_documents.306        :param object_ids: [ObjectId]307        :param bus_vehicle_ids: [int]308        :param counter: int309        :return: None310        """311        self.log_message = 'test_print_bus_vehicle_documents: starting'312        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)313        self.start_time = time.time()314        self.bus_vehicle_handler.print_bus_vehicle_documents(315            object_ids=object_ids,316            bus_vehicle_ids=bus_vehicle_ids,317            counter=counter318        )319        self.elapsed_time = time.time() - self.start_time320        self.log_message = 'test_print_bus_vehicle_documents: finished - elapsed_time = ' \321                           + str(self.elapsed_time) + ' sec'322        log(module_name=self.module_name, log_type=self.log_type, log_message=self.log_message)323if __name__ == '__main__':324    bus_vehicle_handler_tester = BusVehicleHandlerTester()325    while True:326        time.sleep(0.01)327        selection = raw_input(328            '\n0. exit'329            '\n1. test_clear_bus_vehicle_documents_collection'330            '\n2. test_delete_bus_vehicle_document'331            '\n3. test_delete_bus_vehicle_documents'332            '\n4. test_generate_bus_vehicle_document'333            '\n5. test_generate_bus_vehicle_documents'334            '\n6. test_insert_bus_vehicle_document'335            '\n7. test_insert_bus_vehicle_documents'336            '\n8. test_print_bus_vehicle_document'337            '\n9. test_print_bus_vehicle_documents'338            '\nSelection: '339        )340        # 0. exit341        if selection == '0':342            break343        # 1. test_clear_bus_vehicle_documents_collection344        elif selection == '1':345            bus_vehicle_handler_tester.test_clear_bus_vehicle_documents_collection()346        # 2. test_delete_bus_vehicle_document347        elif selection == '2':348            bus_vehicle_id = int(349                raw_input(350                    '\n2. test_delete_bus_vehicle_document'351                    '\nbus_vehicle_id: '352                )353            )354            bus_vehicle_handler_tester.test_delete_bus_vehicle_document(355                object_id=None,356                bus_vehicle_id=bus_vehicle_id357            )358        # 3. test_delete_bus_vehicle_documents359        elif selection == '3':360            bus_vehicle_ids = []361            bus_vehicle_handler_tester.test_delete_bus_vehicle_documents(362                object_ids=None,363                bus_vehicle_ids=bus_vehicle_ids364            )365        # 4. test_generate_bus_vehicle_document366        elif selection == '4':367            maximum_capacity = int(368                raw_input(369                    '\n4. test_generate_bus_vehicle_document'370                    '\nmaximum_capacity: '371                )372            )373            bus_vehicle_handler_tester.test_generate_bus_vehicle_document(374                maximum_capacity=maximum_capacity375            )376        # 5. test_generate_bus_vehicle_documents377        elif selection == '5':378            maximum_capacity = int(379                raw_input(380                    '\n5. test_generate_bus_vehicle_documents'381                    '\nmaximum_capacity: '382                )383            )384            number_of_bus_vehicle_documents = int(385                raw_input(386                    '\nnumber_of_bus_vehicle_documents: '387                )388            )389            bus_vehicle_handler_tester.test_generate_bus_vehicle_documents(390                maximum_capacity=maximum_capacity,391                number_of_bus_vehicle_documents=number_of_bus_vehicle_documents392            )393        # 6. test_insert_bus_vehicle_document394        elif selection == '6':395            pass396        # 7. test_insert_bus_vehicle_documents397        elif selection == '7':398            pass399        # 8. test_print_bus_vehicle_document400        elif selection == '8':401            bus_vehicle_id = int(402                raw_input(403                    '\n8. test_print_bus_vehicle_document'404                    '\nbus_vehicle_id: '405                )406            )407            bus_vehicle_handler_tester.test_print_bus_vehicle_document(408                object_id=None,409                bus_vehicle_id=bus_vehicle_id410            )411        # 9. test_print_bus_vehicle_documents412        elif selection == '9':413            bus_vehicle_handler_tester.test_print_bus_vehicle_documents()414        else:...bans.py
Source:bans.py  
1import html2from telegram import ParseMode, Update3from telegram.error import BadRequest4from telegram.ext import CallbackContext, CommandHandler, Filters, run_async5from telegram.utils.helpers import mention_html6from Sherlock import (7    DEMONS,8    DEV_USERS,9    DRAGONS,10    LOGGER,11    OWNER_ID,12    TIGERS,13    WOLVES,14    dispatcher,15)16from Sherlock.modules.disable import DisableAbleCommandHandler17from Sherlock.modules.helper_funcs.chat_status import (18    bot_admin,19    can_restrict,20    connection_status,21    is_user_admin,22    is_user_ban_protected,23    is_user_in_chat,24    user_admin,25    user_can_ban,26)27from Sherlock.modules.helper_funcs.extraction import extract_user_and_text28from Sherlock.modules.helper_funcs.string_handling import extract_time29from Sherlock.modules.log_channel import gloggable, loggable30@run_async31@connection_status32@bot_admin33@can_restrict34@user_admin35@user_can_ban36@loggable37def ban(update: Update, context: CallbackContext) -> str:38    chat = update.effective_chat39    user = update.effective_user40    message = update.effective_message41    log_message = ""42    bot = context.bot43    args = context.args44    user_id, reason = extract_user_and_text(message, args)45    if not user_id:46        message.reply_text("I doubt that's a user.")47        return log_message48    try:49        member = chat.get_member(user_id)50    except BadRequest as excp:51        if excp.message == "User not found":52            message.reply_text("Can't seem to find this person.")53            return log_message54        else:55            raise56    if user_id == bot.id:57        message.reply_text("Oh yeah, ban myself, noob!")58        return log_message59    if is_user_ban_protected(chat, user_id, member) and user not in DEV_USERS:60        if user_id == OWNER_ID:61            message.reply_text("Trying to put me against a God level disaster huh?")62            return log_message63        elif user_id in DEV_USERS:64            message.reply_text("I can't act against our own.")65            return log_message66        elif user_id in DRAGONS:67            message.reply_text(68                "Fighting this Dragon here will put civilian lives at risk."69            )70            return log_message71        elif user_id in DEMONS:72            message.reply_text(73                "Bring an order from Heroes association to fight a Demon disaster."74            )75            return log_message76        elif user_id in TIGERS:77            message.reply_text(78                "Bring an order from Heroes association to fight a Tiger disaster."79            )80            return log_message81        elif user_id in WOLVES:82            message.reply_text("Wolf abilities make them ban immune!")83            return log_message84        else:85            message.reply_text("This user has immunity and cannot be banned.")86            return log_message87    log = (88        f"<b>{html.escape(chat.title)}:</b>\n"89        f"#BANNED\n"90        f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"91        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"92    )93    if reason:94        log += "\n<b>Reason:</b> {}".format(reason)95    try:96        chat.kick_member(user_id)97        # bot.send_sticker(chat.id, BAN_STICKER)  # banhammer marie sticker98        reply = (99            f"<code>â</code><b>Ban Event</b>\n"100            f"<code> </code><b>⢠ User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"101        )102        if reason:103            reply += f"\n<code> </code><b>⢠ Reason:</b> \n{html.escape(reason)}"104        bot.sendMessage(chat.id, reply, parse_mode=ParseMode.HTML, quote=False)105        return log106    except BadRequest as excp:107        if excp.message == "Reply message not found":108            # Do not reply109            message.reply_text("Banned!", quote=False)110            return log111        else:112            LOGGER.warning(update)113            LOGGER.exception(114                "ERROR banning user %s in chat %s (%s) due to %s",115                user_id,116                chat.title,117                chat.id,118                excp.message,119            )120            message.reply_text("Uhm...that didn't work...")121    return log_message122@run_async123@connection_status124@bot_admin125@can_restrict126@user_admin127@user_can_ban128@loggable129def sban(update: Update, context: CallbackContext) -> str:130    chat = update.effective_chat131    user = update.effective_user132    message = update.effective_message133    log_message = ""134    bot = context.bot135    args = context.args136    user_id, reason = extract_user_and_text(message, args)137    update.effective_message.delete()138    if not user_id:139        return log_message140    try:141        member = chat.get_member(user_id)142    except BadRequest as excp:143        if excp.message == "User not found":144            return log_message145        else:146            raise147    if user_id == bot.id:148        return log_message149    if is_user_ban_protected(chat, user_id, member):150        return log_message151    if user_id == 777000 or user_id == 1087968824:152        return log_message153    log = (154        f"<b>{html.escape(chat.title)}:</b>\n"155        f"#SBANNED\n"156        f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"157        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"158    )159    if reason:160        log += "\n<b>Reason:</b> {}".format(reason)161    try:162        chat.kick_member(user_id)163        return log164    except BadRequest as excp:165        if excp.message == "Reply message not found":166            return log167        else:168            LOGGER.warning(update)169            LOGGER.exception(170                "ERROR banning user %s in chat %s (%s) due to %s",171                user_id,172                chat.title,173                chat.id,174                excp.message,175            )176    return log_message177@run_async178@connection_status179@bot_admin180@can_restrict181@user_admin182@user_can_ban183@loggable184def temp_ban(update: Update, context: CallbackContext) -> str:185    chat = update.effective_chat186    user = update.effective_user187    message = update.effective_message188    log_message = ""189    bot, args = context.bot, context.args190    user_id, reason = extract_user_and_text(message, args)191    if not user_id:192        message.reply_text("I doubt that's a user.")193        return log_message194    try:195        member = chat.get_member(user_id)196    except BadRequest as excp:197        if excp.message == "User not found":198            message.reply_text("I can't seem to find this user.")199            return log_message200        else:201            raise202    if user_id == bot.id:203        message.reply_text("I'm not gonna BAN myself, are you crazy?")204        return log_message205    if is_user_ban_protected(chat, user_id, member):206        message.reply_text("I don't feel like it.")207        return log_message208    if not reason:209        message.reply_text("You haven't specified a time to ban this user for!")210        return log_message211    split_reason = reason.split(None, 1)212    time_val = split_reason[0].lower()213    if len(split_reason) > 1:214        reason = split_reason[1]215    else:216        reason = ""217    bantime = extract_time(message, time_val)218    if not bantime:219        return log_message220    log = (221        f"<b>{html.escape(chat.title)}:</b>\n"222        "#TEMP BANNED\n"223        f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"224        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}\n"225        f"<b>Time:</b> {time_val}"226    )227    if reason:228        log += "\n<b>Reason:</b> {}".format(reason)229    try:230        chat.kick_member(user_id, until_date=bantime)231        # bot.send_sticker(chat.id, BAN_STICKER)  # banhammer marie sticker232        bot.sendMessage(233            chat.id,234            f"Banned! User {mention_html(member.user.id, html.escape(member.user.first_name))} "235            f"will be banned for {time_val}.",236            parse_mode=ParseMode.HTML,237        )238        return log239    except BadRequest as excp:240        if excp.message == "Reply message not found":241            # Do not reply242            message.reply_text(243                f"Banned! User will be banned for {time_val}.", quote=False244            )245            return log246        else:247            LOGGER.warning(update)248            LOGGER.exception(249                "ERROR banning user %s in chat %s (%s) due to %s",250                user_id,251                chat.title,252                chat.id,253                excp.message,254            )255            message.reply_text("Well damn, I can't ban that user.")256    return log_message257@run_async258@connection_status259@bot_admin260@can_restrict261@user_admin262@user_can_ban263@loggable264def stemp_ban(update: Update, context: CallbackContext) -> str:265    chat = update.effective_chat266    user = update.effective_user267    message = update.effective_message268    log_message = ""269    bot, args = context.bot, context.args270    user_id, reason = extract_user_and_text(message, args)271    update.effective_message.delete()272    if not user_id:273        return log_message274    try:275        member = chat.get_member(user_id)276    except BadRequest as excp:277        if excp.message == "User not found":278            return log_message279        else:280            raise281    if user_id == bot.id:282        return log_message283    if is_user_ban_protected(chat, user_id, member):284        return log_message285    if not reason:286        message.reply_text("You haven't specified a time to ban this user for!")287        return log_message288    split_reason = reason.split(None, 1)289    time_val = split_reason[0].lower()290    if len(split_reason) > 1:291        reason = split_reason[1]292    else:293        reason = ""294    bantime = extract_time(message, time_val)295    if not bantime:296        return log_message297    log = (298        f"<b>{html.escape(chat.title)}:</b>\n"299        "#STEMP BANNED\n"300        f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"301        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}\n"302        f"<b>Time:</b> {time_val}"303    )304    if reason:305        log += "\n<b>Reason:</b> {}".format(reason)306    try:307        chat.kick_member(user_id, until_date=bantime)308        # bot.send_sticker(chat.id, BAN_STICKER)  # banhammer marie sticker309        return log310    except BadRequest as excp:311        if excp.message == "Reply message not found":312            # Do not reply313            return log314        else:315            LOGGER.warning(update)316            LOGGER.exception(317                "ERROR banning user %s in chat %s (%s) due to %s",318                user_id,319                chat.title,320                chat.id,321                excp.message,322            )323    return log_message324@run_async325@connection_status326@bot_admin327@can_restrict328@user_admin329@user_can_ban330@loggable331def kick(update: Update, context: CallbackContext) -> str:332    chat = update.effective_chat333    user = update.effective_user334    message = update.effective_message335    log_message = ""336    bot, args = context.bot, context.args337    user_id, reason = extract_user_and_text(message, args)338    if not user_id:339        message.reply_text("I doubt that's a user.")340        return log_message341    try:342        member = chat.get_member(user_id)343    except BadRequest as excp:344        if excp.message == "User not found":345            message.reply_text("I can't seem to find this user.")346            return log_message347        else:348            raise349    if user_id == bot.id:350        message.reply_text("Yeahhh I'm not gonna do that.")351        return log_message352    if is_user_ban_protected(chat, user_id):353        message.reply_text("I really wish I could kick this user....")354        return log_message355    res = chat.unban_member(user_id)  # unban on current user = kick356    if res:357        # bot.send_sticker(chat.id, BAN_STICKER)  # banhammer marie sticker358        bot.sendMessage(359            chat.id,360            f"User Kicked Woops! {mention_html(member.user.id, html.escape(member.user.first_name))}.",361            parse_mode=ParseMode.HTML,362        )363        log = (364            f"<b>{html.escape(chat.title)}:</b>\n"365            f"#KICKED\n"366            f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"367            f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"368        )369        if reason:370            log += f"\n<b>Reason:</b> {reason}"371        return log372    else:373        message.reply_text("Well damn, I can't kick that user.")374    return log_message375@run_async376@connection_status377@bot_admin378@can_restrict379@user_admin380@user_can_ban381@loggable382def skick(update: Update, context: CallbackContext) -> str:383    chat = update.effective_chat384    user = update.effective_user385    message = update.effective_message386    log_message = ""387    bot, args = context.bot, context.args388    user_id, reason = extract_user_and_text(message, args)389    update.effective_message.delete()390    if not user_id:391        return log_message392    try:393        member = chat.get_member(user_id)394    except BadRequest as excp:395        if excp.message == "User not found":396            return log_message397        else:398            raise399    if user_id == bot.id:400        return log_message401    if is_user_ban_protected(chat, user_id):402        return log_message403    res = chat.unban_member(user_id)  # unban on current user = kick404    if res:405        log = (406            f"<b>{html.escape(chat.title)}:</b>\n"407            f"#SKICKED\n"408            f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"409            f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"410        )411        if reason:412            log += f"\n<b>Reason:</b> {reason}"413        return log414    return log_message415@run_async416@bot_admin417@can_restrict418def kickme(update: Update, context: CallbackContext):419    user_id = update.effective_message.from_user.id420    if is_user_admin(update.effective_chat, user_id):421        update.effective_message.reply_text("I wish I could... but you're an admin.")422        return423    res = update.effective_chat.unban_member(user_id)  # unban on current user = kick424    if res:425        update.effective_message.reply_text("*kicks you out of the group*")426    else:427        update.effective_message.reply_text("Huh? I can't :/")428@run_async429@connection_status430@bot_admin431@can_restrict432@user_admin433@user_can_ban434@loggable435def unban(update: Update, context: CallbackContext) -> str:436    message = update.effective_message437    user = update.effective_user438    chat = update.effective_chat439    log_message = ""440    bot, args = context.bot, context.args441    user_id, reason = extract_user_and_text(message, args)442    if not user_id:443        message.reply_text("I doubt that's a user.")444        return log_message445    try:446        member = chat.get_member(user_id)447    except BadRequest as excp:448        if excp.message == "User not found":449            message.reply_text("I can't seem to find this user.")450            return log_message451        else:452            raise453    if user_id == bot.id:454        message.reply_text("How would I unban myself if I wasn't here...?")455        return log_message456    if is_user_in_chat(chat, user_id):457        message.reply_text("Isn't this person already here??")458        return log_message459    chat.unban_member(user_id)460    message.reply_text("Yep, this user can join!")461    log = (462        f"<b>{html.escape(chat.title)}:</b>\n"463        f"#UNBANNED\n"464        f"<b>Admin:</b> {mention_html(user.id, html.escape(user.first_name))}\n"465        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"466    )467    if reason:468        log += f"\n<b>Reason:</b> {reason}"469    return log470@run_async471@connection_status472@bot_admin473@can_restrict474@gloggable475def selfunban(context: CallbackContext, update: Update) -> str:476    message = update.effective_message477    user = update.effective_user478    bot, args = context.bot, context.args479    if user.id not in DRAGONS or user.id not in TIGERS:480        return481    try:482        chat_id = int(args[0])483    except:484        message.reply_text("Give a valid chat ID.")485        return486    chat = bot.getChat(chat_id)487    try:488        member = chat.get_member(user.id)489    except BadRequest as excp:490        if excp.message == "User not found":491            message.reply_text("I can't seem to find this user.")492            return493        else:494            raise495    if is_user_in_chat(chat, user.id):496        message.reply_text("Aren't you already in the chat??")497        return498    chat.unban_member(user.id)499    message.reply_text("Yep, I have unbanned you.")500    log = (501        f"<b>{html.escape(chat.title)}:</b>\n"502        f"#UNBANNED\n"503        f"<b>User:</b> {mention_html(member.user.id, html.escape(member.user.first_name))}"504    )505    return log506__help__ = """507*Kicks:*508 ⪠/kick <userhandle>*:* Kicks a user out of the group, (via handle, or reply)509 ⪠/skick <userhandle>*:* Silently kicks a user out of the group, (via handle, or reply)510 ⪠/kickme*:* Kicks the user who used the command.511 512*Bans:*513 ⪠/ban <userhandle>*:* Bans a user. (via handle, or reply)514 ⪠/sban <userhandle>*:* Silently bans a user without leaving any message. (via handle, or reply)515 ⪠/tban <userhandle> x(m/h/d)*:* Bans a user for `x` time. (via handle, or reply). `m` = `minutes`, `h` = `hours`, `d` = `days`.516 ⪠/stban <userhandle> x(m/h/d)*:* Silently bans a user for `x` time. (via handle, or reply). `m` = `minutes`, `h` = `hours`, `d` = `days`.517 ⪠/unban <userhandle>*:* Unbans a user. (via handle, or reply)518_NOTE:_519 If you set Log Channels, you will get logs of Silent kick and bans. Check *Logger* module to know more about Log Channel.520"""521BAN_HANDLER = CommandHandler("ban", ban)522TEMPBAN_HANDLER = CommandHandler(["tban"], temp_ban)523STEMPBAN_HANDLER = CommandHandler(["stban"], stemp_ban)524KICK_HANDLER = CommandHandler("kick", kick)525SKICK_HANDLER = CommandHandler("skick", skick)526UNBAN_HANDLER = CommandHandler("unban", unban)527ROAR_HANDLER = CommandHandler("roar", selfunban)528KICKME_HANDLER = DisableAbleCommandHandler("kickme", kickme, filters=Filters.group)529SBAN_HANDLER = CommandHandler("sban", sban)530dispatcher.add_handler(BAN_HANDLER)531dispatcher.add_handler(TEMPBAN_HANDLER)532dispatcher.add_handler(STEMPBAN_HANDLER)533dispatcher.add_handler(KICK_HANDLER)534dispatcher.add_handler(SKICK_HANDLER)535dispatcher.add_handler(UNBAN_HANDLER)536dispatcher.add_handler(ROAR_HANDLER)537dispatcher.add_handler(KICKME_HANDLER)538dispatcher.add_handler(SBAN_HANDLER)539__mod_name__ = "Bans"540__handlers__ = [541    BAN_HANDLER,542    TEMPBAN_HANDLER,543    STEMPBAN_HANDLER,544    KICK_HANDLER,545    SKICK_HANDLER,546    UNBAN_HANDLER,547    ROAR_HANDLER,548    KICKME_HANDLER,549    SBAN_HANDLER,...itunesrpc.py
Source:itunesrpc.py  
...64# none of this is automatically uploaded, but it should be65# sent within error reports on GitHub, as it may help determine66# your issue67os.remove("log")  # delete the old log file.68log_message("Starting system log dump.")69log_message("Machine Architecture: " + platform.machine())70log_message("Machine Version:" + platform.version())71log_message("Machine Platform: " + platform.platform())72log_message("Machine Unix Name: " + str(platform.uname()))73log_message("OS Type: " + platform.system())74log_message("Processor: " + platform.processor())75log_message(76    "Total RAM: " + str(round(psutil.virtual_memory().total / (1024.0**3))) + " GB"77)78log_message("End of system logs.\n")79log_message("Starting iTunesRPC logs.")80# DEFINITIONS81def push_playing(o, DiscordRPC, dict, last_pos, paused_track, moved_playhead):82    paused = False83    # Get all relevant information84    # TRACK INFO85    track = o.CurrentTrack.Name86    # OTHER INFO87    artist = o.CurrentTrack.Artist88    album = o.CurrentTrack.Album89    # SAVE DATA TO FILE, FOR WINDOW90    curr = str({"song": track, "artist": artist, "album": album})91    with open(92        "current_song_info", "w", encoding="utf-8"93    ) as current:  # save with context manager to allow for encoding= variable.94        current.write(curr)95    # MODIFY TRACK TO HAVE PAUSED IF PAUSED ON APPLE MUSIC96    if paused_track:97        track = "[PAUSED] " + track98    file_path = os.getcwd() + "\\temporary.png"99    o.CurrentTrack.Artwork.Item(1).SaveArtworkToFile(file_path)100    artwork_url = networking.get("temporary.png", domain, track, artist, album)101    artwork_url = ast.literal_eval(str(artwork_url))102    artwork_url = str(artwork_url[1]) + str(artwork_url[2])103    artwork_url = "https://" + domain + "/itrpc/" + artwork_url104    # os.remove(file_path)105    # log_message INFO106    log_message("Track: " + track)107    log_message("Artist: " + artist)108    log_message("Album: " + album)109    log_message("Artwork URL: " + str(artwork_url))110    pause_button = f"https://{domain}/itrpc/pause.png"111    play_button = f"https://{domain}/itrpc/play.png"112    # timestamps for computing how far into the song we are113    if paused_track is False:114        starttime = int(time.time()) - o.PlayerPosition115        endtime = int(time.time()) + (o.CurrentTrack.Duration - o.PlayerPosition)116    try:117        if moved_playhead:118            DiscordRPC.clear()  # get rid of the current status: the left count won't refresh otherwise.119            time.sleep(0.1)120            # if we don't pause for a tiny amount the .update will send, and discord will121            # forget the .clear command.122        if paused_track is True:123            if paused is not True:124                DiscordRPC.update(125                    details=track,126                    state=artist,127                    large_image=artwork_url,128                    large_text=album,129                    small_image=pause_button,130                    small_text="Paused on Apple Music",131                    buttons=buttons,132                )133                paused = True134        else:135            if last_pos is not False:136                DiscordRPC.update(137                    details=track,138                    state=artist,139                    start=starttime,140                    end=endtime,141                    large_image=artwork_url,142                    large_text=album,143                    small_image=play_button,144                    small_text="Playing on Apple Music",145                    buttons=buttons,146                )147            else:148                last_pos = o.CurrentTrack.Duration - o.PlayerPosition149    except Exception:150        # Discord is closed if we error here.151        # Let's re open it.152        log_message("..........................................")153        log_message(".           Discord is closed...         .")154        log_message(".          Attempting to open it         .")155        log_message(". Waiting global_pause+3 seconds before  .")156        log_message(".               continuing.              .")157        log_message("..........................................")158        DiscordRPC = False159        opened = False160        while DiscordRPC is False:161            try:162                DiscordRPC = pypresence.Presence(secret, pipe=0)163                DiscordRPC.connect()164                log_message("Hooked to Discord.")165            except Exception:166                if not opened:167                    os.system(discord_command)168                    log_message("..........................................")169                    log_message(".           Discord is closed...         .")170                    log_message(".          Attempting to open it         .")171                    log_message(". Waiting global_pause+3 seconds before  .")172                    log_message(".               continuing.              .")173                    log_message("..........................................")174                    opened = True175                time.sleep(global_pause + 3)176                continue177        # Now we have re opened Discord, let's post the message to Discord.178        time.sleep(global_pause)179        # Since we may have had Discord closed for a while, we need to update our items.180        # Let's re call this definition, as it ensures we get the most recent values.181        # We can send our original arguments to this. It isn't a massive deal.182        DiscordRPC, track, artist, album, last_pos, paused = push_playing(183            o, DiscordRPC, dict, last_pos, paused_track, moved_playhead184        )185    # Finally, regardless of what happened, let's return all our values.186    return (DiscordRPC, track, artist, album, last_pos, paused)187# SYSTRAY DEFINITIONS188# window definitions defined at start of program.189def exit_program(systray):190    global shutdown_systray191    shutdown_systray = True192def toggle_rpc(systray):193    global toggled194    toggled = not toggled195    time.sleep(global_pause + 1)196    DiscordRPC.clear()197# SYSTRAY MENU OPTIONS AND MAKING THE ICON198menu_options = (199    ("Show Window", None, itrpc_window.start),200    ("Toggle Rich Presence", None, toggle_rpc),201    ("Shutdown iTunesRPC Safely", None, exit_program),202)203systray = SysTrayIcon("icon.ico", "iTunesRPC", menu_options)204systray.start()205log_message("Started Systray icon.")206# GETTING THE ITUNES COM CONNECTION207o = win32com.client.gencache.EnsureDispatch(208    "iTunes.Application"209)  # connect to the COM of iTunes.Application210log_message("Hooked to iTunes COM.")211# NOTE: win32com.client.gencache.EnsureDispatch will force open the application if not already open212# CONNECTING TO DISCORD213DiscordRPC = False214opened = False215while DiscordRPC is False:216    if shutdown_systray:217        log_message("No connection to DiscordRPC, so not closing.")218        systray.shutdown()219        log_message("Shutdown the Systray icon.")220        quit("Shutdown the Python program.")221    try:222        DiscordRPC = pypresence.Presence(secret, pipe=0)223        DiscordRPC.connect()224        log_message("Hooked to Discord.")225    except Exception:226        if not opened:227            os.system(discord_command)228            log_message("..........................................")229            log_message(".           Discord is closed...         .")230            log_message(". Waiting for it to open before starting .")231            log_message(". Waiting global_pause+3 seconds before  .")232            log_message(".               continuing.              .")233            log_message("..........................................")234            opened = True235        time.sleep(236            global_pause + 3237        )  # takes a while to open discord on lower end hardware so account for that here238        continue239# GET LAST POSITION OF TRACK240stopped = True241while stopped:242    try:243        last_pos = o.CurrentTrack.Duration - o.PlayerPosition244        # LAST TRACK = THE TRACK THAT IS CURRENTLY PLAYED. IT MAKES SENSE IN245        # CODE AS LAST_TRACK IS THE TRACK PLAYED {global_pause} SECONDS AGO246        last_track = o.CurrentTrack.Name247        track = o.CurrentTrack.Name248        stopped = False249    except Exception:250        DiscordRPC.clear()251        o = win32com.client.gencache.EnsureDispatch("iTunes.Application")252        log_message("..........................................")253        log_message(".    iTunes is not playing anything...   .")254        log_message(". Waiting for it to play before starting .")255        log_message(".           Waiting 10 seconds.          .")256        log_message("..........................................")257        time.sleep(10)258# LOOP VARIABLES259special_push = False  # this is used to determine if another function has already pushed260# to RPC, as we don't want to repeat for loads of different items.261stopped = False  # track stopped (iTunes has no track selected)262paused = False  # track paused (iTunes has a track selected)263first_run = True  # first ran the program.264shutdown_systray = False  # shutdown the program from the systray265running = True  # run var266skipped = False  # skipping song.267toggled = True  # showing RP?268# LOOP269while running:270    if toggled:271        if first_run:272            last_pos = o.CurrentTrack.Duration - o.PlayerPosition273            time.sleep(global_pause)274            first_run = False275        try:276            placeholder = o.CurrentTrack.Name  # try to get current track name277        except Exception:278            stopped = True279        if stopped is False:280            log_message("------------------")281            # update the last track to be the variable that was playing 5 seconds ago and282            # get the new current track and store it as track283            last_track = track284            track = o.CurrentTrack.Name285            log_message("Last Track: " + last_track)286            log_message("Current Track: " + track)287            log_message("Last Playhead Position: " + str(last_pos))288            log_message(289                "Current Playhead Position: "290                + str((o.CurrentTrack.Duration - o.PlayerPosition))291            )292            log_message(293                "Position Difference: "294                + str(last_pos - (o.CurrentTrack.Duration - o.PlayerPosition))295            )296            log_message("Pushing the following info...")297            if last_track != track:  # if we changed tracks.298                special_push = True299                skipped = True300                log_message("Changed track. Getting regular fetch from push_playing.")301                DiscordRPC, track, artist, album, last_pos, paused = push_playing(302                    o, DiscordRPC, dict, last_pos, False, False303                )304            if not paused or not skipped:305                if (306                    last_pos - (o.CurrentTrack.Duration - o.PlayerPosition)307                    < global_pause - 1308                    and last_pos - (o.CurrentTrack.Duration - o.PlayerPosition) >= 0309                ):310                    special_push = True311                    # we are paused312                    log_message("Paused. Sending pause message to RPC.")313                    DiscordRPC, track, artist, album, last_pos, paused = push_playing(314                        o, DiscordRPC, dict, last_pos, True, False315                    )316                else:317                    paused = False318                if (319                    (last_pos - (o.CurrentTrack.Duration - o.PlayerPosition) < 0)320                    or (321                        last_pos - (o.CurrentTrack.Duration - o.PlayerPosition)322                        > global_pause + 1323                    )324                ) and last_track == track:325                    # we have rewound or fast forwarded within the song. let's make sure we account for that when calling push_playing326                    # this could also happen when a new song has started. that is why last_track == track is in this if statement327                    log_message(328                        "Track position moved over global_pause value, likely skipped forward/backward in the song."329                    )330                    special_push = True331                    DiscordRPC, track, artist, album, last_pos, paused = push_playing(332                        o, DiscordRPC, dict, last_pos, False, True333                    )334                if special_push is False:335                    DiscordRPC, track, artist, album, last_pos, paused = push_playing(336                        o, DiscordRPC, dict, last_pos, False, False337                    )338            else:339                skipped = False340            special_push = False341            # get the last position of the track. used for pause342            last_pos = o.CurrentTrack.Duration - o.PlayerPosition343            time.sleep(global_pause)344        else:345            DiscordRPC.clear()346            try:347                del o348            except Exception:349                pass350            o = win32com.client.gencache.EnsureDispatch("iTunes.Application")351            log_message("..........................................")352            log_message(".    iTunes is not playing anything...   .")353            log_message(". Waiting for it to play before starting .")354            log_message(".           Waiting 10 seconds.          .")355            log_message("..........................................")356            time.sleep(10)357            stopped = False358            try:359                track = o.CurrentTrack.Name360            except Exception as e:361                log_message(e)362                stopped = True363        if shutdown_systray:364            running = False365            log_message("------------------")366            log_message("Shutting down.")367    else:368        log_message("RPC is toggled off. Not showing status.")369        log_message("Waiting 1 second to check if toggled is enabled.")370        time.sleep(1)371# SHUTDOWN372DiscordRPC.close()373log_message("Closed connection to DiscordRPC.")374systray.shutdown()375log_message("Shutdown the Systray icon.")376o.Quit()377log_message("Closed iTunes connection.")378p = open("config", "r")379prev = ast.literal_eval(p.read())380p.close()381prev["gui_window_isOpen"] = False382update = open("config", "w")383update.write(str(prev))384update.close()385log_message(386    "Set GUI isOpen to False to ensure we don't get hanging on next open."387)  # If this value is left as true, on the next launch of the app, it is possible that the window will freeze....xmpp_service_stub.py
Source:xmpp_service_stub.py  
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9#     http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""Stub version of the XMPP API, writes messages to logs."""18import logging19import os20from google.appengine.api import apiproxy_stub21from google.appengine.api import app_identity22from google.appengine.api.xmpp import xmpp_service_pb23from google.appengine.runtime import apiproxy_errors24INVALID_JID_CHARACTERS = ',;()[]'25class XmppServiceStub(apiproxy_stub.APIProxyStub):26  """Python only xmpp service stub.27  This stub does not use an XMPP network. It prints messages to the console28  instead of sending any stanzas.29  """30  THREADSAFE = True31  def __init__(self, log=logging.info, service_name='xmpp'):32    """Initializer.33    Args:34      log: A logger, used for dependency injection.35      service_name: Service name expected for all calls.36    """37    super(XmppServiceStub, self).__init__(service_name)38    self.log = log39  def _Dynamic_GetPresence(self, request, response):40    """Implementation of XmppService::GetPresence.41    Returns online if the first character of the JID comes before 'm' in the42    alphabet, otherwise returns offline.43    Args:44      request: A PresenceRequest.45      response: A PresenceResponse.46    """47    self._GetFrom(request.from_jid())48    self._FillInPresenceResponse(request.jid(), response)49  def _Dynamic_BulkGetPresence(self, request, response):50    self._GetFrom(request.from_jid())51    for jid in request.jid_list():52      subresponse = response.add_presence_response()53      self._FillInPresenceResponse(jid, subresponse)54  def _FillInPresenceResponse(self, jid, response):55    """Arbitrarily fill in a presence response or subresponse."""56    response.set_is_available(jid[0] < 'm')57    response.set_valid(self._ValidateJid(jid))58    response.set_presence(1)59  def _Dynamic_SendMessage(self, request, response):60    """Implementation of XmppService::SendMessage.61    Args:62      request: An XmppMessageRequest.63      response: An XmppMessageResponse .64    """65    from_jid = self._GetFrom(request.from_jid())66    log_message = []67    log_message.append('Sending an XMPP Message:')68    log_message.append('    From:')69    log_message.append('       ' + from_jid)70    log_message.append('    Body:')71    log_message.append('       ' + request.body())72    log_message.append('    Type:')73    log_message.append('       ' + request.type())74    log_message.append('    Raw Xml:')75    log_message.append('       ' + str(request.raw_xml()))76    log_message.append('    To JIDs:')77    for jid in request.jid_list():78      log_message.append('       ' + jid)79    self.log('\n'.join(log_message))80    for jid in request.jid_list():81      if self._ValidateJid(jid):82        response.add_status(xmpp_service_pb.XmppMessageResponse.NO_ERROR)83      else:84        response.add_status(xmpp_service_pb.XmppMessageResponse.INVALID_JID)85  def _Dynamic_SendInvite(self, request, response):86    """Implementation of XmppService::SendInvite.87    Args:88      request: An XmppInviteRequest.89      response: An XmppInviteResponse .90    """91    from_jid = self._GetFrom(request.from_jid())92    self._ParseJid(request.jid())93    log_message = []94    log_message.append('Sending an XMPP Invite:')95    log_message.append('    From:')96    log_message.append('       ' + from_jid)97    log_message.append('    To: ' + request.jid())98    self.log('\n'.join(log_message))99  def _Dynamic_SendPresence(self, request, response):100    """Implementation of XmppService::SendPresence.101    Args:102      request: An XmppSendPresenceRequest.103      response: An XmppSendPresenceResponse .104    """105    from_jid = self._GetFrom(request.from_jid())106    log_message = []107    log_message.append('Sending an XMPP Presence:')108    log_message.append('    From:')109    log_message.append('       ' + from_jid)110    log_message.append('    To: ' + request.jid())111    if request.type():112      log_message.append('    Type: ' + request.type())113    if request.show():114      log_message.append('    Show: ' + request.show())115    if request.status():116      log_message.append('    Status: ' + request.status())117    self.log('\n'.join(log_message))118  def _ParseJid(self, jid):119    """Parse the given JID.120    Also tests that the given jid:121      * Contains one and only one @.122      * Has one or zero resources.123      * Has a node.124      * Does not contain any invalid characters.125    Args:126      jid: The JID to validate127    Returns:128      A tuple (node, domain, resource) representing the JID.129    Raises:130      apiproxy_errors.ApplicationError if the requested JID is invalid using the131        criteria listed above.132    """133    if set(jid).intersection(INVALID_JID_CHARACTERS):134      self.log('Invalid JID: characters "%s" not supported.  JID: %s',135               INVALID_JID_CHARACTERS,136               jid)137      raise apiproxy_errors.ApplicationError(138          xmpp_service_pb.XmppServiceError.INVALID_JID)139    node, domain, resource = ('', '', '')140    at = jid.find('@')141    if at == -1:142      self.log('Invalid JID: No \'@\' character found. JID: %s', jid)143      raise apiproxy_errors.ApplicationError(144          xmpp_service_pb.XmppServiceError.INVALID_JID)145    node = jid[:at]146    if not node:147      self.log('Invalid JID: No node. JID: %s', jid)148      raise apiproxy_errors.ApplicationError(149          xmpp_service_pb.XmppServiceError.INVALID_JID)150    rest = jid[at+1:]151    if rest.find('@') > -1:152      self.log('Invalid JID: Second \'@\' character found. JID: %s',153               jid)154      raise apiproxy_errors.ApplicationError(155          xmpp_service_pb.XmppServiceError.INVALID_JID)156    slash = rest.find('/')157    if slash == -1:158      domain = rest159      resource = 'bot'160    else:161      domain = rest[:slash]162      resource = rest[slash+1:]163    if resource.find('/') > -1:164      self.log('Invalid JID: Second \'/\' character found. JID: %s',165               jid)166      raise apiproxy_errors.ApplicationError(167          xmpp_service_pb.XmppServiceError.INVALID_JID)168    return node, domain, resource169  def _ValidateJid(self, jid):170    """Validate the given JID using self._ParseJid."""171    try:172      self._ParseJid(jid)173      return True174    except apiproxy_errors.ApplicationError:175      return False176  def _GetFrom(self, requested):177    """Validates that the from JID is valid.178    The JID uses the display-app-id for all apps to simulate a common case179    in production (alias === display-app-id).180    Args:181      requested: The requested from JID.182    Returns:183      string, The from JID.184    Raises:185      apiproxy_errors.ApplicationError if the requested JID is invalid.186    """187    full_appid = os.environ.get('APPLICATION_ID')188    partition, _, display_app_id = (189        app_identity.app_identity._ParseFullAppId(full_appid))190    if requested == None or requested == '':191      return display_app_id + '@appspot.com/bot'192    node, domain, resource = self._ParseJid(requested)193    if domain == 'appspot.com' and node == display_app_id:194      return node + '@' + domain + '/' + resource195    elif domain == display_app_id + '.appspotchat.com':196      return node + '@' + domain + '/' + resource197    self.log('Invalid From JID: Must be appid@appspot.com[/resource] or '198             'node@appid.appspotchat.com[/resource]. JID: %s', requested)199    raise apiproxy_errors.ApplicationError(200        xmpp_service_pb.XmppServiceError.INVALID_JID)201  def _Dynamic_CreateChannel(self, request, response):202    """Implementation of XmppService::CreateChannel.203    Args:204      request: A CreateChannelRequest.205      response: A CreateChannelResponse.206    """207    log_message = []208    log_message.append('Sending a Create Channel:')209    log_message.append('    Client ID:')210    log_message.append('       ' + request.application_key())211    if request.duration_minutes():212      log_message.append('    Duration minutes: ' +213                         str(request.duration_minutes()))214    self.log('\n'.join(log_message))215  def _Dynamic_SendChannelMessage(self, request, response):216    """Implementation of XmppService::SendChannelMessage.217    Args:218      request: A SendMessageRequest.219      response: A SendMessageRequest.220    """221    log_message = []222    log_message.append('Sending a Channel Message:')223    log_message.append('    Client ID:')224    log_message.append('       ' + request.application_key())225    log_message.append('    Message:')226    log_message.append('       ' + str(request.message()))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
