Best Python code snippet using localstack_python
test_authorization.py
Source:test_authorization.py  
...26        # SAMPLE_V3_TOKEN has OS-TRUST:trust in it.27        token_data = test_token_provider.SAMPLE_V3_TOKEN28        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,29                                          token_data=token_data)30        auth_context = authorization.token_to_auth_context(token)31        self.assertEqual(token, auth_context['token'])32        self.assertTrue(auth_context['is_delegated_auth'])33        self.assertEqual(token_data['token']['user']['id'],34                         auth_context['user_id'])35        self.assertEqual(token_data['token']['user']['domain']['id'],36                         auth_context['user_domain_id'])37        self.assertEqual(token_data['token']['project']['id'],38                         auth_context['project_id'])39        self.assertEqual(token_data['token']['project']['domain']['id'],40                         auth_context['project_domain_id'])41        self.assertNotIn('domain_id', auth_context)42        self.assertNotIn('domain_name', auth_context)43        self.assertEqual(token_data['token']['OS-TRUST:trust']['id'],44                         auth_context['trust_id'])45        self.assertEqual(46            token_data['token']['OS-TRUST:trust']['trustor_user_id'],47            auth_context['trustor_id'])48        self.assertEqual(49            token_data['token']['OS-TRUST:trust']['trustee_user_id'],50            auth_context['trustee_id'])51        self.assertItemsEqual(52            [r['name'] for r in token_data['token']['roles']],53            auth_context['roles'])54        self.assertIsNone(auth_context['consumer_id'])55        self.assertIsNone(auth_context['access_token_id'])56        self.assertNotIn('group_ids', auth_context)57    def test_token_is_domain_scoped(self):58        # Check contents of auth_context when token is domain-scoped.59        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)60        del token_data['token']['project']61        domain_id = uuid.uuid4().hex62        domain_name = uuid.uuid4().hex63        token_data['token']['domain'] = {'id': domain_id, 'name': domain_name}64        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,65                                          token_data=token_data)66        auth_context = authorization.token_to_auth_context(token)67        self.assertNotIn('project_id', auth_context)68        self.assertNotIn('project_domain_id', auth_context)69        self.assertEqual(domain_id, auth_context['domain_id'])70        self.assertEqual(domain_name, auth_context['domain_name'])71    def test_token_is_unscoped(self):72        # Check contents of auth_context when the token is unscoped.73        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)74        del token_data['token']['project']75        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,76                                          token_data=token_data)77        auth_context = authorization.token_to_auth_context(token)78        self.assertNotIn('project_id', auth_context)79        self.assertNotIn('project_domain_id', auth_context)80        self.assertNotIn('domain_id', auth_context)81        self.assertNotIn('domain_name', auth_context)82    def test_token_is_for_federated_user(self):83        # When the token is for a federated user then group_ids is in84        # auth_context.85        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)86        group_ids = [uuid.uuid4().hex for x in range(1, 5)]87        federation_data = {'identity_provider': {'id': uuid.uuid4().hex},88                           'protocol': {'id': 'saml2'},89                           'groups': [{'id': gid} for gid in group_ids]}90        token_data['token']['user'][federation_constants.FEDERATION] = (91            federation_data)92        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,93                                          token_data=token_data)94        auth_context = authorization.token_to_auth_context(token)95        self.assertItemsEqual(group_ids, auth_context['group_ids'])96    def test_oauth_variables_set_for_oauth_token(self):97        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)98        access_token_id = uuid.uuid4().hex99        consumer_id = uuid.uuid4().hex100        token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id,101                                            'consumer_id': consumer_id}102        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,103                                          token_data=token_data)104        auth_context = authorization.token_to_auth_context(token)105        self.assertEqual(access_token_id, auth_context['access_token_id'])106        self.assertEqual(consumer_id, auth_context['consumer_id'])107    def test_oauth_variables_not_set(self):108        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)109        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,110                                          token_data=token_data)111        auth_context = authorization.token_to_auth_context(token)112        self.assertIsNone(auth_context['access_token_id'])113        self.assertIsNone(auth_context['consumer_id'])114    def test_token_is_not_KeystoneToken_raises_exception(self):115        # If the token isn't a KeystoneToken then an UnexpectedError exception116        # is raised.117        self.assertRaises(exception.UnexpectedError,118                          authorization.token_to_auth_context, {})119    def test_user_id_missing_in_token_raises_exception(self):120        # If there's no user ID in the token then an Unauthorized121        # exception is raised.122        token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)123        del token_data['token']['user']['id']124        token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,125                                          token_data=token_data)...func.py
Source:func.py  
1import datetime2import io3import json4import logging5import oci6import base647from datetime import timedelta8import requests9from fdk import response10from requests.auth import HTTPBasicAuth11oauth_apps = {}12def initContext(context):13    # This method takes elements from the Application Context and from OCI Vault to create the OAuth App Clients object.14    if (len(oauth_apps) < 2):15        logging.getLogger().info('Retriving details about the API and backend OAuth Apps')16        try:17            logging.getLogger().info('initContext: Initializing context')18            oauth_apps['idcs'] = {'introspection_endpoint': context['idcs_introspection_endpoint'], 19                                  'client_id': context['idcs_app_client_id'], 20                                  'client_secret': getSecret(context['idcs_app_client_secret_ocid'])}21            oauth_apps['oic'] = {'token_endpoint': context['back_end_token_endpoint'], 22                                  'client_id': context['back_end_app_client_id'], 23                                  'client_secret': getSecret(context['back_end_client_secret_ocid'])}24        except Exception as ex:25            logging.getLogger().error('initContext: Failed to get config or secrets')26            print("ERROR [initContext]: Failed to get the configs", ex, flush=True)27            raise28    else:29        logging.getLogger().info('OAuth Apps already stored')30        31def getSecret(ocid):32    signer = oci.auth.signers.get_resource_principals_signer()33    try:34        client = oci.secrets.SecretsClient({}, signer=signer)35        secret_content = client.get_secret_bundle(ocid).data.secret_bundle_content.content.encode('utf-8')36        decrypted_secret_content = base64.b64decode(secret_content).decode('utf-8')37    except Exception as ex:38        logging.getLogger().error("getSecret: Failed to get Secret" + ex)39        print("Error [getSecret]: failed to retrieve", ex, flush=True)40        raise41    return decrypted_secret_content42def introspectToken(access_token, introspection_endpoint, client_id, client_secret):43    # This method handles the introspection of the received auth token to IDCS.  44    payload = {'token': access_token}45    headers = {'Content-Type' : 'application/x-www-form-urlencoded;charset=UTF-8', 46               'Accept': 'application/json'}47               48    try:49        token = requests.post(introspection_endpoint, 50                              data=payload, 51                              headers=headers, 52                              auth=HTTPBasicAuth(client_id, 53                              client_secret))54    except Exception as ex:55        logging.getLogger().error("introspectToken: Failed to introspect token" + ex)56        raise57    return token.json()58def getBackEndAuthToken(token_endpoint, client_id, client_secret):59    # This method gets the token from the back-end system (oic in this case)60    payload = {'grant_type': 'client_credentials'}61    headers = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'}62    try:63        backend_token = json.loads(requests.post(token_endpoint, 64                                                 data=payload, 65                                                 headers=headers, 66                                                 auth=HTTPBasicAuth(client_id, client_secret)).text)67    except Exception as ex:68        logging.getLogger().error("getBackEndAuthToken: Failed to get oic token" + ex)69        raise70    71    return backend_token72def getAuthContext(token, client_apps):73    # This method populates the Auth Context that will be returned to the gateway.74    auth_context = {}75    # Calling IDCS to validate the token and retrieve the client info76    try:77        token_info = introspectToken(token[len('Bearer '):], client_apps['idcs']['introspection_endpoint'], client_apps['idcs']['client_id'], client_apps['idcs']['client_secret'])78    except Exception as ex:79            logging.getLogger().error("getAuthContext: Failed to introspect token" + ex)80            raise81    # If IDCS confirmed the token valid and active, we can proceed to populate the auth context82    if (token_info['active'] == True):83        auth_context['active'] = True84        auth_context['principal'] = token_info['sub']85        auth_context['scope'] = token_info['scope']86        # Retrieving the back-end Token87        backend_token = getBackEndAuthToken(client_apps['oic']['token_endpoint'], client_apps['oic']['client_id'], client_apps['oic']['client_secret'])88        89        # The maximum TTL for this auth is the lesser of the API Client Auth (IDCS) and the Gateway Client Auth (oic)90        if (datetime.datetime.fromtimestamp(token_info['exp']) < (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in']))):91            auth_context['expiresAt'] = (datetime.datetime.fromtimestamp(token_info['exp'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()92        else:93            auth_context['expiresAt'] = (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()94        # Storing the back_end_token in the context of the auth decision so we can map it to Authorization header using the request/response transformation policy95        auth_context['context'] = {'back_end_token': ('Bearer ' + str(backend_token['access_token']))}96    else:97        # API Client token is not active, so we will go ahead and respond with the wwwAuthenticate header98        auth_context['active'] = False99        auth_context['wwwAuthenticate'] = 'Bearer realm=\"identity.oraclecloud.com\"'100    return(auth_context)101def handler(ctx, data: io.BytesIO=None):102    logging.getLogger().info('Entered Handler')103    initContext(dict(ctx.Config()))104      105    auth_context = {}106    try:107        gateway_auth = json.loads(data.getvalue())108        auth_context = getAuthContext(gateway_auth['token'], oauth_apps)109        if (auth_context['active']):110            logging.getLogger().info('Authorizer returning 200...')111            return response.Response(112                ctx,113                response_data=json.dumps(auth_context),114                status_code = 200,115                headers={"Content-Type": "application/json"}116                )117        else:118            logging.getLogger().info('Authorizer returning 401...')119            return response.Response(120                ctx,121                response_data=json.dumps(str(auth_context)),122                status_code = 401,123                headers={"Content-Type": "application/json"}124                )125    except (Exception, ValueError) as ex:126        logging.getLogger().info('error parsing json payload: ' + str(ex))127        return response.Response(128            ctx,129            response_data=json.dumps(str(auth_context)),130            status_code = 401,131            headers={"Content-Type": "application/json"}...authorization.py
Source:authorization.py  
...45* ``group_ids`` (optional): list of group IDs for which the API user has46                            membership if token was for a federated user47"""48LOG = log.getLogger(__name__)49def token_to_auth_context(token):50    if not isinstance(token, token_model.KeystoneToken):51        raise exception.UnexpectedError(_('token reference must be a '52                                          'KeystoneToken type, got: %s') %53                                        type(token))54    auth_context = {'token': token,55                    'is_delegated_auth': False}56    try:57        auth_context['user_id'] = token.user_id58    except KeyError:59        LOG.warning(_LW('RBAC: Invalid user data in token'))60        raise exception.Unauthorized()61    auth_context['user_domain_id'] = token.user_domain_id62    if token.project_scoped:63        auth_context['project_id'] = token.project_id...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
