How to use update_firewall_config method in localstack

Best Python code snippet using localstack_python

_settings4.py

Source:_settings4.py Github

copy

Full Screen

1# Copyright (c) 2013 Riverbed Technology, Inc.2#3# This software is licensed under the terms and conditions of the 4# MIT License set forth at:5# https://github.com/riverbed/flyscript/blob/master/LICENSE ("License"). 6# This software is distributed "AS IS" as set forth in the License.7import functools8import warnings9from rvbd.common.jsondict import JsonDict10import json11import copy12import time13def getted(f):14 @functools.wraps(f)15 def wrapper(self, *args, **kwds):16 if self.data is None:17 raise LookupError('You have to get the configuration first via the get method')18 return f(self, *args, **kwds)19 return wrapper20 21class BasicSettingsFunctionality(object):22 """This basic mixin is used as a base for all the settings related classes.23 It ensures that the following interface is implemented for all the settings:24 get()25 save()26 cancel()27 download()28 load()29 """30 def __init__(self, api):31 self.data = None32 self._api = api33 def _get(self, f, force=False):34 """Gets the configuration calling the f function35 """36 if self.data is None or force is True:37 self.data = f()38 return copy.deepcopy(self.data)39 def get(self, force=False):40 """Gets the configuration from the server41 """42 return self._get(self._api.get, force)43 @getted44 def _save(self, f):45 """Saves the configuration using the function f"""46 f(self.data)47 self.get(force=True)48 @getted49 def save(self):50 """Save configuration to the server51 """52 self._save(self._api.update)53 @getted54 def cancel(self):55 """Cancel pending changes to the configuration56 and reloads the configuration from server57 """58 return self.get(force=True)59 @getted60 def download(self, path):61 """Download configuration to path62 path must be a complete, including a filename63 """64 data = self.get()65 with open(path, 'w') as f:66 f.write(json.dumps(data))67 def load(self, path_or_obj, save=True):68 """Load the configuration from a path or dict69 `path_or_obj` is or a string representing a path or a dict/list representing the70 configuration71 `save` is a flag to automatically save to the server after load,72 default to True73 """74 if isinstance(path_or_obj, basestring):75 with open(path_or_obj, 'r') as f:76 self.data = json.loads(f.read())77 elif isinstance(path_or_obj, dict):78 self.data = path_or_obj79 elif isinstance(path_or_obj, list):80 self.data = path_or_obj81 else:82 raise ValueError('path_or_obj muth be a filepath, a dict or a list')83 if save is True:84 self.save()85class NoBulk(object):86 """Mixin class to force get of new configuration in not bulk update capable87 settings88 This basically overrides the save method such that it doesn't perform a89 bulk update on the resource but fetches the new data from the server only90 """91 @getted92 def save(self):93 #this mimics settings behaviour94 #for all the resources that do not allow bulk update95 self.get(force=True)96class Basic(BasicSettingsFunctionality):97 """Wrapper class around basic system settings."""98 #definining get/save here in order to do not touch _api4.py99 def get(self, force=False):100 """Get configuration from the server101 """102 return self._get(self._api.get_basic)103 @getted104 def save(self):105 self._save(self._api.update_basic)106class Auth(BasicSettingsFunctionality):107 """Wrapper class around authentication settings."""108 def get(self, force=False):109 """Get configuration from the server110 """111 return self._get(self._api.get_auth)112 @getted113 def save(self):114 self._save(self._api.update_auth)115 116class Audit(BasicSettingsFunctionality):117 """Wrapper class around audit configuration."""118 def get(self, force=False):119 """Get configuration from the server120 """121 return self._get(self._api.get_audit)122 @getted123 def save(self):124 self._save(self._api.update_audit)125 126class Licenses(NoBulk, BasicSettingsFunctionality):127 """Wrapper class around license configuration."""128 @getted129 def save(self):130 NoBulk.save(self)131 warnings.warn('Reboot of shark is needed to apply the new configuration')132 133 @getted134 def add(self, license_keys):135 #the add wants a list of keys while the136 #delete wants a single key137 self._api.add_license([license_keys])138 139 @getted140 def remove(self, key):141 self._api.delete_license(key)142 143 @getted144 def clear(self):145 for lic in self.data:146 self._api.delete_license(lic['key'])147 @getted148 def status(self):149 return self._api.get_status()150class Firewall(BasicSettingsFunctionality):151 """Allows to get the current configuration of the firewall and 152 set a new one."""153 def get(self, force=False):154 """Get configuration from the server155 """156 return self._get(self._api.get_firewall_config)157 @getted158 def save(self):159 self._save(self._api.update_firewall_config)160class Certificates(NoBulk, BasicSettingsFunctionality):161 '''Wrapper class around the certificates configuration'''162 def _gen_cert_configuration(self, *args, **kwargs):163 return {164 'issued_to':{165 'country': kwargs.get('country') or 'US',166 'email': kwargs.get('email') or '',167 'locality': kwargs.get('locality') or 'San Francisco',168 'organization': kwargs.get('organization') or 'Riverbed Technology',169 'organization_unit': kwargs.get('organization_unit') or '',170 'state': kwargs.get('state') or 'CA'171 },172 'validity':{173 'days': kwargs.get('days') or 365174 }175 }176 @getted177 def save(self):178 NoBulk.save(self)179 warnings.warn('Reboot of shark is needed to apply the new configuration')180 181 @getted182 def use_profiler_export_certificate_for_web(self):183 """Copies profiler export certificate and use it for webui"""184 self._api.copy_profiler_export_certificate()185 186 @getted187 def set_certificate_for_web(self, cert):188 """Given a certificate in PEM format, uploads to the server and189 sets as webui certificate.190 The PEM certificate must contain both private key and CA-signed public certificate"""191 self._api.update_web_certificate({'pem':cert})192 @getted193 def generate_new_certificate_for_web(self, country=None, email=None, locality=None,194 organization=None, organization_unit=None,195 state=None, days=None):196 """Generates a new certificate for the webui"""197 kwargs = locals()198 kwargs.pop('self')199 self._api.generate_web_certificate(self._gen_cert_configuration(**kwargs))200 @getted201 def set_certificate_for_profiler_export(self, cert):202 """Give a certificate in PEM format, uploads to the server and sets203 as profiler export certificate204 The PEM certificate must contain both private key and CA-signed public certificate"""205 self._api.update_profiler_export_certificate({'pem': cert})206 @getted207 def generate_new_certificate_for_profiler_export(self, country=None, email=None,208 locality=None,organization=None,209 organization_unit=None, state=None,210 days=None):211 """Generates a new certificate for profiler export"""212 kwargs = locals()213 kwargs.pop('self')214 self._api.generate_profiler_export_certificate(self._gen_cert_configuration(**kwargs))215 @getted216 def use_web_interface_certificate_for_profiler_export(self):217 """Copies webui certificate and use it for profiler export"""218 self._api.copy_web_certificate()219 @getted220 def add_profiler_trusted_certificate(self, name, cert):221 """Adds the given PEM certificate to the list of trusted certificates222 under the given name"""223 self._api.add_trusted_profiler_certificate({224 'id': name,225 'pem': cert226 })227 @getted228 def remove_profiler_trusted_certificate(self, name):229 """Removes the name of a PEM certificate that is trusted, removes from the list of230 trusted certificates"""231 self._api.delete_trusted_profiler_certificate(name)232 233class ProfilerExport(BasicSettingsFunctionality):234 """Wrapper class around authentication settings. """235 def get(self, force=False):236 """Get configuration from the server237 """238 return self._get(self._api.get_profiler_export)239 @getted240 def save(self):241 self._save(self._api.update_profiler_export)242class CorsDomain(BasicSettingsFunctionality):243 def get(self, force=False):244 """Get configuration from the server245 """246 return self._get(self._api.get_cors_domains)247 @getted248 def save(self):249 self._save(self._api.update_cors_domains)250class Users(NoBulk, BasicSettingsFunctionality):251 @getted252 def add(self, username, password, groups=[], can_be_locked=False):253 """Adds a user to the Shark254 `username` is a string representing the username255 `groups` is the group the user should be added in.256 Administrators is the administrators group. Add user to that group257 to make the user with administator privileges.258 `can_be_locked` is a boolean representing if the user can be locked out259 from the system or not260 """261 self._api.add({'name': username,262 'password': password,263 'groups': groups,264 'can_be_locked': can_be_locked265 })266 @getted267 def delete(self, username):268 """Delete user from the system269 `username` is the username of the user to be deleted270 """271 self._api.delete(username)272 @getted273 def change_password(self, username, password):274 """Change password of an user275 """276 self._api.update(username, {'existing_password': '',277 'new_password': password})278class Groups(NoBulk, BasicSettingsFunctionality):279 @getted280 def add(self, name, description='', capabilities=[]):281 """Adds a new group to the system282 `name` is the name of the group283 `description` is the description of the group284 `capabilities` is a list of permissions the group has.285 They can be:286 CAPABILITY_ADMINISTRATOR,287 CAPABILITY_APPLY_VIEWS_ON_FILES,288 CAPABILITY_APPLY_VIEWS_ON_INTERFACES,289 CAPABILITY_SHARE_VIEWS,290 CAPABILITY_CREATE_FILES,291 CAPABILITY_IMPORT_FILES,292 CAPABILITY_EXPORT_FILES,293 CAPABILITY_CREATE_JOBS,294 CAPABILITY_SCHEDULE_WATCHES,295 CAPABILITY_ACCESS_PROBE_FILES296 """297 self._api.add({'name': name,298 'description': description,299 'capabilities': capabilities})300 @getted301 def delete(self, name):302 """Removes group from the groups in the Shark303 `name` is the name of the group304 """305 self._api.delete(name)306class Update(NoBulk, BasicSettingsFunctionality):307 def load_iso_from_url(self, url):308 self._api.load_iso_from_url({'url':url})309 def upload_iso(self, f):310 self._api.upload_iso(f)311 @getted312 def delete_iso(self):313 self._api.delete_iso({'state': 'NEUTRAL', 'reset': True})314 @getted315 def update(self):316 if self.data['init_id'] is not '': 317 res = self._api.update({'init_id': self.data['init_id'],318 'state': 'RUNNING'})319 else:320 msg = ('Server does not have any iso image loaded for upload. '321 'Upload an iso first and save the configuration to proceed.')322 raise SystemError(msg)323 while res['state'] == 'RUNNING':324 time.sleep(5)325 res = self.get(force=True)326 if res['state'] != 'NEUTRAL':327 msg = 'Server returned error while update: %s' % res['comment']328 raise SystemError(msg)329 return res330class Storage(NoBulk, BasicSettingsFunctionality):331 def reinitialize(self, wait=True):332 """Reinitializes the packet storage333 If `wait` is True it will wait for the packet storage to be back again334 before returning335 WARNING: This operation will lose all packets in every job336 """337 self._api.reinitialize()338 res = self.get(force=True)339 while res['state'] == 'INITIALIZING':340 time.sleep(5)341 res = self.get(force=True)342 if res['state'] != 'OK':343 raise SystemError('Server returned error while reinitializing packet storage')344 return res345 def format(self, percentage_reserved_space=0):346 """Formats the packet storage347 `percentage_reserved_space` is the percentage of disk reserved starting from the348 outher boundaries of the disk.349 Since I/O operations at the farmost parts of the disk have higher latency this is350 often used to increase performances of the packet recorder.351 `percentage_reserved_space` can be any value from 0 (default) to 95.352 WARNING: This operation will lose all packets in every job353 """354 assert percentage_reserved_space >=0 and percentage_reserved_space < 96355 self._api.format({'reserved_space': percentage_reserved_space})356class NotImplementedSetting(object):357 def __init__(self, msg=''):358 self.msg = msg359 def get(self, force=True):360 raise NotImplementedError('This setting is not available for this version of Shark'+self.msg)361class Settings4(object):362 """Interface to various configuration settings on the shark appliance."""363 def __init__(self, shark):364 super(Settings4, self).__init__()365 self.shark = shark366 self.auth = Auth(shark.api.settings)367 self.audit = Audit(shark.api.settings)368 self.basic = Basic(shark.api.settings)369 self.licenses = Licenses(shark.api.licenses)370 self.firewall = Firewall(shark.api.settings)371 self.certificates = Certificates(shark.api.certificates)372 self.profiler_export = ProfilerExport(shark.api.settings)373 self.cors_domain = CorsDomain(shark.api.settings)374 self.users = Users(shark.api.users)375 self.groups = Groups(shark.api.groups)376 self.update = Update(shark.api.update)377 self.storage = Storage(shark.api.storage)378 # For the raw text handlers there's nothing that the379 # high-level API needs to add or hide380 self.get_raw = shark.api.settings.get_raw381 self.update_raw = shark.api.settings.update_raw382 self.reset_raw = shark.api.settings.reset_raw383 #these are now DPI in Shark API 5.0384 self.get_protocol_groups = shark.api.settings.get_protocol_groups385 self.update_protocol_groups = shark.api.settings.update_protocol_groups386 self.get_protocol_names = shark.api.settings.get_protocol_names387 self.update_protocol_names = shark.api.settings.update_protocol_names388 #these have been implemented from API >= 5.0 389 self.alerts = NotImplementedSetting()...

Full Screen

Full Screen

firewallrules.py

Source:firewallrules.py Github

copy

Full Screen

...143 constants.NETWORK_TYPE_OAM)144 oam_address_pool = pecan.request.dbapi.address_pool_get(145 oam_network.pool_uuid)146 try:147 firewall_sig = pecan.request.rpcapi.update_firewall_config(148 pecan.request.context, oam_address_pool.family, contents)149 # push the updated firewall_sig into db150 sp_firewallrules = pecan.request.dbapi.service_parameter_get_one(151 service=constants.SERVICE_TYPE_PLATFORM,152 section=constants.SERVICE_PARAM_SECTION_PLATFORM_SYSINV,153 name=constants.SERVICE_PARAM_NAME_SYSINV_FIREWALL_RULES_ID)154 sp_firewallrules = pecan.request.dbapi.service_parameter_update(155 sp_firewallrules.uuid,156 {'value': firewall_sig, 'personality': constants.CONTROLLER})157 sp_firewallrules_dict = firewallrules_as_dict(sp_firewallrules)158 LOG.info("import_firewallrules sp_firewallrules={}".format(159 sp_firewallrules_dict))160 except Exception as e:161 return dict(success="", error=e.value)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful