Best Python code snippet using molecule_python
test_cloud_vendor_prefixes.py
Source:test_cloud_vendor_prefixes.py  
1import json2import tempfile3from pathlib import Path4import pytest5from netlookup.network_sets import base6from netlookup.network import NetworkError7from netlookup.prefixes import Prefixes8from netlookup.network_sets.aws import AWS, AWSPrefix9from netlookup.network_sets.azure import Azure, AzurePrefix10from netlookup.network_sets.gcp import GCP, GCPPrefix11from netlookup.network_sets.google import GoogleServices, GoogleServicePrefix12from ..constants import MATCH_PREFIXES_LOOKUPS, INVALID_ADDRESS_LOOKUPS13DATA_PATH = Path(__file__).parent.absolute().joinpath('data')14TEST_DATA_PREFIX_LEN = 408515TEST_NETWORK_ADDRESSES = (16    {17        'address': '8.34.208.0',18        'network': '8.34.208.0/20',19    },20    {21        'address': '8.34.223.10',22        'network': '8.34.208.0/20',23    },24    {25        'address': '8.34.223.254',26        'network': '8.34.208.0/20',27    },28    {29        'address': '8.34.223.255',30        'network': '8.34.208.0/20',31    },32    {33        'address': '8.34.224.0',34    },35    {36        'address': '2c0f:fb50:4000::0',37        'network': '2c0f:fb50:4000::/36'38    },39    {40        'address': '2c0f:fb50:4000::10',41        'network': '2c0f:fb50:4000::/36'42    },43    {44        'address': '2c0f:fb50:4fff:ffff:ffff:ffff:ffff:fffe',45        'network': '2c0f:fb50:4000::/36'46    },47    {48        'address': '2c0f:fb50:4fff:ffff:ffff:ffff:ffff:ffff',49        'network': '2c0f:fb50:4000::/36'50    },51    {52        'address': '2c0f:fb50:5000::0'53    }54)55TEST_JSON_REQUIRED_FIELDS = ('type', 'cidr')56TEST_FILTER_COUNTS = {57    'invalidvalue': 0,58    'aws': 1414,59    'azure': 2574,60    'gcp': 72,61    'google': 25,62}63@pytest.fixture64def mock_open_permission_denied(monkeypatch):65    """66    Fixture to mock67    """68    # pylint: disable=unused-argument69    def permission_denied(*args):70        """71        Always return false for os.access72        """73        raise PermissionError74    monkeypatch.setattr(base.Path, 'open', permission_denied)75def validate_prefix_list(network_set, loader_class):76    """77    Validate prefix list78    """79    for network in network_set:80        assert isinstance(network, loader_class)81    for network in network_set:82        assert isinstance(network, loader_class)83        value = network.__repr__()84        assert isinstance(value, str)85        assert value == str(network)86def test_cloud_vendor_prefixes_networks_no_data():87    """88    Test creating networks with no data using new temporary directory89    """90    tempdir = tempfile.mkdtemp()91    prefixes = Prefixes(cache_directory=tempdir)92    assert len(prefixes) == 093def test_cloud_vendor_prefixes_missing_directory():94    """95    Test creating cache directory96    """97    tempdir = tempfile.mkdtemp()98    cache_directory = Path(tempdir, 'test')99    assert not cache_directory.exists()100    prefixes = Prefixes(cache_directory=cache_directory)101    assert cache_directory.exists()102    assert len(prefixes) == 0103# pylint: disable=unused-argument,redefined-outer-name104def test_cloud_vendor_prefixes_missing_directory_create_error(mock_path_mkdir_permission_denied):105    """106    Test creating cache directory107    """108    cache_directory = Path('/D78B28E2-14AC-47B7-90DF-DC2694651038')109    assert not cache_directory.exists()110    with pytest.raises(NetworkError):111        Prefixes(cache_directory=cache_directory)112    assert not cache_directory.exists()113def test_cloud_vendor_prefixes_networks_iterator_load():114    """115    Test loading networks as side effect of iterator116    """117    tempdir = tempfile.mkdtemp()118    prefixes = Prefixes(cache_directory=tempdir)119    for vendor in prefixes.vendors:120        assert len(vendor) == 0121        next(vendor)122        assert len(vendor) > 0123def test_cloud_vendor_prefixes_networks_save():124    """125    Test saving networks as side effect of iterator126    """127    tempdir = tempfile.mkdtemp()128    prefixes = Prefixes(cache_directory=tempdir)129    for vendor in prefixes.vendors:130        assert len(vendor) == 0131        next(vendor)132        assert len(vendor) > 0133    prefixes.save()134    prefixes = Prefixes(cache_directory=tempdir)135    for vendor in prefixes.vendors:136        assert len(vendor) > 0137# pylint: disable=unused-argument138def test_cloud_vendor_prefixes_networks_save_no_permission(mock_open_permission_denied):139    """140    Test saving networks to files without permissions141    """142    tempdir = tempfile.mkdtemp()143    prefixes = Prefixes(cache_directory=tempdir)144    for vendor in prefixes.vendors:145        assert len(vendor) == 0146        next(vendor)147        assert len(vendor) > 0148    with pytest.raises(NetworkError):149        prefixes.save()150def test_cloud_vendor_networks_loading():151    """152    Test creating networks with test data, ensuring correct number of networks is loaded153    """154    prefixes = Prefixes(cache_directory=DATA_PATH)155    assert len(prefixes) == TEST_DATA_PREFIX_LEN156def test_cloud_vendor_prefix_filtering():157    """158    Test filtering cloud vendor prefixes159    """160    prefixes = Prefixes(cache_directory=DATA_PATH)161    for name, count in TEST_FILTER_COUNTS.items():162        matches = prefixes.filter_type(name)163        assert len(matches) == count164def test_cloud_vendor_prefix_find_valid_addresses():165    """166    Test finding of addresses from cloud vendor prefixes167    """168    prefixes = Prefixes(cache_directory=DATA_PATH)169    for value in MATCH_PREFIXES_LOOKUPS:170        address = prefixes.find(value)171        assert address is not None172def test_cloud_vendor_prefix_find_invalid_addresses():173    """174    Test finding of invalid address values from cloud vendor prefixes175    """176    prefixes = Prefixes(cache_directory=DATA_PATH)177    for value in INVALID_ADDRESS_LOOKUPS:178        with pytest.raises(NetworkError):179            prefixes.find(value)180def test_cloud_vendor_prefix_lookup():181    """182    Test looking up networks from test data183    """184    prefixes = Prefixes(cache_directory=DATA_PATH)185    for testcase in TEST_NETWORK_ADDRESSES:186        prefix = prefixes.find(testcase['address'])187        network = testcase.get('network', None)188        if network is not None:189            assert prefix is not None190            assert str(prefix.cidr) == network191        else:192            assert prefix is None193def test_cloud_vendor_json_formatting():194    """195    Test all loaded networks can be formatted as JSON196    """197    prefixes = Prefixes(cache_directory=DATA_PATH)198    for prefix in prefixes:199        testdata = json.loads(json.dumps(prefix.as_dict()))200        for field in TEST_JSON_REQUIRED_FIELDS:201            assert field in testdata202            assert testdata[field] is not None203def test_cloud_vendor_prefixes_invalid_vendor():204    """205    Test cases for AWS prefixes206    """207    prefixes = Prefixes(cache_directory=DATA_PATH)208    with pytest.raises(NetworkError):209        prefixes.get_vendor('8522A483-A895-40F3-B5BE-93DDDCA58E51')210def test_cloud_vendor_prefixes_aws():211    """212    Test cases for AWS prefixes213    """214    prefixes = Prefixes(cache_directory=DATA_PATH)215    vendor = prefixes.get_vendor('aws')216    assert isinstance(vendor, AWS)217    validate_prefix_list(vendor, AWSPrefix)218    regions = vendor.regions219    assert isinstance(regions, list)220    for region in regions:221        assert isinstance(region, str)222def test_cloud_vendor_prefixes_azure():223    """224    Test cases for Azure prefixes225    """226    prefixes = Prefixes(cache_directory=DATA_PATH)227    vendor = prefixes.get_vendor('azure')228    assert isinstance(vendor, Azure)229    validate_prefix_list(vendor, AzurePrefix)230def test_cloud_vendor_prefixes_gcp():231    """232    Test cases for GCP prefixes233    """234    prefixes = Prefixes(cache_directory=DATA_PATH)235    vendor = prefixes.get_vendor('gcp')236    assert isinstance(vendor, GCP)237    validate_prefix_list(vendor, GCPPrefix)238def test_cloud_vendor_prefixes_google_services():239    """240    Test cases for google services prefixes241    """242    prefixes = Prefixes(cache_directory=DATA_PATH)243    vendor = prefixes.get_vendor('google')244    assert isinstance(vendor, GoogleServices)...prefixes.py
Source:prefixes.py  
...12    """13    def __init__(self, cache_directory=None):14        super().__init__()15        if cache_directory is None:16            cache_directory = get_cache_directory()17        self.cache_directory = Path(cache_directory).expanduser()18        self.vendors = [19            AWS(cache_directory=self.cache_directory),20            Azure(cache_directory=self.cache_directory),21            GCP(cache_directory=self.cache_directory),22            GoogleServices(cache_directory=self.cache_directory),23        ]24        self.load()25    def update(self):26        """27        Fetch and update cached prefix data28        """29        for vendor in self.vendors:30            try:...FileCache.py
Source:FileCache.py  
1from __future__ import absolute_import, division, print_function, unicode_literals2import gzip3import json4import os5_NONE = object()6class FileCache(object):7    """A two-level cache, which stores expensive results in memory and on disk.8    """9    def __init__(self, cache_directory, creator, open=gzip.open, suffix='.gz'):10        self.cache_directory = cache_directory11        self.creator = creator12        self.open = open13        self.suffix = suffix14        self.cached_data = {}15        if not os.path.exists(self.cache_directory):16            os.makedirs(self.cache_directory)17    def get_file_data(self, name):18        if os.path.exists(filename):19            return json.load(self.open(filename))20        result = self.creator(name)21        return result22    def get_data(self, name, save_in_cache, can_create, default=None):23        name = str(name)24        result = self.cached_data.get(name, _NONE)25        if result is _NONE:26            filename = os.path.join(self.cache_directory, name) + self.suffix27            if os.path.exists(filename):28                result = json.load(self.open(filename)) or _NONE29            if result is _NONE and can_create:30                result = self.creator(name)31                if save_in_cache:32                    json.dump(result, self.open(filename, 'w'))33        return default if result is _NONE else result34    def _files(self):35        return os.listdir(self.cache_directory)36    def cache_list(self):37        for f in self._files():38            if f.endswith(self.suffix):39                yield f[:-len(self.suffix)]40    def file_count(self):41        return len(self._files())42    def clear(self):43        """Clears both local files and memory."""44        self.cached_data = {}45        for f in self._files():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
