Best Python code snippet using localstack_python
test_twitch_chatbot.py
Source:test_twitch_chatbot.py  
1from threading import Thread2from time import sleep3from typing import Tuple, List4from dataclasses import dataclass, field5from teamfightchaticts.settings import TwitchSettings6from teamfightchaticts.tft_command import TFTCommand7from teamfightchaticts.twitch_connection import TwitchConnection, LineBuffer8@dataclass9class IrcSocketMock:10    text_to_return: str11    text_received: str=''12    closed: bool=True13    buffer_id: int=014    encoding: str='utf-8'15    def connect(self, _: Tuple[str, int]):16        self.closed = False17    def close(self):18        self.closed = True19    def recv(self, bufsize: int) -> bytes:20        text_enc = self.text_to_return.encode(self.encoding)21        start, end = self.buffer_id * bufsize, (self.buffer_id + 1) * bufsize22        self.buffer_id += 123        sleep(0.01)24        return text_enc[start:min(end, len(text_enc))] if start < len(text_enc) else bytes([])25    def send(self, data: bytes) -> int:26        self.text_received += data.decode(self.encoding)27        sleep(0.01)28@dataclass29class RemoteControlMock:30    received_commands: List[TFTCommand]=field(init=False, default_factory=list)31    def execute_cmd(self, tft_cmd: TFTCommand):32        self.received_commands.append(tft_cmd)33def msg_padding(sequence: str, repetitions: int, separator: str=''):34    return separator.join([sequence for _ in range(repetitions)])35def test_should_connect_to_chat():36    conn_settings = TwitchSettings('foobar.com', 6667, 'twitch_test', 'my_chatbot', 'somepwd')37    text_to_return = "End of /NAMES list\r\n"38    connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))39    connection.connect_to_server()40    exp_text_buffer = 'PASS somepwd\nNICK my_chatbot\nJOIN #twitch_test\n'41    socket: IrcSocketMock = connection.websocket42    assert not socket.closed and socket.text_received == exp_text_buffer43# def test_should_fail_to_connect_to_chat_after_timeout():44#     # TODO: use asyncio for this feature. threads are very clunky to achieve this.45#     conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')46#     connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(''), timeout_seconds=0.5)47#     connection.connect_to_server()48#     sleep(0.6)49#     print(connection)50#     assert connection.irc is None51# # TODO: make this test work52def test_should_disconnect_from_chat_gracefully():53    conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')54    text_to_return = "End of /NAMES list\r\n" + msg_padding(' ', 1024) \55        + "\r\n::w3w4\r\n::lock\r\n::some text\r\n::lvl\r\n"56    connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))57    msgs_received: List[TFTCommand] = []58    shutdown_requested = False59    def observe_twitch_chat():60        connection.connect_to_server()61        connection.register_message_listener(msgs_received.append)62        connection.receive_messages_as_daemon(lambda: shutdown_requested)63    conn_thread = Thread(target=observe_twitch_chat)64    conn_thread.start()65    sleep(1)66    shutdown_requested = True67    conn_thread.join(timeout=0.1)68    assert set(['w3w4', 'lock', 'lvl']) <= set(msgs_received)69def test_line_buffer():70    def process_read_buffer(text_buffer, remainder):71        buf = LineBuffer(remainder)72        return buf.process(text_buffer), buf.remainder73    assert process_read_buffer('', '') == ([], '')74    assert process_read_buffer('', 'abc') == ([], 'abc')75    assert process_read_buffer('abc\r\n', '') == (['abc'], '')76    assert process_read_buffer('abc\r\ndef', '') == (['abc'], 'def')77    assert process_read_buffer('abc\r\ndef\r\nghi', '') == (['abc', 'def'], 'ghi')78    assert process_read_buffer('\ndef', 'abc\r') == (['abc'], 'def')79    assert process_read_buffer('\r\ndef', 'abc') == (['abc'], 'def')80    assert process_read_buffer('abc\r\ndef\r\n', '') == (['abc', 'def'], '')81def test_should_send_chat_pong():82    conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')83    text_to_return = "End of /NAMES list\r\n" + msg_padding(' ', 1024) \84        + "\r\n::w3w4\r\n::lock\r\n::some text\r\nPING :tmi.twitch.tv\r\n::lvl\r\n"85    connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))86    shutdown_requested = False87    def observe_twitch_chat():88        connection.connect_to_server()89        connection.receive_messages_as_daemon(lambda: shutdown_requested)90    conn_thread = Thread(target=observe_twitch_chat)91    conn_thread.start()92    sleep(1)93    shutdown_requested = True94    conn_thread.join(timeout=0.1)95    socket: IrcSocketMock = connection.websocket96    assert 'PONG :tmi.twitch.tv' in socket.text_received...subscription.py
Source:subscription.py  
1#!/usr/bin/env python32from config import *3import multiprocessing, threading, logging, time, signal4from google.cloud import pubsub_v15shutdown_requested = False # Whether this program should shut down due to SIGINT/SIGTERM6def subscribe(subscription_name, worker, give_up=False):7    """Receives and spawns threads to handle jobs received in Pub/Sub"""8    global shutdown_requested9    message = None # The current active message10    lock = threading.Lock()11    client = pubsub_v1.SubscriberClient()12    subscription_path = client.subscription_path(GCLOUD_PROJECT_ID, subscription_name)13    def renew_deadline():14        """Repeatedly give the active message more time to be processed to prevent it being resent"""15        while not (message == None and shutdown_requested):16            if message != None:17                try:18                    with lock:19                        client.modify_ack_deadline(subscription=subscription_path,20                                                    ack_ids=[message.ack_id],21                                                    ack_deadline_seconds=SUB_ACK_DEADLINE)22                        logging.debug('Reset ack deadline for {} for {}s'.format(message.message.data.decode(), SUB_ACK_DEADLINE))23                    time.sleep(SUB_SLEEP_TIME)24                except Exception as e:25                    logging.warning('Could not reset ack deadline', exc_info=e)26    watcher = threading.Thread(target=renew_deadline)27    watcher.start()28    # Repeatedly check for new jobs until SIGINT/SIGTERM received29    logging.info('Listening for jobs')30    try:31        while not shutdown_requested:32            response = client.pull(request= {'subscription': subscription_path, 'max_messages':1})33            if not response.received_messages:34                logging.info('Job queue is empty')35                time.sleep(SUB_SLEEP_TIME)36                continue37            if len(response.received_messages) > 1:38                logging.warning('Received more than one job when only one expected')39            with lock:40                message = response.received_messages[0]41            logging.info('Beginning: {}'.format(message.message.data.decode()))42            process = multiprocessing.Process(target=worker, args=(message.message.data.decode(),))43            process.start()44            process.join()45            if process.exitcode == 0:46                # Success; acknowledge and return47                try:48                    client.acknowledge(subscription=subscription_path, ack_ids=[message.ack_id])49                    logging.info('Ending and acknowledged: {}'.format(message.message.data.decode()))50                except Exception as e:51                    logging.error('Could not end and acknowledge: {}'.format(message.message.data.decode()), exc_info=e)52            elif give_up and (time.time() - message.message.publish_time.timestamp()) >600:53                # Failure; give up and acknowledge54                try:55                    client.acknowledge(subscription=subscription_path, ack_ids=[message.ack_id])56                    logging.error('Failed but acknowledged: {}'.format(message.message.data.decode()))57                except Exception as e:58                    logging.error('Failed but could not acknowledge: {}'.format(message.message.data.decode()), exc_info=e)59            else:60                # Failure; refuse to acknowledge61                logging.error('Failed, not acknowledged: {}'.format(message.message.data.decode()))62            # Stop extending this message's deadline in the "watcher" thread63            with lock:64                message = None65    except Exception as e:66        logging.critical('Exception encountered. ', exc_info=e)67    finally:68        # If there is an exception, make sure the "watcher" thread shuts down69        shutdown_requested = True70def graceful_exit(signal, frame):71    global shutdown_requested72    shutdown_requested = True73    logging.warning('Requesting shutdown due to signal {}'.format(signal))74signal.signal(signal.SIGINT, graceful_exit)...twitch_chatbot.py
Source:twitch_chatbot.py  
1from threading import Thread2from typing import Dict3from dataclasses import dataclass, field4from teamfightchaticts.tft_command import TFTCommand5from teamfightchaticts.twitch_connection import TwitchConnection6from teamfightchaticts.tft_remote_control import TFTRemoteControl7@dataclass8class TwitchTFTChatbotState:9    last_cmd: TFTCommand=TFTCommand('')10    cmd_counts: Dict[TFTCommand, int]=field(default_factory=lambda: {})11    pool: int=1012    def update_state(self, tft_cmd: TFTCommand):13        self.cmd_counts[tft_cmd] += 114    def reset_counts(self):15        self.last_cmd = self.cmd_to_execute16        self.cmd_counts = {}17    @property18    def cmd_to_execute(self) -> TFTCommand:19        return next(filter(lambda cmd: self.cmd_counts[cmd] >= self.pool, self.cmd_counts), None)20@dataclass21class TwitchTFTChatbot:22    connection: TwitchConnection23    tft_remote_control: TFTRemoteControl24    state: TwitchTFTChatbotState=TwitchTFTChatbotState()25    thread: Thread=field(init=False, default=None)26    shutdown_requested: bool=False27    def start_bot(self, pool_size: int):28        if self.shutdown_requested:29            return30        self.state.pool = pool_size31        self.shutdown_requested = False32        self.thread = Thread(target=self._receive_twitch_messages)33        self.thread.start()34    def stop_bot(self):35        self.shutdown_requested = True36        self.thread.join(timeout=0.1)37        self.shutdown_requested = False38    def _receive_twitch_messages(self):39        self.connection.connect_to_server()40        self.connection.register_message_listener(self._process_tft_cmd)41        self.connection.receive_messages_as_daemon(lambda: self.shutdown_requested)42    def _process_tft_cmd(self, msg: str):43        tft_cmd = TFTCommand(msg)44        self.state.update_state(tft_cmd)45        cmd_exec = self.state.cmd_to_execute46        # vote for next command not complete yet47        if not cmd_exec:48            return49        # same command twice50        if cmd_exec == self.state.last_cmd:51            self.state.last_cmd = TFTCommand('')52            return53        # command is ok, go execute it54        self.tft_remote_control.execute_cmd(cmd_exec)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
