Best Python code snippet using localstack_python
local_file_system.py
Source:local_file_system.py  
1import asyncio2import io3import logging4import os5from concurrent.futures import Executor, ThreadPoolExecutor6from datetime import MINYEAR, datetime7from fnmatch import fnmatch8from pathlib import Path9from typing import AsyncIterable, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union10from . import file_filter, protocol11logger = logging.getLogger(__name__)12# pylint: disable=invalid-name13default_executor: Optional[Executor] = None14def _get_default_executor():15    # pylint: disable=global-statement16    global default_executor17    if default_executor is None:18        default_executor = ThreadPoolExecutor(10)19    return default_executor20class AsyncFile(protocol.FileReader):21    _file: BinaryIO22    _buffer: bytes = bytes()23    _offset: int = 024    _size: int25    file_path: Path26    def __init__(self, file_path: Path, mode: str, executor = None, **kwargs):27        self.file_path = file_path28        self._executor = executor29        self._file = file_path.open(mode + "b", buffering=False, **kwargs)30        try:31            self._size = os.fstat(self._file.fileno()).st_size32        except:33            self._file.close()34            raise35    @classmethod36    async def open(cls, file_path: Path, mode: str, executor = None, **kwargs) -> "AsyncFile":37        if executor is None:38            executor = _get_default_executor()39        return await asyncio.get_running_loop().run_in_executor(40            executor, lambda: AsyncFile(file_path, mode, executor, **kwargs))41    async def read(self, num_bytes: int = -1) -> bytes:42        if num_bytes >= 0:43            if self._buffer:44                next_offset = self._offset + min(num_bytes, len(self._buffer) - self._offset)45                result = self._buffer[self._offset: next_offset]46                if len(self._buffer) == next_offset:47                    self._buffer = bytes()48                    self._offset = 049                else:50                    self._offset = next_offset51                return result52            buffer = await asyncio.get_running_loop().run_in_executor(53                self._executor, self._file.read, protocol.READ_SIZE)54            if len(buffer) > num_bytes:55                self._buffer = buffer56                self._offset = num_bytes57                return self._buffer[:self._offset]58            return buffer59        result = await asyncio.get_running_loop().run_in_executor(self._executor, self._file.read, -1)60        if self._buffer:61            result = self._buffer[self._offset:] + result62            self._buffer = bytes()63            self._offset = 064        return result65    async def write(self, buffer: bytes):66        await asyncio.get_running_loop().run_in_executor(self._executor, self._file.write, buffer)67    def seek(self, offset: int, whence: int):68        if self._buffer:69            self._buffer = bytes()70            self._offset = 071        self._file.seek(offset, whence)72    def tell(self) -> int:73        return self._file.tell() - self._offset74    def close(self):75        self._file.close()76    @property77    def file_size(self) -> Optional[int]:78        return self._size79class BytesReader(protocol.FileReader):80    def __init__(self, content: bytes):81        self._reader = io.BytesIO(content)82    async def read(self, num_bytes: int = None) -> bytes:83        return self._reader.read(num_bytes)84    def close(self):85        pass86    @property87    def file_size(self) -> Optional[int]:88        return self._reader.getbuffer().nbytes89async def async_stat(file_path: Path, executor = None):90    if executor is None:91        executor = _get_default_executor()92    return await asyncio.get_running_loop().run_in_executor(executor, file_path.stat)93async def _restore_directory(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):94    logger.info(f"Restoring directory {child_path}")95    if clobber_existing and (child_path.is_symlink() or (child_path.exists() and not child_path.is_dir())):96        child_path.unlink()97    if content is not None:98        raise ValueError("Content cannot be supplied for directory")99    child_path.mkdir(parents=False, exist_ok=True)100async def _restore_regular(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):101    if clobber_existing and (child_path.is_symlink() or child_path.exists()):102        # This deliberately will fail if the child is a directory. We don't want want to remove an entire directory tree103        logger.debug("Removing original %s", child_path)104        child_path.unlink()105    logger.info("Restoring file %s", child_path)106    with AsyncFile(child_path, 'x') as file:107        bytes_read = await content.read(protocol.READ_SIZE)108        while bytes_read:109            await file.write(bytes_read)110            bytes_read = await content.read(protocol.READ_SIZE)111async def _restore_link(child_path: Path, content: Optional[protocol.FileReader],  clobber_existing: bool):112    logger.info(f"Restoring symbolic link {child_path}")113    if clobber_existing and (child_path.is_symlink() or child_path.exists()):114        child_path.unlink()115    link_content = await content.read(protocol.READ_SIZE)116    extra_bytes = await content.read(protocol.READ_SIZE)117    while extra_bytes:118        link_content += extra_bytes119        extra_bytes = await content.read(protocol.READ_SIZE)120    os.symlink(dst=child_path, src=link_content)121async def _restore_pipe(child_path: Path, content: Optional[protocol.FileReader],  clobber_existing: bool):122    logger.info("Restoring child %s", child_path)123    if content is not None:124        if await content.read(1):125            raise ValueError("Cannot restore pipe with content")126    if clobber_existing:127        if child_path.is_symlink():128            child_path.unlink()129        elif child_path.is_fifo():130            return131        elif child_path.exists():132            child_path.unlink()133    elif child_path.is_fifo():134        return135    os.mkfifo(child_path)136class LocalDirectoryExplorer(protocol.DirectoryExplorer):137    _EXCLUDED_DIR_INODE = protocol.Inode(138        type=protocol.FileType.DIRECTORY, mode=0, modified_time=datetime(year=MINYEAR, month=1, day=1),139        size=None, uid=0, gid=0, hash=None)140    _INCLUDED_FILE_TYPES = {141        # Here we implicitly ignore device files and sockets as they are not properly supported142        protocol.FileType.DIRECTORY,143        protocol.FileType.REGULAR,144        protocol.FileType.LINK,145        protocol.FileType.PIPE,146    }147    _RESTORE_TYPES = {148        protocol.FileType.DIRECTORY: _restore_directory,149        protocol.FileType.REGULAR: _restore_regular,150        protocol.FileType.LINK: _restore_link,151        protocol.FileType.PIPE: _restore_pipe,152    }153    def __init__(self, base_path: Path,154                 filter_node: Optional[file_filter.FilterPathNode],155                 ignore_patterns: List[str],156                 all_files: Dict[Tuple[int, int], protocol.Inode]):157        self._base_path = base_path158        self._all_files = all_files159        self._ignore_patterns = ignore_patterns160        self._filter_node = filter_node161        self._children = {}162    def iter_children(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:163        if self._filter_node is None or self._filter_node.filter_type is protocol.FilterType.INCLUDE:164            return self._iter_included_directory()165        if self._filter_node.filter_type is protocol.FilterType.EXCLUDE:166            return self._iter_excluded_directory()167        raise ValueError(f"Normalized filter node had type {self._filter_node.filter_type}. This should have been "168                         f"either {protocol.FilterType.INCLUDE} or {protocol.FilterType.EXCLUDE}")169    async def _iter_included_directory(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:170        for child in self._base_path.iterdir():171            child_name = child.name172            if self._filter_node is not None and child_name in self._filter_node.exceptions and \173                    self._filter_node.exceptions[child_name].filter_type is protocol.FilterType.EXCLUDE:174                # If this child is explicitly excluded ...175                exception_count = len(self._filter_node.exceptions[child_name].exceptions)176                if exception_count:177                    logger.debug("Skipping %s on filter with %s exceptions", child, exception_count)178                    yield child_name, self._EXCLUDED_DIR_INODE.copy()179                else:180                    logger.debug("Skipping %s on filter", child)181                continue182            if self._should_pattern_ignore(child):183                logger.debug("Skipping file for pattern %s", child)184                continue185            inode = self._stat_child(child_name)186            if inode.type not in self._INCLUDED_FILE_TYPES:187                logger.debug("Skipping %s for type %s", child, inode.type)188                continue189            yield child.name, inode190    async def _iter_excluded_directory(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:191        # This LocalDirectoryExplorer has been created for an excluded directory, but there may be exceptions.192        logger.debug("Listing %s exceptions for %s ", len(self._filter_node.exceptions), self._base_path)193        for child_name, exception in self._filter_node.exceptions.items():194            child = self._base_path / child_name195            if exception.filter_type is protocol.FilterType.INCLUDE:196                # Exception to include this child.197                if self._should_pattern_ignore(child):198                    logger.warning("File explicitly included but then excluded by pattern %s", child)199                elif child.exists():200                    yield child_name, self._stat_child(child_name)201            elif child.is_dir():202                # Looks like there is a child of the child that's the real exception.203                yield child_name, self._EXCLUDED_DIR_INODE.copy()204            elif exception.exceptions:205                # This is an edge case.  An INCLUDE filter can be made for a child directory where the parent is206                # not actually a directory.  Remember filters can name files that don't actually exist.207                # Lets say the user EXCLUDEs /foo and INCLUDEs /foo/bar/baz in filters.208                # Then the user creates a file (not directory) named /foo/bar ... What are we supposed to do now?209                # Let's warn the user they've been a bit stupid and do NOT backup /foo/bar in any way.210                # That's because at this point we know /foo/bar is excluded and /foo/bar/baz doesn't exist.211                child_exception = exception212                meaningful_name = self._base_path / child_name213                try:214                    while child_exception.filter_type is not protocol.FilterType.INCLUDE:215                        meaningful_name = meaningful_name / next(iter(exception.exceptions.keys()))216                        child_exception = child_exception.exceptions[meaningful_name.name]217                    logger.warning("%s was included but %s is actually a file!  Ignoring filters under %s",218                                   meaningful_name, self._base_path / child_name, self._base_path / child_name)219                except StopIteration:220                    # This clause really should never occur.  Meaningless exceptions are supposed to be pruned...221                    # We are currently on an EXCLUDE which has exceptions so there should be an INCLUDE in it's222                    # children.  But let's not break just because we failed to write a more meaningful warning.223                    logger.warning("%s/.../%s was included but %s is actually a file!  Ignoring filters under %s",224                                   self._base_path, child_name, self._base_path / child_name,225                                   self._base_path / child_name)226    def _should_pattern_ignore(self, child: Path) -> bool:227        child_name = child.name228        for pattern in self._ignore_patterns:229            if fnmatch(child_name, pattern):230                logger.debug("Skipping %s on pattern %s", child, pattern)231                return True232        return False233    def _stat_child(self, child: str) -> protocol.Inode:234        inode = self._children.get(child)235        if inode is not None:236            return inode237        file_path = self._base_path / child238        file_stat = file_path.lstat()239        inode = self._all_files.get((file_stat.st_dev, file_stat.st_ino))240        if inode is None:241            inode = protocol.Inode.from_stat(file_stat, None)242        self._children[child] = inode243        if inode.type is not protocol.FileType.DIRECTORY:244            self._all_files[(file_stat.st_dev, file_stat.st_ino)] = inode245        return inode246    def __str__(self) -> str:247        return str(self._base_path)248    async def inode(self) -> protocol.Inode:249        if self._filter_node is not None and self._filter_node.filter_type is protocol.FilterType.EXCLUDE:250            return self._EXCLUDED_DIR_INODE.copy()251        stat = self._base_path.lstat()252        inode = protocol.Inode.from_stat(stat, hash_value=None)253        self._all_files[(stat.st_dev, stat.st_ino)] = inode254        return inode255    async def open_child(self, name: str) -> protocol.FileReader:256        child_type = self._stat_child(name).type257        child_path = self._base_path / name258        if child_type is protocol.FileType.REGULAR:259            return AsyncFile(child_path, 'r')260        if child_type is protocol.FileType.LINK:261            return BytesReader(os.readlink(child_path).encode())262        if child_type is protocol.FileType.PIPE:263            return BytesReader(bytes(0))264        raise ValueError(f"Cannot open child of type {child_type}")265    async def restore_child(self, name: str, type_: protocol.FileType, content: Optional[protocol.FileReader],266                            clobber_existing: bool):267        try:268            restore_function = self._RESTORE_TYPES[type_]269        except KeyError:270            raise ValueError(f"Cannot restore file of type {type_}") from None271        child_path = self._base_path / name272        self._children.pop(name, None)273        await restore_function(child_path=child_path, content=content, clobber_existing=clobber_existing)274    async def restore_meta(self, name: str, meta: protocol.Inode, toggle: Dict[str,bool]):275        child_path = self._base_path / name276        if toggle.get('mode', True):277            os.chmod(child_path, mode=meta.mode, follow_symlinks=False)278        change_uid = toggle.get('uid', True)279        change_gid = toggle.get('gid', True)280        if change_uid or change_gid:281            os.chown(282                path=child_path,283                uid=meta.uid if change_uid else -1,284                gid=meta.gid if change_gid else -1,285                follow_symlinks=False,286            )287        if toggle.get('modified_time', True):288            mod_time = meta.modified_time.timestamp()289            os.utime(path=child_path, times=(mod_time, mod_time), follow_symlinks=False)290    def get_child(self, name: str) -> protocol.DirectoryExplorer:291        return type(self)(292            base_path=self._base_path / name,293            filter_node=self._filter_node.exceptions.get(name) if self._filter_node is not None else None,294            ignore_patterns=self._ignore_patterns,295            all_files=self._all_files,296        )297    def get_path(self, name: Optional[str]) -> str:298        if name is None:299            return str(self._base_path)300        return str(self._base_path / name)301class LocalFileSystemExplorer:302    _all_files: Dict[Tuple[int, int], protocol.Inode]303    def __init__(self):304        self._all_files = {}305    def __call__(self, directory_root: Union[str, Path],306                 filters: Iterable[protocol.Filter] = ()) -> LocalDirectoryExplorer:307        base_path = Path(directory_root)308        if not base_path.is_dir():309            if not base_path.exists():310                raise FileNotFoundError(f"Backup path doesn't exist: {base_path}")311            raise ValueError(f"Backup path is not a directory: {base_path}")312        ignore_patterns, root_filter_node = file_filter.normalize_filters(filters)313        return LocalDirectoryExplorer(314            base_path=Path(base_path),315            ignore_patterns=ignore_patterns,316            filter_node=root_filter_node,317            all_files=self._all_files,...db.py
Source:db.py  
...34               union_id=None, id__in=None, **kwargs):35        clone = self._clone()36        clone._filter_union_id(union_id)37        clone._filter_prefer(prefer, prefer_id)38        clone._filter_node(node)39        clone._filter_assets(assets)40        clone._filter_other(kwargs)41        clone._filter_id_in(id__in)42        return clone43    def _filter_union_id(self, union_id):44        if not union_id:45            return46        cleaned_union_id = union_id.split('_')47        # 妿union_idéä¸è¿æ¬æ£æ¥ï¼ä»£è¡¨å¯è½ä¸æ¯æ¬backend, åºè¯¥è¿å空48        if not self._check_union_id(union_id, cleaned_union_id):49            self.queryset = self.queryset.none()50            return51        return self._perform_filter_union_id(union_id, cleaned_union_id)52    def _check_union_id(self, union_id, cleaned_union_id):53        return union_id and len(cleaned_union_id) == self.union_id_length54    def _perform_filter_union_id(self, union_id, union_id_cleaned):55        self.queryset = self.queryset.filter(union_id=union_id)56    def _filter_assets(self, assets):57        assets_id = self.make_assets_as_id(assets)58        if assets_id:59            self.queryset = self.queryset.filter(asset_id__in=assets_id)60    def _filter_node(self, node):61        pass62    def _filter_id_in(self, ids):63        if ids and isinstance(ids, list):64            self.queryset = self.queryset.filter(union_id__in=ids)65    @staticmethod66    def clean_kwargs(kwargs):67        return {k: v for k, v in kwargs.items() if v}68    def _filter_other(self, kwargs):69        kwargs = self.clean_kwargs(kwargs)70        if kwargs:71            self.queryset = self.queryset.filter(**kwargs)72    def _filter_prefer(self, prefer, prefer_id):73        pass74    def search(self, item):75        qs = []76        for i in ['hostname', 'ip', 'username']:77            kwargs = {i + '__startswith': item}78            qs.append(Q(**kwargs))79        q = reduce(lambda x, y: x | y, qs)80        clone = self._clone()81        clone.queryset = clone.queryset.filter(q).distinct()82        return clone83class SystemUserBackend(DBBackend):84    model = SystemUser.assets.through85    backend = 'system_user'86    prefer = backend87    base_score = 088    union_id_length = 289    def _filter_prefer(self, prefer, prefer_id):90        if prefer and prefer != self.prefer:91            self.queryset = self.queryset.none()92        if prefer_id:93            self.queryset = self.queryset.filter(systemuser__id=prefer_id)94    def _perform_filter_union_id(self, union_id, union_id_cleaned):95        system_user_id, asset_id = union_id_cleaned96        self.queryset = self.queryset.filter(97            asset_id=asset_id, systemuser__id=system_user_id,98        )99    def _perform_delete_by_union_id(self, union_id_cleaned):100        system_user_id, asset_id = union_id_cleaned101        system_user = get_object_or_none(SystemUser, pk=system_user_id)102        asset = get_object_or_none(Asset, pk=asset_id)103        if all((system_user, asset)):104            system_user.assets.remove(asset)105    def _filter_node(self, node):106        if node:107            self.queryset = self.queryset.filter(asset__nodes__id=node.id)108    def get_annotate(self):109        kwargs = dict(110            hostname=F("asset__hostname"),111            ip=F("asset__ip"),112            username=F("systemuser__username"),113            password=F("systemuser__password"),114            private_key=F("systemuser__private_key"),115            public_key=F("systemuser__public_key"),116            score=F("systemuser__priority") + self.base_score,117            version=Value(0, IntegerField()),118            date_created=F("systemuser__date_created"),119            date_updated=F("systemuser__date_updated"),120            asset_username=Concat(F("asset__id"), Value("_"),121                                  F("systemuser__username"),122                                  output_field=CharField()),123            union_id=Concat(F("systemuser_id"), Value("_"), F("asset_id"),124                            output_field=CharField()),125            org_id=F("asset__org_id"),126            backend=Value(self.backend, CharField())127        )128        return kwargs129    def get_filter(self):130        return dict(131            systemuser__username_same_with_user=False,132        )133    def all(self):134        kwargs = self.get_annotate()135        filters = self.get_filter()136        qs = self.model.objects.all().annotate(**kwargs)137        if current_org.org_id() is not None:138            filters['org_id'] = current_org.org_id()139        qs = qs.filter(**filters)140        qs = self.qs_to_values(qs)141        return qs142class DynamicSystemUserBackend(SystemUserBackend):143    backend = 'system_user_dynamic'144    prefer = 'system_user'145    union_id_length = 3146    def get_annotate(self):147        kwargs = super().get_annotate()148        kwargs.update(dict(149            username=F("systemuser__users__username"),150            asset_username=Concat(151                F("asset__id"), Value("_"),152                F("systemuser__users__username"),153                output_field=CharField()154            ),155            union_id=Concat(156                F("systemuser_id"), Value("_"), F("asset_id"),157                Value("_"), F("systemuser__users__id"),158                output_field=CharField()159            ),160            users_count=Count('systemuser__users'),161        ))162        return kwargs163    def _perform_filter_union_id(self, union_id, union_id_cleaned):164        system_user_id, asset_id, user_id = union_id_cleaned165        self.queryset = self.queryset.filter(166            asset_id=asset_id, systemuser_id=system_user_id,167            union_id=union_id,168        )169    def _perform_delete_by_union_id(self, union_id_cleaned):170        system_user_id, asset_id, user_id = union_id_cleaned171        system_user = get_object_or_none(SystemUser, pk=system_user_id)172        if not system_user:173            return174        system_user.users.remove(user_id)175        if system_user.users.count() == 0:176            system_user.assets.remove(asset_id)177    def get_filter(self):178        return dict(179            users_count__gt=0,180            systemuser__username_same_with_user=True181        )182class AdminUserBackend(DBBackend):183    model = Asset184    backend = 'admin_user'185    prefer = backend186    base_score = 200187    def _filter_prefer(self, prefer, prefer_id):188        if prefer and prefer != self.backend:189            self.queryset = self.queryset.none()190        if prefer_id:191            self.queryset = self.queryset.filter(admin_user__id=prefer_id)192    def _filter_node(self, node):193        if node:194            self.queryset = self.queryset.filter(nodes__id=node.id)195    def _perform_filter_union_id(self, union_id, union_id_cleaned):196        admin_user_id, asset_id = union_id_cleaned197        self.queryset = self.queryset.filter(198            id=asset_id, admin_user_id=admin_user_id,199        )200    def _perform_delete_by_union_id(self, union_id_cleaned):201        raise PermissionError(_("Could not remove asset admin user"))202    def all(self):203        qs = self.model.objects.all().annotate(204            asset_id=F("id"),205            username=F("admin_user__username"),206            password=F("admin_user__password"),207            private_key=F("admin_user__private_key"),208            public_key=F("admin_user__public_key"),209            score=Value(self.base_score, IntegerField()),210            version=Value(0, IntegerField()),211            date_updated=F("admin_user__date_updated"),212            asset_username=Concat(F("id"), Value("_"), F("admin_user__username"), output_field=CharField()),213            union_id=Concat(F("admin_user_id"), Value("_"), F("id"), output_field=CharField()),214            backend=Value(self.backend, CharField()),215        )216        qs = self.qs_to_values(qs)217        return qs218class AuthbookBackend(DBBackend):219    model = AuthBook220    backend = 'db'221    prefer = backend222    base_score = 400223    def _filter_node(self, node):224        if node:225            self.queryset = self.queryset.filter(asset__nodes__id=node.id)226    def _filter_prefer(self, prefer, prefer_id):227        if not prefer or not prefer_id:228            return229        if prefer.lower() == "admin_user":230            model = AdminUser231        elif prefer.lower() == "system_user":232            model = SystemUser233        else:234            self.queryset = self.queryset.none()235            return236        obj = get_object_or_none(model, pk=prefer_id)237        if obj is None:...skydive_source.py
Source:skydive_source.py  
...77        "next": ("int", lambda r: r[2]),78    }79    record.update(added_fields)80    return record81def _filter_node(type_value):82    def _action(cnx):83        return cnx.socket.lookup_nodes(84            'G.V().Has("Type", "{}")'.format(type_value))85    return _action86def _filter_node_list(type_value, fields):87    def _action(cnx):88        nodes = cnx.socket.lookup_nodes(89            'G.V().Has("Type", "{}")'.format(type_value))90        return [91            (node.id, elt)92            for node in nodes93            for elt in reduce(lambda v, k: v.get(k, []), fields, node.metadata)94        ]95    return _action96def _filter_rel(type_value):97    def _action(cnx):98        return cnx.socket.lookup_edges(99            'G.E().Has("RelationType", "{}")'.format(type_value))100    return _action101def _metadata(field):102    if isinstance(field, list):103        return lambda node: reduce(lambda v, k: v.get(k), field, node.metadata)104    return lambda node: node.metadata[field]105def _id(node):106    return node.id107def _parent(edge):108    return edge.parent109def _child(edge):110    return edge.child111def elements_of_rule(elt, cnx):112    """Split elements of actions or filters of an Openflow rule113    :param elt: should be either 'actions' or 'filters'114    :param cnx: an open connection to Skydive115    :return: a list of string, each is an atomic action or filter.116    """117    return [118        (rule.id, e, i)119        for rule in _filter_node('ofrule')(cnx)120        for i, e in enumerate(rule.metadata[elt].split(',') + ['end'])121    ]122#: Describes how to bind values extracted from the python skydive client.123TABLES = {124    "sk_host": (125        _filter_node("host"),126        {127            "id": ("id", _id),128            "name": ("string", _metadata("Name")),129            "platform": ("string", _metadata("Platform"))130        }131    ),132    "sk_ovsswitch": (133        _filter_node("openvswitch"),134        {135            "id": ("id", _id),136            "name": ("string", _metadata("Name")),137        }138    ),139    "sk_ovsbridge": (140        _filter_node("ovsbridge"),141        {142            "id": ("id", _id),143            "name": ("string", _metadata("Name")),144        }145    ),146    "sk_ovsport": (147        _filter_node("ovsport"),148        {149            "id": ("id", _id),150            "name": ("string", _metadata("Name")),151        }152    ),153    "sk_patch": (154        _filter_node("patch"),155        {156            "id": ("id", _id),157            "name": ("string", _metadata("Name")),158            "index": ("int", _metadata("OfPort")),159            "peer": ("string", _metadata(["Ovs", "Options", "peer"])),160            "mac": ("string", _metadata("MAC")),161        }162    ),163    "sk_internal": (164        _filter_node("internal"),165        {166            "id": ("id", _id),167            "name": ("string", _metadata("Name")),168            "index": ("int", _metadata("OfPort")),169            "mac": ("string", _metadata("MAC")),170        }171    ),172    "sk_internal_ip": (173        _filter_node_list("internal", ["IPV4"]),174        {175            "id": ("id", lambda p: p[0]),176            "ip": ("ip_address", lambda p: primitives.ip_of_cidr(p[1])),177            "mask": ("ip_address", lambda p: primitives.mask_of_network(p[1])),178            "prefix": (179                "ip_address",180                lambda p: primitives.prefix_of_network(p[1])),181        }182    ),183    "sk_rule": (184        _filter_node("ofrule"),185        {186            "id": ("id", _id),187            "priority": ("int", _metadata("priority")),188            "table": ("int", _metadata("table"))189        }190    ),191    "sk_owns": (192        _filter_rel("ownership"),193        {194            "owner": ("id", _parent),195            "item": ("id", _child)196        }197    ),198    "sk_l2": (...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
