Best Python code snippet using localstack_python
logic.py
Source:logic.py  
1# -*- coding: utf-8 -*-2import asyncio3import logging4import socket5from . import datakeeper as dk6from . import output7logger = logging.getLogger(__name__)8class Logic():9    '''Class that contains the business logic of this application'''10    def __init__(self, config, func_enqueue):11        '''Constructor'''12        self.config = config13        self.func_enqueue = func_enqueue14        self.data = dk.DataKeeper(config)15    def initialize_data(self):16        '''Reload the config and status'''17        self.data.initialize()18    async def ping(self, destination, interface, ping6=False):19        '''Asynchronously execute the ping command to check reachability'''20        command = 'ping'21        if ping6 or (':' in destination):22            command = 'ping6'23        proc = await asyncio.create_subprocess_exec(command, '-q', '-c', '1', '-w', '1', '-W', '1', '-I', interface, destination, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)24        stdout, stderr = await proc.communicate()25        return proc.returncode26    def is_hostname(self, peername):27        '''Checks whether the provided peer is defined by hostname (in contrast to IP address)'''28        if peername is None:29            return False30        result = not (':' in peername) # ":" indicates an IPv6 address and is not allowed in hostname31        if result: # still a hostname candidate?32            parts = peername.split('.')33            result = (len(parts) != 4) or not (parts[0].isnumeric()) or not (parts[1].isnumeric()) or not (parts[2].isnumeric()) or not (parts[3].isnumeric())34        return result35    def endpoint_is_hostname(self, endpoint):36        '''Checks whether the provided endpoint is defined by hostname (in contrast to IP address)'''37        if endpoint is None:38            return False39        return self.is_hostname(endpoint.rpartition(':')[0]) # rpartition also works with IPv640    async def do_periodically(self):41        '''Tasks to be executed periodically each cycle (called by scheduler coroutine)'''42        logger.debug('Executing periodic tasks')43        # Updates the WireGuard status information44        self.data.update_status()45        # Get config attributes46        cycles_wait = self.config.cycles_wait47        cycles_checking = self.config.cycles_checking48        cycles_checkperiod = self.config.cycles_checkperiod49        cycles_slowcheckingperiod = self.config.cycles_slowcheckingperiod50        ping_interval = self.config.ping_interval51        ping_failafternum = self.config.ping_failafternum52        # Iterate through all peers of all interfaces and determine new status53        ping_plan = []54        for interface, interfacedata, peer, peerdata in self.data.peeriterator():55            status = peerdata.get('status', 'undefined')56            cycle_counter = peerdata.get('cycle-counter', 0)57            #print(interface, 'Status', status, cycle_counter)58            update_peer = False59            # Act based on current state60            next = 'unchanged'61            if peerdata.get('handshake-status', 'failed') not in ['none', 'failed']:62                if peerdata.get('ping-address') is None:63                    peerdata['ping-address'] = peerdata['allowed-ips'][0].partition('/')[0]64                if ping_interval > 0:65                    if cycle_counter % ping_interval == 0:66                        next = 'ping'                67                if (status == 'undefined') and (next == 'unchanged'):68                    next = 'up:ok'69            elif (status == 'undefined') or (status == 'up:ok'):70                if self.endpoint_is_hostname(peerdata.get('config_endpoint')):71                    next = 'down:waiting' if (cycles_wait > 0) and (status != 'undefined') else 'down:checking'72                else: # no further check of peer needed73                    if peerdata.get('endpoint') is None:74                        next = 'disabled'75                    else:76                        next = 'down'77            elif status == 'down:waiting':78                if cycle_counter >= cycles_wait:79                    next = 'down:checking'80            elif status == 'down:checking':81                if cycle_counter >= cycles_checking:82                    next = 'down:backingoff'83                else:84                    if cycle_counter % cycles_checkperiod == 0:85                        update_peer = True86            elif status == 'down:backingoff':87                if cycle_counter >= peerdata['backingoff-limit']:88                    update_peer = True89                    peerdata['cycle-counter'] = 090                    peerdata['backingoff-limit'] = 2 * peerdata['backingoff-limit']91                    if peerdata['backingoff-limit'] >= cycles_slowcheckingperiod:92                        next = 'down:slowchecking'93            elif status == 'down:slowchecking':94                if cycle_counter >= cycles_slowcheckingperiod:95                    update_peer = True96                    next = 'down:slowchecking'97            elif status == 'down':               98                pass99            elif status == 'disabled':               100                pass101            else:102                logger.critical('Unknown status [{0}]'.format(status))103            # Count cycles104            peerdata['cycle-counter'] = peerdata.get('cycle-counter', 0) + 1105            # Change state as needed106            if next == 'ping':107                if status == 'undefined':108                    peerdata['cycle-counter'] = 0109                    status = 'down:checking'110                ping_plan.append((interface, interfacedata, peer, peerdata))111                next = None112            elif next == 'down:backingoff':113                peerdata['backingoff-limit'] = 2 * cycles_checkperiod114            elif next == 'down:slowchecking':115                peerdata['backingoff-limit'] = None116            elif next == 'unchanged':117                next = None118            if next is not None:119                logger.info('Changing status of [{interface}:{peer}] to [{next}] after {cycle_counter} cycles'.format(interface=interface, peer=peer, next=next, cycle_counter=cycle_counter))120                peerdata['cycle-counter'] = 0121                status = next122            # Update peer if requested123            if update_peer:124                config_endpoint = peerdata.get('config_endpoint')125                logger.debug('Requesting to check for update of [{interface}:{peer}], endpoint [{config_endpoint}]'.format(interface=interface, peer=peer, config_endpoint=config_endpoint))126                await self.func_enqueue('update_peer', { 'interface': interface, 'peer': peer, 'config_endpoint': peerdata.get('config_endpoint'), 'endpoint': peerdata.get('endpoint') })127            #self.data.set(interface, peer, 'status', status)128            peerdata['status'] = status129        # Check reachability by pinging peers130        if len(ping_plan) > 0:131            ping_tasks = []132            for interface, interfacedata, peer, peerdata in ping_plan:133                addr = peerdata['ping-address']134                ping_tasks.append(asyncio.ensure_future(self.ping(addr, interface)))135            await asyncio.gather(*ping_tasks)136            for i, ping_task in enumerate(ping_tasks):137                if ping_task.result() == 0:138                    if ping_plan[i][3]['status'] != 'up:ok':139                        logger.info('Changing status of [{interface}:{peer}] to [up:ok] after successful ping'.format(interface=ping_plan[i][0], peer=ping_plan[i][2]))140                        ping_plan[i][3]['status'] = 'up:ok'141                        ping_plan[i][3]['cycle-counter'] = 0142                    ping_plan[i][3]['ping-failcounter'] = 0143                else:144                    ping_plan[i][3]['ping-failcounter'] = ping_plan[i][3].get('ping-failcounter', 0) + 1145                    if ping_plan[i][3]['ping-failcounter'] >= ping_failafternum:146                        if ping_plan[i][3]['status'] != 'down:waiting':147                            logger.info('Changing status of [{interface}:{peer}] to [down:waiting] after failed ping'.format(interface=ping_plan[i][0], peer=ping_plan[i][2]))148                            ping_plan[i][3]['status'] = 'down:waiting'149                        ping_plan[i][3]['cycle-counter'] = 0150        # Output new status151        await output.output_status(self.config.outputs, self.data)152    def update_peer(self, interface, peer, config_endpoint, endpoint):153        '''Checks whether peer needs to be updated and does it if needed'''154        config_endpoint, _, config_port = config_endpoint.rpartition(':') # rpartition also works with IPv6155        # Endpoint IPv4 has format "1.1.1.1:51712", endpoint IPv6 has format "[2003:db:cf0c:f100:dea6:32ff:fe9a:859d]:51712"; thus split at last colon156        endpoint = endpoint.rpartition(':')[0]157        # Remove leading "[" and trailing "]" in IPv6 case158        endpoint = endpoint.strip('[')159        endpoint = endpoint.strip(']')160        logger.info('Resolving [{0}]'.format(config_endpoint))161        needed_endpoint = None162        try:163            needed_endpoint = socket.getaddrinfo(config_endpoint, 0)[0][4][0] # get ip address164        except socket.gaierror as e:165            # Something like "socket.gaierror: [Errno -3] Try again" can happen here166            logger.warning('Error resolving interface endpoint [{0}]: {1}'.format(config_endpoint, str(e)))            167        if (needed_endpoint is not None) and (needed_endpoint != endpoint):168            logger.info('Changing interface endpoint [{0}] to changed IP [{1}]'.format(config_endpoint, needed_endpoint))        169            if config_port is not None:170                needed_endpoint += ':' + config_port171            self.data.set_endpoint(interface, peer, needed_endpoint)172    async def process_queue(self, item):173        '''Process an item from the event queue (called by queue listener coroutine)'''174        data = item.get('data', dict())175        if item.get('command') == 'update_peer':176            self.update_peer(data['interface'], data['peer'], data['config_endpoint'], data['endpoint'])177        else:...ah_repository_sync.py
Source:ah_repository_sync.py  
1#!/usr/bin/python2# coding: utf-8 -*-3# (c) 2020, Sean Sullivan <@sean-m-sullivan>4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)5from __future__ import absolute_import, division, print_function6__metaclass__ = type7ANSIBLE_METADATA = {"metadata_version": "1.1", "status": ["preview"], "supported_by": "community"}8DOCUMENTATION = """9---10module: ah_repository_sync11author: "Tom Page (@Tompage1994)"12short_description: Configure a repository.13description:14    - Configure an Automation Hub remote Repository. See15      U(https://www.ansible.com/) for an overview.16options:17    name:18      description:19        - Repository name. Probably one of community or rh-certified.20      required: True21      type: str22    wait:23      description:24        - Wait for the repository to finish syncing before returning.25      required: false26      default: True27      type: bool28    interval:29      description:30        - The interval to request an update from Automation Hub.31      required: False32      default: 133      type: float34    timeout:35      description:36        - If waiting for the project to update this will abort after this37          amount of seconds38      type: int39extends_documentation_fragment: redhat_cop.ah_configuration.auth40"""41EXAMPLES = """42- name: Sync rh-certified repo without waiting43  ah_repository_sync:44    name: rh-certified45    wait: false46- name: Sync community repo and wait up to 60 seconds47  ah_repository_sync:48    name: community49    wait: true50    timeout: 6051"""52from ..module_utils.ah_module import AHModule53import time54def main():55    # Any additional arguments that are not fields of the item can be added here56    argument_spec = dict(57        name=dict(required=True),58        wait=dict(default=True, type="bool"),59        interval=dict(default=1.0, type="float"),60        timeout=dict(default=None, type="int"),61    )62    # Create a module for ourselves63    module = AHModule(argument_spec=argument_spec)64    # Extract our parameters65    name = module.params.get("name")66    wait = module.params.get("wait")67    interval = module.params.get("interval")68    timeout = module.params.get("timeout")69    sync_endpoint = "api/galaxy/content/{0}/v3/sync".format(name)70    config_endpoint = "{0}/config".format(sync_endpoint)71    repository = module.get_only(config_endpoint, name_or_id=name, key="req_url")72    if repository is None:73        module.fail_json(msg="Unable to find repository")74    result = module.post_endpoint(sync_endpoint)75    if result["status_code"] != 200:76        module.fail_json(msg="Failed to update project, see response for details", response=result)77    module.json_output["changed"] = True78    module.json_output["task"] = result["json"]["task"]79    if not wait:80        module.exit_json(**module.json_output)81    # Grab our start time to compare against for the timeout82    start = time.time()83    result = module.get_only(config_endpoint, name_or_id=name, key="req_url")84    while not result["last_sync_task"]["finished_at"]:85        if timeout and timeout < time.time() - start:86            module.json_output["msg"] = "Monitoring of {0} - {1} aborted due to timeout".format("repository_sync", name)87            module.wait_sync_output(result)88            module.fail_json(**module.json_output)89        time.sleep(interval)90        result = module.get_only(config_endpoint, name_or_id=name, key="req_url")91        module.json_output["status"] = result["last_sync_task"]["state"]92    if result["last_sync_task"]["state"] == "failed":93        module.json_output["msg"] = "The {0} - {1}, failed".format("repository_sync", name)94        module.wait_sync_output(result)95        module.fail_json(**module.json_output)96    module.wait_sync_output(result)97    module.exit_json(**module.json_output)98if __name__ == "__main__":...instrument.py
Source:instrument.py  
1import json2import logging3import numpy as np4import requests5class Instrument:6    def __init__(self, host: str, port: int):7        self.host = host8        self.port = port9        self.url = f"http://{self.host}:{self.port}"10        self.config_endpoint = f"{self.url}/config"11        self.data_endpoint = f"{self.url}/data"12    def get_config(self) -> dict[str, float | int]:13        return json.loads(requests.get(self.config_endpoint).content)14    def set_config(self, freq_min: float, freq_max: float, n_freq: int):15        config = {16            "freq_min": freq_min,17            "freq_max": freq_max,18            "n_freq": n_freq19        }20        print(f"Configuration used: {config}")21        requests.post(self.config_endpoint, json=config)22    def reset_config(self):23        requests.delete(self.config_endpoint)24    def get_data(self) -> np.ndarray:25        response = requests.get(self.data_endpoint)26        if response.status_code == 200:27            return np.array(json.loads(response.content))28        else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
