Best Python code snippet using localstack_python
firewall_rule.py
Source:firewall_rule.py  
...239                      new_name)240        stdout('Firewall rule updated successfully.', ctx)241    except Exception as e:242        stderr(e, ctx)243def get_firewall_rule(ctx, gateway_name, id):244    """Get the firewall rule resource.245    It will restore sessions if expired. It will reads the client and246    creates the FirewallRule resource object.247    """248    restore_session(ctx, vdc_required=True)249    client = ctx.obj['client']250    resource = FirewallRule(client, gateway_name, id)251    return resource252@firewall.command('enable', short_help='enable firewall rule')253@click.pass_context254@click.argument('name', metavar='<name>', required=True)255@click.argument('id', metavar='<id>', required=True)256def enabled_firewall_rule(ctx, name, id):257    try:258        firewall_rule_resource = get_firewall_rule(ctx, name, id)259        firewall_rule_resource.enable_disable_firewall_rule(True)260        stdout('Firewall rule enabled successfully', ctx)261    except Exception as e:262        stderr(e, ctx)263@firewall.command('disable', short_help='disable firewall rule')264@click.pass_context265@click.argument('name', metavar='<name>', required=True)266@click.argument('id', metavar='<id>', required=True)267def disabled_firewall_rule(ctx, name, id):268    try:269        firewall_rule_resource = get_firewall_rule(ctx, name, id)270        firewall_rule_resource.enable_disable_firewall_rule(False)271        stdout('Firewall rule disabled successfully', ctx)272    except Exception as e:273        stderr(e, ctx)274@firewall.command('delete', short_help='delete firewall rule')275@click.pass_context276@click.argument('name', metavar='<name>', required=True)277@click.argument('id', metavar='<id>', required=True)278def delete_firewall_rule(ctx, name, id):279    try:280        firewall_rule_resource = get_firewall_rule(ctx, name, id)281        firewall_rule_resource.delete()282        stdout('Firewall rule deleted successfully', ctx)283    except Exception as e:284        stderr(e, ctx)285@firewall.command('info', short_help='info about firewall rule')286@click.pass_context287@click.argument('name', metavar='<name>', required=True)288@click.argument('id', metavar='<id>', required=True)289def info_firewall_rule(ctx, name, id):290    try:291        firewall_rule_resource = get_firewall_rule(ctx, name, id)292        result = firewall_rule_resource.info_firewall_rule()293        stdout(result, ctx)294    except Exception as e:295        stderr(e, ctx)296@firewall.command('list-source', short_help='list of firewall rule\'s source')297@click.pass_context298@click.argument('name', metavar='<name>', required=True)299@click.argument('id', metavar='<id>', required=True)300def list_firewall_rule_source(ctx, name, id):301    try:302        firewall_rule_resource = get_firewall_rule(ctx, name, id)303        result = firewall_rule_resource.list_firewall_rule_source_destination(304            'source')305        stdout(result, ctx)306    except Exception as e:307        stderr(e, ctx)308@firewall.command(309    'reorder', short_help='reorder firewall rule position on gateway')310@click.pass_context311@click.argument('name', metavar='<name>', required=True)312@click.argument('id', metavar='<id>', required=True)313@click.option(314    '--index',315    'new_index',316    required=True,317    metavar='<int>',318    help='new index of the firewall rule')319def update_firewall_rule_sequence(ctx, name, id, new_index):320    try:321        firewall_rule_resource = get_firewall_rule(ctx, name, id)322        firewall_rule_resource.update_firewall_rule_sequence(new_index)323        stdout('Firewall rule sequence updated successfully', ctx)324    except Exception as e:325        stderr(e, ctx)326@firewall.command(327    'delete-source',328    short_help='delete firewall rule\'s source value of a firewall rule')329@click.pass_context330@click.argument('name', metavar='<name>', required=True)331@click.argument('id', metavar='<id>', required=True)332@click.argument('source_value', metavar='<source_value>', required=True)333def delete_firewall_rule_source(ctx, name, id, source_value):334    try:335        firewall_rule_resource = get_firewall_rule(ctx, name, id)336        firewall_rule_resource.delete_firewall_rule_source_destination(337            source_value, 'source')338        stdout('Firewall rule source deleted successfully', ctx)339    except Exception as e:340        stderr(e, ctx)341@firewall.command(342    'list-destination', short_help='list of firewall rule\'s destination')343@click.pass_context344@click.argument('name', metavar='<name>', required=True)345@click.argument('id', metavar='<id>', required=True)346def list_firewall_rule_destination(ctx, name, id):347    try:348        firewall_rule_resource = get_firewall_rule(ctx, name, id)349        result = firewall_rule_resource.list_firewall_rule_source_destination(350            'destination')351        stdout(result, ctx)352    except Exception as e:353        stderr(e, ctx)354@firewall.command(355    'delete-destination',356    short_help='delete firewall rule\'s destination value of a firewall rule')357@click.pass_context358@click.argument('name', metavar='<name>', required=True)359@click.argument('id', metavar='<id>', required=True)360@click.argument(361    'destination_value', metavar='<destination_value>', required=True)362def delete_firewall_rule_destination(ctx, name, id, destination_value):363    try:364        firewall_rule_resource = get_firewall_rule(ctx, name, id)365        firewall_rule_resource.delete_firewall_rule_source_destination(366            destination_value, 'destination')367        stdout('Firewall rule destination deleted successfully', ctx)368    except Exception as e:369        stderr(e, ctx)370@firewall.command('list-service', short_help='list firewall rule\'s services')371@click.pass_context372@click.argument('name', metavar='<name>', required=True)373@click.argument('id', metavar='<id>', required=True)374def list_firewall_rule_service(ctx, name, id):375    try:376        firewall_rule_resource = get_firewall_rule(ctx, name, id)377        result = firewall_rule_resource.list_firewall_rule_service()378        stdout(result, ctx)379    except Exception as e:380        stderr(e, ctx)381@firewall.command(382    'delete-service',383    short_help='delete firewall rule\'s service of a firewall rule')384@click.pass_context385@click.argument('name', metavar='<name>', required=True)386@click.argument('id', metavar='<id>', required=True)387@click.argument('protocol', metavar='<protocol>', required=True)388def delete_firewall_rule_service(ctx, name, id, protocol):389    try:390        firewall_rule_resource = get_firewall_rule(ctx, name, id)391        firewall_rule_resource.delete_firewall_rule_service(protocol)392        stdout('Firewall rule\'s service deleted successfully', ctx)393    except Exception as e:...gateway_firewall_service.py
Source:gateway_firewall_service.py  
...17                "id": int(fw_rule["ID"]),18                "type": str(fw_rule["ruleType"])19            })20        return response21    def get_firewall_rule(self, fw_rule_name):22        fw_rules = self.get_firewall_rules()['msg']23        for fw_rule in fw_rules:24            if fw_rule["name"] == fw_rule_name:25                return FirewallRule(client=self.gateway.client,26                                    gateway_name=self.gateway.name,27                                    resource_id=fw_rule["id"])28        msg = "Firewall rule {0} does not exists"29        raise EntityNotFoundException(msg.format(fw_rule_name))30    def manage_states(self, state=None):31        if state == "present":32            return self.add()33        if state == "update":34            return self.update()35        if state == "absent":36            return self.delete()37        raise Exception("Please provide a valid state for the service")38    def manage_operations(self, operation=None):39        if operation == "list":40            return self.get_firewall_rules()41        raise Exception("Please provide a valid operation for the service")42    def _update_response(self, response, msg, warnings):43        if response['msg']:44            response['msg'] = msg.format(response['msg'])45        if response['warnings']:46            response['warnings'] = warnings.format(response['warnings'])47        return response48    def _prepare_service_values(self, services):49        if services is not None:50            for service in services:51                for name, value in service.copy().items():52                    service[name] = {53                        value["source_port"]: value["destination_port"]54                    }55        return services56    def _prepare_route_values(self, route_values):57        response = list()58        if route_values is not None:59            for route_value in route_values:60                for route, value in route_value.items():61                    if value[0].lower() == 'any':62                        return ['any']63                    response.append("{0}:{1}".format(value[0], route))64        return response65    def add(self):66        response = dict()67        response['changed'] = False68        response['warnings'] = list()69        response['msg'] = list()70        msg = 'Firewall rule(s) {0} have been created'71        warnings = 'Firewall rule(s) {0} are already present'72        for service_param in self.service_params:73            name = service_param.get("name")74            action = service_param.get("action") or 'accept'75            firewall_type = service_param.get("type") or 'User'76            enabled = service_param.get("enabled") or True77            logging_enabled = service_param.get("logging_enabled") or False78            try:79                self.get_firewall_rule(name)80            except EntityNotFoundException:81                self.gateway.add_firewall_rule(name=name, action=action,82                                               type=firewall_type,83                                               enabled=enabled,84                                               logging_enabled=logging_enabled)85                try:86                    self.update([service_param])87                except Exception:88                    self.delete([service_param])89                    raise Exception(traceback.format_exc())90                else:91                    response['msg'].append(name)92                    response['changed'] = True93            else:94                response['warnings'].append(name)95        return self._update_response(response, msg, warnings)96    def update(self, service_params=None):97        response = dict()98        response['changed'] = False99        response['msg'] = list()100        response['warnings'] = list()101        msg = 'Firewall rule(s) {0} have been updated'102        warnings = 'Firewall rule(s) {0} are not present'103        service_params = service_params or self.service_params104        for service_param in service_params:105            name = service_param.get("name")106            new_name = service_param.get("new_name") or name107            services = service_param.get("services") or None108            source_values = service_param.get("source_values") or None109            destination_values = service_param.get(110                "destination_values") or None111            try:112                firewall_rule = self.get_firewall_rule(name)113                services = self._prepare_service_values(services)114                source_values = self._prepare_route_values(source_values)115                destination_values = self._prepare_route_values(116                    destination_values)117                if 'any' in destination_values or 'any' in source_values:118                    continue119                firewall_rule.edit(source_values=source_values,120                                   services=services,121                                   destination_values=destination_values,122                                   new_name=new_name)123                response['msg'].append(name)124                response['changed'] = True125            except EntityNotFoundException:126                response['warnings'].append(name)127            except BadRequestException as ex:128                raise Exception(ex)129        return self._update_response(response, msg, warnings)130    def delete(self, service_params=None):131        response = dict()132        response['changed'] = False133        response['msg'] = list()134        response['warnings'] = list()135        msg = 'Firewall rule(s) {0} have been deleted'136        warnings = 'Firewall rule(s) {0} are not present'137        service_params = service_params or self.service_params138        for service_param in service_params:139            try:140                name = service_param.get("name")141                firewall_rule = self.get_firewall_rule(name)142            except EntityNotFoundException:143                response['warnings'].append(name)144            else:145                firewall_rule.delete()146                response['msg'].append(name)147                response['changed'] = True...fwaas_mixin.py
Source:fwaas_mixin.py  
...44        if not firewall_acl:45            LOG.warning('could not fetch the firewall_acl matching '46                        'the filter "%s"', vspk_filter)47        return firewall_acl48    def get_firewall_rule(self, ent=None, vspk_filter=None,49                          by_fw_rule_id=None):50        """get a firewall rule.51        @params: enterprise object52                 vspk_filter following vspk filter structure53        @return: get_firewall_rule object54        @Example:55        self.vsd.get_firewall_rule(ent=ent1,56            vspk_filter='externalID == "{}"'.format(ext_id))57        """58        if vspk_filter:59            if ent:60                if not isinstance(ent, self.vspk_helper.vspk.NUEnterprise):61                    LOG.error('a enterprise is required')62                    return None63            else:64                ent = self.vspk_helper.get_default_enterprise()65            firewall_rule = ent.firewall_rules.get_first(filter=vspk_filter)66        elif by_fw_rule_id:67            firewall_rule = self.get_firewall_rule(68                ent, self.vspk_helper.get_external_id_filter(by_fw_rule_id))69        else:70            LOG.error('a qualifier is required')71            return None72        if not firewall_rule:73            LOG.warning('could not fetch the firewall_rule matching '74                        'the filter "%s"', vspk_filter)75        return firewall_rule76    def get_firewall_acls(self, ent=None, vspk_filter=None):77        return VspkHelper.get_all(78            parent=self.vspk_helper.get_default_enterprise()79            if ent is None else ent,80            filter=vspk_filter,81            fetcher_str="firewall_acls")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
