How to use _parsed method in fMBT

Best Python code snippet using fMBT_python

Firewall.py

Source:Firewall.py Github

copy

Full Screen

1# firewalld - Linux firewall daemon with time-based capabilities2# Copyright (C) 2020-2021 Johannes Bauer3#4# This file is part of firewalld.5#6# firewalld is free software; you can redistribute it and/or modify7# it under the terms of the GNU General Public License as published by8# the Free Software Foundation; this program is ONLY licensed under9# version 3 of the License, later versions are explicitly excluded.10#11# firewalld is distributed in the hope that it will be useful,12# but WITHOUT ANY WARRANTY; without even the implied warranty of13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the14# GNU General Public License for more details.15#16# You should have received a copy of the GNU General Public License17# along with firewalld; if not, write to the Free Software18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA19#20# Johannes Bauer <JohannesBauer@gmx.de>21import json22import sys23import datetime24import enum25from pyipt.Protocol import Protocol26from pyipt.Rules import Rules, Ruleset27from pyipt.Service import Service28from pyipt.Interface import InterfaceNetwork, InterfaceAddress, InterfaceName29from pyipt.Hostname import Hostname30from pyipt.MultiEnum import ICMPType31from pyipt.Condition import Condition32from pyipt.RegexMatches import PortforwardTarget33from pyipt.Chain import Chain34from pyipt.Exceptions import IncompatibleOptionsException, UnknownTypeError, FirewallRulesetException35from pyipt.Criterion import Criterion36from pyipt.Variables import Variables37class RuleType(enum.Enum):38 Accept = "accept"39 Reject = "reject"40 Drop = "drop"41 Log = "log"42 Masquerade = "masquerade"43 PortForward = "port-forward"44class HighlevelRule():45 _SIMPLE_PARSE_CLASSES = {46 "action": RuleType,47 "proto": Protocol,48 "criterion": Criterion,49 "comment": str,50 "dest-service": Service,51 "src-service": Service,52 "icmp-type": ICMPType,53 "cond": Condition,54 "forward-to": PortforwardTarget,55 "msg": str,56 }57 _COMPLEX_PARSE_CLASSES = {58 "dest-host": Hostname,59 "src-host": Hostname,60 "dest-net": InterfaceNetwork,61 "src-net": InterfaceNetwork,62 "dest-ifaddr": InterfaceAddress,63 "src-ifaddr": InterfaceAddress,64 "dest-if": InterfaceName,65 "src-if": InterfaceName,66 }67 def __init__(self, rule_src, config, variables):68 self._rule_src = rule_src69 self._config = config70 self._parsed = { }71 for (key, value) in self._rule_src.items():72 value = variables.recursive_replace(value)73 if key.startswith("_"):74 continue75 if key in self._SIMPLE_PARSE_CLASSES:76 self._parsed[key] = self._SIMPLE_PARSE_CLASSES[key](value)77 continue78 if key in self._COMPLEX_PARSE_CLASSES:79 self._parsed[key] = self._COMPLEX_PARSE_CLASSES[key](value, self._config)80 continue81 raise UnknownTypeError("Do not know how to parse: %s (in rule '%s')" % (key, str(self._rule_src)))82 self._sanity_check()83 @property84 def action(self):85 return self._parsed["action"]86 @property87 def comment(self):88 if "comment" in self._parsed:89 return self._parsed["comment"]90 else:91 return str(self._rule_src)92 def _sanity_check(self):93 if "action" not in self._parsed:94 raise IncompatibleOptionsException("'action' is not defined in rule: %s" % (str(self._rule_src)))95 if ("service" in self._parsed) and ("proto" in self._parsed):96 raise IncompatibleOptionsException("'service' and 'proto' are mutually exclusive in rule: %s" % (str(self._rule_src)))97 if ("icmp-type" in self._parsed) and ("proto" in self._parsed):98 raise IncompatibleOptionsException("'icmp-type' and 'proto' are mutually exclusive in rule: %s" % (str(self._rule_src)))99 if self.action == RuleType.PortForward:100 if "dest-ifaddr" not in self._parsed:101 raise IncompatibleOptionsException("port forwarding requires 'dest-ifaddr' in rule: %s" % (str(self._rule_src)))102 if "dest-service" not in self._parsed:103 raise IncompatibleOptionsException("port forwarding requires 'dest-service' in rule: %s" % (str(self._rule_src)))104 if "forward-to" not in self._parsed:105 raise IncompatibleOptionsException("port forwarding requires 'forward-to' in rule: %s" % (str(self._rule_src)))106 if (not self._parsed["forward-to"]["relative"]) and (self._parsed["forward-to"]["port"] is not None) and (any((portmap.span_count > 1) for (proto, portmap) in self._parsed["dest-service"])):107 raise IncompatibleOptionsException("port forwarding requires relative port mapping when more than one span is defined in rule: %s" % (str(self._rule_src)))108 hostname = Hostname(self._parsed["forward-to"]["hostname"], self._config)109 if len(hostname) != 1:110 raise IncompatibleOptionsException("port forwarding requires exactly one match for hostname, but found %d (%s) in rule: %s" % (len(hostname), ", ".join(hostname), str(self._rule_src)))111 def insert(self, chain_name, ruleset):112 if "cond" in self._parsed:113 if not self._parsed["cond"].satisfied(ruleset.metadata):114 return115 chain = Chain.parse(chain_name)116 rules = Rules(self.comment)117 rule = rules.new()118 rule.add_fixed(chain.iptables_append())119 if "proto" in self._parsed:120 group = rule.add_group("proto")121 for proto in self._parsed["proto"]:122 group.append([ "-p", proto ])123 if "icmp-type" in self._parsed:124 group = rule.add_group("icmp-type")125 for icmp_type in self._parsed["icmp-type"]:126 group.append([ "-p", "icmp", "--icmp-type", icmp_type ])127 for srcdest in [ "src", "dest" ]:128 if srcdest + "-if" in self._parsed:129 option = {130 "src": "-i",131 "dest": "-o",132 }[srcdest]133 group = rule.add_group(srcdest + "-if")134 for ifname in self._parsed[srcdest + "-if"]:135 group.append([ option, ifname ])136 if srcdest + "-net" in self._parsed:137 option = {138 "src": "-s",139 "dest": "-d",140 }[srcdest]141 group = rule.add_group(srcdest + "-net")142 for network in self._parsed[srcdest + "-net"]:143 group.append([ option, network ])144 if srcdest + "-service" in self._parsed:145 group = rule.add_group(srcdest + "-service")146 if (srcdest == "dest") and self._parsed["action"] == RuleType.PortForward:147 # For port forwarding/DNAT target, the syntax is different148 def dnat_target(incoming_port, forward_to):149 hostname = Hostname(forward_to["hostname"], self._config)150 result = [ "-j", "DNAT", "--to" ]151 if len(hostname) == 0:152 print("Warning: For DNAT/port forwarding, a target is required, but %s could not be resolved successfully." % (forward_to["hostname"]))153 elif len(hostname) != 1:154 print("Warning: For DNAT/port forwarding, a single target is required, but %d were found. Arbitrarily picking the first one." % (len(hostname)))155 if forward_to["port"] is None:156 result.append(hostname[0])157 else:158 if forward_to["relative"]:159 result.append("%s:%d" % (hostname[0], incoming_port + forward_to["port"]))160 else:161 result.append("%s:%d" % (hostname[0], forward_to["port"]))162 return result163 for (proto, port_map) in self._parsed[srcdest + "-service"]:164 for single_port in port_map.single:165 group.append([ "-p", proto, "--dport", str(single_port) ] + dnat_target(single_port, self._parsed["forward-to"]))166 for (begin_range, end_range) in port_map.ranges:167 group.append([ "-p", proto, "--dport", "%d:%d" % (begin_range, end_range) ] + dnat_target(begin_range, self._parsed["forward-to"]))168 else:169 # Not port forwarding (simple ACCEPT/REJCECT/etc.)170 option = {171 "src": "s",172 "dest": "d",173 }[srcdest]174 for (proto, port_map) in self._parsed[srcdest + "-service"]:175 if port_map.port_count == 1:176 group.append([ "-p", proto, "--%sport" % (option), str(port_map[0]) ])177 else:178 if len(port_map.single) > 1:179 group.append([ "-p", proto, "--match", "multiport", "--%sports" % (option), ",".join(str(port) for port in port_map.single) ])180 for (begin_range, end_range) in port_map.ranges:181 group.append([ "-p", proto, "--match", "multiport", "--%sports" % (option), "%d:%d" % (begin_range, end_range) ])182 if srcdest + "-ifaddr" in self._parsed:183 option = {184 "src": "-s",185 "dest": "-d",186 }[srcdest]187 group = rule.add_group(srcdest + "-ifaddr")188 for address in self._parsed[srcdest + "-ifaddr"]:189 group.append([ option, address ])190 if srcdest + "-host" in self._parsed:191 option = {192 "src": "-s",193 "dest": "-d",194 }[srcdest]195 group = rule.add_group(srcdest + "-host")196 for address in self._parsed[srcdest + "-host"]:197 group.append([ option, address ])198 if "criterion" in self._parsed:199 self._parsed["criterion"].apply(rule)200 if self._parsed["action"] in (RuleType.Accept, RuleType.Reject, RuleType.Drop, RuleType.Masquerade, RuleType.Log):201 rule.add_fixed(("-j", self._parsed["action"].value.upper()))202 elif self._parsed["action"] == RuleType.PortForward:203 # We're in nat.PREROUTING204 pass205 else:206 raise NotImplementedError(self._parsed["action"])207 if self._parsed["action"] == RuleType.Log:208 if "msg" in self._parsed:209 rule.add_fixed(("--log-prefix", self._parsed["msg"] + ": "))210 if "comment" in self._parsed:211 rule.add_fixed(("-m", "comment", "--comment", self._parsed["comment"]))212 ruleset.add_rules(rules)213class Firewall():214 def __init__(self, ruleset_filename, args):215 self._ruleset_filename = ruleset_filename216 self._args = args217 def _parse_chain(self, ruleset, chain_name, content):218 if "rules" in content:219 for rulesrc in content["rules"]:220 try:221 hl_rule = HighlevelRule(rulesrc, ruleset.metadata["source"], ruleset.metadata["variables"])222 hl_rule.insert(chain_name, ruleset)223 except FirewallRulesetException as e:224 if not self._args.ignore_errors:225 raise226 else:227 print("Continuing in spite of error: %s (%s)" % (str(e), str(rulesrc)), file = sys.stderr)228 def _initialize_chains(self, ruleset):229 rules = Rules("initializing all chains")230 for (chain_name, content) in ruleset.metadata["source"]["chains"].items():231 chain = Chain.parse(chain_name)232 if "default" in content:233 rule = rules.new()234 rule.add_fixed(chain.iptables_policy(), (content["default"].upper(), ))235 rule = rules.new()236 rule.add_fixed(chain.iptables_flush())237 ruleset.add_rules(rules)238 def _parse_ruleset(self, ruleset):239 self._initialize_chains(ruleset)240 for (chain_name, content) in ruleset.metadata["source"]["chains"].items():241 self._parse_chain(ruleset, chain_name, content)242 def generate(self):243 with open(self._ruleset_filename) as f:244 source = json.load(f)245 source["interfaces-rev"] = { value: key for (key, value) in source["interfaces"].items() }246 metadata = {247 "now": datetime.datetime.now(),248 "source": source,249 "variables": Variables(source.get("variables", { })),250 }251 ruleset = Ruleset(metadata)252 ruleset.add_stat("ruleset_mtime", self._ruleset_filename)253 self._parse_ruleset(ruleset)...

Full Screen

Full Screen

mrz.py

Source:mrz.py Github

copy

Full Screen

1import asn1crypto.core as asn12from datetime import datetime, date3class MachineReadableZone(asn1.OctetString):4 class_ = 15 tag = 316 _parsed = None7 @classmethod8 def load(cls, encoded_data: bytes, strict=False):9 v = super().load(encoded_data, strict)10 clen = len(v.contents) 11 if clen == 90:12 v.format = 'td1'13 elif clen == 72:14 v.format = 'td2'15 elif clen == 88:16 v.format = 'td3'17 else:18 ValueError("Unknown MRZ format")19 return v20 21 def __getitem__(self, key):22 return self.native[key]23 @property24 def country(self) -> str:25 return self['country']26 @property27 def date_of_birth(self) -> date:28 return self['date_of_birth']29 @property30 def date_of_expiry(self) -> date:31 return self['date_of_expiry']32 @property33 def document_code(self) -> str:34 return self['document_code']35 @property36 def document_number(self) -> str:37 return self['document_number']38 @property39 def name(self) -> str:40 ni = self['name_identifiers']41 if len(ni) > 1:42 return ni[-1]43 return ""44 @property45 def nationality(self) -> str:46 return self['nationality']47 @property48 def native(self):49 if self._parsed is None:50 self.parse()51 return self._parsed52 @property53 def optional_data(self) -> str:54 if self.format == 'td1':55 return self['optional_data_2']56 return self['optional_data']57 @property58 def surname(self) -> str:59 ni = self['name_identifiers']60 if len(ni) > 0:61 return ni[0]62 return ""63 def parse(self):64 self._parsed = {}65 if self.format == 'td1':66 self._parse_td1()67 elif self.format == 'td2':68 self._parse_td2()69 elif self.format == 'td3':70 self._parse_td3()71 else:72 ValueError("Cannot parse unknown MRZ format")73 def _parse_td1(self):74 self._parsed['document_code'] = self._read(0, 2)75 self._parsed['country'] = self._read(2, 3)76 self._parsed['document_number'] = self._read(5, 9)77 self._parsed['document_number_cd'] = self._read_with_filter(14, 1) # document number check digit, could be char '<'78 self._parsed['optional_data_1'] = self._read(15, 15)79 self._parsed['date_of_birth'] = self._read_date(30, 6)80 self._parsed['date_of_birth_cd'] = self._read_int(36, 1) # document dob digit81 self._parsed['sex'] = self._read(37, 1)82 self._parsed['date_of_expiry'] = self._read_date(38, 6)83 self._parsed['date_of_expiry_cd'] = self._read_int(44, 1) # document doe digit84 self._parsed['nationality'] = self._read(45, 3)85 self._parsed['optional_data_2'] = self._read(48, 11)86 self._parsed['composite_check_digit'] = self._read_int(59, 1)87 self._parsed['name_identifiers'] = self._read_name_identifiers(60, 30)88 # doc 9303 p10 page 3089 if self._parsed['document_number_cd'] == '<' and len(self._parsed['optional_data_1']) > 0:90 self._parsed['document_number'] += self._parsed['optional_data_1'][:-1]91 self._parsed['document_number_cd'] = self._parsed['optional_data_1'][-1]92 self._parsed['optional_data_1'] = ""93 def _parse_td2(self):94 self._parsed['document_code'] = self._read(0, 2)95 self._parsed['country'] = self._read(2, 3)96 self._parsed['name_identifiers'] = self._read_name_identifiers(5, 31)97 self._parsed['document_number'] = self._read(36, 9)98 self._parsed['document_number_cd'] = self._read_with_filter(45, 1) # document number check digit99 self._parsed['nationality'] = self._read(46, 3)100 self._parsed['date_of_birth'] = self._read_date(49, 6)101 self._parsed['date_of_birth_cd'] = self._read_int(55, 1) # document dob digit102 self._parsed['sex'] = self._read(56, 1)103 self._parsed['date_of_expiry'] = self._read_date(57, 6)104 self._parsed['date_of_expiry_cd'] = self._read_int(63, 1) # document doe digit105 self._parsed['optional_data'] = self._read(64, 7)106 self._parsed['composite_check_digit'] = self._read_int(71, 1)107 def _parse_td3(self):108 self._parsed['document_code'] = self._read(0, 2)109 self._parsed['country'] = self._read(2, 3)110 self._parsed['name_identifiers'] = self._read_name_identifiers(5, 39)111 self._parsed['document_number'] = self._read(44, 9)112 self._parsed['document_number_cd'] = self._read_with_filter(53, 1) # document number check digit113 self._parsed['nationality'] = self._read(54, 3)114 self._parsed['date_of_birth'] = self._read_date(57, 6)115 self._parsed['date_of_birth_cd'] = self._read_int(63, 1) # document dob digit116 self._parsed['sex'] = self._read(64, 1)117 self._parsed['date_of_expiry'] = self._read_date(65, 6)118 self._parsed['date_of_expiry_cd'] = self._read_int(71, 1) # document doe digit119 self._parsed['optional_data'] = self._read(72, 14)120 self._parsed['check_digit'] = self._read_int(86, 1)121 self._parsed['composite_check_digit'] = self._read_int(87, 1)122 def _read_with_filter(self, idx, len):123 return self.contents[idx: idx + len].decode('ascii')124 def _read(self, idx, len):125 return self._read_with_filter(idx, len).rstrip('<')126 def _read_int(self, idx, len):127 return int(self._read(idx, len))128 def _read_date(self, idx, len):129 date = self._read(idx, len)130 return datetime.strptime(date, '%y%m%d').date()131 def _read_name_identifiers(self, idx, size):132 name_field = self._read(idx, size)133 ids = name_field.split('<<')134 for i in range(0, len(ids)):135 ids[i] = ids[i].replace('<', ' ')...

Full Screen

Full Screen

directive.py

Source:directive.py Github

copy

Full Screen

1from grzemplate.debug import DEBUG2from .component import Component3from lxml.html import tostring4from . import parser5class Directive(Component):6 tag = "pd-directive"7 def __init__(self, parser, content, scope, **attrs):8 self._scope = scope9 self.content = content10 self._attrs = attrs11 self._parser = parser12 def _get_env(self):13 return self._scope14 def __repr__(self) -> str:15 return f"<Directive {self.tag} @{id(self)}>"16@parser.register()17class ScopeDirective(Directive):18 tag = "pd-scope"19 def __init__(self, parser, content, scope, **attrs):20 super().__init__(parser, content, scope, **attrs)21 self._parsed = []22 if DEBUG:23 self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )24 if self._attrs.get("pass_scope"):25 for k, v in self._attrs['pass_scope'].items():26 self._scope[k] = v27 for inner in self._attrs['inner_content']:28 self._parsed += self._parser.parseComponent(self, inner)29 if DEBUG:30 self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")31@parser.register()32class ForDirective(Directive):33 tag = "pd-for"34 template_str = '<pd-scope pass_scope="{iterator}" inner_content="{content}"></pd-scope>'35 def __init__(self, parser, content, scope, **attrs):36 super().__init__(parser, content, scope, **attrs)37 self._scope["content"] = self.content38 self._iterator = self._attrs['iter']39 self._iterator_key = self._attrs.get("key", "i")40 self._parsed = []41 if DEBUG:42 self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )43 for x in self._iterator:44 if DEBUG:45 self._parsed += [f"<!-- {self._iterator_key} : {str(x)} -->"]46 self._scope["iterator"] = {self._iterator_key: x}47 self._parsed += self._parser.parseComponent(self)48 if DEBUG:49 self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")50@parser.register()51class IfDirective(Directive):52 tag = "pd-if"53 template_str = '<pd-scope inner_content="{content}"></pd-scope>'54 def __init__(self, parser, content, scope, cond=False, **attrs):55 super().__init__(parser, content, scope, **attrs)56 self._scope["content"] = self.content57 self._parsed = []58 if DEBUG:59 self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )60 if cond:61 self._parsed += self._parser.parseComponent(self)62 if DEBUG:63 self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful