How to use auth_header method in webdriver_manager

Best Python code snippet using webdriver_manager

async_splunk_client.py

Source:async_splunk_client.py Github

copy

Full Screen

1"""2Copyright (C) 2009-2021 Splunk Inc. All Rights Reserved.3Module providing client for making asynchronous requests to Splunk Core4"""5import json6from http import HTTPStatus7from spacebridgeapp.rest.clients.async_non_ssl_client import AsyncNonSslClient8from spacebridgeapp.request.request_processor import JWTAuthHeader9from spacebridgeapp.util import constants10from spacebridgeapp.util.string_utils import append_path_to_uri11from spacebridgeapp.logging import setup_logging12import urllib.parse as urllib13LOGGER = setup_logging(constants.SPACEBRIDGE_APP_NAME + "_async_splunk_client.log", "async_splunk_client")14HISTORY = '/history'15DISPATCH = '/dispatch'16class AsyncSplunkClient(AsyncNonSslClient):17 """18 Client for handling asynchronous requests to Splunk API19 """20 def __init__(self, uri):21 """22 :param uri: string representing uri to make request to23 """24 self.uri = uri25 super(AsyncSplunkClient, self).__init__()26 def async_get_app_list_request(self, auth_header, params=None):27 """28 Make async request to Splunk /apps/local api29 :param auth_header: Value for the Authorization header30 :param params: [tuples]31 :return: async request object32 """33 uri = self.get_app_list_uri()34 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)35 def get_app_list_uri(self):36 """37 Construct Splunk /apps/local api to retrieve app list38 https://docs.splunk.com/Documentation/Splunk/7.2.5/RESTREF/RESTapps#apps.2Flocal39 :return:40 """41 return self.get_rest_endpoint_uri('apps/local')42 def async_get_dashboard_list_request(self, auth_header, owner="-", app_name="-", params=None):43 """44 Make async request to Splunk data/ui/views api45 :param auth_header: Value for the Authorization header46 :param owner: [string]47 :param app_name: [string]48 :param params: [tuples]49 :return: async request object50 """51 uri = self.get_dashboard_list_uri(owner, app_name)52 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)53 async def async_get_all_users(self, auth_header):54 uri = '%sservices/authentication/users' % self.uri55 response = await self.async_get_request(uri, auth_header=auth_header, params={'output_mode': 'json'})56 response_code = HTTPStatus.OK57 users = []58 if response.code == HTTPStatus.OK:59 parsed = await response.json()60 users = list(map((lambda x: x['name']), parsed['entry']))61 else:62 response_code = response.code63 result_tuple = (response_code, users)64 return result_tuple65 async def async_get_sign_credentials(self, auth_header):66 """67 Fetch public and private keys necessary for signing messages. Needed for sending push notifications which68 don't have access to a system auth token to fetch the keys from passwords.conf69 """70 uri = '{}services/ssg/cloudgateway/sign_credentials'.format(self.uri)71 response = await self.async_get_request(uri, auth_header=auth_header, params={'output_mode': 'json'})72 response_code = HTTPStatus.OK73 if response.code == HTTPStatus.OK:74 parsed = await response.json()75 else:76 response_code = response.code77 parsed = {}78 result_tuple = (response_code, parsed)79 return result_tuple80 async def async_get_users_roles_mapping(self, auth_header):81 """82 Returns a map of all Splunk users viewable using the permissions of the supplied authtoken to a list of the83 roles the user belongs to84 """85 uri = '{}services/authentication/users'.format(self.uri)86 params = {87 'count': 0,88 'output_mode': 'json',89 }90 response_code = HTTPStatus.OK91 user_role_mapping = {}92 response = await self.async_get_request(uri, auth_header=auth_header, params=params)93 if response.code == HTTPStatus.OK:94 parsed = await response.json()95 user_role_mapping = {entry['name']: entry["content"]['roles'] for entry in parsed["entry"]}96 else:97 response_code = response.code98 result_tuple = (response_code, user_role_mapping)99 return result_tuple100 def async_get_viewable_roles(self, auth_header):101 uri = '{}services/authorization/roles?output_mode=json'.format(self.uri)102 return self.async_get_request(uri, auth_header=auth_header)103 def async_get_role(self, auth_header, role_name):104 uri = '{splunkd_uri}services/authorization/roles/{role_name}?output_mode=json'.format(105 splunkd_uri=self.uri, role_name=role_name)106 return self.async_get_request(uri=uri, auth_header=auth_header)107 def get_dashboard_list_uri(self, owner, app_name):108 """109 Construct Splunk data/ui/views api to retrieve dashboard list110 http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTknowledge#data.2Fui.2Fviews111 :param owner:112 :param app_name:113 :return:114 """115 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/data/ui/views')116 def async_get_dashboard_request(self, auth_header, owner="-", app_name="search", dashboard_name=None,117 params=None):118 """119 Make async request to Splunk data/ui/views/[dashboard_name]120 :param owner:121 :param app_name:122 :param dashboard_name:123 :param auth_header: Value for the Authorization header124 :param params:125 :return:126 """127 uri = self.get_dashboard_uri(owner, app_name, dashboard_name)128 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)129 def get_dashboard_uri(self, owner, app_name, dashboard_name):130 """131 Construct Splunk data/ui/views/[dashboard_name] api to retrieve dashboard132 http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTknowledge#data.2Fui.2Fviews.2F.7Bname.7D133 :param owner:134 :param app_name:135 :param dashboard_name:136 :return:137 """138 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/data/ui/views/{dashboard_name}')139 def async_ar_workspace_get_request(self, auth_header, dashboard_name=''):140 """141 Make async get request to Splunk services/kvstore/ar_workspace rest API142 """143 uri = self.get_ar_workspace_uri()144 params = {'dashboard_id': dashboard_name}145 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)146 def async_ar_workspace_set_request(self, auth_header, dashboard_name='', workspace_data=''):147 """148 Make async post request to Splunk services/kvstore/ar_workspace rest API149 """150 uri = self.get_ar_workspace_uri()151 params = {'dashboard_id': dashboard_name}152 return self.async_post_request(uri=uri, params=params, data=workspace_data, auth_header=auth_header)153 def get_ar_workspace_uri(self):154 """ Construct URI for the ar_workspaces REST API, for reading and writing ar_workspace data155 :return:156 """157 return self.get_rest_endpoint_uri('kvstore/ar_workspace')158 def async_get_search_data_request(self, auth_header, owner='admin', app_name='search', data=''):159 """160 Call Splunk API to retrieve visualization data asynchronously161 """162 uri = self.get_search_data_uri(owner, app_name)163 return self.async_post_request(uri=uri, data=data, auth_header=auth_header)164 def get_search_data_uri(self, owner, app_name):165 """166 Construct uri for synchronous search query167 http://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTsearch#search.2Fjobs.2Fexport168 """169 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/search/jobs')170 def async_get_search_job_request(self, auth_header, owner='admin', app_name='search', search_id='',171 params=None):172 """173 Make async call to get search job metadata174 """175 uri = self.get_search_job_uri(owner, app_name, search_id)176 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)177 def async_delete_search_job_request(self, auth_header, owner='admin', app_name='search', search_id='',178 params=None):179 """180 Make async call to get search job metadata181 """182 uri = self.get_search_job_uri(owner, app_name, search_id)183 return self.async_delete_request(uri=uri, params=params, auth_header=auth_header)184 def get_search_job_uri(self, owner, app_name, sid):185 """186 Construct uri to search job by sid187 http://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D188 """189 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/search/jobs/{sid}')190 def async_get_search_job_results_preview_request(self, auth_header, owner='admin', app_name='search', search_id='',191 params=None):192 """193 Make async call to get search job results194 """195 uri = self.get_search_job_results_preview_uri(owner, app_name, search_id)196 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)197 def get_search_job_results_preview_uri(self, owner, app_name, sid):198 """199 Construct uri to search job results by sid200 http://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTsearch#search.2Fjobs.2F.7Bsearch_id.7D.2Fresults_preview201 """202 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/search/jobs/{sid}/results_preview')203 def get_login_uri(self):204 """205 Construct uri for the login api which returns a splunk cookie using username and password info206 :return: uri string207 """208 return self.get_rest_endpoint_uri('auth/login?output_mode=json', encoded=False)209 def async_get_splunk_cookie(self, username, password):210 """211 Get a user's splunk cookie by hitting the 'auth/login' api.212 Reference: http://docs.splunk.com/Documentation/Splunk/7.0.3/RESTREF/RESTaccess#auth.2Flogin213 """214 uri = self.get_login_uri()215 data = {'username': username, 'password': password}216 return self.async_post_request(uri, data=data, auth_header=None)217 def get_token_uri(self, token_id=""):218 """219 Construct uri for the tokens api which returns user's token info220 :return: uri string221 """222 return self.get_rest_endpoint_uri(f'authorization/tokens/{token_id}')223 def async_get_JWT_validation(self, username, auth_header):224 """225 Validate JWT token by hitting the 'authorization/tokens' api successfully226 """227 uri = self.get_token_uri()228 params = {'output_mode': 'json', 'username': username}229 return self.async_get_request(uri, params=params, auth_header=auth_header)230 def async_create_new_JWT_token(self, username, auth_header):231 """232 Create new JWT token233 :param username:234 :param auth_header:235 :return:236 """237 uri = self.get_token_uri()238 data = {239 "name": username,240 "not_before": "+0d",241 "audience": constants.CLOUDGATEWAY,242 "expires_on": "+14d"243 }244 params = {'output_mode': 'json'}245 return self.async_post_request(uri, auth_header=auth_header, data=data, params=params)246 def async_delete_old_JWT_token(self, username, id, auth_header):247 """248 Delete JWT token249 :param id:250 :param auth_header:251 :return:252 """253 uri = self.get_token_uri(username)254 params = {255 "id": id256 }257 return self.async_delete_request(uri, auth_header=auth_header, params=params)258 def async_get_app_info(self, app_name, auth_header, params=None):259 """260 Make async request to Splunk /apps/local/[app_name] to retrieve friendly app name261 :param app_name:262 :param auth_header: Value for the Authorization header263 :param params:264 :return:265 """266 uri = self.get_app_info_uri(app_name)267 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)268 def get_app_info_uri(self, app_name):269 """270 List information about the {app_name} app.271 http://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTapps#apps.2Flocal.2F.7Bname.7D272 """273 return self.get_rest_endpoint_uri(f'apps/local/{app_name}?output_mode=json', encoded=False)274 def async_get_current_context(self, auth_header):275 """276 Make async request to Splunk /authentication/current-context api to retrieve current user's context277 """278 uri = self.get_current_context_uri()279 return self.async_get_request(uri=uri, auth_header=auth_header)280 def get_current_context_uri(self):281 """282 Construct uri for the current-context api which returns the user information for the current context283 http://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTaccess#authentication.2Fcurrent-context284 """285 return self.get_rest_endpoint_uri('authentication/current-context?output_mode=json', encoded=False)286 def async_get_saved_searches(self, auth_header, owner, app_name, ref):287 """288 Make async get request to Splunk /saved/searches/{ref} to retrieve saved search metadata289 """290 params = {'output_mode': 'json', 'count': 0}291 rest_uri = self.get_saved_searches_uri(owner, app_name, ref)292 return self.async_get_request(uri=rest_uri, params=params, auth_header=auth_header)293 def async_get_saved_searches_history(self, auth_header, owner, app_name, ref):294 """295 Make async get request to Splunk /saved/searches/{ref}/history to retrieve saved search history296 """297 # Params will get last scheduled saved_search executed298 params = {'output_mode': 'json',299 'count': '1',300 'sort_dir': 'desc',301 'sort_key': 'start',302 'search': '(isScheduled=true AND (isDone=true OR isRealTimeSearch=true))'}303 rest_uri = self.get_saved_searches_uri(owner, app_name, ref, HISTORY)304 return self.async_get_request(uri=rest_uri, params=params, auth_header=auth_header)305 def async_post_saved_searches_dispatch(self, auth_header, owner, app_name, ref, data=None):306 """307 Make async post request to Splunk /saved/searches/{ref}/dispatch to trigger saved search job308 """309 rest_uri = self.get_saved_searches_uri(owner, app_name, ref, DISPATCH)310 return self.async_post_request(uri=rest_uri, data=data, auth_header=auth_header)311 def get_saved_searches_uri(self, owner, app_name, ref, path=''):312 """313 Construct uri for saved searches by ref314 https://docs.splunk.com/Documentation/Splunk/7.1.2/RESTREF/RESTsearch#saved.2Fsearches.2F.7Bname.7D315 """316 # servicesNS can not dispatch searches with no app context317 if app_name is None or app_name == '-':318 return self.get_rest_endpoint_uri(f'saved/searches/{ref}{path}')319 return self.get_namespaced_rest_endpoint_uri(f'{owner}/{app_name}/saved/searches/{ref}{path}')320 def get_rest_endpoint_uri(self, path, encoded=True):321 """322 Create uri for splunk custom rest endpoints323 """324 return append_path_to_uri(self.uri, f'services/{path}', encoded=encoded)325 def get_namespaced_rest_endpoint_uri(self, path, encoded=True):326 """327 Create uri for splunk custom rest endpoints328 """329 return append_path_to_uri(self.uri, f'servicesNS/{path}', encoded=encoded)330 def async_get_deployment_info(self, auth_header):331 """332 Fetch information from the deployment_info endpoint which contains information such as the splapp's public333 keys, mdm public keys, etc334 :param auth_header: auth header used for authentication to kvstore335 :return (Dict} containing deployment information336 """337 uri = self.get_rest_endpoint_uri('ssg/kvstore/deployment_info')338 return self.async_get_request(uri=uri, auth_header=auth_header)339 def async_get_registration_query(self, auth_header, auth_code, device_name):340 """341 Makes a call to the handler which is called by the registration page when opening the login modal342 when registering a new device343 :param auth_code: auth code used for registration to spacebridge344 :param device_name: device name being registered345 :return: Temp key for temporary record saved in KVStore and confirmation code346 """347 uri = self.get_rest_endpoint_uri('ssg/registration/query')348 params = {'auth_code': auth_code,349 'device_name': device_name}350 return self.async_get_request(uri=uri, params=params, auth_header=auth_header)351 def async_post_registration_confirmation(self, auth_header, auth_code, temp_key, device_name):352 """353 Makes a call to the handler which is called by the registration page when the user completes the confirmation354 code/login modal355 """356 params = {'auth_code': auth_code}357 if isinstance(auth_header, JWTAuthHeader):358 uri = self.get_rest_endpoint_uri('ssg/registration/saml')359 data = json.dumps({'temp_key': temp_key})360 elif all(hasattr(auth_header, attr) for attr in ('username', 'password')):361 uri = self.get_rest_endpoint_uri('ssg/registration/confirmation')362 # TODO: Unclear if device_name is actualy used363 data = json.dumps({'username': auth_header.username,364 'password': auth_header.password,365 'temp_key': temp_key,366 'device_name': device_name})367 else:368 raise TypeError('Unsupported auth header type')369 return self.async_post_request(uri=uri, params=params, data=data, auth_header=auth_header)370 def async_create_role(self, auth_header, name, imported_roles=None, capabilities=None):371 """Creates a new Splunk role with the given name, inherited roles, and capabilities."""372 # Note that we do not JSON serialize the payload. This endpoint only accepts data in a373 # application/x-www-form-urlencoded body.374 data = {'name': name}375 if imported_roles:376 data['imported_roles'] = imported_roles377 if capabilities:378 data['capabilities'] = capabilities379 return self.async_post_request(uri=self.get_rest_endpoint_uri('authorization/roles?output_mode=json', encoded=False),380 auth_header=auth_header, data=data)381 def async_update_role(self, auth_header, name, imported_roles=None, capabilities=None):382 """Updates an existing Splunk role with the given parameters."""383 # We check against None in this case because we want to explicitly allow callers to set imported roles or384 # capabilities to an empty list. We cannot use an empty list if either field is None because we would be erasing385 # any imported roles and capabilities already associated with the given role.386 #387 # Note that we do not JSON serialize the payload. This endpoint only accepts data in a388 # application/x-www-form-urlencoded body.389 data = {}390 if imported_roles is not None:391 data['imported_roles'] = imported_roles or ''392 if capabilities is not None:393 data['capabilities'] = capabilities or ''394 if not data:395 raise ValueError('Must specify at least one of imported_roles or capabilities')396 uri = self.get_rest_endpoint_uri('authorization/roles/{name}?output_mode=json'.format(name=name), encoded=False)397 return self.async_post_request(uri=uri, auth_header=auth_header, data=data)398 def async_delete_role(self, auth_header, name):399 """Deletes a role by name."""400 uri = self.get_rest_endpoint_uri('authorization/roles/{name}?output_mode=json'.format(name=name), encoded=False)401 return self.async_delete_request(uri=uri, auth_header=auth_header)402 def async_fetch_companion_apps(self, auth_header, app_id=None):403 if app_id:404 uri = '{}services/ssg/registration/companion_app/{}'.format(self.uri, app_id)405 else:406 uri = '{}services/ssg/registration/companion_app'.format(self.uri)407 return self.async_get_request(uri=uri, auth_header=auth_header)408 def get_server_settings_uri(self):409 return "{}/services/server/settings".format(self.uri)410 def async_get_server_settings(self, auth_header):411 uri = self.get_server_settings_uri()412 return self.async_get_request(uri=uri, auth_header=auth_header, params={'output_mode': 'json'})413 def get_search_ast_url(self):414 return "{}servicesNS/admin/search/search/ast".format(self.uri)415 def async_post_search_ast(self, auth_header, search_query):416 uri = self.get_search_ast_url()417 form_data = {418 'spl': search_query419 }420 form_data_jsn = json.dumps(form_data)421 params = {'output_mode': 'json'}...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

1# Authors: Nikhil Kumar, Abhijit Raman, Nicholas Wille2import json3import requests4import extract5import os6import matplotlib7import matplotlib.pyplot as plt8import seaborn as sns9import pandas as pd10import pickle11import numpy as np12from flask import Flask, request, redirect, g, render_template, Response, make_response13from flask_caching import Cache14from urllib.parse import quote15from math import pi16app = Flask(__name__)17cache = Cache(app, config={'CACHE_TYPE': 'simple'})18# API Keys19client_id = "INSERT_CLIENT_ID_HERE"20client_secret = "INSERT_CLIENT_SECRET_KEY_HERE"21# API URLs22auth_url = "https://accounts.spotify.com/authorize"23token_url = "https://accounts.spotify.com/api/token"24base_url = "https://api.spotify.com/v1"25# Redirect uri and authorization scopes26redirect_uri = "https://myspotifydata.azurewebsites.net/home"27scope = "user-top-read user-read-recently-played playlist-read-collaborative playlist-read-private"28# UNCOMMENT TO USE FOR LOCAL TESTING29# redirect_uri = "http://127.0.0.1:8000/home"30# Image folder configuration31app.config['UPLOAD_FOLDER'] = "/static/"32# Query parameters for authorization33auth_query = {34 "response_type": "code",35 "redirect_uri": redirect_uri,36 "scope": scope,37 "show_dialog": "false",38 "client_id": client_id39}40# Returns a token needed to access the Spotify API41def generate_access_token():42 # Requests refresh and access tokens (POST)43 auth_token = request.args['code']44 code_payload = {45 "grant_type": "authorization_code",46 "code": str(auth_token),47 "redirect_uri": redirect_uri,48 'client_id': client_id,49 'client_secret': client_secret,50 }51 post_request = requests.post(token_url, data=code_payload)52 # Tokens returned to application53 response_data = json.loads(post_request.text)54 access_token = response_data["access_token"]55 refresh_token = response_data["refresh_token"]56 token_type = response_data["token_type"]57 expires_in = response_data["expires_in"]58 return access_token59# GET a user's top artists60def get_top_artist_data(auth_header, time_range, limit):61 endpoint = "{}/me/top/artists?time_range={}&limit={}".format(base_url, time_range, limit) 62 response = requests.get(endpoint, headers=auth_header)63 data = json.loads(response.text)64 top_artist_data = extract.top_artists(data)65 return top_artist_data66# GET a user's top tracks67def get_top_tracks_data(auth_header, time_range, limit):68 endpoint = "{}/me/top/tracks?time_range={}&limit={}".format(base_url, time_range, limit) 69 response = requests.get(endpoint, headers=auth_header)70 data = json.loads(response.text)71 top_tracks_data = extract.top_tracks(data)72 return top_tracks_data73# GET a user's top tracks grouped by their top artists74def get_top_tracks_by_artist(auth_header):75 top_tracks = get_top_tracks_data(auth_header, 'long_term', '50')76 top_artists = get_top_artist_data(auth_header, 'long_term', '10')77 result = extract.top_tracks_by_artist(top_tracks, top_artists)78 return result79# GET a user's recent listening history80def get_recent_tracks_data(auth_header, limit):81 endpoint = "{}/me/player/recently-played?type=track&limit={}".format(base_url, limit)82 response = requests.get(endpoint, headers=auth_header)83 data = json.loads(response.text)84 recent_tracks_data = extract.recent_tracks(data)85 return recent_tracks_data86# GET the track ids from the user's recent listening history87def get_recent_tracks_ids(auth_header, limit):88 endpoint = "{}/me/player/recently-played?type=track&limit={}".format(base_url, limit)89 response = requests.get(endpoint, headers=auth_header)90 data = json.loads(response.text)91 recent_track_ids = extract.recent_track_ids(data)92 result = ','.join(recent_track_ids)93 return result94# GET the track ids for the user's top tracks95def get_top_tracks_ids(auth_header, time_range, limit):96 endpoint = "{}/me/top/tracks?time_range={}&limit={}".format(base_url, time_range, limit) 97 response = requests.get(endpoint, headers=auth_header)98 data = json.loads(response.text)99 top_tracks_id = extract.top_track_ids(data)100 result = ','.join(top_tracks_id)101 return result102# GET the artwork of tracks103def get_track_images(auth_header, track_ids):104 endpoint = "{}/tracks?ids={}".format(base_url, track_ids)105 response = requests.get(endpoint, headers=auth_header)106 data = json.loads(response.text)107 track_images = extract.track_images(data)108 return track_images109# GET the images of artists110def get_artist_images(auth_header, artist_ids):111 endpoint = "{}/artists?ids={}".format(base_url, artist_ids)112 response = requests.get(endpoint, headers=auth_header)113 data = json.loads(response.text)114 artist_images = extract.artist_images(data)115 return artist_images116# GET the image urls of the top tracks117def get_top_track_images(auth_header, tracks):118 lst = []119 for item in tracks:120 track_id = item[1]121 lst.append(track_id)122 track_ids = ','.join(lst)123 images = get_track_images(auth_header, track_ids)124 return images125# GET the image urls of the top artists126def get_top_artist_images(auth_header, artists):127 lst = []128 for item in artists:129 artist_id = item[1]130 lst.append(artist_id)131 artist_ids = ','.join(lst)132 images = get_artist_images(auth_header, artist_ids)133 return images134# GET the tracks from a playlist135def get_tracks_from_playlist(auth_header, list_id, person_type):136 endpoint = "{}/playlists/{}/tracks".format(base_url, list_id)137 response = requests.get(endpoint, headers=auth_header)138 data = json.loads(response.text)139 datapoints = get_dataframe(auth_header, data, person_type)140 return datapoints141# Function that can be called by a route based on term length142def display_top_tracks(term_length):143 # Get the access token from its cookie and use it to access data144 access_token = request.cookies.get('token')145 auth_header = {"Authorization": "Bearer {}".format(access_token)}146 top_tracks_data = get_top_tracks_data(auth_header, term_length, '30')147 images = get_top_track_images(auth_header, top_tracks_data)148 return (top_tracks_data, images)149# Function that can be called by a route based on term length150def display_top_artists(term_length):151 # Get the access token from its cookie and use it to access data152 access_token = request.cookies.get('token')153 auth_header = {"Authorization": "Bearer {}".format(access_token)}154 top_artist_data = get_top_artist_data(auth_header, term_length, '30')155 images = get_top_artist_images(auth_header, top_artist_data)156 return (top_artist_data, images)157# Function that can be called by a route based on term length158def display_top_tracks_by_artist(term_length):159 # Get the access token from its cookie and use it to access data160 access_token = request.cookies.get('token')161 auth_header = {"Authorization": "Bearer {}".format(access_token)}162 data = get_top_tracks_by_artist(auth_header)163 lst = []164 for item in data:165 lst.append(item)166 images = get_top_artist_images(auth_header, lst)167 return (data, images)168# GET audio features for several tracks and store necessary datapoints169def do_audio_analysis(auth_header, track_ids):170 endpoint = "{}/audio-features?ids={}".format(base_url, track_ids)171 response = requests.get(endpoint, headers=auth_header)172 data = json.loads(response.text)173 datapoints = extract.get_audio_datapoints(data)174 return datapoints175# Graph data using matplotlib and seaborne176def make_graph(datapoints, tag, excluded):177 if tag not in excluded:178 plt.figure()179 df = pd.DataFrame()180 df['Top Songs Ranked by Number'] = range(1, len(datapoints[tag]) + 1)181 y_title = tag.capitalize()182 df[y_title] = datapoints[tag]183 title = y_title + " ratings of your top songs"184 sns_plot = sns.barplot(x="Top Songs Ranked by Number", y=y_title, data=df).set_title(title)185 fig = sns_plot.get_figure()186 fig.set_size_inches(15, 7.5) 187 fig.savefig('static/{t}.png'.format(t=tag))188 plt.close()189# Returns a pandas dataframe containing data from the audio analysis190def get_dataframe(auth_header, data, label):191 df = pd.DataFrame()192 items = data['items'] 193 string_ids = ""194 for track in items:195 track_id = track['track']['id']196 string_ids += track_id197 string_ids += ','198 string_ids = string_ids[:-1]199 datapoints = do_audio_analysis(auth_header, string_ids)200 df['Danceability'] = datapoints['danceability']201 df['Tempo'] = datapoints['tempo']202 df['Instrumentalness'] = datapoints['instrumentalness']203 df['Energy'] = datapoints['energy']204 df['Acousticness'] = datapoints['acousticness']205 df['Valence'] = datapoints['valence']206 df['Liveness'] = datapoints['liveness']207 df['Loudness'] = datapoints['loudness']208 df['Speechiness'] = datapoints['speechiness']209 df['Label'] = [label] * (len(df.index))210 return df211# Creates a radar chart212def make_radar_chart(predictions):213 plt.figure()214 labels = np.array(["Outgoing", "Mellow", "Solitary"])215 stats = [0, 0, 0]216 for num in predictions:217 if num == 2:218 stats[0] += 1219 elif num == 1:220 stats[1] += 1221 else:222 stats[2] += 1223 stats = np.array(stats)224 stats = np.concatenate((stats,[stats[0]]))225 N = len(labels)226 angles = [n / float(N) * 2 * pi for n in range(N)]227 angles += angles[:1]228 my_dpi = 96229 fig = plt.figure(figsize=(1000/my_dpi, 1000/my_dpi), dpi=my_dpi)230 ax = fig.add_subplot(111, polar=True, facecolor='black', frameon=True)231 ax.set_theta_offset(pi / 2)232 ax.set_theta_direction(-1)233 plt.xticks(angles[:-1], labels, color='black', size=8)234 ax.set_rlabel_position(0)235 ax.plot(angles,stats,color='green', linewidth=2, linestyle='solid')236 ax.fill(angles,stats, color='green', alpha=0.4)237 plt.title("Personality Graph", size=11, y=1.1)238 fig.set_size_inches(15, 7.5) 239 fig.savefig("static/personality.png")240 plt.close()241# Return predictions from the ML model242def model_predict(datapoints):243 df = pd.DataFrame()244 df['Danceability'] = datapoints['danceability']245 df['Tempo'] = datapoints['tempo']246 df['Instrumentalness'] = datapoints['instrumentalness']247 df['Energy'] = datapoints['energy']248 df['Acousticness'] = datapoints['acousticness']249 df['Valence'] = datapoints['valence']250 df['Liveness'] = datapoints['liveness']251 df['Loudness'] = datapoints['loudness']252 df['Speechiness'] = datapoints['speechiness']253 dtc_loaded = pickle.load(open(r"ml/dtc.pkl", 'rb'))254 predictions = dtc_loaded.predict(df)255 return predictions256# Initial route for user authentication with Spotify257@app.route("/")258def index():259 # Redirects the user to the Spotify login page (first thing that happens upon app launch)260 url_args = "&".join(["{}={}".format(key, quote(val)) for key, val in auth_query.items()])261 authorization = "{}/?{}".format(auth_url, url_args)262 response = make_response(redirect(authorization))263 response.set_cookie('token', 'deletion', max_age=0)264 return response265# Homepage of application266@app.route("/home")267def display_top_data():268 with app.app_context():269 cache.clear()270 # Get an access token from its cookie or generate one if it doesn't exist yet271 access_token = request.cookies.get('token')272 if access_token == None:273 access_token = generate_access_token()274 # Use the token to get the necessary authorization header and then obtain data275 auth_header = {"Authorization": "Bearer {}".format(access_token)}276 recent_tracks_data = get_recent_tracks_data(auth_header, '50')277 recent_track_ids = get_recent_tracks_ids(auth_header, '50')278 track_images = get_track_images(auth_header, recent_track_ids)279 # Store HTML rendering in a response and create a cookie for the access token280 response = make_response(render_template("index.html", recent=recent_tracks_data, images=track_images[0], links=track_images[1]))281 response.set_cookie('token', access_token, max_age=3600)282 return response283# Page for viewing top tracks in the past 1 month284@app.route("/top-tracks-short-term")285def display_top_tracks_short_term():286 data = display_top_tracks('short_term')287 return render_template("toptracks.html", top_tracks=data[0], images=data[1][0], short_link_status="active", 288 med_link_status="", long_link_status="", links=data[1][1]) 289# Page for viewing top tracks in the past 6 months290@app.route("/top-tracks-medium-term")291def display_top_tracks_medium_term():292 data = display_top_tracks('medium_term')293 return render_template("toptracks.html", top_tracks=data[0], images=data[1][0], short_link_status="", 294 med_link_status="active", long_link_status="", links=data[1][1]) 295# Page for viewing all time top tracks296@app.route("/top-tracks-long-term")297def display_top_tracks_long_term():298 data = display_top_tracks('long_term')299 return render_template("toptracks.html", top_tracks=data[0], images=data[1][0], short_link_status="", 300 med_link_status="", long_link_status="active", links=data[1][1]) 301# Page for viewing top artists in the past month302@app.route("/top-artists-short-term")303def display_top_artists_short_term():304 data = display_top_artists('short_term')305 return render_template("topartists.html", top_artists=data[0], images=data[1][0], short_link_status="active", 306 med_link_status="", long_link_status="", links=data[1][1]) 307# Page for viewing top artists in the past 6 months308@app.route("/top-artists-medium-term")309def display_top_artists_medium_term():310 data = display_top_artists('medium_term')311 return render_template("topartists.html", top_artists=data[0], images=data[1][0], short_link_status="", 312 med_link_status="active", long_link_status="", links=data[1][1]) 313# Page for viewing all time top artists314@app.route("/top-artists-long-term")315def display_top_artists_long_term():316 data = display_top_artists('long_term')317 return render_template("topartists.html", top_artists=data[0], images=data[1][0], short_link_status="", 318 med_link_status="", long_link_status="active", links=data[1][1]) 319# Page for viewing top tracks grouped by artist320@app.route("/top-tracks-by-artist")321def display_top_tracks_by_artist_short_term():322 data = display_top_tracks_by_artist('short_term')323 return render_template("toptracksbyartist.html", content=data[0], images=data[1][0], links=data[1][1])324# Page for viewing an audio analysis graphs325@app.route("/audio-analysis")326@cache.cached(timeout=3600) # Caches the HTML on this view for fast reload speed327def audio_analysis():328 # Retrieve access token and retrive track IDs329 access_token = request.cookies.get('token')330 auth_header = {"Authorization": "Bearer {}".format(access_token)}331 track_ids = get_top_tracks_ids(auth_header, 'medium_term', '30')332 333 # Generate graphs using datapoints to display on audio analysis page334 datapoints = do_audio_analysis(auth_header, track_ids)335 matplotlib.use('Agg')336 matplotlib.style.use('ggplot')337 sns.set_style('dark')338 excluded = set(["speechiness", "loudness", "acousticness", "liveness"])339 for key in datapoints:340 make_graph(datapoints, key, excluded)341 # Render HTML with the desired data342 img_names = ['danceability.png','energy.png', 'instrumentalness.png','tempo.png']343 img_1_file = os.path.join(app.config['UPLOAD_FOLDER'], 'danceability.png')344 img_2_file = os.path.join(app.config['UPLOAD_FOLDER'], 'energy.png')345 img_3_file = os.path.join(app.config['UPLOAD_FOLDER'], 'tempo.png')346 return render_template("audioanalysis.html", img_1=img_1_file, img_2=img_2_file, img_3=img_3_file)347# Page for viewing a user's sentiment analysis348@app.route("/personality-analysis")349@cache.cached(timeout=3600) # Caches the HTML on this view for fast reload speed350def predict_personality():351 # Retrieve access token and provide authorization header352 access_token = request.cookies.get('token')353 auth_header = {"Authorization": "Bearer {}".format(access_token)}354 # Lists of playlists with which to build model355 frame_list = []356 outgoing_list = ['37i9dQZF1DX3rxVfibe1L0', '37i9dQZF1DX6GwdWRQMQpq','37i9dQZF1DXdVbxH0H5oTi',357 '37i9dQZF1DXdPec7aLTmlC', '37i9dQZF1DWSf2RDTDayIx', '37i9dQZF1DX7KNKjOK0o75']358 mellow_list = ['37i9dQZF1DX6ziVCJnEm59', '37i9dQZF1DWSiZVO2J6WeI', '37i9dQZF1DX4E3UdUs7fUx', 359 '37i9dQZF1DWYiR2Uqcon0X', '37i9dQZF1DWUvQoIOFMFUT', '37i9dQZF1DWXe9gFZP0gtP', 360 '37i9dQZF1DWZqd5JICZI0u', '17HYiAIcwlDEg5RgVkm4L7']361 passive_list = ['37i9dQZF1DX3YSRoSdA634','37i9dQZF1DX7gIoKXt0gmx','37i9dQZF1DWX83CujKHHOn', 362 '37i9dQZF1DWSqBruwoIXkA', '37i9dQZF1DWVrtsSlLKzro', '19Bsw8qePEz1l5kQ4jWUvw']363 # Iterate through our playlists and retrieve their songs364 for item in passive_list:365 frame_list.append(get_tracks_from_playlist(auth_header, item, 2))366 for item in mellow_list:367 frame_list.append(get_tracks_from_playlist(auth_header, item, 1))368 for item in outgoing_list:369 frame_list.append(get_tracks_from_playlist(auth_header, item, 0))370 371 # Generate csv file using collected data372 result = pd.concat(frame_list)373 result.to_csv(r"ml/tracks.csv", index=False)374 track_ids = get_top_tracks_ids(auth_header, 'long_term', '50')375 datapoints = do_audio_analysis(auth_header, track_ids)376 predictions = model_predict(datapoints)377 # Generate a radar chart and display it to the screen378 make_radar_chart(predictions)379 img_4_file = os.path.join(app.config['UPLOAD_FOLDER'], 'personality.png')380 return render_template("personality.html", img_4=img_4_file)381# Logs the user out of the application382@app.route("/logout")383def logout():384 return redirect("https://www.spotify.com/logout/")385# This is done in order to prevent the browser from caching images386@app.after_request387def disable_cache(r):388 r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate public, max-age=0"389 r.headers["Pragma"] = "no-cache"390 r.headers["Expires"] = "0"391 return r392# # Run the server (uncomment for local testing)393# if __name__ == "__main__":...

Full Screen

Full Screen

backends.py

Source:backends.py Github

copy

Full Screen

1from django.contrib.auth import get_user_model2from django.contrib.auth.backends import ModelBackend3import jwt4import datetime, calendar5from django.conf import settings6from rest_framework import authentication, exceptions7from ondoc.authentication.models import UserSecretKey, WhiteListedLoginTokens8import base649from packaging.version import parse10User = get_user_model()11class AuthBackend(ModelBackend):12 def authenticate(self, username=None, password=None):13 if '@' in username:14 kwargs = {'email': username, 'user_type': 1}15 else:16 kwargs = {'phone_number': username, 'user_type': 1}17 try:18 user = User.objects.get(**kwargs)19 if user.check_password(password):20 return user21 except User.DoesNotExist:22 return None23 def get_user(self, user_id):24 try:25 return User.objects.get(pk=user_id)26 except User.DoesNotExist:27 return None28class MatrixAuthentication(authentication.BaseAuthentication):29 authentication_header_prefix = "Token"30 def authenticate(self, request):31 auth_header = authentication.get_authorization_header(request).split()32 auth_header_prefix = self.authentication_header_prefix33 if not auth_header:34 raise exceptions.AuthenticationFailed('UnAuthorized')35 if (len(auth_header) == 1) or (len(auth_header) > 2):36 raise exceptions.AuthenticationFailed('UnAuthorized')37 prefix = auth_header[0].decode('utf-8')38 token = auth_header[1].decode('utf-8')39 if prefix.lower() != auth_header_prefix.lower():40 raise exceptions.AuthenticationFailed('UnAuthorized')41 token = base64.b64decode(token)42 if token.decode('utf-8') != settings.MATRIX_DOC_AUTH_TOKEN:43 raise exceptions.AuthenticationFailed('UnAuthorized')44 return (None, None)45 def authenticate_header(self, request):46 return self.authentication_header_prefix47class JWTAuthentication(authentication.BaseAuthentication):48 authentication_header_prefix = settings.JWT_AUTH['JWT_AUTH_HEADER_PREFIX']49 def authenticate(self, request):50 request.user = None51 auth_header = authentication.get_authorization_header(request).split()52 auth_header_prefix = self.authentication_header_prefix.lower()53 if not auth_header:54 return None55 if (len(auth_header) == 1) or (len(auth_header) > 2):56 raise exceptions.AuthenticationFailed('UnAuthorized')57 prefix = auth_header[0].decode('utf-8')58 token = auth_header[1].decode('utf-8')59 if prefix.lower() != auth_header_prefix:60 raise exceptions.AuthenticationFailed('UnAuthorized')61 return self._authenticate_credentials(request, token)62 def _authenticate_credentials(self, request, token):63 user_key = None64 user_id, is_agent = JWTAuthentication.get_unverified_user(token)65 if user_id:66 if not is_agent and request and request.META.get("HTTP_APP_NAME") != 'd_web' and (('HTTP_APP_VERSION' not in request.META) or67 (request.META.get("HTTP_APP_NAME") == 'docprime_consumer_app' and parse(68 request.META.get('HTTP_APP_VERSION')) > parse('2.7.2')) or \69 (request.META.get("HTTP_APP_NAME") == "doc_prime_partner" and70 (parse(request.META.get('HTTP_APP_VERSION')) > parse('2.100.15') and request.META.get(71 'HTTP_PLATFORM') == 'android') or72 (parse(request.META.get('HTTP_APP_VERSION')) > parse('2.200.11') and request.META.get(73 'HTTP_PLATFORM') == 'ios')74 )75 ):76 # is_whitelisted = WhiteListedLoginTokens.objects.filter(token=token, user_id=user_id).first()77 # if is_whitelisted:78 user_key_object = UserSecretKey.objects.filter(user_id=user_id).first()79 if user_key_object:80 user_key = user_key_object.key81 else:82 raise exceptions.AuthenticationFailed("Invalid Login")83 else:84 user_key_object = UserSecretKey.objects.filter(user_id=user_id).first()85 if user_key_object:86 user_key = user_key_object.key87 try:88 payload = jwt.decode(token, user_key)89 except Exception as e:90 msg = 'Invalid authentication.'91 raise exceptions.AuthenticationFailed(msg)92 try:93 user = User.objects.get(pk=payload['user_id'])94 except User.DoesNotExist:95 msg = 'No user matching this token was found.'96 raise exceptions.AuthenticationFailed(msg)97 if not user.is_active:98 msg = 'This user has been deactivated.'99 raise exceptions.AuthenticationFailed(msg)100 if payload.get('agent_id', None) is not None:101 request.agent = payload.get('agent_id')102 return (user, token)103 def authenticate_header(self, request):104 return self.authentication_header_prefix105 @classmethod106 def jwt_payload_handler(cls, user, exp=None):107 if not exp:108 exp = datetime.datetime.utcnow() + settings.JWT_AUTH['JWT_EXPIRATION_DELTA']109 return {110 'user_id': user.pk,111 'exp': exp,112 'orig_iat': calendar.timegm(113 datetime.datetime.utcnow().utctimetuple()114 )115 }116 @classmethod117 def appointment_agent_payload_handler(cls, request, created_user, can_book=False):118 return {119 'agent_id': request.user.id,120 'user_id': created_user.pk,121 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7),122 'orig_iat': calendar.timegm(123 datetime.datetime.utcnow().utctimetuple()124 ),125 'refresh': False,126 'can_book': can_book127 }128 @staticmethod129 def get_unverified_user(token):130 try:131 unverified_payload = jwt.decode(token, verify=False)132 except Exception as e:133 msg = 'Invalid authentication.'134 raise exceptions.AuthenticationFailed(msg)135 return unverified_payload.get('user_id', None), unverified_payload.get('agent_id', None)136 @staticmethod137 def generate_token(user, request=None):138 user_key = UserSecretKey.objects.get_or_create(user=user)139 if request and request.META.get("HTTP_APP_NAME") != 'd_web' and (('HTTP_APP_VERSION' not in request.META) or140 (request.META.get("HTTP_APP_NAME")== 'docprime_consumer_app' and parse(request.META.get('HTTP_APP_VERSION')) > parse('2.7.2')) or \141 (request.META.get("HTTP_APP_NAME") == "doc_prime_partner" and142 (parse(request.META.get('HTTP_APP_VERSION')) > parse('2.100.15') and request.META.get(143 'HTTP_PLATFORM') == 'android') or144 (parse(request.META.get('HTTP_APP_VERSION')) > parse('2.200.11') and request.META.get(145 'HTTP_PLATFORM') == 'ios')146 )147 ):148 payload = JWTAuthentication.jwt_payload_handler(user)149 else:150 exp = datetime.datetime.utcnow() + datetime.timedelta(days=365)151 payload = JWTAuthentication.jwt_payload_handler(user, exp)152 token = jwt.encode(payload, user_key[0].key)153 # whitelist = WhiteListedLoginTokens.objects.create(token=token.decode('utf-8'), user=user)154 return {'token': token,155 'payload': payload}156 @classmethod157 def provider_sms_payload_handler(cls, user, appointment):158 return {159 'user_id': user.pk,160 'expiration_time': appointment.time_slot_start + datetime.timedelta(hours=24),161 'exp': appointment.time_slot_start + datetime.timedelta(hours=24),162 'orig_iat': calendar.timegm(163 datetime.datetime.utcnow().utctimetuple()164 ),165 'refresh': False166 }167class RefreshAuthentication(JWTAuthentication):168 def authenticate(self, request):169 request.user = None170 auth_header = authentication.get_authorization_header(request).split()171 auth_header_prefix = self.authentication_header_prefix.lower()172 if not auth_header:173 return None174 prefix = auth_header[0].decode('utf-8')175 token = auth_header[1].decode('utf-8')176 return self._authenticate_credentials(request, token)177 @staticmethod178 def get_unverified_user(token):179 unverified_payload = jwt.decode(token, verify=False)180 return unverified_payload.get('user_id', None), unverified_payload.get('agent_id', None)181 def _authenticate_credentials(self, request, token):182 user_key = None183 user_id, is_agent = self.get_unverified_user(token)184 if is_agent:185 request.agent = True186 user = User.objects.get(pk=user_id)187 return (user, token)188class WhatsappAuthentication(authentication.BaseAuthentication):189 authentication_header_prefix = "Token"190 def authenticate(self, request):191 auth_header = authentication.get_authorization_header(request).split()192 auth_header_prefix = self.authentication_header_prefix193 if not auth_header:194 raise exceptions.AuthenticationFailed('UnAuthorized')195 if (len(auth_header) == 1) or (len(auth_header) > 2):196 raise exceptions.AuthenticationFailed('UnAuthorized')197 prefix = auth_header[0].decode('utf-8')198 token = auth_header[1].decode('utf-8')199 if prefix.lower() != auth_header_prefix.lower():200 raise exceptions.AuthenticationFailed('UnAuthorized')201 token = base64.b64decode(token)202 if token.decode('utf-8') != settings.WHATSAPP_AUTH_TOKEN:203 raise exceptions.AuthenticationFailed('UnAuthorized')204 return (None, None)205 def authenticate_header(self, request):206 return self.authentication_header_prefix207class ChatAuthentication(authentication.BaseAuthentication):208 authentication_header_prefix = "Token"209 def authenticate(self, request):210 auth_header = authentication.get_authorization_header(request).split()211 auth_header_prefix = self.authentication_header_prefix212 if not auth_header:213 raise exceptions.AuthenticationFailed('UnAuthorized')214 if (len(auth_header) == 1) or (len(auth_header) > 2):215 raise exceptions.AuthenticationFailed('UnAuthorized')216 prefix = auth_header[0].decode('utf-8')217 token = auth_header[1].decode('utf-8')218 if prefix.lower() != auth_header_prefix.lower():219 raise exceptions.AuthenticationFailed('UnAuthorized')220 token = base64.b64decode(token)221 if token.decode('utf-8') != settings.CHAT_AUTH_TOKEN:222 raise exceptions.AuthenticationFailed('UnAuthorized')223 return (None, None)224 def authenticate_header(self, request):225 return self.authentication_header_prefix226class MatrixUserAuthentication(authentication.BaseAuthentication):227 authentication_header_prefix = "Token"228 def authenticate(self, request):229 auth_header = authentication.get_authorization_header(request).split()230 auth_header_prefix = self.authentication_header_prefix231 if not auth_header:232 raise exceptions.AuthenticationFailed('UnAuthorized')233 if (len(auth_header) == 1) or (len(auth_header) > 2):234 raise exceptions.AuthenticationFailed('UnAuthorized')235 prefix = auth_header[0].decode('utf-8')236 token = auth_header[1].decode('utf-8')237 if prefix.lower() != auth_header_prefix.lower():238 raise exceptions.AuthenticationFailed('UnAuthorized')239 token = base64.b64decode(token)240 if token.decode('utf-8') != settings.MATRIX_USER_AUTH_TOKEN:241 raise exceptions.AuthenticationFailed('UnAuthorized')242 return (None, None)243 def authenticate_header(self, request):244 return self.authentication_header_prefix245class BajajAllianzAuthentication(authentication.BaseAuthentication):246 authentication_header_prefix = "Token"247 def authenticate(self, request):248 auth_header = authentication.get_authorization_header(request).split()249 auth_header_prefix = self.authentication_header_prefix250 if not auth_header:251 raise exceptions.AuthenticationFailed('UnAuthorized')252 if (len(auth_header) == 1) or (len(auth_header) > 2):253 raise exceptions.AuthenticationFailed('UnAuthorized')254 prefix = auth_header[0].decode('utf-8')255 token = auth_header[1].decode('utf-8')256 if prefix.lower() != auth_header_prefix.lower():257 raise exceptions.AuthenticationFailed('UnAuthorized')258 token = base64.b64decode(token)259 if token.decode('utf-8') != settings.BAJAJ_ALLIANZ_AUTH_TOKEN:260 raise exceptions.AuthenticationFailed('UnAuthorized')261 return (None, None)262 def authenticate_header(self, request):263 return self.authentication_header_prefix264class SbiGAuthentication(authentication.BaseAuthentication):265 authentication_header_prefix = "Token"266 def authenticate(self, request):267 auth_header = authentication.get_authorization_header(request).split()268 auth_header_prefix = self.authentication_header_prefix269 if not auth_header:270 raise exceptions.AuthenticationFailed('UnAuthorized')271 if (len(auth_header) == 1) or (len(auth_header) > 2):272 raise exceptions.AuthenticationFailed('UnAuthorized')273 prefix = auth_header[0].decode('utf-8')274 token = auth_header[1].decode('utf-8')275 if prefix.lower() != auth_header_prefix.lower():276 raise exceptions.AuthenticationFailed('UnAuthorized')277 token = base64.b64decode(token)278 if token.decode('utf-8') != settings.SBIG_AUTH_TOKEN:279 raise exceptions.AuthenticationFailed('UnAuthorized')280 return (None, None)281 def authenticate_header(self, request):282 return self.authentication_header_prefix283class SalespointAuthentication(authentication.BaseAuthentication):284 authentication_header_prefix = "Token"285 def authenticate(self, request):286 auth_header = authentication.get_authorization_header(request).split()287 auth_header_prefix = self.authentication_header_prefix288 if not auth_header:289 raise exceptions.AuthenticationFailed('UnAuthorized')290 if (len(auth_header) == 1) or (len(auth_header) > 2):291 raise exceptions.AuthenticationFailed('UnAuthorized')292 prefix = auth_header[0].decode('utf-8')293 token = auth_header[1].decode('utf-8')294 if prefix.lower() != auth_header_prefix.lower():295 raise exceptions.AuthenticationFailed('UnAuthorized')296 token = base64.b64decode(token)297 if token.decode('utf-8') != settings.SPO_DP_AUTH_TOKEN:298 raise exceptions.AuthenticationFailed('UnAuthorized')299 return (None, None)300 def authenticate_header(self, request):301 return self.authentication_header_prefix302# class CloudLabUserAuthentication(authentication.BaseAuthentication):303# authentication_header_prefix = "Token"304#305# def authenticate(self, request):306# auth_header = authentication.get_authorization_header(request).split()307# auth_header_prefix = self.authentication_header_prefix308#309# if not auth_header:310# raise exceptions.AuthenticationFailed('UnAuthorized')311#312# if (len(auth_header) == 1) or (len(auth_header) > 2):313# raise exceptions.AuthenticationFailed('UnAuthorized')314#315# prefix = auth_header[0].decode('utf-8')316# token = auth_header[1].decode('utf-8')317#318# if prefix.lower() != auth_header_prefix.lower():319# raise exceptions.AuthenticationFailed('UnAuthorized')320#321# token = base64.b64decode(token)322#323# if token.decode('utf-8') != settings.CLOUD_LAB_AUTH_TOKEN:324# raise exceptions.AuthenticationFailed('UnAuthorized')325#326# return (None, None)327#328# def authenticate_header(self, request):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run webdriver_manager automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful