How to use startup_monitor method in localstack

Best Python code snippet using localstack_python

pipeline.py

Source:pipeline.py Github

copy

Full Screen

1import asyncio2import logging3from typing import Dict, List, Iterator, Optional, Iterable, Union, Tuple4from prometheus_client import Info, start_http_server5from telethon import events6from tqdm import tqdm7from gif_pipeline import _version8from gif_pipeline.chat_builder import ChannelBuilder, WorkshopBuilder9from gif_pipeline.database import Database10from gif_pipeline.chat import Chat, Channel, WorkshopGroup11from gif_pipeline.chat_config import ChannelConfig, WorkshopConfig12from gif_pipeline.helpers.channel_fwd_tag_helper import ChannelFwdTagHelper13from gif_pipeline.helpers.chart_helper import ChartHelper14from gif_pipeline.helpers.chunk_split_helper import ChunkSplitHelper15from gif_pipeline.helpers.delete_helper import DeleteHelper16from gif_pipeline.helpers.download_helper import DownloadHelper17from gif_pipeline.helpers.duplicate_helper import DuplicateHelper18from gif_pipeline.helpers.fa_helper import FAHelper19from gif_pipeline.helpers.ffprobe_helper import FFProbeHelper20from gif_pipeline.helpers.find_helper import FindHelper21from gif_pipeline.helpers.imgur_gallery_helper import ImgurGalleryHelper22from gif_pipeline.helpers.menu_helper import MenuHelper23from gif_pipeline.helpers.merge_helper import MergeHelper24from gif_pipeline.helpers.msg_helper import MSGHelper25from gif_pipeline.helpers.public.public_tag_helper import PublicTagHelper26from gif_pipeline.helpers.reverse_helper import ReverseHelper27from gif_pipeline.helpers.scene_split_helper import SceneSplitHelper28from gif_pipeline.helpers.schedule_helper import ScheduleHelper29from gif_pipeline.helpers.send_helper import GifSendHelper30from gif_pipeline.helpers.stabilise_helper import StabiliseHelper31from gif_pipeline.helpers.subscription_helper import SubscriptionHelper32from gif_pipeline.helpers.tag_helper import TagHelper33from gif_pipeline.helpers.telegram_gif_helper import TelegramGifHelper34from gif_pipeline.helpers.update_yt_dl_helper import UpdateYoutubeDlHelper35from gif_pipeline.helpers.video_crop_helper import VideoCropHelper36from gif_pipeline.helpers.video_cut_helper import VideoCutHelper37from gif_pipeline.helpers.video_helper import VideoHelper38from gif_pipeline.helpers.video_rotate_helper import VideoRotateHelper39from gif_pipeline.helpers.zip_helper import ZipHelper40from gif_pipeline.menu_cache import MenuCache41from gif_pipeline.message import Message, MessageData42from gif_pipeline.startup_monitor import StartupMonitor, StartupState43from gif_pipeline.tag_manager import TagManager44from gif_pipeline.tasks.task_worker import TaskWorker, Bottleneck45from gif_pipeline.telegram_client import TelegramClient, message_data_from_telegram, chat_id_from_telegram46from gif_pipeline.utils import tqdm_gather47logger = logging.getLogger(__name__)48version_info = Info(49 "gif_pipeline_version",50 "Version of gif pipeline currently running"51)52PROM_PORT = 718053class PipelineConfig:54 def __init__(self, config: Dict):55 start_http_server(PROM_PORT)56 version_info.info({57 "version": _version.__VERSION__58 })59 self.startup_monitor = StartupMonitor()60 self.startup_monitor.set_state(StartupState.LOADING_CONFIG)61 self.channels = [ChannelConfig.from_json(x) for x in config['channels']]62 self.workshops = [WorkshopConfig.from_json(x) for x in config["workshop_groups"]]63 self.workshops += [chan.queue for chan in self.channels if chan.queue is not None]64 self.api_id = config["api_id"]65 self.api_hash = config["api_hash"]66 # Pipeline bot, handles video editing and sending to channels67 self.pipeline_bot_token = config.get("pipeline_bot_token")68 # Public bot, handles public queries for gifs and searches69 self.public_bot_token = config.get("public_bot_token")70 # API keys for external services71 self.api_keys = config.get("api_keys", {})72 def initialise_pipeline(self) -> 'Pipeline':73 self.startup_monitor.set_state(StartupState.CREATING_DATABASE)74 database = Database()75 self.startup_monitor.set_state(StartupState.CONNECTING_TELEGRAM)76 client = TelegramClient(self.api_id, self.api_hash, self.pipeline_bot_token, self.public_bot_token)77 client.synchronise_async(client.initialise())78 channels, workshops = client.synchronise_async(self.initialise_chats(database, client))79 self.startup_monitor.set_state(StartupState.CREATING_PIPELINE)80 pipe = Pipeline(database, client, channels, workshops, self.api_keys, self.startup_monitor)81 return pipe82 async def initialise_chats(83 self,84 database: Database,85 client: TelegramClient86 ) -> Tuple[List[Channel], List[WorkshopGroup]]:87 download_bottleneck = Bottleneck(3)88 workshop_builder = WorkshopBuilder(database, client, download_bottleneck)89 channel_builder = ChannelBuilder(database, client, download_bottleneck)90 # Get chat data for chat config91 self.startup_monitor.set_state(StartupState.INITIALISING_CHAT_DATA)92 logger.info("Initialising workshop data")93 workshop_data = await workshop_builder.get_chat_data(self.workshops)94 logger.info("Initialising channel data")95 channel_data = await channel_builder.get_chat_data(self.channels)96 message_inits = []97 self.startup_monitor.set_state(StartupState.LISTING_WORKSHOP_MESSAGES)98 logger.info("Listing messages in workshops")99 workshop_message_lists = await workshop_builder.get_message_inits(self.workshops, workshop_data)100 workshop_message_counts = [len(x) for x in workshop_message_lists]101 message_inits += [init for message_list in workshop_message_lists for init in message_list]102 self.startup_monitor.set_state(StartupState.LISTING_CHANNEL_MESSAGES)103 logger.info("Listing messages in channels")104 channel_message_lists = await channel_builder.get_message_inits(self.channels, channel_data)105 channel_message_counts = [len(x) for x in channel_message_lists]106 message_inits += [init for message_list in channel_message_lists for init in message_list]107 self.startup_monitor.set_state(StartupState.DOWNLOADING_MESSAGES)108 logger.info("Downloading messages")109 all_messages = await tqdm_gather(message_inits, desc="Downloading messages")110 logger.info("Creating workshops")111 self.startup_monitor.set_state(StartupState.CREATING_WORKSHOPS)112 workshop_dict = {}113 for work_conf, work_data, message_count in zip(self.workshops, workshop_data, workshop_message_counts):114 work_messages = all_messages[:message_count]115 all_messages = all_messages[message_count:]116 workshop_dict[work_conf.handle] = WorkshopGroup(work_data, work_conf, work_messages, client)117 logger.info("Creating channels")118 self.startup_monitor.set_state(StartupState.CREATING_CHANNELS)119 channels = []120 for chan_conf, chan_data, message_count in zip(self.channels, channel_data, channel_message_counts):121 chan_messages = all_messages[:message_count]122 all_messages = all_messages[message_count:]123 queue = None124 if chan_conf.queue:125 queue = workshop_dict[chan_conf.queue.handle]126 channels.append(Channel(chan_data, chan_conf, chan_messages, client, queue))127 workshops = list(workshop_dict.values())128 logger.info("Cleaning up excess files from chats")129 self.startup_monitor.set_state(StartupState.CLEANING_UP_CHAT_FILES)130 for chat in tqdm([*channels, *workshops], desc="Cleaning up excess files from chats"):131 chat.cleanup_excess_files()132 logger.info("Initialised channels and workshops")133 return channels, workshops134class Pipeline:135 def __init__(136 self,137 database: Database,138 client: TelegramClient,139 channels: List[Channel],140 workshops: List[WorkshopGroup],141 api_keys: Dict[str, Dict[str, str]],142 startup_monitor: StartupMonitor143 ):144 self.database = database145 self.channels = channels146 self.workshops = workshops147 self.client = client148 self.api_keys = api_keys149 self.worker = TaskWorker(3)150 self.helpers = {}151 self.public_helpers = {}152 self.menu_cache = MenuCache(database) # MenuHelper later populates this from database153 self.download_bottleneck = Bottleneck(3)154 self.startup_monitor = startup_monitor155 @property156 def all_chats(self) -> List[Chat]:157 channels = [x for x in self.channels] # type: List[Chat]158 for workshop in self.workshops:159 channels.append(workshop)160 return channels161 @property162 def all_chat_ids(self) -> List[int]:163 return [chat.chat_data.chat_id for chat in self.all_chats]164 def chat_by_id(self, chat_id: int) -> Optional[Chat]:165 for chat in self.all_chats:166 if chat.chat_data.chat_id == chat_id:167 return chat168 return None169 def chat_by_handle(self, name: str) -> Optional[Chat]:170 name = name.lstrip("@")171 for chat in self.all_chats:172 if chat.chat_data.matches_handle(name):173 return chat174 return None175 def channel_by_handle(self, name: str) -> Optional[Channel]:176 name = name.lstrip("@")177 for chat in self.channels:178 if chat.chat_data.matches_handle(name):179 return chat180 return None181 def initialise_helpers(self) -> None:182 logger.info("Initialising helpers")183 self.startup_monitor.set_state(StartupState.INITIALISING_DUPLICATE_DETECTOR)184 duplicate_helper = self.client.synchronise_async(self.initialise_duplicate_detector())185 self.startup_monitor.set_state(StartupState.INITIALISING_HELPERS)186 tag_manager = TagManager(self.channels, self.workshops, self.database)187 delete_helper = DeleteHelper(self.database, self.client, self.worker, self.menu_cache)188 menu_helper = MenuHelper(self.database, self.client, self.worker, self, delete_helper, tag_manager)189 twitter_keys = self.api_keys.get("twitter", {})190 send_helper = GifSendHelper(self.database, self.client, self.worker, self.channels, menu_helper, twitter_keys)191 schedule_helper = ScheduleHelper(192 self.database,193 self.client,194 self.worker,195 self.channels,196 menu_helper,197 send_helper,198 delete_helper,199 tag_manager200 )201 download_helper = DownloadHelper(self.database, self.client, self.worker)202 subscription_helper = SubscriptionHelper(203 self.database,204 self.client,205 self.worker,206 self,207 duplicate_helper,208 download_helper,209 self.api_keys210 )211 ffprobe_helper = FFProbeHelper(self.database, self.client, self.worker)212 helpers = [213 duplicate_helper,214 menu_helper,215 TelegramGifHelper(self.database, self.client, self.worker),216 VideoRotateHelper(self.database, self.client, self.worker),217 VideoCutHelper(self.database, self.client, self.worker),218 VideoCropHelper(self.database, self.client, self.worker),219 download_helper,220 StabiliseHelper(self.database, self.client, self.worker),221 VideoHelper(self.database, self.client, self.worker),222 MSGHelper(self.database, self.client, self.worker),223 FAHelper(self.database, self.client, self.worker),224 SceneSplitHelper(self.database, self.client, self.worker, menu_helper),225 ChunkSplitHelper(self.database, self.client, self.worker, ffprobe_helper),226 send_helper,227 delete_helper,228 MergeHelper(self.database, self.client, self.worker),229 ReverseHelper(self.database, self.client, self.worker),230 ffprobe_helper,231 ZipHelper(self.database, self.client, self.worker),232 TagHelper(self.database, self.client, self.worker, tag_manager),233 ChannelFwdTagHelper(self.database, self.client, self.worker),234 UpdateYoutubeDlHelper(self.database, self.client, self.worker),235 ChartHelper(self.database, self.client, self.worker, self, tag_manager),236 schedule_helper,237 subscription_helper,238 FindHelper(self.database, self.client, self.worker, duplicate_helper, download_helper)239 ]240 if "imgur" in self.api_keys:241 helpers.append(242 ImgurGalleryHelper(self.database, self.client, self.worker, self.api_keys["imgur"]["client_id"]))243 for helper in helpers:244 self.helpers[helper.name] = helper245 # Check yt-dl install246 self.startup_monitor.set_state(StartupState.INSTALLING_YT_DL)247 self.client.synchronise_async(download_helper.check_yt_dl())248 # Load menus from database249 self.startup_monitor.set_state(StartupState.LOADING_MENUS)250 self.client.synchronise_async(menu_helper.refresh_from_database())251 # Load schedule helper and subscription helper252 self.startup_monitor.set_state(StartupState.INITIALISING_SCHEDULES)253 self.client.synchronise_async(schedule_helper.initialise())254 self.startup_monitor.set_state(StartupState.INITIALISING_SUBSCRIPTIONS)255 self.client.synchronise_async(subscription_helper.initialise())256 # Helpers complete257 logger.info(f"Initialised {len(self.helpers)} helpers")258 # Set up public helpers259 self.startup_monitor.set_state(StartupState.INITIALISING_PUBLIC_HELPERS)260 public_helpers = [261 PublicTagHelper(self.database, self.client, self.worker, tag_manager)262 ]263 for helper in public_helpers:264 self.public_helpers[helper.name] = helper265 logger.info(f"Initialised {len(self.public_helpers)} public helpers")266 async def initialise_duplicate_detector(self) -> DuplicateHelper:267 helper = DuplicateHelper(self.database, self.client, self.worker)268 logger.info("Initialising DuplicateHelper")269 await helper.initialise_hashes(self.workshops)270 logger.info("Initialised DuplicateHelper")271 return helper272 def watch_workshop(self) -> None:273 # Start prometheus server274 self.startup_monitor.set_running()275 logger.info("Watching workshop")276 # Set up handlers277 self.client.add_message_handler(self.on_new_message, self.all_chat_ids)278 self.client.add_public_message_handler(self.pass_message_to_public_handlers)279 self.client.add_edit_handler(self.on_edit_message, self.all_chat_ids)280 self.client.add_delete_handler(self.on_deleted_message)281 self.client.add_callback_query_handler(self.on_callback_query)282 self.client.client.run_until_disconnected()283 async def on_edit_message(self, event: events.MessageEdited.Event):284 # Get chat, check it's one we know285 chat = self.chat_by_id(chat_id_from_telegram(event.message))286 if chat is None:287 logger.debug("Ignoring edited message in other chat, which must have slipped through")288 return289 # Convert to our custom Message object. This will update message data, but not the video, for edited messages290 logger.info(f"Edited message in chat: {chat}")291 message_data = message_data_from_telegram(event.message)292 new_message = await self.download_bottleneck.await_run(293 Message.from_message_data(message_data, chat.chat_data, self.client)294 )295 chat.remove_message(message_data)296 chat.add_message(new_message)297 self.database.save_message(new_message.message_data)298 logger.info(f"Edited message initialised: {new_message}")299 async def on_new_message(self, event: events.NewMessage.Event) -> None:300 # This is called just for new messages301 # Get chat, check it's one we know302 chat = self.chat_by_id(chat_id_from_telegram(event.message))303 if chat is None:304 logger.debug("Ignoring new message in other chat, which must have slipped through")305 return306 # Convert to our custom Message object. This will update message data, but not the video, for edited messages307 logger.info(f"New message in chat: {chat}")308 message_data = message_data_from_telegram(event.message)309 new_message = await self.download_bottleneck.await_run(310 Message.from_message_data(message_data, chat.chat_data, self.client)311 )312 chat.add_message(new_message)313 self.database.save_message(new_message.message_data)314 logger.info(f"New message initialised: {new_message}")315 # Pass to helpers316 await self.pass_message_to_handlers(new_message, chat)317 async def pass_message_to_handlers(self, new_message: Message, chat: Chat = None):318 if chat is None:319 chat = self.chat_by_id(new_message.chat_data.chat_id)320 # If any helpers say that a message is priority, send only to those helpers321 priority = any(helper.is_priority(chat, new_message) for helper in self.helpers.values())322 helpers = {key: val for key, val in self.helpers.items() if not priority or val.is_priority(chat, new_message)}323 # Call the helpers324 helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(325 *(helper.on_new_message(chat, new_message) for helper in helpers.values()),326 return_exceptions=True327 )328 # Handle helper results329 for helper, result in zip(helpers.keys(), helper_results):330 if isinstance(result, BaseException):331 logger.error(332 f"Helper {helper} threw an exception trying to handle message {new_message}.",333 exc_info=result334 )335 elif result:336 for reply_message in result:337 await self.pass_message_to_handlers(reply_message)338 async def pass_message_to_public_handlers(self, event: events.NewMessage.Event):339 logger.info(f"New public message: {event}")340 helper_results: Iterable[Union[BaseException, Optional[List[MessageData]]]] = await asyncio.gather(341 *(helper.on_new_message(event.message) for helper in self.public_helpers.values()),342 return_exceptions=True343 )344 for helper, result in zip(self.public_helpers.keys(), helper_results):345 if isinstance(result, BaseException):346 logger.error(347 f"Public helper {helper} threw an exception trying to handle message {event}.",348 exc_info=result349 )350 async def on_deleted_message(self, event: events.MessageDeleted.Event):351 # Get messages352 chat = self.chat_by_id(event.chat_id)353 messages = self.get_messages_for_delete_event(event)354 for message in messages:355 # Tell helpers356 helper_results = await asyncio.gather(357 *(helper.on_deleted_message(chat, message) for helper in self.helpers.values()),358 return_exceptions=True359 )360 results_dict = dict(zip(self.helpers.keys(), helper_results))361 for helper, result in results_dict.items():362 if isinstance(result, Exception):363 logger.error(364 f"Helper {helper} threw an exception trying to handle deleting message {message}.",365 exc_info=result366 )367 # If it's a menu, remove that368 self.menu_cache.remove_menu_by_message(message)369 # Remove messages from store370 logger.info(f"Deleting message {message} from chat: {message.chat_data}")371 message.delete(self.database)372 chat.remove_message(message.message_data)373 def get_messages_for_delete_event(self, event: events.MessageDeleted.Event) -> Iterator[Message]:374 deleted_ids = event.deleted_ids375 if event.chat_id is None:376 return [377 message378 for workshop in self.workshops379 for message in workshop.messages380 if message.message_data.message_id in deleted_ids381 ]382 chat = self.chat_by_id(event.chat_id)383 if chat is None:384 return []385 return [message for message in chat.messages if message.message_data.message_id in deleted_ids]386 async def on_callback_query(self, event: events.CallbackQuery.Event):387 # Get chat, check it's one we know388 chat = self.chat_by_id(chat_id_from_telegram(event))389 if chat is None:390 logger.debug("Ignoring new message in other chat, which must have slipped through")391 return392 # Get the menu393 menu = self.menu_cache.get_menu_by_message_id(event.chat_id, event.message_id)394 if not menu:395 # Handle stateless menu callbacks396 logger.debug("Received a callback for a stateless menu")397 return await self.on_stateless_callback(event, chat)398 # Check button was pressed by someone who was allowed to press it399 if not menu.menu.allows_sender(event.sender_id):400 logger.info("User tried to press a button on a menu that wasn't theirs")401 await event.answer("This is not your menu, you are not authorised to use it.")402 return403 # Check if menu has already been clicked404 if menu.clicked:405 # Menu already clicked406 logger.info("Callback received for a menu which has already been clicked")407 await event.answer("That menu has already been clicked.")408 return409 # Hand callback queries to helpers410 helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(411 *(helper.on_callback_query(event.data, menu, event.sender_id) for helper in self.helpers.values()),412 return_exceptions=True413 )414 answered = False415 for helper, result in zip(self.helpers.keys(), helper_results):416 if isinstance(result, BaseException):417 logger.error(418 f"Helper {helper} threw an exception trying to handle callback query {event}.",419 exc_info=result420 )421 elif result:422 for reply_message in result:423 await self.pass_message_to_handlers(reply_message)424 # Check for result is None because empty list would be an answer, None is not425 if result is not None and not answered:426 await event.answer()427 async def on_stateless_callback(self, event: events.CallbackQuery.Event, chat: Chat) -> None:428 # Get message429 msg = chat.message_by_id(event.message_id)430 # Handle callback query431 helper_results: Iterable[Union[BaseException, Optional[List[Message]]]] = await asyncio.gather(432 *(helper.on_stateless_callback(event.data, chat, msg, event.sender_id) for helper in self.helpers.values()),433 return_exceptions=True434 )435 answered = False436 for helper, result in zip(self.helpers.keys(), helper_results):437 if isinstance(result, BaseException):438 logger.error(439 f"Helper {helper} threw an exception trying to handle stateless callback query: {event}.",440 exc_info=result441 )442 elif result:443 for reply_message in result:444 await self.pass_message_to_handlers(reply_message)445 # Check for result is None because empty list would be an answer, None is not446 if result is not None and not answered:...

Full Screen

Full Screen

controllers.py

Source:controllers.py Github

copy

Full Screen

...127 :param service: the service128 """129 if self.startup_monitor is None:130 raise ValueError("No startup monitor set")131 return self.startup_monitor(service)132class DockerisedServiceController(133 Generic[DockerisedServiceType], ContainerisedServiceController[DockerisedServiceType], metaclass=ABCMeta):134 """135 Controller of Docker containers running a service brought up for testing.136 """137 @staticmethod138 def _call_detector_with_correct_arguments(detector: Callable, line: str, service: DockerisedServiceType) -> bool:139 """140 Calls the given detector with either line as the only argument or both line and service, depending on the141 detector's signature.142 :param detector: the detector to call143 :param line: the log line to give to the detector144 :param service: the service being started145 :return: the detector's return value146 """147 number_of_parameters = len(signature(detector).parameters)148 if number_of_parameters == 1:149 return detector(line)150 else:151 return detector(line, service)152 def __init__(self, service_model: Type[ServiceType], repository: str, tag: str, ports: List[int], *,153 start_timeout: int=math.inf, start_tries: int=math.inf, additional_run_settings: Dict[str, Any]=None,154 pull: bool=True,155 start_log_detector: LogListener=None,156 persistent_error_log_detector: LogListener=None,157 transient_error_log_detector: LogListener=None,158 startup_monitor: Callable[[ServiceType], bool]=None,159 start_http_detector: Callable[[Response], bool]=None,160 start_http_detection_endpoint: str=""):161 """162 Constructor.163 :param service_model: see `ServiceController.__init__`164 :param repository: the repository of the service to start165 :param tag: the repository tag of the service to start166 :param ports: the ports the service exposes167 :param start_timeout: timeout for starting containers168 :param start_tries: number of times to try starting the containerised service169 :param additional_run_settings: other run settings (see https://docker-py.readthedocs.io/en/1.2.3/api/#create_container)170 :param pull: whether to always pull from source repository171 :param start_log_detector: callable that detects if the service is ready for use from the logs172 :param persistent_error_log_detector: callable that detects if the service is unable to start173 :param transient_error_log_detector: callable that detects if the service encountered a transient error174 :param start_http_detector: callable that detects if the service is ready for use based on given HTTP response175 :param start_http_detection_endpoint: endpoint to call that should respond if the service has started176 """177 if startup_monitor and (start_log_detector or persistent_error_log_detector or transient_error_log_detector or178 start_http_detector):179 raise ValueError("Cannot set `startup_monitor` in conjunction with any other detector")180 super().__init__(service_model, start_timeout, start_tries, startup_monitor=startup_monitor)181 self.repository = repository182 self.tag = tag183 self.ports = ports184 self.run_settings = additional_run_settings if additional_run_settings is not None else {}185 self.pull = pull186 self.start_log_detector = start_log_detector187 self.persistent_error_log_detector = persistent_error_log_detector188 self.transient_error_log_detector = transient_error_log_detector189 self.start_http_detector = start_http_detector190 self.start_http_detection_endpoint = start_http_detection_endpoint191 self._log_iterator: Dict[Service, Iterator] = dict()192 def _start(self, service: DockerisedServiceType, runtime_configuration: Dict):193 if self.pull:194 image = docker_client.images.pull(self.repository, tag=self.tag)195 else:196 image = docker_client.images.get(f"{self.repository}:{self.tag}")197 service.name = f"{self.repository.split('/')[-1]}-{uuid4()}"198 service.ports = {port: _get_open_port() for port in self.ports}199 service.controller = self200 create_kwargs = dict(self.run_settings)201 create_kwargs.update(runtime_configuration)202 container = docker_client.containers.create(203 image=image.id,204 name=service.name,205 ports=service.ports,206 detach=True,207 **create_kwargs)208 service.container = container209 container.start()210 def _stop(self, service: DockerisedServiceType):211 if service in self._log_iterator:212 del self._log_iterator[service]213 if service.container:214 try:215 service.container.stop()216 service.container.remove(force=True)217 except NotFound:218 pass219 def _wait_until_started(self, service: DockerisedServiceType):220 if self.startup_monitor is not None:221 return self.startup_monitor(service)222 else:223 if self.start_log_detector:224 self._wait_until_log_indicates_start(service)225 if self.start_http_detector:226 self._wait_until_http_indicates_start(service)227 def _wait_until_log_indicates_start(self, service: DockerisedServiceType):228 """229 Blocks until container log indicates that the service has started.230 :param service: starting service231 :raises ServiceStartException: raised if service cannot be started232 """233 log_stream = service.container.logs(stream=True)234 for line in log_stream:235 # XXX: Although non-streamed logs are returned as a string, the generator returns bytes!?...

Full Screen

Full Screen

conftest.py

Source:conftest.py Github

copy

Full Screen

...59 threading.Thread(target=startup_monitor).start()60def _trigger_stop():61 localstack_stop.set()62 startup_monitor_event.set()63def startup_monitor() -> None:64 """65 The startup monitor is a thread that waits for the startup_monitor_event and, once the event is true, starts a66 localstack instance in it's own thread context.67 """68 logger.info("waiting on localstack_start signal")69 startup_monitor_event.wait()70 if localstack_stop.is_set():71 # this is called if _trigger_stop() is called before any test has requested the localstack_runtime fixture.72 logger.info("ending startup_monitor")73 localstack_stopped.set()74 return75 if is_env_true("TEST_SKIP_LOCALSTACK_START") or os.environ.get("TEST_TARGET") == "AWS_CLOUD":76 logger.info("TEST_SKIP_LOCALSTACK_START is set, not starting localstack")77 localstack_started.set()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful