How to use _filter_node method in localstack

Best Python code snippet using localstack_python

local_file_system.py

Source:local_file_system.py Github

copy

Full Screen

1import asyncio2import io3import logging4import os5from concurrent.futures import Executor, ThreadPoolExecutor6from datetime import MINYEAR, datetime7from fnmatch import fnmatch8from pathlib import Path9from typing import AsyncIterable, BinaryIO, Dict, Iterable, List, Optional, Tuple, Union10from . import file_filter, protocol11logger = logging.getLogger(__name__)12# pylint: disable=invalid-name13default_executor: Optional[Executor] = None14def _get_default_executor():15 # pylint: disable=global-statement16 global default_executor17 if default_executor is None:18 default_executor = ThreadPoolExecutor(10)19 return default_executor20class AsyncFile(protocol.FileReader):21 _file: BinaryIO22 _buffer: bytes = bytes()23 _offset: int = 024 _size: int25 file_path: Path26 def __init__(self, file_path: Path, mode: str, executor = None, **kwargs):27 self.file_path = file_path28 self._executor = executor29 self._file = file_path.open(mode + "b", buffering=False, **kwargs)30 try:31 self._size = os.fstat(self._file.fileno()).st_size32 except:33 self._file.close()34 raise35 @classmethod36 async def open(cls, file_path: Path, mode: str, executor = None, **kwargs) -> "AsyncFile":37 if executor is None:38 executor = _get_default_executor()39 return await asyncio.get_running_loop().run_in_executor(40 executor, lambda: AsyncFile(file_path, mode, executor, **kwargs))41 async def read(self, num_bytes: int = -1) -> bytes:42 if num_bytes >= 0:43 if self._buffer:44 next_offset = self._offset + min(num_bytes, len(self._buffer) - self._offset)45 result = self._buffer[self._offset: next_offset]46 if len(self._buffer) == next_offset:47 self._buffer = bytes()48 self._offset = 049 else:50 self._offset = next_offset51 return result52 buffer = await asyncio.get_running_loop().run_in_executor(53 self._executor, self._file.read, protocol.READ_SIZE)54 if len(buffer) > num_bytes:55 self._buffer = buffer56 self._offset = num_bytes57 return self._buffer[:self._offset]58 return buffer59 result = await asyncio.get_running_loop().run_in_executor(self._executor, self._file.read, -1)60 if self._buffer:61 result = self._buffer[self._offset:] + result62 self._buffer = bytes()63 self._offset = 064 return result65 async def write(self, buffer: bytes):66 await asyncio.get_running_loop().run_in_executor(self._executor, self._file.write, buffer)67 def seek(self, offset: int, whence: int):68 if self._buffer:69 self._buffer = bytes()70 self._offset = 071 self._file.seek(offset, whence)72 def tell(self) -> int:73 return self._file.tell() - self._offset74 def close(self):75 self._file.close()76 @property77 def file_size(self) -> Optional[int]:78 return self._size79class BytesReader(protocol.FileReader):80 def __init__(self, content: bytes):81 self._reader = io.BytesIO(content)82 async def read(self, num_bytes: int = None) -> bytes:83 return self._reader.read(num_bytes)84 def close(self):85 pass86 @property87 def file_size(self) -> Optional[int]:88 return self._reader.getbuffer().nbytes89async def async_stat(file_path: Path, executor = None):90 if executor is None:91 executor = _get_default_executor()92 return await asyncio.get_running_loop().run_in_executor(executor, file_path.stat)93async def _restore_directory(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):94 logger.info(f"Restoring directory {child_path}")95 if clobber_existing and (child_path.is_symlink() or (child_path.exists() and not child_path.is_dir())):96 child_path.unlink()97 if content is not None:98 raise ValueError("Content cannot be supplied for directory")99 child_path.mkdir(parents=False, exist_ok=True)100async def _restore_regular(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):101 if clobber_existing and (child_path.is_symlink() or child_path.exists()):102 # This deliberately will fail if the child is a directory. We don't want want to remove an entire directory tree103 logger.debug("Removing original %s", child_path)104 child_path.unlink()105 logger.info("Restoring file %s", child_path)106 with AsyncFile(child_path, 'x') as file:107 bytes_read = await content.read(protocol.READ_SIZE)108 while bytes_read:109 await file.write(bytes_read)110 bytes_read = await content.read(protocol.READ_SIZE)111async def _restore_link(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):112 logger.info(f"Restoring symbolic link {child_path}")113 if clobber_existing and (child_path.is_symlink() or child_path.exists()):114 child_path.unlink()115 link_content = await content.read(protocol.READ_SIZE)116 extra_bytes = await content.read(protocol.READ_SIZE)117 while extra_bytes:118 link_content += extra_bytes119 extra_bytes = await content.read(protocol.READ_SIZE)120 os.symlink(dst=child_path, src=link_content)121async def _restore_pipe(child_path: Path, content: Optional[protocol.FileReader], clobber_existing: bool):122 logger.info("Restoring child %s", child_path)123 if content is not None:124 if await content.read(1):125 raise ValueError("Cannot restore pipe with content")126 if clobber_existing:127 if child_path.is_symlink():128 child_path.unlink()129 elif child_path.is_fifo():130 return131 elif child_path.exists():132 child_path.unlink()133 elif child_path.is_fifo():134 return135 os.mkfifo(child_path)136class LocalDirectoryExplorer(protocol.DirectoryExplorer):137 _EXCLUDED_DIR_INODE = protocol.Inode(138 type=protocol.FileType.DIRECTORY, mode=0, modified_time=datetime(year=MINYEAR, month=1, day=1),139 size=None, uid=0, gid=0, hash=None)140 _INCLUDED_FILE_TYPES = {141 # Here we implicitly ignore device files and sockets as they are not properly supported142 protocol.FileType.DIRECTORY,143 protocol.FileType.REGULAR,144 protocol.FileType.LINK,145 protocol.FileType.PIPE,146 }147 _RESTORE_TYPES = {148 protocol.FileType.DIRECTORY: _restore_directory,149 protocol.FileType.REGULAR: _restore_regular,150 protocol.FileType.LINK: _restore_link,151 protocol.FileType.PIPE: _restore_pipe,152 }153 def __init__(self, base_path: Path,154 filter_node: Optional[file_filter.FilterPathNode],155 ignore_patterns: List[str],156 all_files: Dict[Tuple[int, int], protocol.Inode]):157 self._base_path = base_path158 self._all_files = all_files159 self._ignore_patterns = ignore_patterns160 self._filter_node = filter_node161 self._children = {}162 def iter_children(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:163 if self._filter_node is None or self._filter_node.filter_type is protocol.FilterType.INCLUDE:164 return self._iter_included_directory()165 if self._filter_node.filter_type is protocol.FilterType.EXCLUDE:166 return self._iter_excluded_directory()167 raise ValueError(f"Normalized filter node had type {self._filter_node.filter_type}. This should have been "168 f"either {protocol.FilterType.INCLUDE} or {protocol.FilterType.EXCLUDE}")169 async def _iter_included_directory(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:170 for child in self._base_path.iterdir():171 child_name = child.name172 if self._filter_node is not None and child_name in self._filter_node.exceptions and \173 self._filter_node.exceptions[child_name].filter_type is protocol.FilterType.EXCLUDE:174 # If this child is explicitly excluded ...175 exception_count = len(self._filter_node.exceptions[child_name].exceptions)176 if exception_count:177 logger.debug("Skipping %s on filter with %s exceptions", child, exception_count)178 yield child_name, self._EXCLUDED_DIR_INODE.copy()179 else:180 logger.debug("Skipping %s on filter", child)181 continue182 if self._should_pattern_ignore(child):183 logger.debug("Skipping file for pattern %s", child)184 continue185 inode = self._stat_child(child_name)186 if inode.type not in self._INCLUDED_FILE_TYPES:187 logger.debug("Skipping %s for type %s", child, inode.type)188 continue189 yield child.name, inode190 async def _iter_excluded_directory(self) -> AsyncIterable[Tuple[str, protocol.Inode]]:191 # This LocalDirectoryExplorer has been created for an excluded directory, but there may be exceptions.192 logger.debug("Listing %s exceptions for %s ", len(self._filter_node.exceptions), self._base_path)193 for child_name, exception in self._filter_node.exceptions.items():194 child = self._base_path / child_name195 if exception.filter_type is protocol.FilterType.INCLUDE:196 # Exception to include this child.197 if self._should_pattern_ignore(child):198 logger.warning("File explicitly included but then excluded by pattern %s", child)199 elif child.exists():200 yield child_name, self._stat_child(child_name)201 elif child.is_dir():202 # Looks like there is a child of the child that's the real exception.203 yield child_name, self._EXCLUDED_DIR_INODE.copy()204 elif exception.exceptions:205 # This is an edge case. An INCLUDE filter can be made for a child directory where the parent is206 # not actually a directory. Remember filters can name files that don't actually exist.207 # Lets say the user EXCLUDEs /foo and INCLUDEs /foo/bar/baz in filters.208 # Then the user creates a file (not directory) named /foo/bar ... What are we supposed to do now?209 # Let's warn the user they've been a bit stupid and do NOT backup /foo/bar in any way.210 # That's because at this point we know /foo/bar is excluded and /foo/bar/baz doesn't exist.211 child_exception = exception212 meaningful_name = self._base_path / child_name213 try:214 while child_exception.filter_type is not protocol.FilterType.INCLUDE:215 meaningful_name = meaningful_name / next(iter(exception.exceptions.keys()))216 child_exception = child_exception.exceptions[meaningful_name.name]217 logger.warning("%s was included but %s is actually a file! Ignoring filters under %s",218 meaningful_name, self._base_path / child_name, self._base_path / child_name)219 except StopIteration:220 # This clause really should never occur. Meaningless exceptions are supposed to be pruned...221 # We are currently on an EXCLUDE which has exceptions so there should be an INCLUDE in it's222 # children. But let's not break just because we failed to write a more meaningful warning.223 logger.warning("%s/.../%s was included but %s is actually a file! Ignoring filters under %s",224 self._base_path, child_name, self._base_path / child_name,225 self._base_path / child_name)226 def _should_pattern_ignore(self, child: Path) -> bool:227 child_name = child.name228 for pattern in self._ignore_patterns:229 if fnmatch(child_name, pattern):230 logger.debug("Skipping %s on pattern %s", child, pattern)231 return True232 return False233 def _stat_child(self, child: str) -> protocol.Inode:234 inode = self._children.get(child)235 if inode is not None:236 return inode237 file_path = self._base_path / child238 file_stat = file_path.lstat()239 inode = self._all_files.get((file_stat.st_dev, file_stat.st_ino))240 if inode is None:241 inode = protocol.Inode.from_stat(file_stat, None)242 self._children[child] = inode243 if inode.type is not protocol.FileType.DIRECTORY:244 self._all_files[(file_stat.st_dev, file_stat.st_ino)] = inode245 return inode246 def __str__(self) -> str:247 return str(self._base_path)248 async def inode(self) -> protocol.Inode:249 if self._filter_node is not None and self._filter_node.filter_type is protocol.FilterType.EXCLUDE:250 return self._EXCLUDED_DIR_INODE.copy()251 stat = self._base_path.lstat()252 inode = protocol.Inode.from_stat(stat, hash_value=None)253 self._all_files[(stat.st_dev, stat.st_ino)] = inode254 return inode255 async def open_child(self, name: str) -> protocol.FileReader:256 child_type = self._stat_child(name).type257 child_path = self._base_path / name258 if child_type is protocol.FileType.REGULAR:259 return AsyncFile(child_path, 'r')260 if child_type is protocol.FileType.LINK:261 return BytesReader(os.readlink(child_path).encode())262 if child_type is protocol.FileType.PIPE:263 return BytesReader(bytes(0))264 raise ValueError(f"Cannot open child of type {child_type}")265 async def restore_child(self, name: str, type_: protocol.FileType, content: Optional[protocol.FileReader],266 clobber_existing: bool):267 try:268 restore_function = self._RESTORE_TYPES[type_]269 except KeyError:270 raise ValueError(f"Cannot restore file of type {type_}") from None271 child_path = self._base_path / name272 self._children.pop(name, None)273 await restore_function(child_path=child_path, content=content, clobber_existing=clobber_existing)274 async def restore_meta(self, name: str, meta: protocol.Inode, toggle: Dict[str,bool]):275 child_path = self._base_path / name276 if toggle.get('mode', True):277 os.chmod(child_path, mode=meta.mode, follow_symlinks=False)278 change_uid = toggle.get('uid', True)279 change_gid = toggle.get('gid', True)280 if change_uid or change_gid:281 os.chown(282 path=child_path,283 uid=meta.uid if change_uid else -1,284 gid=meta.gid if change_gid else -1,285 follow_symlinks=False,286 )287 if toggle.get('modified_time', True):288 mod_time = meta.modified_time.timestamp()289 os.utime(path=child_path, times=(mod_time, mod_time), follow_symlinks=False)290 def get_child(self, name: str) -> protocol.DirectoryExplorer:291 return type(self)(292 base_path=self._base_path / name,293 filter_node=self._filter_node.exceptions.get(name) if self._filter_node is not None else None,294 ignore_patterns=self._ignore_patterns,295 all_files=self._all_files,296 )297 def get_path(self, name: Optional[str]) -> str:298 if name is None:299 return str(self._base_path)300 return str(self._base_path / name)301class LocalFileSystemExplorer:302 _all_files: Dict[Tuple[int, int], protocol.Inode]303 def __init__(self):304 self._all_files = {}305 def __call__(self, directory_root: Union[str, Path],306 filters: Iterable[protocol.Filter] = ()) -> LocalDirectoryExplorer:307 base_path = Path(directory_root)308 if not base_path.is_dir():309 if not base_path.exists():310 raise FileNotFoundError(f"Backup path doesn't exist: {base_path}")311 raise ValueError(f"Backup path is not a directory: {base_path}")312 ignore_patterns, root_filter_node = file_filter.normalize_filters(filters)313 return LocalDirectoryExplorer(314 base_path=Path(base_path),315 ignore_patterns=ignore_patterns,316 filter_node=root_filter_node,317 all_files=self._all_files,...

Full Screen

Full Screen

db.py

Source:db.py Github

copy

Full Screen

...34 union_id=None, id__in=None, **kwargs):35 clone = self._clone()36 clone._filter_union_id(union_id)37 clone._filter_prefer(prefer, prefer_id)38 clone._filter_node(node)39 clone._filter_assets(assets)40 clone._filter_other(kwargs)41 clone._filter_id_in(id__in)42 return clone43 def _filter_union_id(self, union_id):44 if not union_id:45 return46 cleaned_union_id = union_id.split('_')47 # 如果union_id通不过本检查,代表可能不是本backend, 应该返回空48 if not self._check_union_id(union_id, cleaned_union_id):49 self.queryset = self.queryset.none()50 return51 return self._perform_filter_union_id(union_id, cleaned_union_id)52 def _check_union_id(self, union_id, cleaned_union_id):53 return union_id and len(cleaned_union_id) == self.union_id_length54 def _perform_filter_union_id(self, union_id, union_id_cleaned):55 self.queryset = self.queryset.filter(union_id=union_id)56 def _filter_assets(self, assets):57 assets_id = self.make_assets_as_id(assets)58 if assets_id:59 self.queryset = self.queryset.filter(asset_id__in=assets_id)60 def _filter_node(self, node):61 pass62 def _filter_id_in(self, ids):63 if ids and isinstance(ids, list):64 self.queryset = self.queryset.filter(union_id__in=ids)65 @staticmethod66 def clean_kwargs(kwargs):67 return {k: v for k, v in kwargs.items() if v}68 def _filter_other(self, kwargs):69 kwargs = self.clean_kwargs(kwargs)70 if kwargs:71 self.queryset = self.queryset.filter(**kwargs)72 def _filter_prefer(self, prefer, prefer_id):73 pass74 def search(self, item):75 qs = []76 for i in ['hostname', 'ip', 'username']:77 kwargs = {i + '__startswith': item}78 qs.append(Q(**kwargs))79 q = reduce(lambda x, y: x | y, qs)80 clone = self._clone()81 clone.queryset = clone.queryset.filter(q).distinct()82 return clone83class SystemUserBackend(DBBackend):84 model = SystemUser.assets.through85 backend = 'system_user'86 prefer = backend87 base_score = 088 union_id_length = 289 def _filter_prefer(self, prefer, prefer_id):90 if prefer and prefer != self.prefer:91 self.queryset = self.queryset.none()92 if prefer_id:93 self.queryset = self.queryset.filter(systemuser__id=prefer_id)94 def _perform_filter_union_id(self, union_id, union_id_cleaned):95 system_user_id, asset_id = union_id_cleaned96 self.queryset = self.queryset.filter(97 asset_id=asset_id, systemuser__id=system_user_id,98 )99 def _perform_delete_by_union_id(self, union_id_cleaned):100 system_user_id, asset_id = union_id_cleaned101 system_user = get_object_or_none(SystemUser, pk=system_user_id)102 asset = get_object_or_none(Asset, pk=asset_id)103 if all((system_user, asset)):104 system_user.assets.remove(asset)105 def _filter_node(self, node):106 if node:107 self.queryset = self.queryset.filter(asset__nodes__id=node.id)108 def get_annotate(self):109 kwargs = dict(110 hostname=F("asset__hostname"),111 ip=F("asset__ip"),112 username=F("systemuser__username"),113 password=F("systemuser__password"),114 private_key=F("systemuser__private_key"),115 public_key=F("systemuser__public_key"),116 score=F("systemuser__priority") + self.base_score,117 version=Value(0, IntegerField()),118 date_created=F("systemuser__date_created"),119 date_updated=F("systemuser__date_updated"),120 asset_username=Concat(F("asset__id"), Value("_"),121 F("systemuser__username"),122 output_field=CharField()),123 union_id=Concat(F("systemuser_id"), Value("_"), F("asset_id"),124 output_field=CharField()),125 org_id=F("asset__org_id"),126 backend=Value(self.backend, CharField())127 )128 return kwargs129 def get_filter(self):130 return dict(131 systemuser__username_same_with_user=False,132 )133 def all(self):134 kwargs = self.get_annotate()135 filters = self.get_filter()136 qs = self.model.objects.all().annotate(**kwargs)137 if current_org.org_id() is not None:138 filters['org_id'] = current_org.org_id()139 qs = qs.filter(**filters)140 qs = self.qs_to_values(qs)141 return qs142class DynamicSystemUserBackend(SystemUserBackend):143 backend = 'system_user_dynamic'144 prefer = 'system_user'145 union_id_length = 3146 def get_annotate(self):147 kwargs = super().get_annotate()148 kwargs.update(dict(149 username=F("systemuser__users__username"),150 asset_username=Concat(151 F("asset__id"), Value("_"),152 F("systemuser__users__username"),153 output_field=CharField()154 ),155 union_id=Concat(156 F("systemuser_id"), Value("_"), F("asset_id"),157 Value("_"), F("systemuser__users__id"),158 output_field=CharField()159 ),160 users_count=Count('systemuser__users'),161 ))162 return kwargs163 def _perform_filter_union_id(self, union_id, union_id_cleaned):164 system_user_id, asset_id, user_id = union_id_cleaned165 self.queryset = self.queryset.filter(166 asset_id=asset_id, systemuser_id=system_user_id,167 union_id=union_id,168 )169 def _perform_delete_by_union_id(self, union_id_cleaned):170 system_user_id, asset_id, user_id = union_id_cleaned171 system_user = get_object_or_none(SystemUser, pk=system_user_id)172 if not system_user:173 return174 system_user.users.remove(user_id)175 if system_user.users.count() == 0:176 system_user.assets.remove(asset_id)177 def get_filter(self):178 return dict(179 users_count__gt=0,180 systemuser__username_same_with_user=True181 )182class AdminUserBackend(DBBackend):183 model = Asset184 backend = 'admin_user'185 prefer = backend186 base_score = 200187 def _filter_prefer(self, prefer, prefer_id):188 if prefer and prefer != self.backend:189 self.queryset = self.queryset.none()190 if prefer_id:191 self.queryset = self.queryset.filter(admin_user__id=prefer_id)192 def _filter_node(self, node):193 if node:194 self.queryset = self.queryset.filter(nodes__id=node.id)195 def _perform_filter_union_id(self, union_id, union_id_cleaned):196 admin_user_id, asset_id = union_id_cleaned197 self.queryset = self.queryset.filter(198 id=asset_id, admin_user_id=admin_user_id,199 )200 def _perform_delete_by_union_id(self, union_id_cleaned):201 raise PermissionError(_("Could not remove asset admin user"))202 def all(self):203 qs = self.model.objects.all().annotate(204 asset_id=F("id"),205 username=F("admin_user__username"),206 password=F("admin_user__password"),207 private_key=F("admin_user__private_key"),208 public_key=F("admin_user__public_key"),209 score=Value(self.base_score, IntegerField()),210 version=Value(0, IntegerField()),211 date_updated=F("admin_user__date_updated"),212 asset_username=Concat(F("id"), Value("_"), F("admin_user__username"), output_field=CharField()),213 union_id=Concat(F("admin_user_id"), Value("_"), F("id"), output_field=CharField()),214 backend=Value(self.backend, CharField()),215 )216 qs = self.qs_to_values(qs)217 return qs218class AuthbookBackend(DBBackend):219 model = AuthBook220 backend = 'db'221 prefer = backend222 base_score = 400223 def _filter_node(self, node):224 if node:225 self.queryset = self.queryset.filter(asset__nodes__id=node.id)226 def _filter_prefer(self, prefer, prefer_id):227 if not prefer or not prefer_id:228 return229 if prefer.lower() == "admin_user":230 model = AdminUser231 elif prefer.lower() == "system_user":232 model = SystemUser233 else:234 self.queryset = self.queryset.none()235 return236 obj = get_object_or_none(model, pk=prefer_id)237 if obj is None:...

Full Screen

Full Screen

skydive_source.py

Source:skydive_source.py Github

copy

Full Screen

...77 "next": ("int", lambda r: r[2]),78 }79 record.update(added_fields)80 return record81def _filter_node(type_value):82 def _action(cnx):83 return cnx.socket.lookup_nodes(84 'G.V().Has("Type", "{}")'.format(type_value))85 return _action86def _filter_node_list(type_value, fields):87 def _action(cnx):88 nodes = cnx.socket.lookup_nodes(89 'G.V().Has("Type", "{}")'.format(type_value))90 return [91 (node.id, elt)92 for node in nodes93 for elt in reduce(lambda v, k: v.get(k, []), fields, node.metadata)94 ]95 return _action96def _filter_rel(type_value):97 def _action(cnx):98 return cnx.socket.lookup_edges(99 'G.E().Has("RelationType", "{}")'.format(type_value))100 return _action101def _metadata(field):102 if isinstance(field, list):103 return lambda node: reduce(lambda v, k: v.get(k), field, node.metadata)104 return lambda node: node.metadata[field]105def _id(node):106 return node.id107def _parent(edge):108 return edge.parent109def _child(edge):110 return edge.child111def elements_of_rule(elt, cnx):112 """Split elements of actions or filters of an Openflow rule113 :param elt: should be either 'actions' or 'filters'114 :param cnx: an open connection to Skydive115 :return: a list of string, each is an atomic action or filter.116 """117 return [118 (rule.id, e, i)119 for rule in _filter_node('ofrule')(cnx)120 for i, e in enumerate(rule.metadata[elt].split(',') + ['end'])121 ]122#: Describes how to bind values extracted from the python skydive client.123TABLES = {124 "sk_host": (125 _filter_node("host"),126 {127 "id": ("id", _id),128 "name": ("string", _metadata("Name")),129 "platform": ("string", _metadata("Platform"))130 }131 ),132 "sk_ovsswitch": (133 _filter_node("openvswitch"),134 {135 "id": ("id", _id),136 "name": ("string", _metadata("Name")),137 }138 ),139 "sk_ovsbridge": (140 _filter_node("ovsbridge"),141 {142 "id": ("id", _id),143 "name": ("string", _metadata("Name")),144 }145 ),146 "sk_ovsport": (147 _filter_node("ovsport"),148 {149 "id": ("id", _id),150 "name": ("string", _metadata("Name")),151 }152 ),153 "sk_patch": (154 _filter_node("patch"),155 {156 "id": ("id", _id),157 "name": ("string", _metadata("Name")),158 "index": ("int", _metadata("OfPort")),159 "peer": ("string", _metadata(["Ovs", "Options", "peer"])),160 "mac": ("string", _metadata("MAC")),161 }162 ),163 "sk_internal": (164 _filter_node("internal"),165 {166 "id": ("id", _id),167 "name": ("string", _metadata("Name")),168 "index": ("int", _metadata("OfPort")),169 "mac": ("string", _metadata("MAC")),170 }171 ),172 "sk_internal_ip": (173 _filter_node_list("internal", ["IPV4"]),174 {175 "id": ("id", lambda p: p[0]),176 "ip": ("ip_address", lambda p: primitives.ip_of_cidr(p[1])),177 "mask": ("ip_address", lambda p: primitives.mask_of_network(p[1])),178 "prefix": (179 "ip_address",180 lambda p: primitives.prefix_of_network(p[1])),181 }182 ),183 "sk_rule": (184 _filter_node("ofrule"),185 {186 "id": ("id", _id),187 "priority": ("int", _metadata("priority")),188 "table": ("int", _metadata("table"))189 }190 ),191 "sk_owns": (192 _filter_rel("ownership"),193 {194 "owner": ("id", _parent),195 "item": ("id", _child)196 }197 ),198 "sk_l2": (...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful