Best Python code snippet using localstack_python
twitch.py
Source:twitch.py  
...67            username: subscription.status68            for username, subscription in zip(usernames, esublist.data)69        } if usernames else {}70        return usernames, statuses71    def delete_event_subscription(self, esub_id):72        response = requests.delete(73            self.sub_url, headers=self.auth_headers(), json={'id': str(esub_id)})74        response.raise_for_status()75        return response76    def event_subscribe(self, esubtype, channel_id):77        """ type esubtype event subscription to channel channel_id78            get channel_id with get_channel_id_from_username79            esubtypes are here:80        """81        data = {82            'client_id': self.app_id,83            "type": esubtype,84            "version": "1",85            "condition": {"broadcaster_user_id": channel_id},86            "transport": {"method": "webhook", "callback": self.callback_url, "secret": self.app_secret}87        }88        response = requests.post(89            self.sub_url, headers=self.auth_headers(), json=data, timeout=10)90        logging.info(response.json())91        try:92            response.raise_for_status()93        except requests.HTTPError as e:94            logging.error(f'Callback url:{self.callback_url}')95            raise e96        return response97    def subscribe(self, twitch_username: str, esubtype: str = 'stream.online', channel_id: int = None):98        channel_id = channel_id or self.get_channel_id_from_username(99            username=twitch_username)100        try:101            response = self.event_subscribe(102                esubtype=esubtype, channel_id=channel_id)103            logging.info(104                f"Status {response.status_code}. Body: {response.json()}")105        except requests.HTTPError as e:106            logging.error(f"Error request: {e.request}")107            raise e108        except Exception as e:109            logging.error(110                f"Error subscribing to {esubtype} response code: {e}")111            raise e112        return response.json()113    def unsubscribe(self, twitch_username: str):114        channel_id = self.get_channel_id_from_username(115            username=twitch_username)116        channel_id = int(channel_id)117        esublist = self.get_event_subscriptions()118        # delete all event subscriptions119        logging.info(esublist)120        for esub in esublist.data:121            logging.info(122                f'channel_id {channel_id} : broadcaster_id {esub.condition.broadcaster_user_id}')123            if esub.condition.broadcaster_user_id == channel_id:124                response = self.delete_event_subscription(esub.id)125                if response.status_code == 204:126                    logging.info(f'Deleted subscription {esub.id}')127                    return response128                else:129                    response.raise_for_status()130                    logging.info(131                        f'response: {response.status_code} {response.json()}')...handler.py
Source:handler.py  
...66            },67        ]68    )69    return response70def delete_event_subscription():71    session = boto3.session.Session()72    client = session.client(service_name='rds',region_name='ap-southeast-2')73    response = client.delete_event_subscription(SubscriptionName='restore-snapshot-subs')74    return response    75def restore(event, context):76    dbname = os.environ["DB_NAME"]77    db_sg = os.environ["DB_SECURITY_GROUP_ID"]78    lambda_sg = os.environ["LAMBDA_SECURITY_GROUP_ID"]79    subnet_group = os.environ["DB_SUBNET_GROUP_NAME"]80    stage = os.environ["STAGE"]81    restore_snapshot(dbname, db_sg, lambda_sg, subnet_group)82    83    if subscription_exists():84       delete_event_subscription()85       print("subscription (restore-snapshot-subs) deleted") 86    response_create_subs = create_event_subscription(dbname,stage)87    subscription_id = response_create_subs["EventSubscription"]["CustSubscriptionId"]88    status = response_create_subs["EventSubscription"]["Status"]89    result = f"the subscription {subscription_id} is {status}" 90    print(result)91    return {"result": result}92if __name__ == "__main__":...main.py
Source:main.py  
...34    for esub in esublist.data:35        logging.info(36            f'channel_id {channel_id} : broadcaster_id {esub.condition.broadcaster_user_id}')37        if esub.condition.broadcaster_user_id == channel_id:38            response = twitch.delete_event_subscription(esub.id)39            if response.status_code == 204:40                logging.info(f'Deleted subscription {esub.id}')41                return response42            else:43                response.raise_for_status()44                logging.info(45                    f'response: {response.status_code} {response.json()}')46                return response47@app.command()48def unsubscribe_all():49    twitch = Twitch(app_id=APP_ID, app_secret=APP_SECRET,50                    callback_url=TWITCH_CALLBACK_URL)51    esublist = twitch.get_event_subscriptions()52    # delete all event subscriptions53    logging.info(esublist)54    for esub in esublist.data:55        response = twitch.delete_event_subscription(esub.id)56        if response.status_code == 204:57            logging.info(f'Deleted subscription {esub.id}')58@app.command()59def check_subscriptions():60    twitch = Twitch(app_id=APP_ID, app_secret=APP_SECRET,61                    callback_url=TWITCH_CALLBACK_URL)62    esublist = twitch.get_event_subscriptions()63    logging.info(esublist)64if __name__ == '__main__':65    app()66# c = curlify.to_curl(response.request)67# print(c)68# import ipdb69# ipdb.set_trace()Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
