Best Python code snippet using localstack_python
fetch.py
Source:fetch.py  
1import logging2import os3from pathlib import Path4import time5from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple6import yaml7import botocore.exceptions8from botocore.session import Session as Boto9from introspector.error import GFInternal10from introspector.models.raw_import import ERROR_KEY11_log = logging.getLogger(__name__)12_THIS_DIR: Path = Path(os.path.dirname(__file__))13KeyFilter = Callable[[str], bool]14class ClientProxy(object):15  def __init__(self, client, service: str):16    self._client = client17    self._patch_client()18    self._service = service19  @property20  def _is_gov(self) -> bool:21    return '-gov-' in self._client.meta.region_name22  def _patch_client(self):23    pass24  def _should_import(self, key: str) -> bool:25    return True26  def resource_names(self) -> Iterator[str]:27    return filter(self._should_import, dir(self._client))28  def _list_args(self, key: str) -> Dict:29    return {}30  def _paginate_args(self, key: str) -> Dict:31    return {}32  def canonical_name(self, py_name: str) -> str:33    return self._client.meta.method_to_api_mapping[py_name]34  def _map_error_code(self, code: str, resource_name: str) -> Optional[str]:35    return None36  def list(self, key: str, kwargs,37           retry_on_throttle) -> Optional[Tuple[str, Any]]:38    prefix = len(key.split('_')[0])39    resource_name = self._client._PY_TO_OP_NAME[key][prefix:]40    extra_kwargs = dict(self._list_args(key), **kwargs)41    try:42      if self._client.can_paginate(key):43        paginator = self._client.get_paginator(key)44        method_args = dict(self._paginate_args(key), **extra_kwargs)45        iterator = paginator.paginate(**method_args)46        result = iterator.build_full_result()47      else:48        op = self._client.meta.method_to_api_mapping[key]49        op_model = self._client.meta.service_model.operation_model(op)50        output = op_model.output_shape51        attr = getattr(self._client, key)52        full_result = attr(**extra_kwargs)53        result = {54            result_key: full_result[result_key]55            for result_key in output.members.keys()56            if result_key in full_result57        }58      return resource_name, result59    except KeyError as e:60      raise GFInternal(61          f'Pagination Exception raise {self._client._PY_TO_OP_NAME[key]}')62    except botocore.exceptions.ParamValidationError as e:63      # TODO: fix this64      _log.debug(f'{key} Needs param input {str(e)}')65      return resource_name, {ERROR_KEY: 'needs param input'}66    except botocore.exceptions.ClientError as e:67      code = e.response.get('Error', {}).get('Code')68      if code == 'UnsupportedOperation':69        _log.info(f'{resource_name} Not supported in this region')70        return resource_name, {ERROR_KEY: 'unsupported in region'}71      elif code == 'MissingParameter':72        return resource_name, {ERROR_KEY: 'missing parameter'}73      elif code == 'OptInRequired':74        return resource_name, {ERROR_KEY: 'missing opt-in'}75      elif code in ('AuthFailure', 'AccessDeniedException',76                    'UnauthorizedOperation', 'AccessDenied'):77        _log.warn(f'Missing permissions for {self._service} {key}')78        return resource_name, {ERROR_KEY: 'auth failure'}79      elif code == 'InvalidClientTokenId':80        return resource_name, {ERROR_KEY: 'invalid token'}81      elif code is not None and code.startswith('NoSuch'):82        # No results, return nothing83        return None84      elif code == 'Throttling' or code == 'ThrottlingException':85        if retry_on_throttle:86          _log.warn(f'Throttled for {key}, retrying')87          time.sleep(3)88          return self.list(key, kwargs, retry_on_throttle=False)89        else:90          _log.error(f'Throttled for {key}, not retrying')91          raise e92      else:93        mapped = self._map_error_code(code, resource_name)94        if mapped is not None:95          return resource_name, {ERROR_KEY: mapped}96        raise e97  def get(self, key: str, kwargs):98    try:99      api_call = getattr(self._client, key)100      full_result = api_call(**kwargs)101      op = self._client.meta.method_to_api_mapping[key]102      op_model = self._client.meta.service_model.operation_model(op)103      output = op_model.output_shape104      attr_result = {}105      saw_result = False106      for result_key in output.members.keys():107        value = full_result.get(result_key)108        if value is not None:109          saw_result = True110          attr_result[result_key] = value111      if saw_result:112        return attr_result113    except botocore.exceptions.ClientError as e:114      error = e.response.get('Error', {}).get('Code', '')115      if error.startswith('NoSuch') \116        or error.endswith('NotFoundError') \117          or error.endswith('NotFound') \118            or error == 'ResourceNotFoundException':119        # No results, nothing to return120        return None121      elif error in ('AuthFailure', 'MethodNotAllowed',122                     'AccessDeniedException', 'AccessDenied'):123        # Auth failure124        _log.warn(f'Missing permissions for {self._service} {key}')125      else:126        raise e127class EC2ClientProxy(ClientProxy):128  MISSING_PAGINATION = {129      'describe_coip_pools':130      'PoolIds',131      'describe_elastic_gpus':132      'ElasticGpuSet',133      'describe_instance_type_offerings':134      'InstanceTypeOfferings',135      'describe_local_gateway_route_table_virtual_interface_group_associations':136      'LocalGatewayRouteTableVirtualInterfaceGroupAssociationIds',137      'describe_local_gateway_route_table_vpc_associations':138      'LocalGatewayRouteTableVpcAssociations',139      'describe_local_gateway_route_tables':140      'LocalGatewayRouteTableSet',141      'describe_local_gateway_virtual_interface_groups':142      'LocalGatewayVirtualInterfaceGroups',143      'describe_local_gateway_virtual_interfaces':144      'LocalGatewayVirtualInterfaces',145      'describe_local_gateways':146      'LocalGateways',147      'describe_transit_gateway_peering_attachments':148      'TransitGatewayPeeringAttachments',149      'describe_instance_types':150      'InstanceTypes',151      'describe_transit_gateway_multicast_domains':152      'TransitGatewayMulticastDomains'153  }154  SKIPLIST = [155      'describe_reserved_instances_offerings',156      # unsupported157      'describe_transit_gateway_connect_peers',158      'describe_transit_gateway_connects',159      'describe_spot_price_history',160      # Describes services that *can* have VPC endpoints, not ones that do161      'describe_vpc_endpoint_services',162      # TODO: verify this, i think it's about regional support for long ids163      'describe_aggregate_id_format',164      # TODO: look into this165      'describe_moving_addresses',166      # Failing in some cases, and we don't map167      'describe_id_format',168      # not needed, it's on most return values169      'describe_tags',170      # Not top level171      'describe_instance_attribute'172  ]173  GOV_SKIPLIST = [174      'describe_client_vpn_endpoints', 'describe_managed_prefix_lists',175      'describe_network_insights_analyses', 'describe_network_insights_paths'176  ]177  EXTRA_ARGS = {178      'describe_images': {179          'Filters': [{180              'Name': 'is-public',181              'Values': ['false']182          }]183      },184      'describe_snapshots': {185          'OwnerIds': ['self']186      }187  }188  PAGE_SIZES = {189      'describe_ipv6_pools': 10,190      'describe_public_ipv4_pools': 10,191      'describe_reserved_instances_modifications': None192  }193  INVALID_ACTIONS = [194      'FpgaImages', 'TransitGatewayMulticastDomains', 'ByoipCidrs',195      'Ipv6Pools', 'CoipPools',196      'LocalGatewayRouteTableVirtualInterfaceGroupAssociations',197      'LocalGatewayRouteTableVpcAssociations', 'LocalGatewayRouteTables',198      'LocalGatewayVirtualInterfaceGroups', 'LocalGatewayVirtualInterfaces',199      'LocalGateways'200  ]201  def __init__(self, *args, **kwargs):202    super().__init__(*args, **kwargs)203    ec2_svc_file = _THIS_DIR / 'svcs' / 'ec2.yml'204    with ec2_svc_file.open('r') as f:205      self._spec = yaml.safe_load(f)206  def _patch_client(self):207    # Force loading of the pagination config208    self._client.can_paginate('describe_instances')209    assert 'page_config' in self._client._cache210    for py_name, result_key in self.MISSING_PAGINATION.items():211      op_name = self._client._PY_TO_OP_NAME[py_name]212      self._client._cache['page_config'][op_name] = {213          'input_token': 'NextToken',214          'output_token': 'NextToken',215          'limit_key': 'MaxResults',216          'result_key': result_key217      }218  def _should_import(self, key: str) -> bool:219    return key.startswith('describe_') and key not in self.SKIPLIST and not (220        self._is_gov and key in self.GOV_SKIPLIST)221  def _list_args(self, key: str) -> Dict:222    return self.EXTRA_ARGS.get(key, {})223  def _paginate_args(self, key: str) -> Dict:224    # TODO: key this off of output shape225    page_size = self.PAGE_SIZES.get(key, 100)226    return {'PaginationConfig': {'PageSize': page_size}}227  def _map_error_code(self, code: str, resource_name: str) -> Optional[str]:228    if code == 'InvalidSpotDatafeed.NotFound':229      return 'Missing subscription'230    elif code == 'InvalidAction' and resource_name in self.INVALID_ACTIONS:231      return 'Missing Input params'232    return None233class S3ClientProxy(ClientProxy):234  MISSING_PAGINATION = {235      'list_bucket_analytics_configurations': 'AnalyticsConfigurationList',236      'list_bucket_inventory_configurations': 'InventoryConfigurationList',237      'list_bucket_metrics_configurations': 'MetricsConfigurationList'238  }239  GET_PREFIX = 'get_bucket_'240  LIST_PREFIX = 'list_bucket_'241  SKIPLIST = [242      # deprecated methods covered by other calls243      'get_bucket_notification',244      'get_bucket_lifecycle',245      'list_bucket_intelligent_tiering_configurations'246  ]247  GOV_SKIPLIST = ['get_bucket_accelerate_configuration']248  def _patch_client(self):249    # Force loading of the pagination config250    self._client.can_paginate('list_bucket_analytics_configurations')251    for py_name, result_key in self.MISSING_PAGINATION.items():252      op_name = self._client._PY_TO_OP_NAME[py_name]253      self._client._cache['page_config'][op_name] = {254          'input_token': 'ContinuationToken',255          'output_token': 'NextContinuationToken',256          'result_key': result_key257      }258  def _should_import(self, key: str) -> bool:259    if key in self.SKIPLIST:260      return False261    elif self._is_gov and key in self.GOV_SKIPLIST:262      return False263    elif key.startswith(self.LIST_PREFIX):264      return True265    elif key.startswith(self.GET_PREFIX):266      # Only return true if there is not a corresponding list call267      item = key[len(self.GET_PREFIX):]268      list_op = f'list_bucket_{item}s'269      return not hasattr(self._client, list_op)270    else:271      return False272class ELBClientProxy(ClientProxy):273  SKIPLIST = [274      'describe_load_balancers', 'describe_account_limits',275      'describe_instance_health', 'describe_load_balancer_policy_types'276  ]277  def _should_import(self, key: str) -> bool:278    return key.startswith('describe_') and key not in self.SKIPLIST279class LambdaClientProxy(ClientProxy):280  SKIPLIST: List[str] = []281  def _should_import(self, key: str) -> bool:282    return key.startswith('list_') and key not in self.SKIPLIST283class CloudtrailClientProxy(ClientProxy):284  SKIPLIST: List[str] = []285  def _should_import(self, key: str) -> bool:286    return key.startswith('describe_') and key not in self.SKIPLIST287class RDSClientProxy(ClientProxy):288  SKIPLIST = [289      'describe_custom_availability_zones',290      'describe_installation_media',291      'describe_export_tasks',292      'describe_reserved_db_instances_offerings',293      'describe_db_engine_versions',294      'describe_valid_db_instance_modifications',295      'describe_option_group_options',296      'describe_db_proxies',  # Not sure why this comes back with unknown297      'describe_global_clusters',298      # migrating away299      'describe_db_snapshots',300      'describe_db_cluster_snapshots'301  ]302  def _should_import(self, key: str) -> bool:303    return key.startswith('describe_') and key not in self.SKIPLIST304class IAMClientProxy(ClientProxy):305  SKIPLIST: List[str] = []306  MISSING_PAGINATION = {'list_user_tags': 'Tags', 'list_role_tags': 'Tags'}307  def _should_import(self, key: str) -> bool:308    return key.startswith('list_') and key not in self.SKIPLIST309  def _patch_client(self):310    # Force loading of the pagination config311    self._client.can_paginate('list_users')312    for py_name, result_key in self.MISSING_PAGINATION.items():313      op_name = self._client._PY_TO_OP_NAME[py_name]314      self._client._cache['page_config'][op_name] = {315          'input_token': 'Marker',316          'limit_key': 'MaxItems',317          'more_results': 'IsTruncated',318          'output_token': 'Marker',319          'result_key': result_key320      }321class AWSFetch(object):322  SVC_CLASS = {323      'ec2': EC2ClientProxy,324      's3': S3ClientProxy,325      'elb': ELBClientProxy,326      'iam': IAMClientProxy,327      'rds': RDSClientProxy,328      'lambda': LambdaClientProxy,329      'cloudtrail': CloudtrailClientProxy330  }331  def __init__(self, boto: Boto):332    self._boto = boto333  def client(self,334             service: str,335             region: Optional[str] = None) -> 'ClientProxy':336    client_class = self.SVC_CLASS.get(service, ClientProxy)337    kwargs = {}338    if region is not None:339      kwargs['region_name'] = region340    client = self._boto.create_client(service, **kwargs)341    return client_class(client, service)342class ServiceProxy(object):343  def __init__(self, impl: ClientProxy):344    self._impl = impl345  def resource_names(self) -> Iterator[str]:346    return self._impl.resource_names()347  def list(self, resource: str, **kwargs) -> Optional[Tuple[str, Any]]:348    _log.debug(f'calling list {resource} {kwargs}')349    return self._impl.list(resource, kwargs, retry_on_throttle=True)350  def get(self, resource: str, **kwargs):351    _log.debug(f'calling get {resource} {kwargs}')352    return self._impl.get(resource, kwargs)353  def canonical_name(self, py_name: str) -> str:354    return self._impl.canonical_name(py_name)355class Proxy(object):356  @classmethod357  def build(cls, boto: Boto, patch_id: Optional[int] = None) -> 'Proxy':358    return cls(AWSFetch(boto))359  @classmethod360  def dummy(cls, boto: Boto) -> 'Proxy':361    return cls(AWSFetch(boto))362  def __init__(self, aws: AWSFetch):363    self._aws = aws364  def service(self,365              service: str,366              region: Optional[str] = None) -> ServiceProxy:...app.py
Source:app.py  
...12    if inflight_network_test_details:13        for test_detail in inflight_network_test_details:14            logging.info("test detail >> " + str(test_detail))15            try:16                response = ec2Client.describe_network_insights_analyses(17                    NetworkInsightsAnalysisIds=[18                        test_detail['NetworkInsightsAnalysisId']],19                    NetworkInsightsPathId=test_detail['NetworkInsightsPathId']20                )21            except botocore.exceptions.ClientError as error:22                logging.error(23                    "Call to describe_network_insights_analyses failed")24                raise error25            test_status = response['NetworkInsightsAnalyses'][0]['Status']26            updated_route_details = {27                'Source': test_detail['Source'],28                'Destination': test_detail['Destination'],29                'RouteTag': test_detail['RouteTag'],30                'NetworkInsightsPathId': test_detail['NetworkInsightsPathId'],...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
