How to use get_firewall_rule method in localstack

Best Python code snippet using localstack_python

firewall_rule.py

Source:firewall_rule.py Github

copy

Full Screen

...239 new_name)240 stdout('Firewall rule updated successfully.', ctx)241 except Exception as e:242 stderr(e, ctx)243def get_firewall_rule(ctx, gateway_name, id):244 """Get the firewall rule resource.245 It will restore sessions if expired. It will reads the client and246 creates the FirewallRule resource object.247 """248 restore_session(ctx, vdc_required=True)249 client = ctx.obj['client']250 resource = FirewallRule(client, gateway_name, id)251 return resource252@firewall.command('enable', short_help='enable firewall rule')253@click.pass_context254@click.argument('name', metavar='<name>', required=True)255@click.argument('id', metavar='<id>', required=True)256def enabled_firewall_rule(ctx, name, id):257 try:258 firewall_rule_resource = get_firewall_rule(ctx, name, id)259 firewall_rule_resource.enable_disable_firewall_rule(True)260 stdout('Firewall rule enabled successfully', ctx)261 except Exception as e:262 stderr(e, ctx)263@firewall.command('disable', short_help='disable firewall rule')264@click.pass_context265@click.argument('name', metavar='<name>', required=True)266@click.argument('id', metavar='<id>', required=True)267def disabled_firewall_rule(ctx, name, id):268 try:269 firewall_rule_resource = get_firewall_rule(ctx, name, id)270 firewall_rule_resource.enable_disable_firewall_rule(False)271 stdout('Firewall rule disabled successfully', ctx)272 except Exception as e:273 stderr(e, ctx)274@firewall.command('delete', short_help='delete firewall rule')275@click.pass_context276@click.argument('name', metavar='<name>', required=True)277@click.argument('id', metavar='<id>', required=True)278def delete_firewall_rule(ctx, name, id):279 try:280 firewall_rule_resource = get_firewall_rule(ctx, name, id)281 firewall_rule_resource.delete()282 stdout('Firewall rule deleted successfully', ctx)283 except Exception as e:284 stderr(e, ctx)285@firewall.command('info', short_help='info about firewall rule')286@click.pass_context287@click.argument('name', metavar='<name>', required=True)288@click.argument('id', metavar='<id>', required=True)289def info_firewall_rule(ctx, name, id):290 try:291 firewall_rule_resource = get_firewall_rule(ctx, name, id)292 result = firewall_rule_resource.info_firewall_rule()293 stdout(result, ctx)294 except Exception as e:295 stderr(e, ctx)296@firewall.command('list-source', short_help='list of firewall rule\'s source')297@click.pass_context298@click.argument('name', metavar='<name>', required=True)299@click.argument('id', metavar='<id>', required=True)300def list_firewall_rule_source(ctx, name, id):301 try:302 firewall_rule_resource = get_firewall_rule(ctx, name, id)303 result = firewall_rule_resource.list_firewall_rule_source_destination(304 'source')305 stdout(result, ctx)306 except Exception as e:307 stderr(e, ctx)308@firewall.command(309 'reorder', short_help='reorder firewall rule position on gateway')310@click.pass_context311@click.argument('name', metavar='<name>', required=True)312@click.argument('id', metavar='<id>', required=True)313@click.option(314 '--index',315 'new_index',316 required=True,317 metavar='<int>',318 help='new index of the firewall rule')319def update_firewall_rule_sequence(ctx, name, id, new_index):320 try:321 firewall_rule_resource = get_firewall_rule(ctx, name, id)322 firewall_rule_resource.update_firewall_rule_sequence(new_index)323 stdout('Firewall rule sequence updated successfully', ctx)324 except Exception as e:325 stderr(e, ctx)326@firewall.command(327 'delete-source',328 short_help='delete firewall rule\'s source value of a firewall rule')329@click.pass_context330@click.argument('name', metavar='<name>', required=True)331@click.argument('id', metavar='<id>', required=True)332@click.argument('source_value', metavar='<source_value>', required=True)333def delete_firewall_rule_source(ctx, name, id, source_value):334 try:335 firewall_rule_resource = get_firewall_rule(ctx, name, id)336 firewall_rule_resource.delete_firewall_rule_source_destination(337 source_value, 'source')338 stdout('Firewall rule source deleted successfully', ctx)339 except Exception as e:340 stderr(e, ctx)341@firewall.command(342 'list-destination', short_help='list of firewall rule\'s destination')343@click.pass_context344@click.argument('name', metavar='<name>', required=True)345@click.argument('id', metavar='<id>', required=True)346def list_firewall_rule_destination(ctx, name, id):347 try:348 firewall_rule_resource = get_firewall_rule(ctx, name, id)349 result = firewall_rule_resource.list_firewall_rule_source_destination(350 'destination')351 stdout(result, ctx)352 except Exception as e:353 stderr(e, ctx)354@firewall.command(355 'delete-destination',356 short_help='delete firewall rule\'s destination value of a firewall rule')357@click.pass_context358@click.argument('name', metavar='<name>', required=True)359@click.argument('id', metavar='<id>', required=True)360@click.argument(361 'destination_value', metavar='<destination_value>', required=True)362def delete_firewall_rule_destination(ctx, name, id, destination_value):363 try:364 firewall_rule_resource = get_firewall_rule(ctx, name, id)365 firewall_rule_resource.delete_firewall_rule_source_destination(366 destination_value, 'destination')367 stdout('Firewall rule destination deleted successfully', ctx)368 except Exception as e:369 stderr(e, ctx)370@firewall.command('list-service', short_help='list firewall rule\'s services')371@click.pass_context372@click.argument('name', metavar='<name>', required=True)373@click.argument('id', metavar='<id>', required=True)374def list_firewall_rule_service(ctx, name, id):375 try:376 firewall_rule_resource = get_firewall_rule(ctx, name, id)377 result = firewall_rule_resource.list_firewall_rule_service()378 stdout(result, ctx)379 except Exception as e:380 stderr(e, ctx)381@firewall.command(382 'delete-service',383 short_help='delete firewall rule\'s service of a firewall rule')384@click.pass_context385@click.argument('name', metavar='<name>', required=True)386@click.argument('id', metavar='<id>', required=True)387@click.argument('protocol', metavar='<protocol>', required=True)388def delete_firewall_rule_service(ctx, name, id, protocol):389 try:390 firewall_rule_resource = get_firewall_rule(ctx, name, id)391 firewall_rule_resource.delete_firewall_rule_service(protocol)392 stdout('Firewall rule\'s service deleted successfully', ctx)393 except Exception as e:...

Full Screen

Full Screen

gateway_firewall_service.py

Source:gateway_firewall_service.py Github

copy

Full Screen

...17 "id": int(fw_rule["ID"]),18 "type": str(fw_rule["ruleType"])19 })20 return response21 def get_firewall_rule(self, fw_rule_name):22 fw_rules = self.get_firewall_rules()['msg']23 for fw_rule in fw_rules:24 if fw_rule["name"] == fw_rule_name:25 return FirewallRule(client=self.gateway.client,26 gateway_name=self.gateway.name,27 resource_id=fw_rule["id"])28 msg = "Firewall rule {0} does not exists"29 raise EntityNotFoundException(msg.format(fw_rule_name))30 def manage_states(self, state=None):31 if state == "present":32 return self.add()33 if state == "update":34 return self.update()35 if state == "absent":36 return self.delete()37 raise Exception("Please provide a valid state for the service")38 def manage_operations(self, operation=None):39 if operation == "list":40 return self.get_firewall_rules()41 raise Exception("Please provide a valid operation for the service")42 def _update_response(self, response, msg, warnings):43 if response['msg']:44 response['msg'] = msg.format(response['msg'])45 if response['warnings']:46 response['warnings'] = warnings.format(response['warnings'])47 return response48 def _prepare_service_values(self, services):49 if services is not None:50 for service in services:51 for name, value in service.copy().items():52 service[name] = {53 value["source_port"]: value["destination_port"]54 }55 return services56 def _prepare_route_values(self, route_values):57 response = list()58 if route_values is not None:59 for route_value in route_values:60 for route, value in route_value.items():61 if value[0].lower() == 'any':62 return ['any']63 response.append("{0}:{1}".format(value[0], route))64 return response65 def add(self):66 response = dict()67 response['changed'] = False68 response['warnings'] = list()69 response['msg'] = list()70 msg = 'Firewall rule(s) {0} have been created'71 warnings = 'Firewall rule(s) {0} are already present'72 for service_param in self.service_params:73 name = service_param.get("name")74 action = service_param.get("action") or 'accept'75 firewall_type = service_param.get("type") or 'User'76 enabled = service_param.get("enabled") or True77 logging_enabled = service_param.get("logging_enabled") or False78 try:79 self.get_firewall_rule(name)80 except EntityNotFoundException:81 self.gateway.add_firewall_rule(name=name, action=action,82 type=firewall_type,83 enabled=enabled,84 logging_enabled=logging_enabled)85 try:86 self.update([service_param])87 except Exception:88 self.delete([service_param])89 raise Exception(traceback.format_exc())90 else:91 response['msg'].append(name)92 response['changed'] = True93 else:94 response['warnings'].append(name)95 return self._update_response(response, msg, warnings)96 def update(self, service_params=None):97 response = dict()98 response['changed'] = False99 response['msg'] = list()100 response['warnings'] = list()101 msg = 'Firewall rule(s) {0} have been updated'102 warnings = 'Firewall rule(s) {0} are not present'103 service_params = service_params or self.service_params104 for service_param in service_params:105 name = service_param.get("name")106 new_name = service_param.get("new_name") or name107 services = service_param.get("services") or None108 source_values = service_param.get("source_values") or None109 destination_values = service_param.get(110 "destination_values") or None111 try:112 firewall_rule = self.get_firewall_rule(name)113 services = self._prepare_service_values(services)114 source_values = self._prepare_route_values(source_values)115 destination_values = self._prepare_route_values(116 destination_values)117 if 'any' in destination_values or 'any' in source_values:118 continue119 firewall_rule.edit(source_values=source_values,120 services=services,121 destination_values=destination_values,122 new_name=new_name)123 response['msg'].append(name)124 response['changed'] = True125 except EntityNotFoundException:126 response['warnings'].append(name)127 except BadRequestException as ex:128 raise Exception(ex)129 return self._update_response(response, msg, warnings)130 def delete(self, service_params=None):131 response = dict()132 response['changed'] = False133 response['msg'] = list()134 response['warnings'] = list()135 msg = 'Firewall rule(s) {0} have been deleted'136 warnings = 'Firewall rule(s) {0} are not present'137 service_params = service_params or self.service_params138 for service_param in service_params:139 try:140 name = service_param.get("name")141 firewall_rule = self.get_firewall_rule(name)142 except EntityNotFoundException:143 response['warnings'].append(name)144 else:145 firewall_rule.delete()146 response['msg'].append(name)147 response['changed'] = True...

Full Screen

Full Screen

fwaas_mixin.py

Source:fwaas_mixin.py Github

copy

Full Screen

...44 if not firewall_acl:45 LOG.warning('could not fetch the firewall_acl matching '46 'the filter "%s"', vspk_filter)47 return firewall_acl48 def get_firewall_rule(self, ent=None, vspk_filter=None,49 by_fw_rule_id=None):50 """get a firewall rule.51 @params: enterprise object52 vspk_filter following vspk filter structure53 @return: get_firewall_rule object54 @Example:55 self.vsd.get_firewall_rule(ent=ent1,56 vspk_filter='externalID == "{}"'.format(ext_id))57 """58 if vspk_filter:59 if ent:60 if not isinstance(ent, self.vspk_helper.vspk.NUEnterprise):61 LOG.error('a enterprise is required')62 return None63 else:64 ent = self.vspk_helper.get_default_enterprise()65 firewall_rule = ent.firewall_rules.get_first(filter=vspk_filter)66 elif by_fw_rule_id:67 firewall_rule = self.get_firewall_rule(68 ent, self.vspk_helper.get_external_id_filter(by_fw_rule_id))69 else:70 LOG.error('a qualifier is required')71 return None72 if not firewall_rule:73 LOG.warning('could not fetch the firewall_rule matching '74 'the filter "%s"', vspk_filter)75 return firewall_rule76 def get_firewall_acls(self, ent=None, vspk_filter=None):77 return VspkHelper.get_all(78 parent=self.vspk_helper.get_default_enterprise()79 if ent is None else ent,80 filter=vspk_filter,81 fetcher_str="firewall_acls")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful