Best Python code snippet using localstack_python
utils.py
Source:utils.py  
1from AccessControl import getSecurityManager2from AccessControl.SecurityManagement import newSecurityManager3from AccessControl.SecurityManagement import SpecialUsers4from Acquisition import aq_inner5from Acquisition import aq_parent6from binascii import hexlify7from ftw.upgrade.exceptions import CyclicDependencies8from ftw.upgrade.exceptions import UpgradeNotFound9from ftw.upgrade.jsonapi.exceptions import AbortTransactionWithStreamedResponse10from ftw.upgrade.jsonapi.exceptions import APIError11from ftw.upgrade.jsonapi.exceptions import CyclicDependenciesWrapper12from ftw.upgrade.jsonapi.exceptions import MethodNotAllowed13from ftw.upgrade.jsonapi.exceptions import MissingParam14from ftw.upgrade.jsonapi.exceptions import UnauthorizedWrapper15from ftw.upgrade.jsonapi.exceptions import UpgradeNotFoundWrapper16from ftw.upgrade.utils import get_tempfile_authentication_directory17from OFS.interfaces import IApplication18from zExceptions import Unauthorized19from zope.interface import alsoProvides20from zope.security import checkPermission21import json22import os23import re24import stat25import transaction26try:27    from plone.protect.interfaces import IDisableCSRFProtection28except ImportError:29    DISABLE_CSRF = False30else:31    DISABLE_CSRF = True32try:33    from inspect import getfullargspec34except ImportError:35    # Python2.7 compatibility36    from inspect import getargspec as getfullargspec37class ErrorHandling(object):38    """Context manager for handling API errors and responding as JSON.39    """40    exception_wrappers = {41        CyclicDependencies: CyclicDependenciesWrapper,42        UpgradeNotFound: UpgradeNotFoundWrapper,43        Unauthorized: UnauthorizedWrapper}44    def __init__(self, response):45        self.response = response46    def __enter__(self):47        return self48    def __exit__(self, _type, exc, _traceback):49        if isinstance(exc, AbortTransactionWithStreamedResponse):50            if isinstance(self.wrap_exception(exc.original_exception),51                          APIError):52                exc = exc.original_exception53            else:54                transaction.abort()55                return True56        exc = self.wrap_exception(exc)57        if not isinstance(exc, APIError):58            return59        self.response.setStatus(exc.response_code, exc.message)60        self.response.setHeader('Content-Type',61                                'application/json; charset=utf-8')62        exc.process_error(self.response)63        self.response.setBody(64            json.dumps(['ERROR', exc.message, exc.details]) + '\n')65        self.response.flush()66        return True67    def wrap_exception(self, original_exception):68        for original_type, wrapper_type in self.exception_wrappers.items():69            if isinstance(original_exception, original_type):70                return wrapper_type(original_exception)71        return original_exception72def action(method, rename_params={}):73    """Decorats an API action.74    The action is protected to only respond to one HTTP method75    and protects the action by the cmf.ManagePortal permission.76    Known API errors are written as JSON to the respond.77    """78    def wrap_action(func):79        def action_wrapper(self):80            with ErrorHandling(self.request.RESPONSE):81                if self.request.method != method:82                    raise MethodNotAllowed(method)83                perform_tempfile_authentication(self.context, self.request)84                if not checkPermission('cmf.ManagePortal', self.context):85                    raise Unauthorized()86                if DISABLE_CSRF:87                    alsoProvides(self.request, IDisableCSRFProtection)88                params = extract_action_params(89                    func, self.request, rename_params)90                return func(self, **params)91        action_wrapper.__doc__ = func.__doc__92        action_wrapper.__name__ = func.__name__93        action_wrapper.action_info = {94            'method': method,95            'rename_params': rename_params,96            'name': func.__name__,97            'doc': func.__doc__,98            'argspec': getfullargspec(func)}99        return action_wrapper100    return wrap_action101def jsonify(func):102    """Action decorator for converting response data to JSON.103    """104    def json_wrapper(self, *args, **kwargs):105        result = func(self, *args, **kwargs)106        response = self.request.RESPONSE107        if 'json' in (response.getHeader('Content-Type') or ''):108            # already converted to json, e.g. on error.109            return result110        response.setHeader('Content-Type', 'application/json; charset=utf-8')111        return json.dumps(result, indent=4) + '\n'112    json_wrapper.__doc__ = func.__doc__113    json_wrapper.__name__ = func.__name__114    json_wrapper.action_info = getattr(func, 'action_info', None)115    return json_wrapper116def extract_action_params(func, request, rename_params=None):117    rename_params = rename_params or {}118    form = request.form119    argspec = getfullargspec(func)120    required_params = get_required_args(argspec)121    for arg_name in required_params:122        if not form.get(arg_name, None):123            raise MissingParam(rename_params.get(arg_name, arg_name))124    return dict([(name, form[name]) for name in form if name in argspec.args])125def get_action_discovery_information(view):126    result = []127    for name in sorted(dir(view)):128        if name == '__call__':129            continue130        func = getattr(view, name, None)131        if not func:132            continue133        action_info = getattr(func, 'action_info', None)134        if not action_info:135            continue136        argspec = action_info['argspec']137        required_params = sorted(get_required_args(argspec))138        rename_params = action_info['rename_params']139        required_params = [rename_params.get(name, name)140                           for name in required_params]141        result.append({142                'name': action_info['name'],143                'description': re.sub(r'\s+', ' ', action_info['doc']).strip(),144                'required_params': required_params,145                'request_method': action_info['method'].upper()})146    return result147def get_required_args(argspec):148    if not argspec.defaults:149        return argspec.args[1:]150    else:151        return argspec.args[1:-len(argspec.defaults)]152def perform_tempfile_authentication(context, request):153    """When the "x-ftw.upgrade-tempfile-auth" header is set, authentication is154    initialized based on a tempfile value for verifying that the client and the155    server is on the same machine with the same user.156    When necessary, a system-upgrade user is created with Manager role and157    the security is set to this user.158    """159    if getSecurityManager().getUser() != SpecialUsers.nobody:160        return161    auth_value = request.getHeader('x-ftw.upgrade-tempfile-auth')162    if not auth_value:163        return164    validate_tempfile_authentication_header_value(auth_value)165    user = get_system_upgrade_user(context)166    newSecurityManager(request, user)167    transaction.get().setUser(user.getId(), '')168def validate_tempfile_authentication_header_value(header_value):169    if not re.match(r'^tmp\w{6,8}:\w{64}', header_value):170        raise ValueError(171            'tempfile auth: invalid x-ftw.upgrade-tempfile-auth header value.')172    filename, authhash = header_value.split(':')173    directory = get_tempfile_authentication_directory(os.getcwd())174    filepath = directory.joinpath(filename)175    if not filepath.isfile():176        raise ValueError('tempfile auth: tempfile does not exist.')177    # Verify that "others" do not have any permissions on this file.178    if filepath.stat().st_mode & stat.S_IRWXO:179        raise ValueError('tempfile auth: tempfile is accesible by "others".')180    if filepath.getsize() != 64:181        raise ValueError('tempfile auth: tempfile size is invalid.')182    with open(filepath, 'r') as authfile:183        if authfile.read() != authhash:184            raise ValueError('tempfile auth: authentication failed.')185def get_system_upgrade_user(context):186    while not IApplication.providedBy(context):187        context = aq_parent(aq_inner(context))188    acl_users = context.acl_users189    if not acl_users.getUserById('system-upgrade'):190        acl_users.userFolderAddUser(191            'system-upgrade', hexlify(os.urandom(16)), ['Manager'], None)192    return acl_users.getUserById('system-upgrade')193def parse_bool(string):...rename_columns_op.py
Source:rename_columns_op.py  
1from curation.remodeling.operations.base_op import BaseOp23RENAME_PARAMS = {4    "command": "rename_columns",5    "required_parameters": {6        "column_mapping": dict,7        "ignore_missing": bool8    },9    "optional_parameters": {}10}111213class RenameColumnsOp (BaseOp):14    """ Rename columns in a dataframe.1516    Notes: The required parameters are:17        - column_mapping (dict) The names of the columns to be removed.18        - ignore_missing (bool) If true, the names in remove_names that are not columns in df should be ignored.1920    Raises:21        KeyError if ignore_missing is false and a column name in column_mapping is not in the dataframe.2223    """2425    def __init__(self, parameters):26        super().__init__(RENAME_PARAMS["command"], RENAME_PARAMS["required_parameters"],27                         RENAME_PARAMS["optional_parameters"])28        self.check_parameters(parameters)29        self.column_mapping = parameters['column_mapping']30        if parameters['ignore_missing']:31            self.error_handling = 'ignore'32        else:33            self.error_handling = 'raise'3435    def do_op(self, dispatcher, df, name, sidecar=None):36        """ Rename columns as specified in column_mapping dictionary.3738        Args:39            dispatcher (Dispatcher) - dispatcher object for context40            df (DataFrame) - The DataFrame to be remodeled.41            name (str) - Unique identifier for the dataframe -- often the original file path.42            sidecar (Sidecar or file-like)   Only needed for HED operations4344        Returns:45            Dataframe - a new dataframe after processing.4647        Raises:48            KeyError - when ignore_missing is false and column_mapping has columns not in df.4950        """51
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
