How to use rename_params method in localstack

Best Python code snippet using localstack_python

utils.py

Source:utils.py Github

copy

Full Screen

1from AccessControl import getSecurityManager2from AccessControl.SecurityManagement import newSecurityManager3from AccessControl.SecurityManagement import SpecialUsers4from Acquisition import aq_inner5from Acquisition import aq_parent6from binascii import hexlify7from ftw.upgrade.exceptions import CyclicDependencies8from ftw.upgrade.exceptions import UpgradeNotFound9from ftw.upgrade.jsonapi.exceptions import AbortTransactionWithStreamedResponse10from ftw.upgrade.jsonapi.exceptions import APIError11from ftw.upgrade.jsonapi.exceptions import CyclicDependenciesWrapper12from ftw.upgrade.jsonapi.exceptions import MethodNotAllowed13from ftw.upgrade.jsonapi.exceptions import MissingParam14from ftw.upgrade.jsonapi.exceptions import UnauthorizedWrapper15from ftw.upgrade.jsonapi.exceptions import UpgradeNotFoundWrapper16from ftw.upgrade.utils import get_tempfile_authentication_directory17from OFS.interfaces import IApplication18from zExceptions import Unauthorized19from zope.interface import alsoProvides20from zope.security import checkPermission21import json22import os23import re24import stat25import transaction26try:27 from plone.protect.interfaces import IDisableCSRFProtection28except ImportError:29 DISABLE_CSRF = False30else:31 DISABLE_CSRF = True32try:33 from inspect import getfullargspec34except ImportError:35 # Python2.7 compatibility36 from inspect import getargspec as getfullargspec37class ErrorHandling(object):38 """Context manager for handling API errors and responding as JSON.39 """40 exception_wrappers = {41 CyclicDependencies: CyclicDependenciesWrapper,42 UpgradeNotFound: UpgradeNotFoundWrapper,43 Unauthorized: UnauthorizedWrapper}44 def __init__(self, response):45 self.response = response46 def __enter__(self):47 return self48 def __exit__(self, _type, exc, _traceback):49 if isinstance(exc, AbortTransactionWithStreamedResponse):50 if isinstance(self.wrap_exception(exc.original_exception),51 APIError):52 exc = exc.original_exception53 else:54 transaction.abort()55 return True56 exc = self.wrap_exception(exc)57 if not isinstance(exc, APIError):58 return59 self.response.setStatus(exc.response_code, exc.message)60 self.response.setHeader('Content-Type',61 'application/json; charset=utf-8')62 exc.process_error(self.response)63 self.response.setBody(64 json.dumps(['ERROR', exc.message, exc.details]) + '\n')65 self.response.flush()66 return True67 def wrap_exception(self, original_exception):68 for original_type, wrapper_type in self.exception_wrappers.items():69 if isinstance(original_exception, original_type):70 return wrapper_type(original_exception)71 return original_exception72def action(method, rename_params={}):73 """Decorats an API action.74 The action is protected to only respond to one HTTP method75 and protects the action by the cmf.ManagePortal permission.76 Known API errors are written as JSON to the respond.77 """78 def wrap_action(func):79 def action_wrapper(self):80 with ErrorHandling(self.request.RESPONSE):81 if self.request.method != method:82 raise MethodNotAllowed(method)83 perform_tempfile_authentication(self.context, self.request)84 if not checkPermission('cmf.ManagePortal', self.context):85 raise Unauthorized()86 if DISABLE_CSRF:87 alsoProvides(self.request, IDisableCSRFProtection)88 params = extract_action_params(89 func, self.request, rename_params)90 return func(self, **params)91 action_wrapper.__doc__ = func.__doc__92 action_wrapper.__name__ = func.__name__93 action_wrapper.action_info = {94 'method': method,95 'rename_params': rename_params,96 'name': func.__name__,97 'doc': func.__doc__,98 'argspec': getfullargspec(func)}99 return action_wrapper100 return wrap_action101def jsonify(func):102 """Action decorator for converting response data to JSON.103 """104 def json_wrapper(self, *args, **kwargs):105 result = func(self, *args, **kwargs)106 response = self.request.RESPONSE107 if 'json' in (response.getHeader('Content-Type') or ''):108 # already converted to json, e.g. on error.109 return result110 response.setHeader('Content-Type', 'application/json; charset=utf-8')111 return json.dumps(result, indent=4) + '\n'112 json_wrapper.__doc__ = func.__doc__113 json_wrapper.__name__ = func.__name__114 json_wrapper.action_info = getattr(func, 'action_info', None)115 return json_wrapper116def extract_action_params(func, request, rename_params=None):117 rename_params = rename_params or {}118 form = request.form119 argspec = getfullargspec(func)120 required_params = get_required_args(argspec)121 for arg_name in required_params:122 if not form.get(arg_name, None):123 raise MissingParam(rename_params.get(arg_name, arg_name))124 return dict([(name, form[name]) for name in form if name in argspec.args])125def get_action_discovery_information(view):126 result = []127 for name in sorted(dir(view)):128 if name == '__call__':129 continue130 func = getattr(view, name, None)131 if not func:132 continue133 action_info = getattr(func, 'action_info', None)134 if not action_info:135 continue136 argspec = action_info['argspec']137 required_params = sorted(get_required_args(argspec))138 rename_params = action_info['rename_params']139 required_params = [rename_params.get(name, name)140 for name in required_params]141 result.append({142 'name': action_info['name'],143 'description': re.sub(r'\s+', ' ', action_info['doc']).strip(),144 'required_params': required_params,145 'request_method': action_info['method'].upper()})146 return result147def get_required_args(argspec):148 if not argspec.defaults:149 return argspec.args[1:]150 else:151 return argspec.args[1:-len(argspec.defaults)]152def perform_tempfile_authentication(context, request):153 """When the "x-ftw.upgrade-tempfile-auth" header is set, authentication is154 initialized based on a tempfile value for verifying that the client and the155 server is on the same machine with the same user.156 When necessary, a system-upgrade user is created with Manager role and157 the security is set to this user.158 """159 if getSecurityManager().getUser() != SpecialUsers.nobody:160 return161 auth_value = request.getHeader('x-ftw.upgrade-tempfile-auth')162 if not auth_value:163 return164 validate_tempfile_authentication_header_value(auth_value)165 user = get_system_upgrade_user(context)166 newSecurityManager(request, user)167 transaction.get().setUser(user.getId(), '')168def validate_tempfile_authentication_header_value(header_value):169 if not re.match(r'^tmp\w{6,8}:\w{64}', header_value):170 raise ValueError(171 'tempfile auth: invalid x-ftw.upgrade-tempfile-auth header value.')172 filename, authhash = header_value.split(':')173 directory = get_tempfile_authentication_directory(os.getcwd())174 filepath = directory.joinpath(filename)175 if not filepath.isfile():176 raise ValueError('tempfile auth: tempfile does not exist.')177 # Verify that "others" do not have any permissions on this file.178 if filepath.stat().st_mode & stat.S_IRWXO:179 raise ValueError('tempfile auth: tempfile is accesible by "others".')180 if filepath.getsize() != 64:181 raise ValueError('tempfile auth: tempfile size is invalid.')182 with open(filepath, 'r') as authfile:183 if authfile.read() != authhash:184 raise ValueError('tempfile auth: authentication failed.')185def get_system_upgrade_user(context):186 while not IApplication.providedBy(context):187 context = aq_parent(aq_inner(context))188 acl_users = context.acl_users189 if not acl_users.getUserById('system-upgrade'):190 acl_users.userFolderAddUser(191 'system-upgrade', hexlify(os.urandom(16)), ['Manager'], None)192 return acl_users.getUserById('system-upgrade')193def parse_bool(string):...

Full Screen

Full Screen

rename_columns_op.py

Source:rename_columns_op.py Github

copy

Full Screen

1from curation.remodeling.operations.base_op import BaseOp23RENAME_PARAMS = {4 "command": "rename_columns",5 "required_parameters": {6 "column_mapping": dict,7 "ignore_missing": bool8 },9 "optional_parameters": {}10}111213class RenameColumnsOp (BaseOp):14 """ Rename columns in a dataframe.1516 Notes: The required parameters are:17 - column_mapping (dict) The names of the columns to be removed.18 - ignore_missing (bool) If true, the names in remove_names that are not columns in df should be ignored.1920 Raises:21 KeyError if ignore_missing is false and a column name in column_mapping is not in the dataframe.2223 """2425 def __init__(self, parameters):26 super().__init__(RENAME_PARAMS["command"], RENAME_PARAMS["required_parameters"],27 RENAME_PARAMS["optional_parameters"])28 self.check_parameters(parameters)29 self.column_mapping = parameters['column_mapping']30 if parameters['ignore_missing']:31 self.error_handling = 'ignore'32 else:33 self.error_handling = 'raise'3435 def do_op(self, dispatcher, df, name, sidecar=None):36 """ Rename columns as specified in column_mapping dictionary.3738 Args:39 dispatcher (Dispatcher) - dispatcher object for context40 df (DataFrame) - The DataFrame to be remodeled.41 name (str) - Unique identifier for the dataframe -- often the original file path.42 sidecar (Sidecar or file-like) Only needed for HED operations4344 Returns:45 Dataframe - a new dataframe after processing.4647 Raises:48 KeyError - when ignore_missing is false and column_mapping has columns not in df.4950 """51 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful