Best Python code snippet using fMBT_python
Firewall.py
Source:Firewall.py  
1#	firewalld - Linux firewall daemon with time-based capabilities2#	Copyright (C) 2020-2021 Johannes Bauer3#4#	This file is part of firewalld.5#6#	firewalld is free software; you can redistribute it and/or modify7#	it under the terms of the GNU General Public License as published by8#	the Free Software Foundation; this program is ONLY licensed under9#	version 3 of the License, later versions are explicitly excluded.10#11#	firewalld is distributed in the hope that it will be useful,12#	but WITHOUT ANY WARRANTY; without even the implied warranty of13#	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the14#	GNU General Public License for more details.15#16#	You should have received a copy of the GNU General Public License17#	along with firewalld; if not, write to the Free Software18#	Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA19#20#	Johannes Bauer <JohannesBauer@gmx.de>21import json22import sys23import datetime24import enum25from pyipt.Protocol import Protocol26from pyipt.Rules import Rules, Ruleset27from pyipt.Service import Service28from pyipt.Interface import InterfaceNetwork, InterfaceAddress, InterfaceName29from pyipt.Hostname import Hostname30from pyipt.MultiEnum import ICMPType31from pyipt.Condition import Condition32from pyipt.RegexMatches import PortforwardTarget33from pyipt.Chain import Chain34from pyipt.Exceptions import IncompatibleOptionsException, UnknownTypeError, FirewallRulesetException35from pyipt.Criterion import Criterion36from pyipt.Variables import Variables37class RuleType(enum.Enum):38	Accept = "accept"39	Reject = "reject"40	Drop = "drop"41	Log = "log"42	Masquerade = "masquerade"43	PortForward = "port-forward"44class HighlevelRule():45	_SIMPLE_PARSE_CLASSES = {46		"action":			RuleType,47		"proto":			Protocol,48		"criterion":		Criterion,49		"comment":			str,50		"dest-service":		Service,51		"src-service":		Service,52		"icmp-type":		ICMPType,53		"cond":				Condition,54		"forward-to":		PortforwardTarget,55		"msg":				str,56	}57	_COMPLEX_PARSE_CLASSES = {58		"dest-host":		Hostname,59		"src-host":			Hostname,60		"dest-net":			InterfaceNetwork,61		"src-net":			InterfaceNetwork,62		"dest-ifaddr":		InterfaceAddress,63		"src-ifaddr":		InterfaceAddress,64		"dest-if":			InterfaceName,65		"src-if":			InterfaceName,66	}67	def __init__(self, rule_src, config, variables):68		self._rule_src = rule_src69		self._config = config70		self._parsed = { }71		for (key, value) in self._rule_src.items():72			value = variables.recursive_replace(value)73			if key.startswith("_"):74				continue75			if key in self._SIMPLE_PARSE_CLASSES:76				self._parsed[key] = self._SIMPLE_PARSE_CLASSES[key](value)77				continue78			if key in self._COMPLEX_PARSE_CLASSES:79				self._parsed[key] = self._COMPLEX_PARSE_CLASSES[key](value, self._config)80				continue81			raise UnknownTypeError("Do not know how to parse: %s (in rule '%s')" % (key, str(self._rule_src)))82		self._sanity_check()83	@property84	def action(self):85		return self._parsed["action"]86	@property87	def comment(self):88		if "comment" in self._parsed:89			return self._parsed["comment"]90		else:91			return str(self._rule_src)92	def _sanity_check(self):93		if "action" not in self._parsed:94			raise IncompatibleOptionsException("'action' is not defined in rule: %s" % (str(self._rule_src)))95		if ("service" in self._parsed) and ("proto" in self._parsed):96			raise IncompatibleOptionsException("'service' and 'proto' are mutually exclusive in rule: %s" % (str(self._rule_src)))97		if ("icmp-type" in self._parsed) and ("proto" in self._parsed):98			raise IncompatibleOptionsException("'icmp-type' and 'proto' are mutually exclusive in rule: %s" % (str(self._rule_src)))99		if self.action == RuleType.PortForward:100			if "dest-ifaddr" not in self._parsed:101				raise IncompatibleOptionsException("port forwarding requires 'dest-ifaddr' in rule: %s" % (str(self._rule_src)))102			if "dest-service" not in self._parsed:103				raise IncompatibleOptionsException("port forwarding requires 'dest-service' in rule: %s" % (str(self._rule_src)))104			if "forward-to" not in self._parsed:105				raise IncompatibleOptionsException("port forwarding requires 'forward-to' in rule: %s" % (str(self._rule_src)))106			if (not self._parsed["forward-to"]["relative"]) and (self._parsed["forward-to"]["port"] is not None) and (any((portmap.span_count > 1) for (proto, portmap) in self._parsed["dest-service"])):107				raise IncompatibleOptionsException("port forwarding requires relative port mapping when more than one span is defined in rule: %s" % (str(self._rule_src)))108			hostname = Hostname(self._parsed["forward-to"]["hostname"], self._config)109			if len(hostname) != 1:110				raise IncompatibleOptionsException("port forwarding requires exactly one match for hostname, but found %d (%s) in rule: %s" % (len(hostname), ", ".join(hostname), str(self._rule_src)))111	def insert(self, chain_name, ruleset):112		if "cond" in self._parsed:113			if not self._parsed["cond"].satisfied(ruleset.metadata):114				return115		chain = Chain.parse(chain_name)116		rules = Rules(self.comment)117		rule = rules.new()118		rule.add_fixed(chain.iptables_append())119		if "proto" in self._parsed:120			group = rule.add_group("proto")121			for proto in self._parsed["proto"]:122				group.append([ "-p", proto ])123		if "icmp-type" in self._parsed:124			group = rule.add_group("icmp-type")125			for icmp_type in self._parsed["icmp-type"]:126				group.append([ "-p", "icmp", "--icmp-type", icmp_type ])127		for srcdest in [ "src", "dest" ]:128			if srcdest + "-if" in self._parsed:129				option = {130					"src":	"-i",131					"dest":	"-o",132				}[srcdest]133				group = rule.add_group(srcdest + "-if")134				for ifname in self._parsed[srcdest + "-if"]:135					group.append([ option, ifname ])136			if srcdest + "-net" in self._parsed:137				option = {138					"src":	"-s",139					"dest":	"-d",140				}[srcdest]141				group = rule.add_group(srcdest + "-net")142				for network in self._parsed[srcdest + "-net"]:143					group.append([ option, network ])144			if srcdest + "-service" in self._parsed:145				group = rule.add_group(srcdest + "-service")146				if (srcdest == "dest") and self._parsed["action"] == RuleType.PortForward:147					# For port forwarding/DNAT target, the syntax is different148					def dnat_target(incoming_port, forward_to):149						hostname = Hostname(forward_to["hostname"], self._config)150						result = [ "-j", "DNAT", "--to" ]151						if len(hostname) == 0:152							print("Warning: For DNAT/port forwarding, a target is required, but %s could not be resolved successfully." % (forward_to["hostname"]))153						elif len(hostname) != 1:154							print("Warning: For DNAT/port forwarding, a single target is required, but %d were found. Arbitrarily picking the first one." % (len(hostname)))155						if forward_to["port"] is None:156							result.append(hostname[0])157						else:158							if forward_to["relative"]:159								result.append("%s:%d" % (hostname[0], incoming_port + forward_to["port"]))160							else:161								result.append("%s:%d" % (hostname[0], forward_to["port"]))162						return result163					for (proto, port_map) in self._parsed[srcdest + "-service"]:164						for single_port in port_map.single:165							group.append([ "-p", proto, "--dport", str(single_port) ] + dnat_target(single_port, self._parsed["forward-to"]))166						for (begin_range, end_range) in port_map.ranges:167							group.append([ "-p", proto, "--dport", "%d:%d" % (begin_range, end_range) ] + dnat_target(begin_range, self._parsed["forward-to"]))168				else:169					# Not port forwarding (simple ACCEPT/REJCECT/etc.)170					option = {171						"src":	"s",172						"dest":	"d",173					}[srcdest]174					for (proto, port_map) in self._parsed[srcdest + "-service"]:175						if port_map.port_count == 1:176							group.append([ "-p", proto, "--%sport" % (option), str(port_map[0]) ])177						else:178							if len(port_map.single) > 1:179								group.append([ "-p", proto, "--match", "multiport", "--%sports" % (option), ",".join(str(port) for port in port_map.single) ])180							for (begin_range, end_range) in port_map.ranges:181								group.append([ "-p", proto, "--match", "multiport", "--%sports" % (option), "%d:%d" % (begin_range, end_range) ])182			if srcdest + "-ifaddr" in self._parsed:183				option = {184					"src":	"-s",185					"dest":	"-d",186				}[srcdest]187				group = rule.add_group(srcdest + "-ifaddr")188				for address in self._parsed[srcdest + "-ifaddr"]:189					group.append([ option, address ])190			if srcdest + "-host" in self._parsed:191				option = {192					"src":	"-s",193					"dest":	"-d",194				}[srcdest]195				group = rule.add_group(srcdest + "-host")196				for address in self._parsed[srcdest + "-host"]:197					group.append([ option, address ])198		if "criterion" in self._parsed:199			self._parsed["criterion"].apply(rule)200		if self._parsed["action"] in (RuleType.Accept, RuleType.Reject, RuleType.Drop, RuleType.Masquerade, RuleType.Log):201			rule.add_fixed(("-j", self._parsed["action"].value.upper()))202		elif self._parsed["action"] == RuleType.PortForward:203			# We're in nat.PREROUTING204			pass205		else:206			raise NotImplementedError(self._parsed["action"])207		if self._parsed["action"] == RuleType.Log:208			if "msg" in self._parsed:209				rule.add_fixed(("--log-prefix", self._parsed["msg"] + ": "))210		if "comment" in self._parsed:211			rule.add_fixed(("-m", "comment", "--comment", self._parsed["comment"]))212		ruleset.add_rules(rules)213class Firewall():214	def __init__(self, ruleset_filename, args):215		self._ruleset_filename = ruleset_filename216		self._args = args217	def _parse_chain(self, ruleset, chain_name, content):218		if "rules" in content:219			for rulesrc in content["rules"]:220				try:221					hl_rule = HighlevelRule(rulesrc, ruleset.metadata["source"], ruleset.metadata["variables"])222					hl_rule.insert(chain_name, ruleset)223				except FirewallRulesetException as e:224					if not self._args.ignore_errors:225						raise226					else:227						print("Continuing in spite of error: %s (%s)" % (str(e), str(rulesrc)), file = sys.stderr)228	def _initialize_chains(self, ruleset):229		rules = Rules("initializing all chains")230		for (chain_name, content) in ruleset.metadata["source"]["chains"].items():231			chain = Chain.parse(chain_name)232			if "default" in content:233				rule = rules.new()234				rule.add_fixed(chain.iptables_policy(), (content["default"].upper(), ))235			rule = rules.new()236			rule.add_fixed(chain.iptables_flush())237		ruleset.add_rules(rules)238	def _parse_ruleset(self, ruleset):239		self._initialize_chains(ruleset)240		for (chain_name, content) in ruleset.metadata["source"]["chains"].items():241			self._parse_chain(ruleset, chain_name, content)242	def generate(self):243		with open(self._ruleset_filename) as f:244			source = json.load(f)245		source["interfaces-rev"] = { value: key for (key, value) in source["interfaces"].items() }246		metadata = {247			"now":			datetime.datetime.now(),248			"source":		source,249			"variables":	Variables(source.get("variables", { })),250		}251		ruleset = Ruleset(metadata)252		ruleset.add_stat("ruleset_mtime", self._ruleset_filename)253		self._parse_ruleset(ruleset)...mrz.py
Source:mrz.py  
1import asn1crypto.core as asn12from datetime import datetime, date3class MachineReadableZone(asn1.OctetString):4    class_ = 15    tag    = 316    _parsed = None7    @classmethod8    def load(cls, encoded_data: bytes, strict=False):9        v = super().load(encoded_data, strict)10        clen = len(v.contents) 11        if clen == 90:12            v.format = 'td1'13        elif clen == 72:14            v.format = 'td2'15        elif clen == 88:16            v.format = 'td3'17        else:18            ValueError("Unknown MRZ format")19        return v20    21    def __getitem__(self, key):22        return self.native[key]23    @property24    def country(self) -> str:25        return self['country']26    @property27    def date_of_birth(self) -> date:28        return self['date_of_birth']29    @property30    def date_of_expiry(self) -> date:31        return self['date_of_expiry']32    @property33    def document_code(self) -> str:34        return self['document_code']35    @property36    def document_number(self) -> str:37        return self['document_number']38    @property39    def name(self) -> str:40        ni = self['name_identifiers']41        if len(ni) > 1:42            return ni[-1]43        return ""44    @property45    def nationality(self) -> str:46        return self['nationality']47    @property48    def native(self):49        if self._parsed is None:50            self.parse()51        return self._parsed52    @property53    def optional_data(self) -> str:54        if self.format == 'td1':55            return self['optional_data_2']56        return self['optional_data']57    @property58    def surname(self) -> str:59        ni = self['name_identifiers']60        if len(ni) > 0:61            return ni[0]62        return ""63    def parse(self):64        self._parsed = {}65        if self.format == 'td1':66            self._parse_td1()67        elif self.format == 'td2':68            self._parse_td2()69        elif self.format == 'td3':70            self._parse_td3()71        else:72            ValueError("Cannot parse unknown MRZ format")73    def _parse_td1(self):74        self._parsed['document_code']         = self._read(0, 2)75        self._parsed['country']               = self._read(2, 3)76        self._parsed['document_number']       = self._read(5, 9)77        self._parsed['document_number_cd']    = self._read_with_filter(14, 1) # document number check digit, could be char '<'78        self._parsed['optional_data_1']       = self._read(15, 15)79        self._parsed['date_of_birth']         = self._read_date(30, 6)80        self._parsed['date_of_birth_cd']      = self._read_int(36, 1) # document dob digit81        self._parsed['sex']                   = self._read(37, 1)82        self._parsed['date_of_expiry']        = self._read_date(38, 6)83        self._parsed['date_of_expiry_cd']     = self._read_int(44, 1) # document doe digit84        self._parsed['nationality']           = self._read(45, 3)85        self._parsed['optional_data_2']       = self._read(48, 11)86        self._parsed['composite_check_digit'] = self._read_int(59, 1)87        self._parsed['name_identifiers']      = self._read_name_identifiers(60, 30)88        # doc 9303 p10 page 3089        if self._parsed['document_number_cd'] == '<' and len(self._parsed['optional_data_1']) > 0:90            self._parsed['document_number'] += self._parsed['optional_data_1'][:-1]91            self._parsed['document_number_cd'] = self._parsed['optional_data_1'][-1]92            self._parsed['optional_data_1'] = ""93    def _parse_td2(self):94        self._parsed['document_code']         = self._read(0, 2)95        self._parsed['country']               = self._read(2, 3)96        self._parsed['name_identifiers']      = self._read_name_identifiers(5, 31)97        self._parsed['document_number']       = self._read(36, 9)98        self._parsed['document_number_cd']    = self._read_with_filter(45, 1) # document number check digit99        self._parsed['nationality']           = self._read(46, 3)100        self._parsed['date_of_birth']         = self._read_date(49, 6)101        self._parsed['date_of_birth_cd']      = self._read_int(55, 1) # document dob digit102        self._parsed['sex']                   = self._read(56, 1)103        self._parsed['date_of_expiry']        = self._read_date(57, 6)104        self._parsed['date_of_expiry_cd']     = self._read_int(63, 1) # document doe digit105        self._parsed['optional_data']         = self._read(64, 7)106        self._parsed['composite_check_digit'] = self._read_int(71, 1)107    def _parse_td3(self):108        self._parsed['document_code']         = self._read(0, 2)109        self._parsed['country']               = self._read(2, 3)110        self._parsed['name_identifiers']      = self._read_name_identifiers(5, 39)111        self._parsed['document_number']       = self._read(44, 9)112        self._parsed['document_number_cd']    = self._read_with_filter(53, 1) # document number check digit113        self._parsed['nationality']           = self._read(54, 3)114        self._parsed['date_of_birth']         = self._read_date(57, 6)115        self._parsed['date_of_birth_cd']      = self._read_int(63, 1) # document dob digit116        self._parsed['sex']                   = self._read(64, 1)117        self._parsed['date_of_expiry']        = self._read_date(65, 6)118        self._parsed['date_of_expiry_cd']     = self._read_int(71, 1) # document doe digit119        self._parsed['optional_data']         = self._read(72, 14)120        self._parsed['check_digit']           = self._read_int(86, 1)121        self._parsed['composite_check_digit'] = self._read_int(87, 1)122    def _read_with_filter(self, idx, len):123        return self.contents[idx: idx + len].decode('ascii')124    def _read(self, idx, len):125        return self._read_with_filter(idx, len).rstrip('<')126    def _read_int(self, idx, len):127        return int(self._read(idx, len))128    def _read_date(self, idx, len):129        date = self._read(idx, len)130        return datetime.strptime(date, '%y%m%d').date()131    def _read_name_identifiers(self, idx, size):132        name_field = self._read(idx, size)133        ids = name_field.split('<<')134        for i in range(0, len(ids)):135            ids[i] = ids[i].replace('<', ' ')...directive.py
Source:directive.py  
1from grzemplate.debug import DEBUG2from .component import Component3from lxml.html import tostring4from . import parser5class Directive(Component):6    tag = "pd-directive"7    def __init__(self, parser, content, scope, **attrs):8        self._scope = scope9        self.content = content10        self._attrs = attrs11        self._parser = parser12    def _get_env(self):13        return self._scope14    def __repr__(self) -> str:15        return f"<Directive {self.tag} @{id(self)}>"16@parser.register()17class ScopeDirective(Directive):18    tag = "pd-scope"19    def __init__(self, parser, content, scope, **attrs):20        super().__init__(parser, content, scope, **attrs)21        self._parsed = []22        if DEBUG:23            self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )24        if self._attrs.get("pass_scope"):25            for k, v in self._attrs['pass_scope'].items():26                self._scope[k] = v27        for inner in self._attrs['inner_content']:28            self._parsed += self._parser.parseComponent(self, inner)29        if DEBUG:30            self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")31@parser.register()32class ForDirective(Directive):33    tag = "pd-for"34    template_str = '<pd-scope pass_scope="{iterator}" inner_content="{content}"></pd-scope>'35    def __init__(self, parser, content, scope, **attrs):36        super().__init__(parser, content, scope, **attrs)37        self._scope["content"] = self.content38        self._iterator = self._attrs['iter']39        self._iterator_key = self._attrs.get("key", "i")40        self._parsed = []41        if DEBUG:42            self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )43        for x in self._iterator:44            if DEBUG:45                self._parsed += [f"<!-- {self._iterator_key} : {str(x)} -->"]46            self._scope["iterator"] = {self._iterator_key: x}47            self._parsed += self._parser.parseComponent(self)48        if DEBUG:49            self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")50@parser.register()51class IfDirective(Directive):52    tag = "pd-if"53    template_str = '<pd-scope inner_content="{content}"></pd-scope>'54    def __init__(self, parser, content, scope, cond=False, **attrs):55        super().__init__(parser, content, scope, **attrs)56        self._scope["content"] = self.content57        self._parsed = []58        if DEBUG:59            self._parsed.append( f"<!-- <{self._attrs.get('tag', self.tag)}> -->" )60        if cond:61            self._parsed += self._parser.parseComponent(self)62        if DEBUG:63            self._parsed.append(f"<!-- </{self._attrs.get('tag', self.tag)}> -->")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
