Best Python code snippet using localstack_python
fetching.py
Source:fetching.py  
1"""2The only place in this package that directly interacts with the AWS API.3Contains the cached fetcher that ensures that expensive calls to the AWS API4are cached.5"""6import time7from timeit import default_timer8from loguru import logger9from prometheus_ecs_discoverer import s, telemetry, toolbox10from prometheus_ecs_discoverer.caching import SlidingCache11# Telemetry ====================================================================12CLUSTERS = telemetry.gauge("clusters", "Fetched clusters.")13CINSTANCES = telemetry.gauge(14    "container_instances", "Fetched container instances.", ("cluster",)15)16TASKS = telemetry.gauge("tasks", "Fetched tasks.", ("cluster",))17DURATION = telemetry.histogram(18    "api_requests_duration_seconds", "Duration of requests to the AWS API.", ("method",)19)20# ==============================================================================21class CachedFetcher:22    """Work with the AWS API leveraging a sliding cache.23    Reduces the amount of request made to the AWS API helping to stay below the24    request limits. Only implements necessary methods. So not a generic class.25    Rember to flush all caches with `flush_caches` after every "full round".26    """27    def __init__(28        self,29        ecs_client,30        ec2_client,31        throttle_interval_seconds: float = 0.1,32        should_throttle: bool = False,33    ):34        """35        :param ecs_client: Boto3 ECS client.36        :param ec2_client: Boto3 EC2 client.37        :param throttle_interval_seconds: Time to sleep after every single38            request made to the AWS API.39        :param should_throttle: If process should go to sleep after a request40            made to the AWS API.41        """42        self.ecs = ecs_client43        self.ec2 = ec2_client44        self.throttle_interval_seconds = throttle_interval_seconds45        self.should_throttle = should_throttle46        self.task_cache = SlidingCache(name="task_cache")47        self.task_definition_cache = SlidingCache(name="task_definition_cache")48        self.container_instance_cache = SlidingCache(name="container_instance_cache")49        self.ec2_instance_cache = SlidingCache(name="ec2_instance_cache")50    def flush_caches(self) -> None:51        """Flush all caches. Should be called at the end of a round."""52        self.task_cache.flush()53        self.task_definition_cache.flush()54        self.container_instance_cache.flush()55        self.ec2_instance_cache.flush()56    # ==========================================================================57    def get_arns(self, method: str, key: str, **aws_api_parameters) -> list:58        """Get the ARNs with a given method and key.59        Args:60            method: AWS API method to use.61            key: Key to extract from the response(s).62        Returns:63            list: List of ARNs.64        """65        arns = []66        total_start_time = default_timer()67        start_time = total_start_time68        for page in self.ecs.get_paginator(method).paginate(**aws_api_parameters):69            DURATION.labels(method).observe(max(default_timer() - start_time, 0))70            arns += page.get(key, [])71            if self.should_throttle:72                time.sleep(self.throttle_interval_seconds)73            start_time = default_timer()74        if s.DEBUG:75            logger.bind(**aws_api_parameters).debug("{} {}.", method, key)76        if s.PRINT_STRUCTS:77            toolbox.pstruct(arns, f"{key} {method}")78        return arns79    def get_cluster_arns(self) -> list:80        """Get cluster ARNs."""81        arns = self.get_arns("list_clusters", "clusterArns")82        CLUSTERS.set(len(arns))83        return arns84    def get_container_instance_arns(self, cluster_arn: str) -> list:85        """Get container instance ARNs for given cluster ARN."""86        arns = self.get_arns(87            "list_container_instances", "containerInstanceArns", cluster=cluster_arn88        )89        CINSTANCES.labels(cluster_arn).set(len(arns))90        return arns91    def get_task_definition_arns(self) -> list:92        """Get task definition ARNs."""93        return self.get_arns("list_task_definitions", "taskDefinitionArns")94    def get_task_arns(self, cluster_arn: str) -> list:95        """Get task ARNs for given cluster ARN."""96        arns = self.get_arns("list_tasks", "taskArns", cluster=cluster_arn)97        TASKS.labels(cluster_arn).set(len(arns))98        return arns99    # ==========================================================================100    def get_tasks(self, cluster_arn: str, task_arns: list = None) -> dict:101        """Get task descriptions from cache / AWS API.102        Returns:103            dict: Keys are the task ARNs, values the respective task descriptions.104        """105        def uncached_fetch(task_arns: list) -> dict:106            logger.bind(cluster_arn=cluster_arn, task_arns=task_arns).debug(107                "Fetch tasks from AWS with describe_tasks."108            ) if s.DEBUG else None109            tasks = []110            chunked_task_arns = toolbox.chunk_list(task_arns, 100)111            for task_arns_chunk in chunked_task_arns:112                start_time = default_timer()113                _t = self.ecs.describe_tasks(cluster=cluster_arn, tasks=task_arns_chunk)[114                    "tasks"115                ]116                tasks += list(117                    filter(lambda x: x.get("lastStatus", None) == "RUNNING", _t)118                )119                DURATION.labels("describe_tasks").observe(120                    max(default_timer() - start_time, 0)121                )122                if self.should_throttle:123                    time.sleep(self.throttle_interval_seconds)124            if s.PRINT_STRUCTS:125                toolbox.pstruct(tasks, "describe_tasks")126            return toolbox.list_to_dict(tasks, "taskArn")127        if task_arns is None:128            task_arns = self.get_task_arns(cluster_arn)129        return self.task_cache.get_multiple(task_arns, uncached_fetch)130    def get_task_definition(self, arn: str) -> dict:131        """Get single task definition descriptions from cache / AWS API.132        Returns:133            dict: Key is the task definition ARN, value the task definition134                description.135        """136        def uncached_fetch(arn: str) -> dict:137            logger.bind(arn=arn).debug(138                "Fetch task definition from AWS with describe_task_definition."139            ) if s.DEBUG else None140            start_time = default_timer()141            task_definition: dict = self.ecs.describe_task_definition(taskDefinition=arn)[142                "taskDefinition"143            ]144            DURATION.labels("describe_task_definition").observe(145                max(default_timer() - start_time, 0)146            )147            if s.PRINT_STRUCTS:148                toolbox.pstruct(task_definition, "fetched task definition")149            if self.should_throttle:150                time.sleep(self.throttle_interval_seconds)151            return task_definition152        return self.task_definition_cache.get_single(arn, uncached_fetch)153    def get_task_definitions(self, arns: list = None) -> dict:154        """Get task definition descriptions from cache / AWS API.155        Every given ARN corresponds with a (cached) call.156        Returns:157            dict: Keys are the task definition ARNs, values the respective task158                definition descriptions.159        """160        def uncached_fetch(arns: list) -> dict:161            logger.bind(arns=arns).debug(162                "Fetch task definitions from AWS with describe_task_definition."163            ) if s.DEBUG else None164            descriptions = {}165            for arn in arns:166                start_time = default_timer()167                response = self.ecs.describe_task_definition(taskDefinition=arn)168                DURATION.labels("describe_task_definition").observe(169                    max(default_timer() - start_time, 0)170                )171                response_arn = response["taskDefinition"]["taskDefinitionArn"]172                descriptions[response_arn] = response["taskDefinition"]173                if self.should_throttle:174                    time.sleep(self.throttle_interval_seconds)175            if s.PRINT_STRUCTS:176                toolbox.pstruct(descriptions, "fetched task definitions")177            return descriptions178        if arns is None:179            arns = self.get_task_definition_arns()180        return self.task_definition_cache.get_multiple(181            arns,182            uncached_fetch,183        )184    def get_container_instances(self, cluster_arn: str, arns: list = None) -> dict:185        """Get container instance descriptions from cache / AWS API.186        Returns:187            dict: Keys are the container instance ARNs, values the respective188                container instance descriptions.189        """190        def uncached_fetch(arns: list) -> dict:191            logger.bind(arns=arns).debug(192                "Fetch container instances from AWS with describe_container_instances."193            ) if s.DEBUG else None194            lst = []195            arns_chunks = toolbox.chunk_list(arns, 100)196            for arns_chunk in arns_chunks:197                start_time = default_timer()198                lst += self.ecs.describe_container_instances(199                    cluster=cluster_arn, containerInstances=arns_chunk200                )["containerInstances"]201                DURATION.labels("describe_container_instances").observe(202                    max(default_timer() - start_time, 0)203                )204                if self.should_throttle:205                    time.sleep(self.throttle_interval_seconds)206            dct = toolbox.list_to_dict(lst, "containerInstanceArn")207            if s.PRINT_STRUCTS:208                toolbox.pstruct(dct, "describe_container_instances")209            return dct210        if arns is None:211            arns = self.get_container_instance_arns(cluster_arn)212        return self.container_instance_cache.get_multiple(213            arns,214            uncached_fetch,215        )216    def get_ec2_instances(self, instance_ids: list) -> dict:217        """Get EC2 instance descriptions from cache / AWS API.218        Returns:219            dict: Keys are the EC2 instance ARNs, values the respective220                EC2 instance descriptions.221        """222        def uncached_fetch(instance_ids: list) -> dict:223            logger.bind(instance_ids=instance_ids).debug(224                "Fetch EC2 instances from AWS with describe_instances."225            ) if s.DEBUG else None226            instances_list = []227            ids_chunks = toolbox.chunk_list(instance_ids, 100)228            for ids_chunk in ids_chunks:229                start_time = default_timer()230                response = self.ec2.describe_instances(InstanceIds=ids_chunk)231                for reservation in response["Reservations"]:232                    instances_list += reservation["Instances"]233                DURATION.labels("describe_instances").observe(234                    max(default_timer() - start_time, 0)235                )236                if self.should_throttle:237                    time.sleep(self.throttle_interval_seconds)238            dct = toolbox.list_to_dict(instances_list, "InstanceId")239            if s.PRINT_STRUCTS:240                toolbox.pstruct(dct, "ec2.describe_instances")241            return dct242        return self.ec2_instance_cache.get_multiple(243            instance_ids,244            uncached_fetch,...main.py
Source:main.py  
1"""2Entry to PromED. Contains a lot of instrumentation and is responsible for3looping the discovery. It daemonizes the functionality.4"""5import sys6import time7from timeit import default_timer8import boto39from botocore.config import Config10from loguru import logger11from prometheus_client import Histogram, start_http_server12from prometheus_ecs_discoverer import discovery, fetching, marshalling, s, telemetry13INTERVAL_BREACHED_COUNTER = telemetry.counter(14    "execution_breaches_total",15    "Number of times the discovery round took longer than the configured interval.",16)17INTERVAL_BREACHED_COUNTER.inc(0)18def configure_logging() -> None:19    """Configure Loguru logging."""20    logger.remove()21    if s.LOG_JSON:22        fmt = "{message}"23        logger.add(sys.stderr, format=fmt, serialize=True, level=s.LOG_LEVEL)24    else:25        fmt = "<green>{time:HH:mm:ss}</green> <level>{level}</level> <cyan>{function}</cyan> {message} <dim>{extra}</dim>"26        logger.add(sys.stderr, colorize=True, format=fmt, level=s.LOG_LEVEL)27    if s.BOTO3_DEBUG:28        import boto329        boto3.set_stream_logger(name="botocore")30def expose_info() -> None:31    """Expose a gauge with info label values."""32    telemetry.info(33        {34            "interval_seconds": str(s.INTERVAL),35        }36    )37    INTERVAL_INFO = telemetry.gauge(38        "info_interval_seconds", "Configured interval in seconds."39    )40    INTERVAL_INFO.set(s.INTERVAL)41def get_interval_histogram(interval: int) -> Histogram:42    """Create histogram with buckets that fit the given interval.43    10 buckets below the interval and two buckets with 10 second steps larger44    than the interval.45    Args:46        interval (int): Interval PromED is running at.47    Returns:48        Histogram: Prometheus Histogram object.49    """50    steps = 1051    step_size = round(interval / steps, 0)52    return telemetry.histogram(53        "round_duration_seconds",54        "Histogram for duration",55        buckets=tuple(56            [x * step_size for x in range(steps)]57            + [58                interval + 10,59                interval + 20,60                float("inf"),61            ]62        ),63    )64def main():65    interval: int = s.INTERVAL66    output_dir: str = s.OUTPUT_DIRECTORY67    should_throttle: bool = s.WARMUP_THROTTLE68    configure_logging()69    expose_info()70    logger.info("Welcome to PromED, the Prometheus ECS Discoverer.")71    logger.bind(settings=s.as_dict()).info("Here is the used configuration.")72    DURATION_HISTOGRAM = get_interval_histogram(interval)73    if s.PROMETHEUS_START_HTTP_SERVER:74        port = s.PROMETHEUS_SERVER_PORT75        logger.bind(port=port).info("Start Prometheus HTTP server to expose metrics.")76        start_http_server(port=port)77    logger.info("Create Boto3 session.")78    session = boto3.Session()79    config = Config(retries={"max_attempts": s.MAX_RETRY_ATTEMPTS, "mode": "standard"})80    logger.info("Create Boto3 clients and CachedFetcher.")81    fetcher = fetching.CachedFetcher(82        session.client("ecs", config=config),83        session.client("ec2", config=config),84        should_throttle=should_throttle,85        throttle_interval_seconds=s.THROTTLE_INTERVAL_SECONDS,86    )87    logger.info("Create PrometheusEcsDiscoverer.")88    discoverer = discovery.PrometheusEcsDiscoverer(fetcher)89    if should_throttle:90        logger.info("First discovery round will be throttled down.")91    logger.info("Ready for discovery. The discoverer will run until interrupted.")92    first_round = True93    while True:94        logger.info("Start new discovery round.")95        start_time = default_timer()96        if not first_round:97            discoverer.fetcher.should_throttle = False98        targets = discoverer.discover()99        marshalling.write_targets_to_file(targets, output_dir)100        if first_round and should_throttle:101            fetcher.should_throttle = False102            first_round = False103        duration = max(default_timer() - start_time, 0)104        logger.bind(duration=duration).info("Finished discovery round.")105        DURATION_HISTOGRAM.observe(duration)106        if duration > interval:107            logger.bind(duration=duration).warning(108                "Discovery round took longer than the configured interval. Please investigate."109            )110            INTERVAL_BREACHED_COUNTER.inc()111        time_left = max(interval - duration, 0)112        time.sleep(time_left)113if __name__ == "__main__":...plex_detector.py
Source:plex_detector.py  
1import asyncio2import traceback3from plexapi.server import PlexServer4from helpers import PlexInhibitor, InhibitSource5import logging6logging.getLogger(__name__).setLevel(logging.DEBUG)7class PlexDetector:8    """Detects if anyone is streaming on a Plex server, and if so it determines if qbittorrent should have its upload9    throttled"""10    def __init__(self, plex_url, plex_token, interface_class=PlexInhibitor):11        logging.info(f"Initializing plexDetector, connecting to {plex_url}")12        self.plex_url = plex_url13        self.plex_token = plex_token14        try:15            self.plex_server = PlexServer(self.plex_url, self.plex_token)16        except Exception as e:17            logging.error(f"Failed to connect to {plex_url}: {e}")18            self.plex_server = None19        self.interface_class = interface_class20        logging.info(f"Connected to {plex_url}")21        self.interface_class.connected_to_plex = True22    def _get_activity(self):23        try:24            sessions = self.plex_server.sessions()25            should_throttle = False26            self.interface_class.total_sessions = 027            for session in sessions:28                if session.players[0].state == "playing" or session.players[0].state == "buffering":29                    if session.session[0].location == "lan":30                        continue31                    should_throttle = True32                    self.interface_class.total_sessions += 133            return should_throttle34        except Exception as e:35            logging.error(f"Failed to get plex activity: {e}\n{traceback.format_exc()}")36            self.interface_class.connected_to_plex = False37    def get_activity(self):38        return self._get_activity()39    async def run(self):40        while not self.interface_class.shutdown:41            logging.debug("Checking plex activity")42            if self._get_activity():43                self.interface_class.should_inhibit = True44            else:45                self.interface_class.should_inhibit = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
