Best Python code snippet using localstack_python
pipeline.py
Source:pipeline.py  
1import asyncio2import logging3from typing import Dict, List, Iterator, Optional, Iterable, Union, Tuple4from prometheus_client import Info, start_http_server5from telethon import events6from tqdm import tqdm7from gif_pipeline import _version8from gif_pipeline.chat_builder import ChannelBuilder, WorkshopBuilder9from gif_pipeline.database import Database10from gif_pipeline.chat import Chat, Channel, WorkshopGroup11from gif_pipeline.chat_config import ChannelConfig, WorkshopConfig12from gif_pipeline.helpers.channel_fwd_tag_helper import ChannelFwdTagHelper13from gif_pipeline.helpers.chart_helper import ChartHelper14from gif_pipeline.helpers.chunk_split_helper import ChunkSplitHelper15from gif_pipeline.helpers.delete_helper import DeleteHelper16from gif_pipeline.helpers.download_helper import DownloadHelper17from gif_pipeline.helpers.duplicate_helper import DuplicateHelper18from gif_pipeline.helpers.fa_helper import FAHelper19from gif_pipeline.helpers.ffprobe_helper import FFProbeHelper20from gif_pipeline.helpers.find_helper import FindHelper21from gif_pipeline.helpers.imgur_gallery_helper import ImgurGalleryHelper22from gif_pipeline.helpers.menu_helper import MenuHelper23from gif_pipeline.helpers.merge_helper import MergeHelper24from gif_pipeline.helpers.msg_helper import MSGHelper25from gif_pipeline.helpers.public.public_tag_helper import PublicTagHelper26from gif_pipeline.helpers.reverse_helper import ReverseHelper27from gif_pipeline.helpers.scene_split_helper import SceneSplitHelper28from gif_pipeline.helpers.schedule_helper import ScheduleHelper29from gif_pipeline.helpers.send_helper import GifSendHelper30from gif_pipeline.helpers.stabilise_helper import StabiliseHelper31from gif_pipeline.helpers.subscription_helper import SubscriptionHelper32from gif_pipeline.helpers.tag_helper import TagHelper33from gif_pipeline.helpers.telegram_gif_helper import TelegramGifHelper34from gif_pipeline.helpers.update_yt_dl_helper import UpdateYoutubeDlHelper35from gif_pipeline.helpers.video_crop_helper import VideoCropHelper36from gif_pipeline.helpers.video_cut_helper import VideoCutHelper37from gif_pipeline.helpers.video_helper import VideoHelper38from gif_pipeline.helpers.video_rotate_helper import VideoRotateHelper39from gif_pipeline.helpers.zip_helper import ZipHelper40from gif_pipeline.menu_cache import MenuCache41from gif_pipeline.message import Message, MessageData42from gif_pipeline.startup_monitor import StartupMonitor, StartupState43from gif_pipeline.tag_manager import TagManager44from gif_pipeline.tasks.task_worker import TaskWorker, Bottleneck45from gif_pipeline.telegram_client import TelegramClient, message_data_from_telegram, chat_id_from_telegram46from gif_pipeline.utils import tqdm_gather47logger = logging.getLogger(__name__)48version_info = Info(49    "gif_pipeline_version",50    "Version of gif pipeline currently running"51)52PROM_PORT = 718053class PipelineConfig:54    def __init__(self, config: Dict):55        start_http_server(PROM_PORT)56        version_info.info({57            "version": _version.__VERSION__58        })59        self.startup_monitor = StartupMonitor()60        self.startup_monitor.set_state(StartupState.LOADING_CONFIG)61        self.channels = [ChannelConfig.from_json(x) for x in config['channels']]62        self.workshops = [WorkshopConfig.from_json(x) for x in config["workshop_groups"]]63        self.workshops += [chan.queue for chan in self.channels if chan.queue is not None]64        self.api_id = config["api_id"]65        self.api_hash = config["api_hash"]66        # Pipeline bot, handles video editing and sending to channels67        self.pipeline_bot_token = config.get("pipeline_bot_token")68        # Public bot, handles public queries for gifs and searches69        self.public_bot_token = config.get("public_bot_token")70        # API keys for external services71        self.api_keys = config.get("api_keys", {})72    def initialise_pipeline(self) -> 'Pipeline':73        self.startup_monitor.set_state(StartupState.CREATING_DATABASE)74        database = Database()75        self.startup_monitor.set_state(StartupState.CONNECTING_TELEGRAM)76        client = TelegramClient(self.api_id, self.api_hash, self.pipeline_bot_token, self.public_bot_token)77        client.synchronise_async(client.initialise())78        channels, workshops = client.synchronise_async(self.initialise_chats(database, client))79        self.startup_monitor.set_state(StartupState.CREATING_PIPELINE)80        pipe = Pipeline(database, client, channels, workshops, self.api_keys, self.startup_monitor)81        return pipe82    async def initialise_chats(83            self,84            database: Database,85            client: TelegramClient86    ) -> Tuple[List[Channel], List[WorkshopGroup]]:87        download_bottleneck = Bottleneck(3)88        workshop_builder = WorkshopBuilder(database, client, download_bottleneck)89        channel_builder = ChannelBuilder(database, client, download_bottleneck)90        # Get chat data for chat config91        self.startup_monitor.set_state(StartupState.INITIALISING_CHAT_DATA)92        logger.info("Initialising workshop data")93        workshop_data = await workshop_builder.get_chat_data(self.workshops)94        logger.info("Initialising channel data")95        channel_data = await channel_builder.get_chat_data(self.channels)96        message_inits = []97        self.startup_monitor.set_state(StartupState.LISTING_WORKSHOP_MESSAGES)98        logger.info("Listing messages in workshops")99        workshop_message_lists = await workshop_builder.get_message_inits(self.workshops, workshop_data)100        workshop_message_counts = [len(x) for x in workshop_message_lists]101        message_inits += [init for message_list in workshop_message_lists for init in message_list]102        self.startup_monitor.set_state(StartupState.LISTING_CHANNEL_MESSAGES)103        logger.info("Listing messages in channels")104        channel_message_lists = await channel_builder.get_message_inits(self.channels, channel_data)105        channel_message_counts = [len(x) for x in channel_message_lists]106        message_inits += [init for message_list in channel_message_lists for init in message_list]107        self.startup_monitor.set_state(StartupState.DOWNLOADING_MESSAGES)108        logger.info("Downloading messages")109        all_messages = await tqdm_gather(message_inits, desc="Downloading messages")110        logger.info("Creating workshops")111        self.startup_monitor.set_state(StartupState.CREATING_WORKSHOPS)112        workshop_dict = {}113        for work_conf, work_data, message_count in zip(self.workshops, workshop_data, workshop_message_counts):114            work_messages = all_messages[:message_count]115            all_messages = all_messages[message_count:]116            workshop_dict[work_conf.handle] = WorkshopGroup(work_data, work_conf, work_messages, client)117        logger.info("Creating channels")118        self.startup_monitor.set_state(StartupState.CREATING_CHANNELS)119        channels = []120        for chan_conf, chan_data, message_count in zip(self.channels, channel_data, channel_message_counts):121            chan_messages = all_messages[:message_count]122            all_messages = all_messages[message_count:]123            queue = None124            if chan_conf.queue:125                queue = workshop_dict[chan_conf.queue.handle]126            channels.append(Channel(chan_data, chan_conf, chan_messages, client, queue))127        workshops = list(workshop_dict.values())128        logger.info("Cleaning up excess files from chats")129        self.startup_monitor.set_state(StartupState.CLEANING_UP_CHAT_FILES)130        for chat in tqdm([*channels, *workshops], desc="Cleaning up excess files from chats"):131            chat.cleanup_excess_files()132        logger.info("Initialised channels and workshops")133        return channels, workshops134class Pipeline:135    def __init__(136            self,137            database: Database,138            client: TelegramClient,139            channels: List[Channel],140            workshops: List[WorkshopGroup],141            api_keys: Dict[str, Dict[str, str]],142            startup_monitor: StartupMonitor143    ):144        self.database = database145        self.channels = channels146        self.workshops = workshops147        self.client = client148        self.api_keys = api_keys149        self.worker = TaskWorker(3)150        self.helpers = {}151        self.public_helpers = {}152        self.menu_cache = MenuCache(database)  # MenuHelper later populates this from database153        self.download_bottleneck = Bottleneck(3)154        self.startup_monitor = startup_monitor155    @property156    def all_chats(self) -> List[Chat]:157        channels = [x for x in self.channels]  # type: List[Chat]158        for workshop in self.workshops:159            channels.append(workshop)160        return channels161    @property162    def all_chat_ids(self) -> List[int]:163        return [chat.chat_data.chat_id for chat in self.all_chats]164    def chat_by_id(self, chat_id: int) -> Optional[Chat]:165        for chat in self.all_chats:166            if chat.chat_data.chat_id == chat_id:167                return chat168        return None169    def chat_by_handle(self, name: str) -> Optional[Chat]:170        name = name.lstrip("@")171        for chat in self.all_chats:172            if chat.chat_data.matches_handle(name):173                return chat174        return None175    def channel_by_handle(self, name: str) -> Optional[Channel]:176        name = name.lstrip("@")177        for chat in self.channels:178            if chat.chat_data.matches_handle(name):179                return chat180        return None181    def initialise_helpers(self) -> None:182        logger.info("Initialising helpers")183        self.startup_monitor.set_state(StartupState.INITIALISING_DUPLICATE_DETECTOR)184        duplicate_helper = self.client.synchronise_async(self.initialise_duplicate_detector())185        self.startup_monitor.set_state(StartupState.INITIALISING_HELPERS)186        tag_manager = TagManager(self.channels, self.workshops, self.database)187        delete_helper = DeleteHelper(self.database, self.client, self.worker, self.menu_cache)188        menu_helper = MenuHelper(self.database, self.client, self.worker, self, delete_helper, tag_manager)189        twitter_keys = self.api_keys.get("twitter", {})190        send_helper = GifSendHelper(self.database, self.client, self.worker, self.channels, menu_helper, twitter_keys)191        schedule_helper = ScheduleHelper(192            self.database,193            self.client,194            self.worker,195            self.channels,196            menu_helper,197            send_helper,198            delete_helper,199            tag_manager200        )201        download_helper = DownloadHelper(self.database, self.client, self.worker)202        subscription_helper = SubscriptionHelper(203            self.database,204            self.client,205            self.worker,206            self,207            duplicate_helper,208            download_helper,209            self.api_keys210        )211        ffprobe_helper = FFProbeHelper(self.database, self.client, self.worker)212        helpers = [213            duplicate_helper,214            menu_helper,215            TelegramGifHelper(self.database, self.client, self.worker),216            VideoRotateHelper(self.database, self.client, self.worker),217            VideoCutHelper(self.database, self.client, self.worker),218            VideoCropHelper(self.database, self.client, self.worker),219            download_helper,220            StabiliseHelper(self.database, self.client, self.worker),221            VideoHelper(self.database, self.client, self.worker),222            MSGHelper(self.database, self.client, self.worker),223            FAHelper(self.database, self.client, self.worker),224            SceneSplitHelper(self.database, self.client, self.worker, menu_helper),225            ChunkSplitHelper(self.database, self.client, self.worker, ffprobe_helper),226            send_helper,227            delete_helper,228            MergeHelper(self.database, self.client, self.worker),229            ReverseHelper(self.database, self.client, self.worker),230            ffprobe_helper,231            ZipHelper(self.database, self.client, self.worker),232            TagHelper(self.database, self.client, self.worker, tag_manager),233            ChannelFwdTagHelper(self.database, self.client, self.worker),234            UpdateYoutubeDlHelper(self.database, self.client, self.worker),235            ChartHelper(self.database, self.client, self.worker, self, tag_manager),236            schedule_helper,237            subscription_helper,238            FindHelper(self.database, self.client, self.worker, duplicate_helper, download_helper)239        ]240        if "imgur" in self.api_keys:241            helpers.append(242                ImgurGalleryHelper(self.database, self.client, self.worker, self.api_keys["imgur"]["client_id"]))243        for helper in helpers:244            self.helpers[helper.name] = helper245        # Check yt-dl install246        self.startup_monitor.set_state(StartupState.INSTALLING_YT_DL)247        self.client.synchronise_async(download_helper.check_yt_dl())248        # Load menus from database249        self.startup_monitor.set_state(StartupState.LOADING_MENUS)250        self.client.synchronise_async(menu_helper.refresh_from_database())251        # Load schedule helper and subscription helper252        self.startup_monitor.set_state(StartupState.INITIALISING_SCHEDULES)253        self.client.synchronise_async(schedule_helper.initialise())254        self.startup_monitor.set_state(StartupState.INITIALISING_SUBSCRIPTIONS)255        self.client.synchronise_async(subscription_helper.initialise())256        # Helpers complete257        logger.info(f"Initialised {len(self.helpers)} helpers")258        # Set up public helpers259        self.startup_monitor.set_state(StartupState.INITIALISING_PUBLIC_HELPERS)260        public_helpers = [261            PublicTagHelper(self.database, self.client, self.worker, tag_manager)262        ]263        for helper in public_helpers:264            self.public_helpers[helper.name] = helper265        logger.info(f"Initialised {len(self.public_helpers)} public helpers")266    async def initialise_duplicate_detector(self) -> DuplicateHelper:267        helper = DuplicateHelper(self.database, self.client, self.worker)268        logger.info("Initialising DuplicateHelper")269        await helper.initialise_hashes(self.workshops)270        logger.info("Initialised DuplicateHelper")271        return helper272    def watch_workshop(self) -> None:273        # Start prometheus server274        self.startup_monitor.set_running()275        logger.info("Watching workshop")276        # Set up handlers277        self.client.add_message_handler(self.on_new_message, self.all_chat_ids)278        self.client.add_public_message_handler(self.pass_message_to_public_handlers)279        self.client.add_edit_handler(self.on_edit_message, self.all_chat_ids)280        self.client.add_delete_handler(self.on_deleted_message)281        self.client.add_callback_query_handler(self.on_callback_query)282        self.client.client.run_until_disconnected()283    async def on_edit_message(self, event: events.MessageEdited.Event):284        # Get chat, check it's one we know285        chat = self.chat_by_id(chat_id_from_telegram(event.message))286        if chat is None:287            logger.debug("Ignoring edited message in other chat, which must have slipped through")288            return289        # Convert to our custom Message object. This will update message data, but not the video, for edited messages290        logger.info(f"Edited message in chat: {chat}")291        message_data = message_data_from_telegram(event.message)292        new_message = await self.download_bottleneck.await_run(293            Message.from_message_data(message_data, chat.chat_data, self.client)294        )295        chat.remove_message(message_data)296        chat.add_message(new_message)297        self.database.save_message(new_message.message_data)298        logger.info(f"Edited message initialised: {new_message}")299    async def on_new_message(self, event: events.NewMessage.Event) -> None:300        # This is called just for new messages301        # Get chat, check it's one we know302        chat = self.chat_by_id(chat_id_from_telegram(event.message))303        if chat is None:304            logger.debug("Ignoring new message in other chat, which must have slipped through")305            return306        # Convert to our custom Message object. This will update message data, but not the video, for edited messages307        logger.info(f"New message in chat: {chat}")308        message_data = message_data_from_telegram(event.message)309        new_message = await self.download_bottleneck.await_run(310            Message.from_message_data(message_data, chat.chat_data, self.client)311        )312        chat.add_message(new_message)313        self.database.save_message(new_message.message_data)314        logger.info(f"New message initialised: {new_message}")315        # Pass to helpers316        await self.pass_message_to_handlers(new_message, chat)317    async def pass_message_to_handlers(self, new_message: Message, chat: Chat = None):318        if chat is None:319            chat = self.chat_by_id(new_message.chat_data.chat_id)320        # If any helpers say that a message is priority, send only to those helpers321        priority = any(helper.is_priority(chat, new_message) for helper in self.helpers.values())322        helpers = {key: val for key, val in self.helpers.items() if not priority or val.is_priority(chat, new_message)}323        # Call the helpers324        helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(325            *(helper.on_new_message(chat, new_message) for helper in helpers.values()),326            return_exceptions=True327        )328        # Handle helper results329        for helper, result in zip(helpers.keys(), helper_results):330            if isinstance(result, BaseException):331                logger.error(332                    f"Helper {helper} threw an exception trying to handle message {new_message}.",333                    exc_info=result334                )335            elif result:336                for reply_message in result:337                    await self.pass_message_to_handlers(reply_message)338    async def pass_message_to_public_handlers(self, event: events.NewMessage.Event):339        logger.info(f"New public message: {event}")340        helper_results: Iterable[Union[BaseException, Optional[List[MessageData]]]] = await asyncio.gather(341            *(helper.on_new_message(event.message) for helper in self.public_helpers.values()),342            return_exceptions=True343        )344        for helper, result in zip(self.public_helpers.keys(), helper_results):345            if isinstance(result, BaseException):346                logger.error(347                    f"Public helper {helper} threw an exception trying to handle message {event}.",348                    exc_info=result349                )350    async def on_deleted_message(self, event: events.MessageDeleted.Event):351        # Get messages352        chat = self.chat_by_id(event.chat_id)353        messages = self.get_messages_for_delete_event(event)354        for message in messages:355            # Tell helpers356            helper_results = await asyncio.gather(357                *(helper.on_deleted_message(chat, message) for helper in self.helpers.values()),358                return_exceptions=True359            )360            results_dict = dict(zip(self.helpers.keys(), helper_results))361            for helper, result in results_dict.items():362                if isinstance(result, Exception):363                    logger.error(364                        f"Helper {helper} threw an exception trying to handle deleting message {message}.",365                        exc_info=result366                    )367            # If it's a menu, remove that368            self.menu_cache.remove_menu_by_message(message)369            # Remove messages from store370            logger.info(f"Deleting message {message} from chat: {message.chat_data}")371            message.delete(self.database)372            chat.remove_message(message.message_data)373    def get_messages_for_delete_event(self, event: events.MessageDeleted.Event) -> Iterator[Message]:374        deleted_ids = event.deleted_ids375        if event.chat_id is None:376            return [377                message378                for workshop in self.workshops379                for message in workshop.messages380                if message.message_data.message_id in deleted_ids381            ]382        chat = self.chat_by_id(event.chat_id)383        if chat is None:384            return []385        return [message for message in chat.messages if message.message_data.message_id in deleted_ids]386    async def on_callback_query(self, event: events.CallbackQuery.Event):387        # Get chat, check it's one we know388        chat = self.chat_by_id(chat_id_from_telegram(event))389        if chat is None:390            logger.debug("Ignoring new message in other chat, which must have slipped through")391            return392        # Get the menu393        menu = self.menu_cache.get_menu_by_message_id(event.chat_id, event.message_id)394        if not menu:395            # Handle stateless menu callbacks396            logger.debug("Received a callback for a stateless menu")397            return await self.on_stateless_callback(event, chat)398        # Check button was pressed by someone who was allowed to press it399        if not menu.menu.allows_sender(event.sender_id):400            logger.info("User tried to press a button on a menu that wasn't theirs")401            await event.answer("This is not your menu, you are not authorised to use it.")402            return403        # Check if menu has already been clicked404        if menu.clicked:405            # Menu already clicked406            logger.info("Callback received for a menu which has already been clicked")407            await event.answer("That menu has already been clicked.")408            return409        # Hand callback queries to helpers410        helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(411            *(helper.on_callback_query(event.data, menu, event.sender_id) for helper in self.helpers.values()),412            return_exceptions=True413        )414        answered = False415        for helper, result in zip(self.helpers.keys(), helper_results):416            if isinstance(result, BaseException):417                logger.error(418                    f"Helper {helper} threw an exception trying to handle callback query {event}.",419                    exc_info=result420                )421            elif result:422                for reply_message in result:423                    await self.pass_message_to_handlers(reply_message)424            # Check for result is None because empty list would be an answer, None is not425            if result is not None and not answered:426                await event.answer()427    async def on_stateless_callback(self, event: events.CallbackQuery.Event, chat: Chat) -> None:428        # Get message429        msg = chat.message_by_id(event.message_id)430        # Handle callback query431        helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(432            *(helper.on_stateless_callback(event.data, chat, msg, event.sender_id) for helper in self.helpers.values()),433            return_exceptions=True434        )435        answered = False436        for helper, result in zip(self.helpers.keys(), helper_results):437            if isinstance(result, BaseException):438                logger.error(439                    f"Helper {helper} threw an exception trying to handle stateless callback query: {event}.",440                    exc_info=result441                )442            elif result:443                for reply_message in result:444                    await self.pass_message_to_handlers(reply_message)445            # Check for result is None because empty list would be an answer, None is not446            if result is not None and not answered:...controllers.py
Source:controllers.py  
...127        :param service: the service128        """129        if self.startup_monitor is None:130            raise ValueError("No startup monitor set")131        return self.startup_monitor(service)132class DockerisedServiceController(133        Generic[DockerisedServiceType], ContainerisedServiceController[DockerisedServiceType], metaclass=ABCMeta):134    """135    Controller of Docker containers running a service brought up for testing.136    """137    @staticmethod138    def _call_detector_with_correct_arguments(detector: Callable, line: str, service: DockerisedServiceType) -> bool:139        """140        Calls the given detector with either line as the only argument or both line and service, depending on the141        detector's signature.142        :param detector: the detector to call143        :param line: the log line to give to the detector144        :param service: the service being started145        :return: the detector's return value146        """147        number_of_parameters = len(signature(detector).parameters)148        if number_of_parameters == 1:149            return detector(line)150        else:151            return detector(line, service)152    def __init__(self, service_model: Type[ServiceType], repository: str, tag: str, ports: List[int], *,153                 start_timeout: int=math.inf, start_tries: int=math.inf, additional_run_settings: Dict[str, Any]=None,154                 pull: bool=True,155                 start_log_detector: LogListener=None,156                 persistent_error_log_detector: LogListener=None,157                 transient_error_log_detector: LogListener=None,158                 startup_monitor: Callable[[ServiceType], bool]=None,159                 start_http_detector: Callable[[Response], bool]=None,160                 start_http_detection_endpoint: str=""):161        """162        Constructor.163        :param service_model: see `ServiceController.__init__`164        :param repository: the repository of the service to start165        :param tag: the repository tag of the service to start166        :param ports: the ports the service exposes167        :param start_timeout: timeout for starting containers168        :param start_tries: number of times to try starting the containerised service169        :param additional_run_settings: other run settings (see https://docker-py.readthedocs.io/en/1.2.3/api/#create_container)170        :param pull: whether to always pull from source repository171        :param start_log_detector: callable that detects if the service is ready for use from the logs172        :param persistent_error_log_detector: callable that detects if the service is unable to start173        :param transient_error_log_detector: callable that detects if the service encountered a transient error174        :param start_http_detector: callable that detects if the service is ready for use based on given HTTP response175        :param start_http_detection_endpoint: endpoint to call that should respond if the service has started176        """177        if startup_monitor and (start_log_detector or persistent_error_log_detector or transient_error_log_detector or178                                start_http_detector):179            raise ValueError("Cannot set `startup_monitor` in conjunction with any other detector")180        super().__init__(service_model, start_timeout, start_tries, startup_monitor=startup_monitor)181        self.repository = repository182        self.tag = tag183        self.ports = ports184        self.run_settings = additional_run_settings if additional_run_settings is not None else {}185        self.pull = pull186        self.start_log_detector = start_log_detector187        self.persistent_error_log_detector = persistent_error_log_detector188        self.transient_error_log_detector = transient_error_log_detector189        self.start_http_detector = start_http_detector190        self.start_http_detection_endpoint = start_http_detection_endpoint191        self._log_iterator: Dict[Service, Iterator] = dict()192    def _start(self, service: DockerisedServiceType, runtime_configuration: Dict):193        if self.pull:194            image = docker_client.images.pull(self.repository, tag=self.tag)195        else:196            image = docker_client.images.get(f"{self.repository}:{self.tag}")197        service.name = f"{self.repository.split('/')[-1]}-{uuid4()}"198        service.ports = {port: _get_open_port() for port in self.ports}199        service.controller = self200        create_kwargs = dict(self.run_settings)201        create_kwargs.update(runtime_configuration)202        container = docker_client.containers.create(203            image=image.id,204            name=service.name,205            ports=service.ports,206            detach=True,207            **create_kwargs)208        service.container = container209        container.start()210    def _stop(self, service: DockerisedServiceType):211        if service in self._log_iterator:212            del self._log_iterator[service]213        if service.container:214            try:215                service.container.stop()216                service.container.remove(force=True)217            except NotFound:218                pass219    def _wait_until_started(self, service: DockerisedServiceType):220        if self.startup_monitor is not None:221            return self.startup_monitor(service)222        else:223            if self.start_log_detector:224                self._wait_until_log_indicates_start(service)225            if self.start_http_detector:226                self._wait_until_http_indicates_start(service)227    def _wait_until_log_indicates_start(self, service: DockerisedServiceType):228        """229        Blocks until container log indicates that the service has started.230        :param service: starting service231        :raises ServiceStartException: raised if service cannot be started232        """233        log_stream = service.container.logs(stream=True)234        for line in log_stream:235            # XXX: Although non-streamed logs are returned as a string, the generator returns bytes!?...conftest.py
Source:conftest.py  
...59    threading.Thread(target=startup_monitor).start()60def _trigger_stop():61    localstack_stop.set()62    startup_monitor_event.set()63def startup_monitor() -> None:64    """65    The startup monitor is a thread that waits for the startup_monitor_event and, once the event is true, starts a66    localstack instance in it's own thread context.67    """68    logger.info("waiting on localstack_start signal")69    startup_monitor_event.wait()70    if localstack_stop.is_set():71        # this is called if _trigger_stop() is called before any test has requested the localstack_runtime fixture.72        logger.info("ending startup_monitor")73        localstack_stopped.set()74        return75    if is_env_true("TEST_SKIP_LOCALSTACK_START") or os.environ.get("TEST_TARGET") == "AWS_CLOUD":76        logger.info("TEST_SKIP_LOCALSTACK_START is set, not starting localstack")77        localstack_started.set()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
