Best Python code snippet using lisa_python
fw_zone.py
Source:fw_zone.py  
...353            raise FirewallError(COMMAND_FAILED, msg)354        if not enable:355            self.remove_chain(zone, table, chain)356    def add_interface(self, zone, interface, sender=None):357        self._fw.check_panic()358        _zone = self._fw.check_zone(zone)359        _obj = self._zones[_zone]360        interface_id = self.__interface_id(interface)361        if interface_id in _obj.settings["interfaces"]:362            raise FirewallError(ZONE_ALREADY_SET)363        if self.get_zone_of_interface(interface) != None:364            raise FirewallError(ZONE_CONFLICT)365        self.__interface(True, _zone, interface)366        _obj.settings["interfaces"][interface_id] = \367            self.__gen_settings(0, sender)368        # add information whether we add to default or specific zone369        _obj.settings["interfaces"][interface_id]["__default__"] = (not zone or zone == "")370        return _zone371    def change_zone_of_interface(self, zone, interface, sender=None):372        self._fw.check_panic()373        _old_zone = self.get_zone_of_interface(interface)374        _new_zone = self._fw.check_zone(zone)375        if _new_zone == _old_zone:376            return _old_zone377        if _old_zone != None:378            self.remove_interface(_old_zone, interface)379        return self.add_interface(zone, interface, sender)380    def change_default_zone(self, old_zone, new_zone):381        self._fw.check_panic()382        self.__interface(True, new_zone, "+", True)383        if old_zone != None and old_zone != "":384            self.__interface(False, old_zone, "+", True)385    def remove_interface(self, zone, interface):386        self._fw.check_panic()387        zoi = self.get_zone_of_interface(interface)388        if zoi == None:389            raise FirewallError(UNKNOWN_INTERFACE, interface)390        _zone = zoi if zone == "" else self._fw.check_zone(zone)391        if zoi != _zone:392            raise FirewallError(ZONE_CONFLICT)393        _obj = self._zones[_zone]394        interface_id = self.__interface_id(interface)395        self.__interface(False, _zone, interface)396        if interface_id in _obj.settings["interfaces"]:397            del _obj.settings["interfaces"][interface_id]398        return _zone399    def query_interface(self, zone, interface):400        return self.__interface_id(interface) in self.get_settings(zone)["interfaces"]401    def list_interfaces(self, zone):402        return sorted(self.get_settings(zone)["interfaces"].keys())403    # SOURCES404    def check_source(self, source):405        if checkIPnMask(source):406            return "ipv4"407        elif checkIP6nMask(source):408            return "ipv6"409        else:410            raise FirewallError(INVALID_ADDR, source)411    def __source_id(self, source):412        ipv = self.check_source(source)413        return (ipv, source)414    def __source(self, enable, zone, ipv, source):415        rules = [ ]416        for table in ZONE_CHAINS:417            for chain in ZONE_CHAINS[table]:418                # create needed chains if not done already419                if enable:420                    self.add_chain(zone, table, chain)421                # handle trust and block zone directly, accept or reject422                # others will be placed into the proper zone chains423                opt = INTERFACE_ZONE_OPTS[chain]424                # transform INTERFACE_ZONE_OPTS for source address425                if opt == "-i":426                    opt = "-s"427                if opt == "-o":428                    opt = "-d"429                target = self._zones[zone].target.format(430                    chain=SHORTCUTS[chain], zone=zone)431                if target in [ "REJECT", "%%REJECT%%" ] and \432                        chain not in [ "INPUT", "FORWARD", "OUTPUT" ]:433                    # REJECT is only valid in the INPUT, FORWARD and434                    # OUTPUT chains, and user-defined chains which are 435                    # only called from those chains436                    continue437                if target == "DROP" and table == "nat":438                    # DROP is not supported in nat table439                    continue440                # append rule441                action = "-g" if "_ZONE_" in target else "-j"442                rule = [ "%s_ZONES_SOURCE" % chain, "-t", table,443                         opt, source, action, target ]444                rules.append((ipv, rule))445        # handle rules446        ret = self._fw.handle_rules(rules, enable)447        if ret:448            (cleanup_rules, msg) = ret449            self._fw.handle_rules(cleanup_rules, not enable)450            log.debug2(msg)451            raise FirewallError(COMMAND_FAILED, msg)452        if not enable:453            self.remove_chain(zone, table, chain)454    def add_source(self, zone, source, sender=None):455        self._fw.check_panic()456        _zone = self._fw.check_zone(zone)457        _obj = self._zones[_zone]458        source_id = self.__source_id(source)459        if source_id in _obj.settings["sources"]:460            raise FirewallError(ZONE_ALREADY_SET)461        if self.get_zone_of_source(source) != None:462            raise FirewallError(ZONE_CONFLICT)463        self.__source(True, _zone, source_id[0], source_id[1])464        _obj.settings["sources"][source_id] = \465            self.__gen_settings(0, sender)466        # add information whether we add to default or specific zone467        _obj.settings["sources"][source_id]["__default__"] = (not zone or zone == "")468        return _zone469    def change_zone_of_source(self, zone, source, sender=None):470        self._fw.check_panic()471        _old_zone = self.get_zone_of_source(source)472        _new_zone = self._fw.check_zone(zone)473        if _new_zone == _old_zone:474            return _old_zone475        if _old_zone != None:476            self.remove_source(_old_zone, source)477        return self.add_source(zone, source, sender)478    def remove_source(self, zone, source):479        self._fw.check_panic()480        zos = self.get_zone_of_source(source)481        if zos == None:482            raise FirewallError(UNKNOWN_SOURCE, source)483        _zone = zos if zone == "" else self._fw.check_zone(zone)484        if zos != _zone:485            raise FirewallError(ZONE_CONFLICT)486        _obj = self._zones[_zone]487        source_id = self.__source_id(source)488        self.__source(False, _zone, source_id[0], source_id[1])489        if source_id in _obj.settings["sources"]:490            del _obj.settings["sources"][source_id]491        return _zone492    def query_source(self, zone, source):493        return self.__source_id(source) in self.get_settings(zone)["sources"]494    def list_sources(self, zone):495        return [ k[1] for k in self.get_settings(zone)["sources"].keys() ]496    # RICH LANGUAGE497    def check_rule(self, rule):498        rule.check()499    def __rule_id(self, rule):500        self.check_rule(rule)501        return (str(rule))502    def __rule_source(self, source, command):503        if source:504            if source.invert:505                command.append("!")506            command += [ "-s", source.addr ]507    def __rule_destination(self, destination, command):508        if destination:509            if destination.invert:510                command.append("!")511            command += [ "-d", destination.addr ]512    def __rule_limit(self, limit):513        if limit:514            return [ "-m", "limit", "--limit", limit.value ]515        return [ ]516    def __rule_log(self, ipv, table, target, rule, command, rules):517        if not rule.log:518            return519        chain = "%s_log" % target520        _command = command[:]521        _command += [ "-j", "LOG" ]522        if rule.log.prefix:523            _command += [ "--log-prefix", '%s' % rule.log.prefix ]524        if rule.log.level:525            _command += [ "--log-level", '%s' % rule.log.level ]526        _command += self.__rule_limit(rule.log.limit)527        rules.append((ipv, table, chain, _command))528    def __rule_audit(self, ipv, table, target, rule, command, rules):529        if not rule.audit:530            return531        chain = "%s_log" % target532        _command = command[:]533        if type(rule.action) == Rich_Accept:534            _type = "accept"535        elif type(rule.action) == Rich_Reject:536            _type = "reject"537        elif type(rule.action) ==  Rich_Drop:538            _type = "drop"539        else:540            _type = "unknown"541        _command += [ "-j", "AUDIT", "--type", _type ]542        _command += self.__rule_limit(rule.audit.limit)543        rules.append((ipv, table, chain, _command))544    def __rule_action(self, ipv, table, target, rule, command, rules):545        if not rule.action:546            return547        _command = command[:]548        if type(rule.action) == Rich_Accept:549            chain = "%s_allow" % target550            _command += [ "-j", "ACCEPT" ]551        elif type(rule.action) == Rich_Reject:552            chain = "%s_deny" % target553            _command += [ "-j", "REJECT" ]554            if rule.action.type:555                _command += [ "--reject-with", rule.action.type ]556        elif type(rule.action) ==  Rich_Drop:557            chain = "%s_deny" % target558            _command += [ "-j", "DROP" ]559        else:560            raise FirewallError(INVALID_RULE,561                                "Unknown action %s" % type(rule.action))562        _command += self.__rule_limit(rule.action.limit)563        rules.append((ipv, table, chain, _command))564    def __rule(self, enable, zone, rule, mark_id):565                       566        chains = [ ]567        modules = [ ]568        rules = [ ]569        if rule.family != None:570            ipvs = [ rule.family ]571        else:572            ipvs = [ "ipv4", "ipv6" ]573        for ipv in ipvs:574            # SERVICE575            if type(rule.element) == Rich_Service:576                svc = self._fw.service.get_service(rule.element.name)577                if len(svc.destination) > 0:578                    if ipv not in svc.destination:579                        # destination is set, only use if it contains ipv580                        raise FirewallError(INVALID_RULE,581                                            "Service %s is not usable with %s" % 582                                            (rule.element.name, ipv))583                    if svc.destination[ipv] != "" and rule.destination:584                        # we can not use two destinations at the same time585                        raise FirewallError(INVALID_RULE,586                                            "Destination conflict with service.")587                table = "filter"588                chains.append([table, "INPUT" ])589                if type(rule.action) == Rich_Accept:590                    # only load modules for accept action591                    modules += svc.modules592                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],593                                                    zone=zone)594                # create rules595                for (port,proto) in svc.ports:596                    table = "filter"597                    command = [ ]598                    self.__rule_source(rule.source, command)599                    self.__rule_destination(rule.destination, command)600                    if proto in [ "tcp", "udp" ]:601                        command += [ "-m", proto, "-p", proto ]602                    else:603                        if ipv == "ipv4":604                            command += [ "-p", proto ]605                        else:606                            command += [ "-m", "ipv6header", "--header", proto ]607                    if port:608                        command += [ "--dport", "%s" % portStr(port) ]609                    if ipv in svc.destination and svc.destination[ipv] != "":610                        command += [ "-d",  svc.destination[ipv] ]611                    command += [ "-m", "conntrack", "--ctstate", "NEW" ]612                    self.__rule_log(ipv, table, target, rule, command, rules)613                    self.__rule_audit(ipv, table, target, rule, command, rules)614                    self.__rule_action(ipv, table, target, rule, command, rules)615            # PORT616            elif type(rule.element) == Rich_Port:617                port = rule.element.port618                protocol = rule.element.protocol619                self.check_port(port, protocol)620                table = "filter"621                chains.append([ table, "INPUT" ])622                target = self._zones[zone].target.format(chain=SHORTCUTS["INPUT"],623                                                         zone=zone)624                command = [ ]625                self.__rule_source(rule.source, command)626                self.__rule_destination(rule.destination, command)627                command += [ "-m", protocol, "-p", protocol,628                           "--dport", portStr(port),629                           "-m", "conntrack", "--ctstate", "NEW" ]630                self.__rule_log(ipv, table, target, rule, command, rules)631                self.__rule_audit(ipv, table, target, rule, command, rules)632                self.__rule_action(ipv, table, target, rule, command, rules)633            # PROTOCOL634            elif type(rule.element) == Rich_Protocol:635                protocol = rule.element.value636                self.check_protocol(protocol)637                table = "filter"638                chains.append([ table, "INPUT" ])639                target = self._zones[zone].target.format(chain=SHORTCUTS["INPUT"],640                                                         zone=zone)641                command = [ ]642                self.__rule_source(rule.source, command)643                self.__rule_destination(rule.destination, command)644                command += [ "-p", protocol, 645                             "-m", "conntrack", "--ctstate", "NEW" ]646                self.__rule_log(ipv, table, target, rule, command, rules)647                self.__rule_audit(ipv, table, target, rule, command, rules)648                self.__rule_action(ipv, table, target, rule, command, rules)649            # MASQUERADE650            elif type(rule.element) == Rich_Masquerade:651                if enable:652                    enable_ip_forwarding(ipv)653                chains.append([ "nat", "POSTROUTING" ])654                chains.append([ "filter", "FORWARD_OUT" ])655                # POSTROUTING656                target = DEFAULT_ZONE_TARGET.format(657                    chain=SHORTCUTS["POSTROUTING"], zone=zone)658                command = [ ]659                self.__rule_source(rule.source, command)660                self.__rule_destination(rule.destination, command)661                command += [ "-j", "MASQUERADE" ]662                rules.append((ipv, "nat", "%s_allow" % target, command))663                # FORWARD_OUT664                target = DEFAULT_ZONE_TARGET.format(665                    chain=SHORTCUTS["FORWARD_OUT"], zone=zone)666                command = [ ]667                # reverse source/destination !668                self.__rule_source(rule.destination, command)669                self.__rule_destination(rule.source, command)670                command += [ "-j", "ACCEPT" ]671                rules.append((ipv, "filter", "%s_allow" % target, command))672            # FORWARD PORT673            elif type(rule.element) == Rich_ForwardPort:674                if enable:675                    enable_ip_forwarding(ipv)676                port = rule.element.port677                protocol = rule.element.protocol678                toport = rule.element.to_port679                toaddr = rule.element.to_address680                self.check_forward_port(ipv, port, protocol, toport, toaddr)681                if enable:682                    mark_id = self._fw.new_mark()683                filter_chain = "INPUT" if not toaddr else "FORWARD_IN"684                chains.append([ "mangle", "PREROUTING" ])685                chains.append([ "nat", "PREROUTING" ])686                chains.append([ "filter", filter_chain ])687                mark_str = "0x%x" % mark_id688                port_str = portStr(port)689                dest = [ ]690                to = ""691                if toaddr:692                    to += toaddr693                if toport and toport != "":694                    toport_str = portStr(toport)695                    dest = [ "--dport", toport_str ]696                    to += ":%s" % portStr(toport, "-")697                mark = [ "-m", "mark", "--mark", mark_str ]698                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["PREROUTING"],699                                                    zone=zone)700                command = [ ]701                self.__rule_source(rule.source, command)702                command += [ "-p", protocol, "--dport", port_str,703                             "-j", "MARK", "--set-mark", mark_str ]704                rules.append((ipv, "mangle", "%s_allow" % target, command))705                # local and remote706                command = [ "-p", protocol ] + mark + \707                    [ "-j", "DNAT", "--to-destination", to ]708                rules.append((ipv, "nat", "%s_allow" % target, command))709                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS[filter_chain],710                                                    zone=zone)711                command = [ "-m", "conntrack", "--ctstate", "NEW" ] + \712                    mark + [ "-j", "ACCEPT" ]713                rules.append((ipv, "filter", "%s_allow" % target, command))714                if not enable:715                    self._fw.del_mark(mark_id)716            # ICMP BLOCK717            elif type(rule.element) == Rich_IcmpBlock:718                ict = self._fw.icmptype.get_icmptype(rule.element.name)719                if rule.action and type(rule.action) == Rich_Accept:720                    # icmp block might have reject or drop action, but not accept721                    raise FirewallError(INVALID_RULE,722                                        "IcmpBlock not usable with accept action")723                if ict.destination and ipv not in ict.destination:724                    raise FirewallError(INVALID_RULE,725                                        "IcmpBlock %s not usable with %s" % 726                                        (rule.element.name, ipv))727                table = "filter"728                chains.append([ table, "INPUT" ])729                chains.append([ table, "FORWARD_IN" ])730                if ipv == "ipv4":731                    proto = [ "-p", "icmp" ]732                    match = [ "-m", "icmp", "--icmp-type", rule.element.name ]733                else:734                    proto = [ "-p", "ipv6-icmp" ]735                    match = [ "-m", "icmp6", "--icmpv6-type", rule.element.name ]736                # INPUT737                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],738                                                    zone=zone)739                command = [ ]740                self.__rule_source(rule.source, command)741                self.__rule_destination(rule.destination, command)742                command += proto + match743                self.__rule_log(ipv, table, target, rule, command, rules)744                self.__rule_audit(ipv, table, target, rule, command, rules)745                if rule.action:746                    self.__rule_action(ipv, table, target, rule, command, rules)747                else:748                    command += [ "-j", "%%REJECT%%" ]749                    rules.append((ipv, table, "%s_deny" % target, command))750                # FORWARD_IN751                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["FORWARD_IN"],752                                                    zone=zone)753                command = [ ]754                self.__rule_source(rule.source, command)755                self.__rule_destination(rule.destination, command)756                command += proto + match757                self.__rule_log(ipv, table, target, rule, command, rules)758                self.__rule_audit(ipv, table, target, rule, command, rules)759                if rule.action:760                    self.__rule_action(ipv, table, target, rule, command, rules)761                else:762                    command += [ "-j", "%%REJECT%%" ]763                    rules.append((ipv, table, "%s_deny" % target, command))764            elif rule.element == None:765                # source action766                table = "filter"767                chains.append([ table, "INPUT" ])768                target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],769                                                    zone=zone)770                command = [ ]771                self.__rule_source(rule.source, command)772                self.__rule_log(ipv, table, target, rule, command, rules)773                self.__rule_audit(ipv, table, target, rule, command, rules)774                self.__rule_action(ipv, table, target, rule, command, rules)775            # EVERYTHING ELSE776            else:777                raise FirewallError(INVALID_RULE, "Unknown element %s" % 778                                    type(rule.element))779        msg = self.handle_cmr(zone, chains, modules, rules, enable)780        if msg != None:781            raise FirewallError(COMMAND_FAILED, msg)782        return mark_id783    def add_rule(self, zone, rule, timeout=0, sender=None):784        _zone = self._fw.check_zone(zone)785        self._fw.check_panic()786        _obj = self._zones[_zone]787        rule_id = self.__rule_id(rule)788        if rule_id in _obj.settings["rules"]:789            raise FirewallError(ALREADY_ENABLED)790        mark = self.__rule(True, _zone, rule, None)791        _obj.settings["rules"][rule_id] = \792            self.__gen_settings(timeout, sender, mark=mark)793        return _zone794    def remove_rule(self, zone, rule):795        _zone = self._fw.check_zone(zone)796        self._fw.check_panic()797        _obj = self._zones[_zone]798        rule_id = self.__rule_id(rule)799        if not rule_id in _obj.settings["rules"]:800            raise FirewallError(NOT_ENABLED)801        if "mark" in _obj.settings["rules"][rule_id]:802            mark = _obj.settings["rules"][rule_id]["mark"]803        else:804            mark = None805        self.__rule(False, _zone, rule, mark)806        if rule_id in _obj.settings["rules"]:807            del _obj.settings["rules"][rule_id]808        return _zone809    def query_rule(self, zone, rule):810        return self.__rule_id(rule) in self.get_settings(zone)["rules"]811    def list_rules(self, zone):812        return list(self.get_settings(zone)["rules"].keys())813    # SERVICES814    def check_service(self, service):815        self._fw.check_service(service)816    def __service_id(self, service):817        self.check_service(service)818        return service819    def __service(self, enable, zone, service):820        svc = self._fw.service.get_service(service)821        if enable:822            self.add_chain(zone, "filter", "INPUT")823        rules = [ ]824        for ipv in [ "ipv4", "ipv6" ]:825            if len(svc.destination) > 0 and ipv not in svc.destination:826                # destination is set, only use if it contains ipv827                continue828            # handle rules829            for (port,proto) in svc.ports:830                target = DEFAULT_ZONE_TARGET.format(831                    chain=SHORTCUTS["INPUT"], zone=zone)832                rule = [ "%s_allow" % (target), "-t", "filter" ]833                if proto in [ "tcp", "udp" ]:834                    rule += [ "-m", proto, "-p", proto ]835                else:836                    if ipv == "ipv4":837                        rule += [ "-p", proto ]838                    else:839                        rule += [ "-m", "ipv6header", "--header", proto ]840                if port:841                    rule += [ "--dport", "%s" % portStr(port) ]842                if ipv in svc.destination and svc.destination[ipv] != "":843                    rule += [ "-d",  svc.destination[ipv] ]844                rule += [ "-m", "conntrack", "--ctstate", "NEW" ]845                rule += [ "-j", "ACCEPT" ]846                rules.append((ipv, rule))847        cleanup_rules = None848        cleanup_modules = None849        msg = None850        # handle rules851        ret = self._fw.handle_rules(rules, enable)852        if ret == None: # no error, handle modules853            mod_ret = self._fw.handle_modules(svc.modules, enable)854            if mod_ret != None: # error loading modules855                (cleanup_modules, msg) = mod_ret856                cleanup_rules = rules857        else: # ret != None858            (cleanup_rules, msg) = ret859        if cleanup_rules!= None or cleanup_modules != None:860            if cleanup_rules:861                self._fw.handle_rules(cleanup_rules, not enable)862            if cleanup_modules:863                self._fw.handle_modules(cleanup_modules, not enable)864            raise FirewallError(COMMAND_FAILED, msg)865        if not enable:866            self.remove_chain(zone, "filter", "INPUT")867    def add_service(self, zone, service, timeout=0, sender=None):868        _zone = self._fw.check_zone(zone)869        self._fw.check_panic()870        _obj = self._zones[_zone]871        service_id = self.__service_id(service)872        if service_id in _obj.settings["services"]:873            raise FirewallError(ALREADY_ENABLED)874        self.__service(True, _zone, service)875        _obj.settings["services"][service_id] = \876            self.__gen_settings(timeout, sender)877        return _zone878    def remove_service(self, zone, service):879        _zone = self._fw.check_zone(zone)880        self._fw.check_panic()881        _obj = self._zones[_zone]882        service_id = self.__service_id(service)883        if not service_id in _obj.settings["services"]:884            raise FirewallError(NOT_ENABLED)885        self.__service(False, _zone, service)886        if service_id in _obj.settings["services"]:887            del _obj.settings["services"][service_id]888        return _zone889    def query_service(self, zone, service):890        return (self.__service_id(service) in self.get_settings(zone)["services"])891    def list_services(self, zone):892        return sorted(self.get_settings(zone)["services"].keys())893    # PORTS894    def check_port(self, port, protocol):895        self._fw.check_port(port)896        self._fw.check_protocol(protocol)897    def __port_id(self, port, protocol):898        self.check_port(port, protocol)899        return (portStr(port, "-"), protocol)900    def __port(self, enable, zone, port, protocol):901        if enable:902            self.add_chain(zone, "filter", "INPUT")903        rules = [ ]904        for ipv in [ "ipv4", "ipv6" ]:905            target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],906                                                     zone=zone)907            rules.append((ipv, [ "%s_allow" % (target),908                                 "-t", "filter",909                                 "-m", protocol, "-p", protocol,910                                 "--dport", portStr(port),911                                 "-m", "conntrack", "--ctstate", "NEW",912                                 "-j", "ACCEPT" ]))913        # handle rules914        ret = self._fw.handle_rules(rules, enable)915        if ret:916            (cleanup_rules, msg) = ret917            self._fw.handle_rules(cleanup_rules, not enable)918            raise FirewallError(COMMAND_FAILED, msg)919        920        if not enable:921            self.remove_chain(zone, "filter", "INPUT")922    def add_port(self, zone, port, protocol, timeout=0, sender=None):923        _zone = self._fw.check_zone(zone)924        self._fw.check_panic()925        _obj = self._zones[_zone]926        port_id = self.__port_id(port, protocol)927        if port_id in _obj.settings["ports"]:928            raise FirewallError(ALREADY_ENABLED)929        self.__port(True, _zone, port, protocol)930        _obj.settings["ports"][port_id] = \931            self.__gen_settings(timeout, sender)932        return _zone933    def remove_port(self, zone, port, protocol):934        _zone = self._fw.check_zone(zone)935        self._fw.check_panic()936        _obj = self._zones[_zone]937        port_id = self.__port_id(port, protocol)938        if not port_id in _obj.settings["ports"]:939            raise FirewallError(NOT_ENABLED)940        self.__port(False, _zone, port, protocol)941        if port_id in _obj.settings["ports"]:942            del _obj.settings["ports"][port_id]943        return _zone944    def query_port(self, zone, port, protocol):945        return self.__port_id(port, protocol) in self.get_settings(zone)["ports"]946    def list_ports(self, zone):947        return list(self.get_settings(zone)["ports"].keys())948    # PROTOCOLS949    def check_protocol(self, protocol):950        if not checkProtocol(protocol):951            raise FirewallError(INVALID_PROTOCOL, protocol)952    # MASQUERADE953    def __masquerade_id(self):954        return True955    def __masquerade(self, enable, zone):956        if enable:957            self.add_chain(zone, "nat", "POSTROUTING")958            self.add_chain(zone, "filter", "FORWARD_OUT")959            enable_ip_forwarding("ipv4")960        rules = [ ]961        for ipv in [ "ipv4" ]: # IPv4 only!962            target = DEFAULT_ZONE_TARGET.format(963                chain=SHORTCUTS["POSTROUTING"], zone=zone)964            rules.append((ipv, [ "%s_allow" % (target), "!", "-i", "lo",965                                 "-t", "nat", "-j", "MASQUERADE" ]))966            # FORWARD_OUT967            target = DEFAULT_ZONE_TARGET.format(968                chain=SHORTCUTS["FORWARD_OUT"], zone=zone)969            rules.append((ipv, [ "%s_allow" % (target),970                                 "-t", "filter", "-j", "ACCEPT" ]))971        # handle rules972        ret = self._fw.handle_rules(rules, enable)973        if ret:974            (cleanup_rules, msg) = ret975            self._fw.handle_rules(cleanup_rules, not enable)976            raise FirewallError(COMMAND_FAILED, msg)977        if not enable:978            self.remove_chain(zone, "nat", "POSTROUTING")979            self.remove_chain(zone, "filter", "FORWARD_OUT")980    def add_masquerade(self, zone, timeout=0, sender=None):981        _zone = self._fw.check_zone(zone)982        self._fw.check_panic()983        _obj = self._zones[_zone]984        masquerade_id = self.__masquerade_id()985        if masquerade_id in _obj.settings["masquerade"]:986            raise FirewallError(ALREADY_ENABLED)987        self.__masquerade(True, _zone)988        _obj.settings["masquerade"][masquerade_id] = \989            self.__gen_settings(timeout, sender)990        return _zone991    def remove_masquerade(self, zone):992        _zone = self._fw.check_zone(zone)993        self._fw.check_panic()994        _obj = self._zones[_zone]995        masquerade_id = self.__masquerade_id()996        if masquerade_id not in _obj.settings["masquerade"]:997            raise FirewallError(NOT_ENABLED)998        self.__masquerade(False, _zone)999        if masquerade_id in _obj.settings["masquerade"]:1000            del _obj.settings["masquerade"][masquerade_id]1001        return _zone1002    def query_masquerade(self, zone):1003        return self.__masquerade_id() in self.get_settings(zone)["masquerade"]1004    # PORT FORWARDING1005    def check_forward_port(self, ipv, port, protocol, toport=None, toaddr=None):1006        self._fw.check_port(port)1007        self._fw.check_protocol(protocol)1008        if toport:1009            self._fw.check_port(toport)1010        if toaddr:1011            check_single_address(ipv, toaddr)1012        if not toport and not toaddr:1013            raise FirewallError(INVALID_FORWARD)1014    def __forward_port_id(self, port, protocol, toport=None, toaddr=None):1015        self.check_forward_port("ipv4", port, protocol, toport, toaddr)1016        return (portStr(port, "-"), protocol,1017                portStr(toport, "-"), str(toaddr))1018    def __forward_port(self, enable, zone, port, protocol, toport=None,1019                       toaddr=None, mark_id=None):1020        mark_str = "0x%x" % mark_id1021        port_str = portStr(port)1022        dest = [ ]1023        to = ""1024        if toaddr:1025            to += toaddr1026        filter_chain = "INPUT" if not toaddr else "FORWARD_IN"1027        if toport and toport != "":1028            toport_str = portStr(toport)1029            dest = [ "--dport", toport_str ]1030            to += ":%s" % portStr(toport, "-")1031        mark = [ "-m", "mark", "--mark", mark_str ]1032        if enable:1033            self.add_chain(zone, "mangle", "PREROUTING")1034            self.add_chain(zone, "nat", "PREROUTING")1035            self.add_chain(zone, "filter", filter_chain)1036            enable_ip_forwarding("ipv4")1037        rules = [ ]1038        for ipv in [ "ipv4" ]: # IPv4 only!1039            target = DEFAULT_ZONE_TARGET.format(1040                chain=SHORTCUTS["PREROUTING"], zone=zone)1041            rules.append((ipv, [ "%s_allow" % (target),1042                                 "-t", "mangle",1043                                 "-p", protocol, "--dport", port_str,1044                                 "-j", "MARK", "--set-mark", mark_str ]))1045            # local and remote1046            rules.append((ipv, [ "%s_allow" % (target),1047                                 "-t", "nat",1048                                 "-p", protocol ] + mark + \1049                              [ "-j", "DNAT", "--to-destination", to ]))1050            target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS[filter_chain],1051                                                zone=zone)1052            rules.append((ipv, [ "%s_allow" % (target),1053                                 "-t", "filter",1054                                 "-m", "conntrack", "--ctstate", "NEW" ] + \1055                               mark + [ "-j", "ACCEPT" ]))1056        # handle rules1057        ret = self._fw.handle_rules(rules, enable)1058        if ret:1059            (cleanup_rules, msg) = ret1060            self._fw.handle_rules(cleanup_rules, not enable)1061            if enable:1062                self._fw.del_mark(mark_id)1063            raise FirewallError(COMMAND_FAILED, msg)1064        if not enable:1065            self.remove_chain(zone, "mangle", "PREROUTING")1066            self.remove_chain(zone, "nat", "PREROUTING")1067            self.remove_chain(zone, "filter", filter_chain)1068    def add_forward_port(self, zone, port, protocol, toport=None,1069                         toaddr=None, timeout=0, sender=None):1070        _zone = self._fw.check_zone(zone)1071        self._fw.check_panic()1072        _obj = self._zones[_zone]1073        forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1074        if forward_id in _obj.settings["forward_ports"]:1075            raise FirewallError(ALREADY_ENABLED)1076        mark = self._fw.new_mark()1077        self.__forward_port(True, _zone, port, protocol, toport, toaddr,1078                            mark_id=mark)1079        1080        _obj.settings["forward_ports"][forward_id] = \1081            self.__gen_settings(timeout, sender, mark=mark)1082        return _zone1083    def remove_forward_port(self, zone, port, protocol, toport=None,1084                            toaddr=None):1085        _zone = self._fw.check_zone(zone)1086        self._fw.check_panic()1087        _obj = self._zones[_zone]1088        forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1089        if not forward_id in _obj.settings["forward_ports"]:1090            raise FirewallError(NOT_ENABLED)1091        mark = _obj.settings["forward_ports"][forward_id]["mark"]1092        self.__forward_port(False, _zone, port, protocol, toport, toaddr,1093                            mark_id=mark)1094        if forward_id in _obj.settings["forward_ports"]:1095            del _obj.settings["forward_ports"][forward_id]1096        self._fw.del_mark(mark)1097        return _zone1098    def query_forward_port(self, zone, port, protocol, toport=None,1099                           toaddr=None):1100        forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1101        return forward_id in self.get_settings(zone)["forward_ports"]1102    def list_forward_ports(self, zone):1103        return list(self.get_settings(zone)["forward_ports"].keys())1104    # ICMP BLOCK1105    def check_icmp_block(self, icmp):1106        self._fw.check_icmptype(icmp)1107    def __icmp_block_id(self, icmp):1108        self.check_icmp_block(icmp)1109        return icmp1110    def __icmp_block(self, enable, zone, icmp):1111        ict = self._fw.icmptype.get_icmptype(icmp)1112        if enable:1113            self.add_chain(zone, "filter", "INPUT")1114            self.add_chain(zone, "filter", "FORWARD_IN")1115        rules = [ ]1116        for ipv in [ "ipv4", "ipv6" ]:1117            if ict.destination and ipv not in ict.destination:1118                continue1119            if ipv == "ipv4":1120                proto = [ "-p", "icmp" ]1121                match = [ "-m", "icmp", "--icmp-type", icmp ]1122            else:1123                proto = [ "-p", "ipv6-icmp" ]1124                match = [ "-m", "icmp6", "--icmpv6-type", icmp ]1125            target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],1126                                                     zone=zone)1127            rules.append((ipv, [ "%s_deny" % (target),1128                                 "-t", "filter", ] + proto + \1129                              match + [ "-j", "%%REJECT%%" ]))1130            target = DEFAULT_ZONE_TARGET.format(1131                chain=SHORTCUTS["FORWARD_IN"], zone=zone)1132            rules.append((ipv, [ "%s_deny" % (target),1133                                 "-t", "filter", ] + proto + \1134                              match + [ "-j", "%%REJECT%%" ]))1135        # handle rules1136        ret = self._fw.handle_rules(rules, enable)1137        if ret:1138            (cleanup_rules, msg) = ret1139            self._fw.handle_rules(cleanup_rules, not enable)1140            raise FirewallError(COMMAND_FAILED, msg)1141        if not enable:1142            self.remove_chain(zone, "filter", "INPUT")1143            self.remove_chain(zone, "filter", "FORWARD_IN")1144    def add_icmp_block(self, zone, icmp, timeout=0, sender=None):1145        _zone = self._fw.check_zone(zone)1146        self._fw.check_panic()1147        _obj = self._zones[_zone]1148        icmp_id = self.__icmp_block_id(icmp)1149        if icmp_id in _obj.settings["icmp_blocks"]:1150            raise FirewallError(ALREADY_ENABLED)1151        self.__icmp_block(True, _zone, icmp)1152        _obj.settings["icmp_blocks"][icmp_id] = \1153            self.__gen_settings(timeout, sender)1154        return _zone1155    def remove_icmp_block(self, zone, icmp):1156        _zone = self._fw.check_zone(zone)1157        self._fw.check_panic()1158        _obj = self._zones[_zone]1159        icmp_id = self.__icmp_block_id(icmp)1160        if not icmp_id in _obj.settings["icmp_blocks"]:1161            raise FirewallError(NOT_ENABLED)1162        self.__icmp_block(False, _zone, icmp)1163        if icmp_id in _obj.settings["icmp_blocks"]:1164            del _obj.settings["icmp_blocks"][icmp_id]1165        return _zone1166    def query_icmp_block(self, zone, icmp):1167        return self.__icmp_block_id(icmp) in self.get_settings(zone)["icmp_blocks"]1168    def list_icmp_blocks(self, zone):...urls.py
Source:urls.py  
1"""resolute URL Configuration2The `urlpatterns` list routes URLs to views. For more information please see:3    https://docs.djangoproject.com/en/2.0/topics/http/urls/4Examples:5Function views6    1. Add an import:  from my_app import views7    2. Add a URL to urlpatterns:  path('', views.home, name='home')8Class-based views9    1. Add an import:  from other_app.views import Home10    2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')11Including another URLconf12    1. Import the include() function: from django.urls import include, path13    2. Add a URL to urlpatterns:  path('blog/', include('blog.urls'))14"""15from django.contrib import admin16from django.urls import path17from . import views18urlpatterns = [19    path('', views.index, name='index'),20    path('power/', views.power, name='power'),21    path('last_read/', views.last_read, name='last_read'),22    path('get_last_read/', views.get_last_read, name='get_last_read'),23    path('score_card/', views.score_card, name='score_card'),24    path('voltage/', views.voltage, name='voltage'),25    path('current/', views.current, name='current'),26    path('readings/', views.readings, name='readings'),27    path('energy_readings/', views.energy_readings, name='energy_readings'),28    path('get_energy_readings/', views.get_energy_readings, name='get_energy_readings'),29    path('get_line_readings/', views.get_line_readings, name='get_line_readings'),30    path('max_demand/', views.max_demand, name='max_demand'),31    path('fetch_vals_period/', views.fetch_vals_period, name='fetch_vals_period'),32    path('fetch_device_vals/', views.fetch_vals_period_per_device, name='fetch_vals_period_per_device'),33    path('load_datalogs/', views.load_datalogs, name='load_datalogs'),34    path('load_readings/', views.load_readings, name='load_readings'),35    path('get_yesterday_today_usage/', views.get_yesterday_today_usage, name='get_yesterday_today_usage'),36    path('get_line_readings_log/', views.get_line_readings_log, name='get_line_readings_log'),37    path('get_capacity_factors/', views.get_capacity_factors, name='get_capacity_factors'),38    path('messaging/', views.messaging, name='messaging'),39    path('fetch_messages/', views.fetch_messages, name='fetch_messages'),40    path('check_new_message/', views.check_new_message, name='check_new_message'),41    path('send_message/', views.send_message, name='send_message'),42    path('simple_upload/', views.simple_upload, name='simple_upload'),43    path('upload_cdd/', views.upload_cdd, name='upload_cdd'),44    path('all_customers/', views.all_customers, name='all_customers'),45    path('view_customer/<int:id>', views.view_customer, name='view_customer'),46    path('view_profile/', views.view_profile, name='view_profile'),47    path('upload_image/', views.upload_image, name='upload_image'),48    path('update_details/', views.update_details, name='update_details'),49    path('add_user/', views.add_user, name='add_user'),50    path('edit_user/', views.edit_user, name='edit_user'),51    path('update_branch/', views.update_branch, name='update_branch'),52    path('update_device/', views.update_device, name='update_device'),53    path('create_branch/', views.create_branch, name='create_branch'),54    path('create_device/', views.create_device, name='create_device'),55    path('update_historic_scores/', views.update_historic_scores, name='update_historic_scores'),56    path('get_total_energy/', views.get_total_energy, name='get_total_energy'),57    path('get_stats/', views.get_stats, name='get_stats'),58    # path('detail_view', views.detail_view, name='detail_view'),59    # path('locationpost', views.locationpost, name='locationpost'),60    # path('incidents', views.incidents, name='incidents'),61    # path('table', views.table, name='table'),62    # path('logs', views.logs, name='logs'),63    # # path('track', views.track, name='track')64    # path('check/<slug:slug>/', views.check, name='check'),65    # path('collection_check/<int:id>/', views.collection_check, name='collection_check'),66    # path('track/<slug:slug>/', views.track, name='track'),67    # path('trail/<slug:slug>/', views.trail, name='trail'),68    # path('mapping/<slug:slug>/', views.mapping, name='mapping'),69    # path('get_lat_lng/<slug:id>', views.get_lat_lng, name='get_lat_lng'),# FOR PROFILE REALTIME MAP70    # path('get_latlng/<slug:username>', views.get_latlng, name='get_latlng'),71    # path('get_latlng_incident/<slug:username>/<slug:incident>/', views.get_latlng_incident, name='get_latlng_incident'),72    # path('post_latlng', views.post_latlng, name='post_latlng'),73    # path('check_panic', views.check_panic, name='check_panic'),74    # path('create_panic', views.create_panic, name='create_panic'),75    # path('resolve_panic_mobile', views.resolve_panic_mobile, name='resolve_panic_mobile'),76    # path('resolve_panic', views.resolve_panic, name='resolve_panic'),77    # path('recurring_gps_post', views.recurring_gps_post, name='recurring_gps_post'),78    # path('farmers', views.farmers, name='farmers'),79    # path('herdsmen', views.herdsmen, name='herdsmen'),80    # path('get_client_data', views.get_client_data, name='get_client_data'),81    # path('profile_page/<int:target_id>/<slug:is_farmer>', views.profile_page, name='profile_page')82    83    # path('test', views.test, name='test'),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
