Best Python code snippet using locust
client.py
Source:client.py  
1# -*- coding: utf-8 -*-2import logging3import hmac4import hashlib5import uuid6import json7import re8import time9import random10from datetime import datetime11import gzip12from io import BytesIO13import warnings14from .compat import (15    compat_urllib_parse, compat_urllib_error,16    compat_urllib_request, compat_urllib_parse_urlparse)17from .errors import (18    ClientErrorCodes, ClientError, ClientLoginError, ClientLoginRequiredError,19    ClientCookieExpiredError, ClientThrottledError)20from .constants import Constants21from .http import ClientCookieJar22from .endpoints import (23    AccountsEndpointsMixin, DiscoverEndpointsMixin, FeedEndpointsMixin,24    FriendshipsEndpointsMixin, LiveEndpointsMixin, MediaEndpointsMixin,25    MiscEndpointsMixin, LocationsEndpointsMixin, TagsEndpointsMixin,26    UsersEndpointsMixin, UploadEndpointsMixin, UsertagsEndpointsMixin,27    CollectionsEndpointsMixin,28    ClientDeprecationWarning, ClientPendingDeprecationWarning,29    ClientExperimentalWarning30)31logger = logging.getLogger(__name__)32# Force Client deprecation warnings to always appear33warnings.simplefilter('always', ClientDeprecationWarning)34warnings.simplefilter('always', ClientPendingDeprecationWarning)35warnings.simplefilter('default', ClientExperimentalWarning)36class Client(AccountsEndpointsMixin, DiscoverEndpointsMixin, FeedEndpointsMixin,37             FriendshipsEndpointsMixin, LiveEndpointsMixin, MediaEndpointsMixin,38             MiscEndpointsMixin, LocationsEndpointsMixin, TagsEndpointsMixin,39             UsersEndpointsMixin, UploadEndpointsMixin, UsertagsEndpointsMixin,40             CollectionsEndpointsMixin, object):41    """Main API client class for the private app api."""42    API_URL = 'https://i.instagram.com/api/{version!s}/'43    USER_AGENT = Constants.USER_AGENT44    IG_SIG_KEY = Constants.IG_SIG_KEY45    IG_CAPABILITIES = Constants.IG_CAPABILITIES46    SIG_KEY_VERSION = Constants.SIG_KEY_VERSION47    def __init__(self, username, password, **kwargs):48        """49        :param username: Login username50        :param password: Login password51        :param kwargs: See below52        :Keyword Arguments:53            - **auto_patch**: Patch the api objects to match the public API. Default: False54            - **drop_incompat_key**: Remove api object keys that is not in the public API. Default: False55            - **timeout**: Timeout interval in seconds. Default: 1556            - **api_url**: Override the default api url base57            - **cookie**: Saved cookie string from a previous session58            - **settings**: A dict of settings from a previous session59            - **on_login**: Callback after successful login60            - **proxy**: Specify a proxy ex: 'http://127.0.0.1:8888' (ALPHA)61        :return:62        """63        self.username = username64        self.password = password65        self.auto_patch = kwargs.pop('auto_patch', False)66        self.drop_incompat_keys = kwargs.pop('drop_incompat_keys', False)67        self.api_url = kwargs.pop('api_url', None) or self.API_URL68        self.timeout = kwargs.pop('timeout', 15)69        self.on_login = kwargs.pop('on_login', None)70        self.logger = logger71        user_settings = kwargs.pop('settings', None) or {}72        self.uuid = (73            kwargs.pop('guid', None) or kwargs.pop('uuid', None) or74            user_settings.get('uuid') or self.generate_uuid(False))75        self.device_id = (76            kwargs.pop('device_id', None) or user_settings.get('device_id') or77            self.generate_deviceid())78        self.signature_key = (79            kwargs.pop('signature_key', None) or user_settings.get('signature_key') or80            self.IG_SIG_KEY)81        self.key_version = (82            kwargs.pop('key_version', None) or user_settings.get('key_version') or83            self.SIG_KEY_VERSION)84        self.ig_capabilities = (85            kwargs.pop('ig_capabilities', None) or user_settings.get('ig_capabilities') or86            self.IG_CAPABILITIES)87        # to maintain backward compat for user_agent kwarg88        custom_ua = kwargs.pop('user_agent', '') or user_settings.get('user_agent')89        if custom_ua:90            self.user_agent = custom_ua91        else:92            self.app_version = (93                kwargs.pop('app_version', None) or user_settings.get('app_version') or94                Constants.APP_VERSION)95            self.android_release = (96                kwargs.pop('android_release', None) or user_settings.get('android_release') or97                Constants.ANDROID_RELEASE)98            self.android_version = int(99                kwargs.pop('android_version', None) or user_settings.get('android_version') or100                Constants.ANDROID_VERSION)101            self.phone_manufacturer = (102                kwargs.pop('phone_manufacturer', None) or user_settings.get('phone_manufacturer') or103                Constants.PHONE_MANUFACTURER)104            self.phone_device = (105                kwargs.pop('phone_device', None) or user_settings.get('phone_device') or106                Constants.PHONE_DEVICE)107            self.phone_model = (108                kwargs.pop('phone_model', None) or user_settings.get('phone_model') or109                Constants.PHONE_MODEL)110            self.phone_dpi = (111                kwargs.pop('phone_dpi', None) or user_settings.get('phone_dpi') or112                Constants.PHONE_DPI)113            self.phone_resolution = (114                kwargs.pop('phone_resolution', None) or user_settings.get('phone_resolution') or115                Constants.PHONE_RESOLUTION)116            self.phone_chipset = (117                kwargs.pop('phone_chipset', None) or user_settings.get('phone_chipset') or118                Constants.PHONE_CHIPSET)119        cookie_string = kwargs.pop('cookie', None) or user_settings.get('cookie')120        cookie_jar = ClientCookieJar(cookie_string=cookie_string)121        if cookie_string and cookie_jar.expires_earliest and int(time.time()) >= cookie_jar.expires_earliest:122            raise ClientCookieExpiredError('Oldest cookie expired at {0!s}'.format(cookie_jar.expires_earliest))123        cookie_handler = compat_urllib_request.HTTPCookieProcessor(cookie_jar)124        proxy_handler = None125        proxy = kwargs.pop('proxy', None)126        if proxy:127            warnings.warn('Proxy support is alpha.', UserWarning)128            parsed_url = compat_urllib_parse_urlparse(proxy)129            if parsed_url.netloc and parsed_url.scheme:130                proxy_address = '{0!s}://{1!s}'.format(parsed_url.scheme, parsed_url.netloc)131                proxy_handler = compat_urllib_request.ProxyHandler({'https': proxy_address})132            else:133                raise ValueError('Invalid proxy argument: {0!s}'.format(proxy))134        handlers = []135        if proxy_handler:136            handlers.append(proxy_handler)137        # Allow user to override custom ssl context where possible138        custom_ssl_context = kwargs.pop('custom_ssl_context', None)139        try:140            httpshandler = compat_urllib_request.HTTPSHandler(context=custom_ssl_context)141        except TypeError:142            # py version < 2.7.9143            httpshandler = compat_urllib_request.HTTPSHandler()144        handlers.extend([145            compat_urllib_request.HTTPHandler(),146            httpshandler,147            cookie_handler])148        opener = compat_urllib_request.build_opener(*handlers)149        opener.cookie_jar = cookie_jar150        self.opener = opener151        # ad_id must be initialised after cookie_jar/opener because152        # it relies on self.authenticated_user_name153        self.ad_id = (154            kwargs.pop('ad_id', None) or user_settings.get('ad_id') or155            self.generate_adid())156        if not cookie_string:   # [TODO] There's probably a better way than to depend on cookie_string157            if not self.username or not self.password:158                raise ClientLoginRequiredError('login_required', code=400)159            self.login()160        self.logger.debug('USERAGENT: {0!s}'.format(self.user_agent))161        super(Client, self).__init__()162    @property163    def settings(self):164        """Helper property that extracts the settings that you should cache165        in addition to username and password."""166        return {167            'uuid': self.uuid,168            'device_id': self.device_id,169            'ad_id': self.ad_id,170            'cookie': self.cookie_jar.dump(),171            'created_ts': int(time.time())172        }173    @property174    def user_agent(self):175        """Returns the useragent string that the client is currently using."""176        return Constants.USER_AGENT_FORMAT % {177            'app_version': self.app_version,178            'android_version': self.android_version,179            'android_release': self.android_release,180            'brand': self.phone_manufacturer,181            'device': self.phone_device,182            'model': self.phone_model,183            'dpi': self.phone_dpi,184            'resolution': self.phone_resolution,185            'chipset': self.phone_chipset}186    @user_agent.setter187    def user_agent(self, value):188        """Override the useragent string with your own"""189        mobj = re.search(Constants.USER_AGENT_EXPRESSION, value)190        if not mobj:191            raise ValueError('User-agent specified does not fit format required: {0!s}'.format(192                Constants.USER_AGENT_EXPRESSION))193        self.app_version = mobj.group('app_version')194        self.android_release = mobj.group('android_release')195        self.android_version = int(mobj.group('android_version'))196        self.phone_manufacturer = mobj.group('manufacturer')197        self.phone_device = mobj.group('device')198        self.phone_model = mobj.group('model')199        self.phone_dpi = mobj.group('dpi')200        self.phone_resolution = mobj.group('resolution')201        self.phone_chipset = mobj.group('chipset')202    @staticmethod203    def generate_useragent(**kwargs):204        """205        Helper method to generate a useragent string based on device parameters206        :param kwargs:207            - **app_version**208            - **android_version**209            - **android_release**210            - **brand**211            - **device**212            - **model**213            - **dpi**214            - **resolution**215            - **chipset**216        :return: A compatible user agent string217        """218        return Constants.USER_AGENT_FORMAT % {219            'app_version': kwargs.pop('app_version', None) or Constants.APP_VERSION,220            'android_version': int(kwargs.pop('android_version', None) or Constants.ANDROID_VERSION),221            'android_release': kwargs.pop('android_release', None) or Constants.ANDROID_RELEASE,222            'brand': kwargs.pop('phone_manufacturer', None) or Constants.PHONE_MANUFACTURER,223            'device': kwargs.pop('phone_device', None) or Constants.PHONE_DEVICE,224            'model': kwargs.pop('phone_model', None) or Constants.PHONE_MODEL,225            'dpi': kwargs.pop('phone_dpi', None) or Constants.PHONE_DPI,226            'resolution': kwargs.pop('phone_resolution', None) or Constants.PHONE_RESOLUTION,227            'chipset': kwargs.pop('phone_chipset', None) or Constants.PHONE_CHIPSET}228    @staticmethod229    def validate_useragent(value):230        """231        Helper method to validate a useragent string for format correctness232        :param value:233        :return:234        """235        mobj = re.search(Constants.USER_AGENT_EXPRESSION, value)236        if not mobj:237            raise ValueError('User-agent specified does not fit format required: {0!s}'.format(238                Constants.USER_AGENT_EXPRESSION))239        parse_params = {240            'app_version': mobj.group('app_version'),241            'android_version': int(mobj.group('android_version')),242            'android_release': mobj.group('android_release'),243            'brand': mobj.group('manufacturer'),244            'device': mobj.group('device'),245            'model': mobj.group('model'),246            'dpi': mobj.group('dpi'),247            'resolution': mobj.group('resolution'),248            'chipset': mobj.group('chipset')249        }250        return {251            'user_agent': Constants.USER_AGENT_FORMAT % parse_params,252            'parsed_params': parse_params253        }254    def get_cookie_value(self, key):255        for cookie in self.cookie_jar:256            if cookie.name.lower() == key.lower():257                return cookie.value258        return None259    @property260    def csrftoken(self):261        """The client's current csrf token"""262        return self.get_cookie_value('csrftoken')263    @property264    def token(self):265        """For compatibility. Equivalent to :meth:`csrftoken`"""266        return self.csrftoken267    @property268    def authenticated_user_id(self):269        """The current authenticated user id"""270        return self.get_cookie_value('ds_user_id')271    @property272    def authenticated_user_name(self):273        """The current authenticated user name"""274        return self.get_cookie_value('ds_user')275    @property276    def phone_id(self):277        """Current phone ID. For use in certain functions."""278        return self.generate_uuid(return_hex=False, seed=self.device_id)279    @property280    def timezone_offset(self):281        """Timezone offset in seconds. For use in certain functions."""282        return int(round((datetime.now() - datetime.utcnow()).total_seconds()))283    @property284    def rank_token(self):285        if not self.authenticated_user_id:286            return None287        return '{0!s}_{1!s}'.format(self.authenticated_user_id, self.uuid)288    @property289    def authenticated_params(self):290        return {291            '_csrftoken': self.csrftoken,292            '_uuid': self.uuid,293            '_uid': self.authenticated_user_id294        }295    @property296    def cookie_jar(self):297        """The client's cookiejar instance."""298        return self.opener.cookie_jar299    @property300    def default_headers(self):301        return {302            'User-Agent': self.user_agent,303            'Connection': 'close',304            'Accept': '*/*',305            'Accept-Language': 'en-US',306            'Accept-Encoding': 'gzip, deflate',307            'X-IG-Capabilities': self.ig_capabilities,308            'X-IG-Connection-Type': 'WIFI',309            'X-IG-Connection-Speed': '{0:d}kbps'.format(random.randint(1000, 5000)),310        }311    @property312    def radio_type(self):313        """For use in certain endpoints"""314        return 'wifi-none'315    def _generate_signature(self, data):316        """317        Generates the signature for a data string318        :param data: content to be signed319        :return:320        """321        return hmac.new(322            self.signature_key.encode('ascii'), data.encode('ascii'),323            digestmod=hashlib.sha256).hexdigest()324    @classmethod325    def generate_uuid(cls, return_hex=False, seed=None):326        """327        Generate uuid328        :param return_hex: Return in hex format329        :param seed: Seed value to generate a consistent uuid330        :return:331        """332        if seed:333            m = hashlib.md5()334            m.update(seed.encode('utf-8'))335            new_uuid = uuid.UUID(m.hexdigest())336        else:337            new_uuid = uuid.uuid1()338        if return_hex:339            return new_uuid.hex340        return str(new_uuid)341    @classmethod342    def generate_deviceid(cls, seed=None):343        """344        Generate an android device ID345        :param seed: Seed value to generate a consistent device ID346        :return:347        """348        return 'android-{0!s}'.format(cls.generate_uuid(True, seed)[:16])349    def generate_adid(self, seed=None):350        """351        Generate an Advertising ID based on the login username since352        the Google Ad ID is a personally identifying but resettable ID.353        :return:354        """355        modified_seed = seed or self.authenticated_user_name or self.username356        if modified_seed:357            # Do some trivial mangling of original seed358            sha2 = hashlib.sha256()359            sha2.update(modified_seed.encode('utf-8'))360            modified_seed = sha2.hexdigest()361        return self.generate_uuid(False, modified_seed)362    @staticmethod363    def _read_response(response):364        """365        Extract the response body from a http response.366        :param response:367        :return:368        """369        if response.info().get('Content-Encoding') == 'gzip':370            buf = BytesIO(response.read())371            res = gzip.GzipFile(fileobj=buf).read().decode('utf8')372        else:373            res = response.read().decode('utf8')374        return res375    def _call_api(self, endpoint, params=None, query=None, return_response=False, unsigned=False, version='v1'):376        """377        Calls the private api.378        :param endpoint: endpoint path that should end with '/', example 'discover/explore/'379        :param params: POST parameters380        :param query: GET url query parameters381        :param return_response: return the response instead of the parsed json object382        :param unsigned: use post params as-is without signing383        :param version: for the versioned api base url. Default 'v1'.384        :return:385        """386        url = '{0}{1}'.format(self.api_url.format(version=version), endpoint)387        if query:388            url += ('?' if '?' not in endpoint else '&') + compat_urllib_parse.urlencode(query)389        headers = self.default_headers390        data = None391        if params or params == '':392            headers['Content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'393            if params == '':    # force post if empty string394                data = ''.encode('ascii')395            else:396                if not unsigned:397                    json_params = json.dumps(params, separators=(',', ':'))398                    hash_sig = self._generate_signature(json_params)399                    post_params = {400                        'ig_sig_key_version': self.key_version,401                        'signed_body': hash_sig + '.' + json_params402                    }403                else:404                    # direct form post405                    post_params = params406                data = compat_urllib_parse.urlencode(post_params).encode('ascii')407        req = compat_urllib_request.Request(url, data, headers=headers)408        try:409            self.logger.debug('REQUEST: {0!s} {1!s}'.format(url, req.get_method()))410            self.logger.debug('DATA: {0!s}'.format(data))411            response = self.opener.open(req, timeout=self.timeout)412        except compat_urllib_error.HTTPError as e:413            error_msg = e.reason414            error_response = self._read_response(e)415            self.logger.debug('RESPONSE: {0:d} {1!s}'.format(e.code, error_response))416            try:417                error_obj = json.loads(error_response)418                if error_obj.get('message') == 'login_required':419                    raise ClientLoginRequiredError(420                        error_obj.get('message'), code=e.code,421                        error_response=json.dumps(error_obj))422                elif e.code == ClientErrorCodes.TOO_MANY_REQUESTS:423                    raise ClientThrottledError(424                        error_obj.get('message'), code=e.code,425                        error_response=json.dumps(error_obj))426                elif error_obj.get('message'):427                    error_msg = '{0!s}: {1!s}'.format(e.reason, error_obj['message'])428            except (ClientLoginError, ClientLoginRequiredError, ClientThrottledError):429                raise430            except Exception as ex:431                # do nothing else, prob can't parse json432                self.logger.warn('Error parsing error response: {}'.format(str(ex)))433            raise ClientError(error_msg, e.code, error_response)434        if return_response:435            return response436        response_content = self._read_response(response)437        self.logger.debug('RESPONSE: {0:d} {1!s}'.format(response.code, response_content))438        json_response = json.loads(response_content)439        if json_response.get('message', '') == 'login_required':440            raise ClientLoginRequiredError(441                json_response.get('message'), code=response.code,442                error_response=json.dumps(json_response))443        # not from oembed or an ok response444        if not json_response.get('provider_url') and json_response.get('status', '') != 'ok':445            raise ClientError(446                json_response.get('message', 'Unknown error'), code=response.code,447                error_response=json.dumps(json_response))...analytics.py
Source:analytics.py  
1import codecs2import datetime3import json4from util.database import Database5from vendor.instagram_private_api import Client6from api.user.user import User7from util.errors import NetworkError8import platform9import os10import ssl11import certifi12class Analytics(Database):13    def __init__(self):14        super().__init__()15        self.api: Client = None16        self.connectAccount(self.general_settings['instagram_account']['username'],17                            self.general_settings['instagram_account']['password'])18    @staticmethod19    def to_json(python_object):20        if isinstance(python_object, bytes):21            return {'__class__': 'bytes',22                    '__value__': codecs.encode(python_object, 'base64').decode()}23        raise TypeError(repr(python_object) + ' is not JSON serializable')24    @staticmethod25    def from_json(json_object):26        if '__class__' in json_object and json_object['__class__'] == 'bytes':27            return codecs.decode(json_object['__value__'].encode(), 'base64')28        return json_object29    def onLoginCallback(self, api, username):30        cache_settings = api.settings31        self.execute("DELETE FROM InstagramTokens WHERE `username` = %s", (self.user_id, username))32        self.insert("InstagramAccounts", {33            "username": username,34            "token": json.dumps(cache_settings, default=self.to_json)35        })36    def connectAccount(self, username, password=None):37        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)38        ssl_context.verify_mode = ssl.CERT_REQUIRED39        ssl_context.check_hostname = True40        ssl_context.load_default_certs()41        if platform.system().lower() == 'darwin':42            ssl_context.load_verify_locations(43                cafile=os.path.relpath(certifi.where()),44                capath=None,45                cadata=None)46        saved_account = self.selectOne("SELECT * FROM InstagramTokens WHERE `username` = %s ORDER BY `id` DESC LIMIT 1",47                                       username)48        if saved_account is not None:49            try:50                cache_settings = json.loads(saved_account['token'], object_hook=self.from_json)51                self.api = Client(auto_patch=True, drop_incompat_keys=False, username=username, password=password,52                                  settings=cache_settings, custom_ssl_context=ssl_context)53                return True54            except Exception:55                print("Cannot import old account")56                pass57        if password is None:58            return NetworkError(title="Cannot connect to account", message="No password given")59        self.api = Client(auto_patch=True, drop_incompat_keys=False, username=username, password=password,60                          on_login=lambda x: self.onLoginCallback(x, username), custom_ssl_context=ssl_context)61        return True62    def getAccountDetails(self, user_id, username):63        account = self.selectOne(64            "SELECT `username`, `followers`, `following`, `posts_count`, `median_likes`, `uid` FROM InstagramAccounts WHERE `user_id` = %s ORDER BY `updated` DESC LIMIT 1",65            user_id)66        if account is None or (username is not None and username != "" and username!=account['username']):67            if username is None or username == "":68                return NetworkError("No username provided", "No username has been provided")69            try:70                return self.getUpdateAccountDetails(user_id, username)71            except:72                return NetworkError("Invalid username", "Cannot retrieve account. The username is either invalid or the account is private.")73        return account74    def getUpdateAccountDetails(self, user_id, username):75        username_info = self.api.username_info(username)['user']76        user_feed = self.api.username_feed(user_name=username, count=64)['items']77        all_likes = [x['likes']['count'] for x in user_feed]78        median_likes = sum(all_likes) / len(all_likes)79        self.insert("InstagramAccounts", {80            "user_id": user_id,81            "username": username,82            "followers": username_info['follower_count'],83            "following": username_info['following_count'],84            "posts_count": username_info['counts']['media'],85            "median_likes": round(median_likes),86            "uid": username_info['id'],87            "feed": json.dumps(user_feed)88        }, check_table_column=False)89        return {90            "username": username,91            "followers": username_info['follower_count'],92            "following": username_info['following_count'],93            "posts_count": username_info['counts']['media'],94            "median_likes": round(median_likes),95            "uid": username_info['id']96        }97if __name__ == "__main__":98    analytics = Analytics()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
