Best Python code snippet using slash
client.py
Source:client.py  
1import os2import sys3import ssl4import json5import time6import urllib7import urllib38import certifi9import logging10import pathlib11import requests12import subprocess13import psutil1415from typing import Union16from typing import List17from typing import Dict1819from urllib3.exceptions import InsecureRequestWarning20from ibw.clientportal import ClientPortal2122urllib3.disable_warnings(category=InsecureRequestWarning)23# http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())2425try:26    _create_unverified_https_context = ssl._create_unverified_context27except AttributeError:28    # Legacy Python that doesn't verify HTTPS certificates by default29    pass30else:31    # Handle target environment that doesn't support HTTPS verification32    ssl._create_default_https_context = _create_unverified_https_context3334logging.basicConfig(35    filename='app.log',36    format='%(levelname)s - %(name)s - %(message)s',37    level=logging.DEBUG38)394041class IBClient():4243    def __init__(self, username: str, account: str, client_gateway_path: str = None) -> None:44        """Initalizes a new instance of the IBClient Object.4546        Arguments:47        ----48        username {str} -- Your IB account username for either your paper or regular account.4950        account {str} -- Your IB account number for either your paper or regular account.5152        Keyword Arguments:53        ----54        password {str} -- Your IB account password for either your paper or regular account. (default:{""})5556        Usage:57        ----58            >>> ib_paper_session = IBClient(59                username='IB_PAPER_USERNAME',60                account='IB_PAPER_ACCOUNT',61            )62            >>> ib_paper_session63            >>> ib_regular_session = IBClient(64                username='IB_REGULAR_USERNAME',65                account='IB_REGULAR_ACCOUNT',66            )67            >>> ib_regular_session68        """6970        self.account = account71        self.username = username72        self.client_portal_client = ClientPortal()7374        self.api_version = 'v1/'75        self._operating_system = sys.platform76        self.session_state_path: pathlib.Path = pathlib.Path(__file__).parent.joinpath('server_session.json').resolve()77        self.authenticated = False7879        # Define URL Components80        ib_gateway_host = r"https://localhost"81        ib_gateway_port = r"5000"82        self.ib_gateway_path = ib_gateway_host + ":" + ib_gateway_port83        self.backup_gateway_path = r"https://cdcdyn.interactivebrokers.com/portal.proxy"8485        if client_gateway_path is None:8687            # Grab the Client Portal Path.88            self.client_portal_folder: pathlib.Path = pathlib.Path(__file__).parents[1].joinpath(89                'resources/clientportal.beta.gw'90            ).resolve()9192            # See if it exists.93            if not self.client_portal_folder.exists():94                print("The Client Portal Gateway doesn't exist. You need to download it before using the Library.")95                print("Downloading the Client Portal file...")96                self.client_portal_client.download_and_extract()9798        else:99            self.client_portal_folder = client_gateway_path100101        # Log the initial Info.102        logging.info('''103        Operating System: {op_sys}104        Session State Path: {state_path}105        Client Portal Folder: {client_path}106        '''.format(107            op_sys=self._operating_system,108            state_path=self.session_state_path,109            client_path=self.client_portal_folder110        )111        )112113        # Load the Server State.114        self.server_process = self._server_state(action='load')115116        # Log the response.117        logging.debug('''118            Server Prcoess Init: {serv_proc}119            '''.format(serv_proc=self.server_process)120                      )121122    def create_session(self) -> bool:123        """Creates a new session.124125        Creates a new session with Interactive Broker using the credentials126        passed through when the Robot was initalized.127128        Usage:129        ----130            >>> ib_client = IBClient(131                username='IB_PAPER_username',132                password='IB_PAPER_PASSWORD',133                account='IB_PAPER_account',134            )135            >>> server_response = ib_client.create_session()136            >>> server_response137                True138139        Returns:140        ----141        bool -- True if the session was created, False if wasn't created.142        """143144        # Log the process ID.145        logging.info('Server Process: {serv_proc}'.format(serv_proc=self.server_process))146147        # first let's check if the server is running, if it's not then we can start up.148        if self.server_process is None:149150            # If it's None we need to connect first.151            self.connect(start_server=True)152153            # then make sure the server is updated.154            if self._set_server():155                return True156157        # more than likely it's running let's try and see if we can authenticate.158        auth_response = self.is_authenticated()159160        if auth_response is None:161            time.sleep(2)162            auth_response = self.is_authenticated()163164        if 'authenticated' in auth_response.keys() and auth_response['authenticated'] == True:165166            if self._set_server():167                self.authenticated = True168                return True169170        else:171172            # in this case don't connect, but prompt the user to log in again.173            self.connect(start_server=False)174175            if self._set_server():176                self.authenticated = True177                return True178179    def _set_server(self) -> bool:180        """Sets the server info for the session.181182        Sets the Server for the session, and if the server cannot be set then183        script will halt. Otherwise will return True to continue on in the script.184185        Returns:186        ----187        bool -- True if the server was set, False if wasn't188        """189190        server_update_content = self.update_server_account(account_id=self.account, check=False)191        server_account_content = self.server_accounts()192193        success = '\nNew session has been created and authenticated. Requests will not be limited.\n'.upper()194        failure = '\nCould not create a new session that was authenticated, exiting script.\n'.upper()195196        # Log the response.197        logging.debug('''198            Server Update Response: {auth_resp}199            Server Response: {serv_resp}200            '''.format(201            auth_resp=server_update_content,202            serv_resp=server_account_content203        )204        )205206        # TO DO: Add check market hours here and then check for a mutual fund.207        # if 'news' in self.data_news(conid='265598'):208        #     print(success)209        #     return True210        if server_account_content is not None and 'set' in server_update_content.keys() and server_update_content['set'] == True:211            print(success)212            return True213        elif ('error' in server_update_content.keys()) and (server_update_content['error'] == 'Account already set'):214            print(success)215            return True216        else:217            print(failure)218            sys.exit()219220    def _server_state(self, action: str = 'save') -> Union[None, int]:221        """Determines the server state.222223        Maintains the server state, so we can easily load a previous session,224        save a new session, or delete a closed session.225226        Arguments:227        ----228        action {str} -- The action you wish to take to the `json` file. Can be one of the following options:229230        1. save - saves the current state and overwrites the old one.231        2. load - loads the previous state from a session that has a server still running.232        3. delete - deletes the state because the server has been closed.233234        Returns:235        ----236        Union[None, int] -- The Process ID of the Server.237        """238239        # Define file components.240        file_exists = self.session_state_path.exists()241242        # Log the response.243        logging.debug('''244            Server State: {state}245            State File: {exist}246            '''.format(247            state=action,248            exist=file_exists249        )250        )251252        if action == 'save':253254            # Save the State.255            with open(self.session_state_path, 'w') as server_file:256                json.dump(obj={'server_process_id': self.server_process}, fp=server_file)257258        # If we are loading check the file exists first.259        elif action == 'load' and file_exists:260261            # Load it.262            with open(self.session_state_path, 'r') as server_file:263                server_state = json.load(fp=server_file)264265            # Grab the Process Id.266            proc_id = server_state['server_process_id']267268            # If it's running return the process ID.269            is_running = self._check_if_server_running(process_id=proc_id)270271            if is_running:272                return proc_id273274        # Delete it.275        elif action == 'delete' and file_exists:276            self.session_state_path.unlink()277278    def _check_if_server_running(self, process_id: str) -> bool:279280        if self._operating_system == 'win32':281282            # See if the Process is running.283            with os.popen('tasklist') as task_list:284285                # Grab each task.286                for process in task_list.read().splitlines()[4:]:287288                    if str(process_id) in process:289290                        # Log the response.291                        logging.debug('''292                            Process ID Found: {process}293                            '''.format(294                            process=process295                        )296                        )297298                        process_details = process.split()299                        return True300301        else:302303            if process_id == None:304                return True305306            try:307                os.kill(process_id, 0)308                return True309            except OSError:310                return False311312    def _check_authentication_user_input(self) -> bool:313        """Used to check the authentication of the Server.314315        Returns:316        ----317        bool: `True` if authenticated, `False` otherwise.318        """319320        max_retries = 0321322        while (max_retries > 4 or self.authenticated == False):323324            user_input = input('Would you like to make an authenticated request (Yes/No)? ').upper()325326            if user_input == 'NO':327                self.close_session()328            else:329                auth_response = self.is_authenticated()330331            logging.debug('Check User Auth Inital: {auth_resp}'.format(332                auth_resp=auth_response333            )334            )335336            if 'statusCode' in auth_response.keys() and auth_response['statusCode'] == 401:337                print("Session isn't connected, closing script.")338                self.close_session()339340            elif 'authenticated' in auth_response.keys() and auth_response['authenticated'] == True:341                self.authenticated = True342                break343344            elif 'authenticated' in auth_response.keys() and auth_response['authenticated'] == False:345                valid_resp = self.validate()346                reauth_resp = self.reauthenticate()347                auth_response = self.is_authenticated()348349                try:350                    news_resp = self.data_news(conid='265598')351                    if 'news' in news_resp:352                        self.authenticated = True353354                        #  Log the response.355                        logging.debug('Had to do News Update Response: {auth_resp}'.format(356                            auth_resp=news_resp357                        )358                        )359                        break360                except:361                    pass362363                logging.debug(364                    '''365                    Validate Response: {valid_resp}366                    Reauth Response: {reauth_resp}367                    '''.format(368                        valid_resp=valid_resp,369                        reauth_resp=reauth_resp370                    )371                )372373            #     if self.reauthenticate().get('message','Null') == 'triggered':374            #         self.authenticated = True375            #     else:376            #         self.authenticated = False377378            # elif auth_response.get('authenticated','Null') in (False, 'Null') and auth_response.get('connected','Null') in (False, 'Null'):379380            #     self.validate()381            #     if self.reauthenticate() == True:382            #         self.authenticated = True383            #     else:384            #         self.authenticated = False385386            max_retries += 1387388        return self.authenticated389390    def _check_authentication_non_input(self) -> bool:391392        auth_response = self.is_authenticated()393394        if 'statusCode' in auth_response:395            print("Session isn't connected, closing script.")396            self.close_session()397398        elif 'authenticated' in auth_response and auth_response['authenticated']:399            self.authenticated = True400401        elif 'authenticated' in auth_response and not auth_response['authenticated']:402403            self.validate()404            self.reauthenticate()405406            if self.reauthenticate().get('message', 'Null') == 'triggered':407                self.authenticated = True408            else:409                self.authenticated = False410411        elif auth_response.get('authenticated', 'Null') in (False, 'Null') and auth_response.get('connected', 'Null') in (False, 'Null'):412413            self.validate()414            if self.reauthenticate() == True:415                self.authenticated = True416            else:417                self.authenticated = False418419    def _start_server(self) -> str:420        """Starts the Server.421422        Returns:423        ----424        str: The Server Process ID.425        """426427        # windows will use the command line application.428        if self._operating_system == 'win32':429            IB_WEB_API_PROC = ["cmd", "/k", r"bin\run.bat", r"root\conf.yaml"]430            self.server_process = subprocess.Popen(431                args=IB_WEB_API_PROC,432                cwd=self.client_portal_folder,433                creationflags=subprocess.CREATE_NEW_CONSOLE434            ).pid435436        # mac will use the terminal.437        elif self._operating_system == 'darwin':438            IB_WEB_API_PROC = ["open", "-F", "-a", "Terminal", r"bin/run.sh", r"root/conf.yaml"]439            self.server_process = subprocess.Popen(440                args=IB_WEB_API_PROC,441                cwd=self.client_portal_folder,442            ).pid443444        # if linux445        else:446            IB_WEB_API_PROC = [r"bin/run.sh", r"root/conf.yaml"]447            self.server_process = subprocess.Popen(448                args=IB_WEB_API_PROC,449                cwd=self.client_portal_folder,450                stdout=subprocess.DEVNULL451            ).pid452453            time.sleep(2)454455            for proc in psutil.process_iter():456                if 'java' in proc.name():457                    self.server_process = proc.pid458459        return self.server_process460461    def connect(self, start_server: bool = True) -> bool:462        """Connects the session with the API.463464        Connects the session to the Interactive Broker API by, starting up the Client Portal Gateway,465        prompting the user to log in and then returns the results back to the `create_session` method.466467        Arguments:468        ----469        start_server {bool} -- True if the server isn't running but needs to be started, False if it470            is running and just needs to be authenticated.471472        Returns:473        ----474        bool -- `True` if it was connected.475        """476477        logging.debug('Running Client Folder at: {file_path}'.format(file_path=self.client_portal_folder))478479        # If needed, start the server and save the State.480        if start_server:481            server_state = self._start_server()482            self._server_state(action='save')483484        print("""{}485        The Interactive Broker server is currently starting up, so we can authenticate your session.486            STEP 1: GO TO THE FOLLOWING URL: {}487            STEP 2: LOGIN TO YOUR account WITH YOUR username AND PASSWORD.488            STEP 3: WHEN YOU SEE `Client login succeeds` RETURN BACK TO THE TERMINAL AND TYPE `YES` TO CHECK IF THE SESSION IS AUTHENTICATED.489            SERVER IS RUNNING ON PROCESS ID: {}490        {}491        """.format('-'*80, self.ib_gateway_path + "/sso/Login?forwardTo=22&RL=1&ip2loc=on", self.server_process, '-'*80)492              )493494        # Check the auth status495        auth_status = self._check_authentication_user_input()496497        return auth_status498499    def close_session(self) -> None:500        """Closes the current session and kills the server using Taskkill."""501502        print('\nCLOSING SERVER AND EXITING SCRIPT.')503504        if self._operating_system == 'win32':505506            # kill the process.507            return_code = subprocess.call("TASKKILL /F /PID {} /T".format(self.server_process), creationflags=subprocess.DETACHED_PROCESS)508509        else:510            return_code = os.kill(self.server_process, 9)511512        # delete the state513        self._server_state(action='delete')514515        # and exit.516        sys.exit()517518    def _headers(self, mode: str = 'json') -> Dict:519        """ 520            Returns a dictionary of default HTTP headers for calls to TD Ameritrade API,521            in the headers we defined the Authorization and access token.522523        Arguments:524        ----525        mode {str} -- Defines the content-type for the headers dictionary.526            default is 'json'. Possible values are ['json','form']527528        Returns:529        ----530        Dict531        """532533        if mode == 'json':534            headers = {'Content-Type': 'application/json'}535        elif mode == 'form':536            headers = {'Content-Type': 'application/x-www-form-urlencoded'}537538        return headers539540    def _build_url(self, endpoint: str) -> str:541        """Builds a url for a request.542543        Arguments:544        ----545        endpoint {str} -- The URL that needs conversion to a full endpoint URL.546547        Returns:548        ----549        {srt} -- A full URL path.550        """551552        # otherwise build the URL553        return urllib.parse.unquote(urllib.parse.urljoin(self.ib_gateway_path, self.api_version) + r'portal/' + endpoint)554555    def _make_request(self, endpoint: str, req_type: str, params: Dict = None) -> Dict:556        """Handles the request to the client.557558        Handles all the requests made by the client and correctly organizes559        the information so it is sent correctly. Additionally it will also560        build the URL.561562        Arguments:563        ----564        endpoint {str} -- The endpoint we wish to request.565566        req_type {str} --  Defines the type of request to be made. Can be one of four567            possible values ['GET','POST','DELETE','PUT']568569        params {dict} -- Any arguments that are to be sent along in the request. That570            could be parameters of a 'GET' request, or a data payload of a571            'POST' request.572573        Returns:574        ----575        {Dict} -- A response dictionary.576577        """578579        # first build the url580        url = self._build_url(endpoint=endpoint)581582        # Scenario 1: POST with a payload.583        if req_type == 'POST' and params is not None:584585            # make sure it's a JSON String586            headers = self._headers(mode='json')587588            # grab the response.589            response = requests.post(url, headers=headers, json=params, verify=False)590591        # SCENARIO 2: POST without a payload.592        elif req_type == 'POST' and params is None:593594            # grab the response.595            response = requests.post(url, headers=self._headers(mode='json'), verify=False)596597        # SCENARIO 3: GET without parameters.598        elif req_type == 'GET' and params is None:599600            # grab the response.601            response = requests.get(url, headers=self._headers(mode='json'), verify=False)602603         # SCENARIO 3: GET with parameters.604        elif req_type == 'GET' and params is not None:605606            # grab the response.607            response = requests.get(url, headers=self._headers(mode='json'), params=params, verify=False)608609        # grab the status code610        status_code = response.status_code611612        # grab the response headers.613        response_headers = response.headers614615        # Check to see if it was successful616        if response.ok:617618            if response_headers.get('Content-Type', 'null') == 'application/json;charset=utf-8':619                return response.json()620            else:621                return response.json()622623        elif url == 'https://localhost:5000/v1/portal/iserver/account':624625            if response_headers.get('Content-Type', 'null') == 'application/json;charset=utf-8':626                return response.json()627            else:628                return response.json()629630        # if it was a bad request print it out.631        else:632633            print('')634            print('-'*80)635            print("BAD REQUEST - STATUS CODE: {}".format(status_code))636            print("RESPONSE URL: {}".format(response.url))637            print("RESPONSE HEADERS: {}".format(response.headers))638            print("RESPONSE TEXT: {}".format(response.text))639            print('-'*80)640            print('')641642    def _prepare_arguments_list(self, parameter_list: List[str]) -> str:643        """Prepares the arguments for the request.644645        Some endpoints can take multiple values for a parameter, this646        method takes that list and creates a valid string that can be647        used in an API request. The list can have either one index or648        multiple indexes.649650        Arguments:651        ----652        parameter_list {List} -- A list of paramater values assigned to an argument.653654        Usage:655        ----656            >>> SessionObject._prepare_arguments_list(parameter_list=['MSFT','SQ'])657658        Returns:659        ----660        {str} -- The joined list.661662        """663664        # validate it's a list.665        if type(parameter_list) is list:666667            # specify the delimiter and join the list.668            delimiter = ','669            parameter_list = delimiter.join(parameter_list)670671        return parameter_list672673    """674        SESSION ENDPOINTS675    """676677    def validate(self) -> Dict:678        """Validates the current session for the SSO user."""679680        # define request components681        endpoint = r'sso/validate'682        req_type = 'GET'683        content = self._make_request(endpoint=endpoint, req_type=req_type)684685        return content686687    def tickle(self) -> Dict:688        """Keeps the session open.689690        If the gateway has not received any requests for several minutes an open session will 691        automatically timeout. The tickle endpoint pings the server to prevent the 692        session from ending.693        """694695        # define request components696        endpoint = r'tickle'697        req_type = 'POST'698        content = self._make_request(endpoint=endpoint, req_type=req_type)699700        return content701702    def logout(self) -> Dict:703        """Logs the session out.704705        Logs the user out of the gateway session. Any further activity requires 706        re-authentication.707        """708709        # https://cdcdyn.interactivebrokers.com/portal.proxy/v1/portal/logout710        # https://cdcdyn.interactivebrokers.com/portal.proxy/v1/ibcust/logout711        # https://cdcdyn.interactivebrokers.com/sso/Logout?RL=1712713        # define request components714        endpoint = r'logout'715        req_type = 'POST'716        content = self._make_request(endpoint=endpoint, req_type=req_type)717718        return content719720    def reauthenticate(self) -> Dict:721        """Reauthenticates an existing session.722723        Provides a way to reauthenticate to the Brokerage system as long as there 724        is a valid SSO session, see /sso/validate.725        """726727        # define request components728        endpoint = r'iserver/reauthenticate'729        req_type = 'POST'730731        # this is special, I don't want the JSON content right away.732        content = self._make_request(endpoint=endpoint, req_type=req_type)733734        return content735736    def is_authenticated(self) -> Dict:737        """Checks if session is authenticated.738739        Current Authentication status to the Brokerage system. Market Data and 740        Trading is not possible if not authenticated, e.g. authenticated 741        shows `False`.742        """743744        # define request components745        endpoint = 'iserver/auth/status'746        req_type = 'POST'747        content = self._make_request(endpoint=endpoint, req_type=req_type)748749        return content750751    """752        FUNDAMENTAL DATA ENDPOINTS753    """754755    def fundamentals_summary(self, conid: str) -> Dict:756        """Grabs a financial summary of a company.757758        Return a financial summary for specific Contract ID. The financial summary759        includes key ratios and descriptive components of the Contract ID.760761        Arguments:762        ----        763        conid {str} -- The contract ID.764765        Returns:766        ----767        {Dict} -- The response dictionary.768        """769770        # define request components771        endpoint = 'iserver/fundamentals/{}/summary'.format(conid)772        req_type = 'GET'773        content = self._make_request(endpoint=endpoint, req_type=req_type)774775        return content776777    def fundamentals_financials(self, conid: str, financial_statement: str, period: str = 'annual') -> Dict:778        """779            Return a financial summary for specific Contract ID. The financial summary780            includes key ratios and descriptive components of the Contract ID.781782            NAME: conid783            DESC: The contract ID.784            TYPE: String785786            NAME: financial_statement787            DESC: The specific financial statement you wish to request for the Contract ID. Possible788                  values are ['balance','cash','income']789            TYPE: String790791            NAME: period792            DESC: The specific period you wish to see. Possible values are ['annual','quarter']793            TYPE: String794795            RTYPE: Dictionary796        """797798        # define the period799        if period == 'annual':800            period = True801        else:802            period = False803804        # Build the arguments.805        params = {806            'type': financial_statement,807            'annual': period808        }809810        # define request components811        endpoint = 'fundamentals/financials/{}'.format(conid)812        req_type = 'GET'813        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)814815        return content816817    def fundamentals_key_ratios(self, conid: str) -> Dict:818        """819            Returns analyst ratings for a specific conid.820821            NAME: conid822            DESC: The contract ID.823            TYPE: String824825        """826827        # Build the arguments.828        params = {829            'widgets': 'key_ratios'830        }831832        # define request components833        endpoint = 'fundamentals/landing/{}'.format(conid)834        req_type = 'GET'835        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)836837        return content838839    def fundamentals_dividends(self, conid: str) -> Dict:840        """841            Returns analyst ratings for a specific conid.842843            NAME: conid844            DESC: The contract ID.845            TYPE: String846847        """848849        # Build the arguments.850        params = {851            'widgets': 'dividends'852        }853854        # define request components855        endpoint = 'fundamentals/landing/{}'.format(conid)856        req_type = 'GET'857        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)858859        return content860861    def fundamentals_esg(self, conid: str) -> Dict:862        """863            Returns analyst ratings for a specific conid.864865            NAME: conid866            DESC: The contract ID.867            TYPE: String868869        """870871        # Build the arguments.872        params = {873            'widgets': 'esg'874        }875876        # define request components877        endpoint = 'fundamentals/landing/{}'.format(conid)878        req_type = 'GET'879        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)880881        return content882883    """884        DATA ENDPOINTS885    """886887    def data_news(self, conid: str) -> Dict:888        """889            Return a financial summary for specific Contract ID. The financial summary890            includes key ratios and descriptive components of the Contract ID.891892            NAME: conid893            DESC: The contract ID.894            TYPE: String895896        """897898        # Build the arguments.899        params = {900            'widgets': 'news',901            'lang': 'en'902        }903904        # define request components905        endpoint = 'fundamentals/landing/{}'.format(conid)906        req_type = 'GET'907        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)908909        return content910911    def data_ratings(self, conid: str) -> Dict:912        """913            Returns analyst ratings for a specific conid.914915            NAME: conid916            DESC: The contract ID.917            TYPE: String918919        """920921        # Build the arguments.922        params = {923            'widgets': 'ratings'924        }925926        # define request components927        endpoint = 'fundamentals/landing/{}'.format(conid)928        req_type = 'GET'929        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)930931        return content932933    def _data_events(self, conid: str) -> Dict:934        """935            Returns analyst ratings for a specific conid.936937            NAME: conid938            DESC: The contract ID.939            TYPE: String940941        """942943        # Build the arguments.944        params = {945            'widgets': 'ratings'946        }947948        # define request components949        endpoint = 'fundamentals/landing/{}'.format(conid)950        req_type = 'GET'951        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)952953        return content954955    def data_ownership(self, conid: str) -> Dict:956        """957            Returns analyst ratings for a specific conid.958959            NAME: conid960            DESC: The contract ID.961            TYPE: String962963        """964965        # Build the arguments.966        params = {967            'widgets': 'ownership'968        }969970        # define request components971        endpoint = 'fundamentals/landing/{}'.format(conid)972        req_type = 'GET'973        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)974975        return content976977    def data_competitors(self, conid: str) -> Dict:978        """979            Returns analyst ratings for a specific conid.980981            NAME: conid982            DESC: The contract ID.983            TYPE: String984985        """986987        # Build the arguments.988        params = {989            'widgets': 'competitors'990        }991992        # define request components993        endpoint = 'fundamentals/landing/{}'.format(conid)994        req_type = 'GET'995        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)996997        return content998999    def data_analyst_forecast(self, conid: str) -> Dict:1000        """1001            Returns analyst ratings for a specific conid.10021003            NAME: conid1004            DESC: The contract ID.1005            TYPE: String10061007        """10081009        # Build the arguments.1010        params = {1011            'widgets': 'analyst_forecast'1012        }10131014        # define request components1015        endpoint = 'fundamentals/landing/{}'.format(conid)1016        req_type = 'GET'1017        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)10181019        return content10201021    def market_data(self, conids: List[str], since: str, fields: List[str]) -> Dict:1022        """1023            Get Market Data for the given conid(s). The end-point will return by 1024            default bid, ask, last, change, change pct, close, listing exchange. 1025            See response fields for a list of available fields that can be request 1026            via fields argument. The endpoint /iserver/accounts should be called 1027            prior to /iserver/marketdata/snapshot. To receive all available fields 1028            the /snapshot endpoint will need to be called several times.10291030            NAME: conid1031            DESC: The list of contract IDs you wish to pull current quotes for.1032            TYPE: List<String>10331034            NAME: since1035            DESC: Time period since which updates are required.1036                  Uses epoch time with milliseconds.1037            TYPE: String10381039            NAME: fields1040            DESC: List of fields you wish to retrieve for each quote.1041            TYPE: List<String>10421043        """10441045        # define request components1046        endpoint = 'iserver/marketdata/snapshot'1047        req_type = 'GET'10481049        # join the two list arguments so they are both a single string.1050        conids_joined = self._prepare_arguments_list(parameter_list=conids)10511052        if fields is not None:1053            fields_joined = ",".join(str(n) for n in fields)1054        else:1055            fields_joined = ""10561057        # define the parameters1058        if since is None:1059            params = {1060                'conids': conids_joined,1061                'fields': fields_joined1062            }1063        else:1064            params = {1065                'conids': conids_joined,1066                'since': since,1067                'fields': fields_joined1068            }10691070        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)10711072        return content10731074    def market_data_history(self, conid: str, period: str, bar: str) -> Dict:1075        """1076            Get history of market Data for the given conid, length of data is controlled by period and 1077            bar. e.g. 1y period with bar=1w returns 52 data points.10781079            NAME: conid1080            DESC: The contract ID for a given instrument. If you don't know the contract ID use the1081                  `search_by_symbol_or_name` endpoint to retrieve it.1082            TYPE: String10831084            NAME: period1085            DESC: Specifies the period of look back. For example 1y means looking back 1 year from today.1086                  Possible values are ['1d','1w','1m','1y']1087            TYPE: String10881089            NAME: bar1090            DESC: Specifies granularity of data. For example, if bar = '1h' the data will be at an hourly level.1091                  Possible values are ['5min','1h','1w']1092            TYPE: String10931094        """10951096        # define request components1097        endpoint = 'iserver/marketdata/history'1098        req_type = 'GET'1099        params = {1100            'conid': conid,1101            'period': period,1102            'bar': bar1103        }1104        content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)11051106        return content11071108    """1109        SERVER ACCOUNTS ENDPOINTS1110    """11111112    def server_accounts(self):1113        """11141115            Returns a list of accounts the user has trading access to, their1116            respective aliases and the currently selected account. Note this1117            endpoint must be called before modifying an order or querying1118            open orders.11191120        """11211122        # define request components1123        endpoint = 'iserver/accounts'1124        req_type = 'GET'1125        content = self._make_request(endpoint=endpoint, req_type=req_type)11261127        return content11281129    def update_server_account(self, account_id: str, check: bool = False) -> Dict:1130        """1131            If an user has multiple accounts, and user wants to get orders, trades, 1132            etc. of an account other than currently selected account, then user 1133            can update the currently selected account using this API and then can 1134            fetch required information for the newly updated account.11351136            NAME: account_id1137            DESC: The account ID you wish to set for the API Session. This will be used to1138                  grab historical data and make orders.1139            TYPE: String11401141        """11421143        # define request components1144        endpoint = 'iserver/account'1145        req_type = 'POST'1146        params = {1147            'acctId': account_id1148        }11491150        content = self._make_request(1151            endpoint=endpoint,1152            req_type=req_type,1153            params=params1154        )11551156        return content11571158    def server_account_pnl(self):1159        """1160            Returns an object containing PnLfor the selected account and its models 1161            (if any).1162        """11631164        # define request components1165        endpoint = 'iserver/account/pnl/partitioned'1166        req_type = 'GET'1167        content = self._make_request(endpoint=endpoint, req_type=req_type)11681169        return content11701171    """1172        CONTRACT ENDPOINTS1173    """11741175    def symbol_search(self, symbol: str) -> Dict:1176        """1177            Performs a symbol search for a given symbol and returns information related to the1178            symbol including the contract id.1179        """11801181        # define the request components1182        endpoint = 'iserver/secdef/search'1183        req_type = 'POST'1184        payload = {'symbol': symbol}1185        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)11861187        return content11881189    def contract_details(self, conid: str) -> Dict:1190        """1191            Get contract details, you can use this to prefill your order before you submit an order.11921193            NAME: conid1194            DESC: The contract ID you wish to get details for.1195            TYPE: String11961197            RTYPE: Dictionary1198        """11991200        # define the request components1201        endpoint = '/iserver/contract/{conid}/info'.format(conid=conid)1202        req_type = 'GET'1203        content = self._make_request(endpoint=endpoint, req_type=req_type)12041205        return content12061207    def contracts_definitions(self, conids: List[str]) -> Dict:1208        """1209            Returns a list of security definitions for the given conids.12101211            NAME: conids1212            DESC: A list of contract IDs you wish to get details for.1213            TYPE: List<Integer>12141215            RTYPE: Dictionary1216        """12171218        # define the request components1219        endpoint = '/trsrv/secdef'1220        req_type = 'POST'1221        payload = {1222            'conids': conids1223        }1224        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12251226        return content12271228    def futures_search(self, symbols: List[str]) -> Dict:1229        """1230            Returns a list of non-expired future contracts for given symbol(s).12311232            NAME: Symbol1233            DESC: List of case-sensitive symbols separated by comma.1234            TYPE: List<String>12351236            RTYPE: Dictionary1237        """12381239        # define the request components1240        endpoint = '/trsrv/futures'1241        req_type = 'GET'1242        payload = {'symbols': "{}".format(','.join(symbols))}1243        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12441245        return content12461247    def symbols_search_list(self, symbols: List[str]) -> Dict:1248        """1249            Returns a list of non-expired future contracts for given symbol(s).12501251            NAME: Symbol1252            DESC: List of case-sensitive symbols separated by comma.1253            TYPE: List<String>12541255            RTYPE: Dictionary1256        """12571258        # define the request components1259        endpoint = '/trsrv/stocks'1260        req_type = 'GET'1261        payload = {'symbols': '{}'.format(','.join(symbols))}1262        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12631264        return content12651266    """1267        PORTFOLIO ACCOUNTS ENDPOINTS1268    """12691270    def portfolio_accounts(self):1271        """1272            In non-tiered account structures, returns a list of accounts for which the 1273            user can view position and account information. This endpoint must be called prior 1274            to calling other /portfolio endpoints for those accounts. For querying a list of accounts 1275            which the user can trade, see /iserver/accounts. For a list of subaccounts in tiered account 1276            structures (e.g. financial advisor or ibroker accounts) see /portfolio/subaccounts.12771278        """12791280        # define request components1281        endpoint = 'portfolio/accounts'1282        req_type = 'GET'1283        content = self._make_request(endpoint=endpoint, req_type=req_type)12841285        return content12861287    def portfolio_sub_accounts(self):1288        """1289            Used in tiered account structures (such as financial advisor and ibroker accounts) to return a 1290            list of sub-accounts for which the user can view position and account-related information. This 1291            endpoint must be called prior to calling other /portfolio endpoints for those subaccounts. To 1292            query a list of accounts the user can trade, see /iserver/accounts.12931294        """12951296        # define request components1297        endpoint = r'âportfolio/subaccounts'1298        req_type = 'GET'1299        content = self._make_request(endpoint=endpoint, req_type=req_type)13001301        return content13021303    def portfolio_account_info(self, account_id: str) -> Dict:1304        """1305            Used in tiered account structures (such as financial advisor and ibroker accounts) to return a 1306            list of sub-accounts for which the user can view position and account-related information. This 1307            endpoint must be called prior to calling other /portfolio endpoints for those subaccounts. To 1308            query a list of accounts the user can trade, see /iserver/accounts.13091310            NAME: account_id1311            DESC: The account ID you wish to return info for.1312            TYPE: String13131314        """13151316        # define request components1317        endpoint = r'portfolio/{}/meta'.format(account_id)1318        req_type = 'GET'1319        content = self._make_request(endpoint=endpoint, req_type=req_type)13201321        return content13221323    def portfolio_account_summary(self, account_id: str) -> Dict:1324        """1325            Returns information about margin, cash balances and other information 1326            related to specified account. See also /portfolio/{accountId}/ledger. 1327            /portfolio/accounts or /portfolio/subaccounts must be called 1328            prior to this endpoint.13291330            NAME: account_id1331            DESC: The account ID you wish to return info for.1332            TYPE: String13331334        """13351336        # define request components1337        endpoint = r'portfolio/{}/summary'.format(account_id)1338        req_type = 'GET'1339        content = self._make_request(endpoint=endpoint, req_type=req_type)13401341        return content13421343    def portfolio_account_ledger(self, account_id: str) -> Dict:1344        """1345            Information regarding settled cash, cash balances, etc. in the account's 1346            base currency and any other cash balances hold in other currencies. /portfolio/accounts 1347            or /portfolio/subaccounts must be called prior to this endpoint. The list of supported 1348            currencies is available at https://www.interactivebrokers.com/en/index.php?f=3185.13491350            NAME: account_id1351            DESC: The account ID you wish to return info for.1352            TYPE: String13531354        """13551356        # define request components1357        endpoint = r'portfolio/{}/ledger'.format(account_id)1358        req_type = 'GET'1359        content = self._make_request(endpoint=endpoint, req_type=req_type)13601361        return content13621363    def portfolio_account_allocation(self, account_id: str) -> Dict:1364        """1365            Information about the account's portfolio allocation by Asset Class, Industry and 1366            Category. /portfolio/accounts or /portfolio/subaccounts must be called prior to 1367            this endpoint.13681369            NAME: account_id1370            DESC: The account ID you wish to return info for.1371            TYPE: String13721373        """13741375        # define request components1376        endpoint = r'portfolio/{}/allocation'.format(account_id)1377        req_type = 'GET'1378        content = self._make_request(endpoint=endpoint, req_type=req_type)13791380        return content13811382    def portfolio_accounts_allocation(self, account_ids: List[str]) -> Dict:1383        """1384            Similar to /portfolio/{accountId}/allocation but returns a consolidated view of of all the 1385            accounts returned by /portfolio/accounts. /portfolio/accounts or /portfolio/subaccounts must 1386            be called prior to this endpoint.13871388            NAME: account_ids1389            DESC: A list of Account IDs you wish to return alloacation info for.1390            TYPE: List<String>13911392        """13931394        # define request components1395        endpoint = r'portfolio/allocation'1396        req_type = 'POST'1397        payload = account_ids1398        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)13991400        return content14011402    def portfolio_account_positions(self, account_id: str, page_id: int = 0) -> Dict:1403        """1404            Returns a list of positions for the given account. The endpoint supports paging, 1405            page's default size is 30 positions. /portfolio/accounts or /portfolio/subaccounts 1406            must be called prior to this endpoint.14071408            NAME: account_id1409            DESC: The account ID you wish to return positions for.1410            TYPE: String14111412            NAME: page_id1413            DESC: The page you wish to return if there are more than 1. The1414                  default value is `0`.1415            TYPE: String141614171418            ADDITIONAL ARGUMENTS NEED TO BE ADDED!!!!!1419        """14201421        # define request components1422        endpoint = r'portfolio/{}/positions/{}'.format(account_id, page_id)1423        req_type = 'GET'1424        content = self._make_request(endpoint=endpoint, req_type=req_type)14251426        return content14271428    #1429    #   RENAME THIS1430    #14311432    def portfolio_account_position(self, account_id: str, conid: str) -> Dict:1433        """1434            Returns a list of all positions matching the conid. For portfolio models the conid 1435            could be in more than one model, returning an array with the name of the model it 1436            belongs to. /portfolio/accounts or /portfolio/subaccounts must be called prior to 1437            this endpoint.14381439            NAME: account_id1440            DESC: The account ID you wish to return positions for.1441            TYPE: String14421443            NAME: conid1444            DESC: The contract ID you wish to find matching positions for.1445            TYPE: String14461447        """14481449        # define request components1450        endpoint = r'portfolio/{}/position/{}'.format(account_id, conid)1451        req_type = 'GET'1452        content = self._make_request(endpoint=endpoint, req_type=req_type)14531454        return content14551456    #1457    #   GET MORE DETAILS ON THIS1458    #14591460    def portfolio_positions_invalidate(self, account_id: str) -> Dict:1461        """1462            Invalidates the backend cache of the Portfolio. ???14631464            NAME: account_id1465            DESC: The account ID you wish to return positions for.1466            TYPE: String14671468        """14691470        # define request components1471        endpoint = r'portfolio/{}/positions/invalidate'.format(account_id)1472        req_type = 'POST'1473        content = self._make_request(endpoint=endpoint, req_type=req_type)14741475        return content14761477    def portfolio_positions(self, conid: str) -> Dict:1478        """1479            Returns an object of all positions matching the conid for all the selected accounts. 1480            For portfolio models the conid could be in more than one model, returning an array 1481            with the name of the model it belongs to. /portfolio/accounts or /portfolio/subaccounts 1482            must be called prior to this endpoint.14831484            NAME: conid1485            DESC: The contract ID you wish to find matching positions for.1486            TYPE: String          1487        """14881489        # define request components1490        endpoint = r'portfolio/positions/{}'.format(conid)1491        req_type = 'GET'1492        content = self._make_request(endpoint=endpoint, req_type=req_type)14931494        return content14951496    """1497        TRADES ENDPOINTS1498    """14991500    def trades(self):1501        """1502            Returns a list of trades for the currently selected account for current day and 1503            six previous days.1504        """15051506        # define request components1507        endpoint = r'iserver/account/trades'1508        req_type = 'GET'1509        content = self._make_request(endpoint=endpoint, req_type=req_type)15101511        return content15121513    """1514        ORDERS ENDPOINTS1515    """15161517    def get_live_orders(self):1518        """1519            The end-point is meant to be used in polling mode, e.g. requesting every 1520            x seconds. The response will contain two objects, one is notification, the 1521            other is orders. Orders is the list of orders (cancelled, filled, submitted) 1522            with activity in the current day. Notifications contains information about 1523            execute orders as they happen, see status field.15241525        """15261527        # define request components1528        endpoint = r'iserver/account/orders'1529        req_type = 'GET'1530        content = self._make_request(endpoint=endpoint, req_type=req_type)15311532        return content15331534    def place_order(self, account_id: str, order: dict) -> Dict:1535        """1536            Please note here, sometimes this end-point alone can't make sure you submit the order 1537            successfully, you could receive some questions in the response, you have to to answer 1538            them in order to submit the order successfully. You can use "/iserver/reply/{replyid}" 1539            end-point to answer questions.15401541            NAME: account_id1542            DESC: The account ID you wish to place an order for.1543            TYPE: String15441545            NAME: order1546            DESC: Either an IBOrder object or a dictionary with the specified payload.1547            TYPE: IBOrder or Dict15481549        """15501551        if type(order) is dict:1552            order = order1553        else:1554            order = order.create_order()15551556        # define request components1557        endpoint = r'iserver/account/{}/order'.format(account_id)1558        req_type = 'POST'1559        content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)15601561        return content15621563    def place_orders(self, account_id: str, orders: List[Dict]) -> Dict:1564        """1565            An extension of the `place_order` endpoint but allows for a list of orders. Those orders may be1566            either a list of dictionary objects or a list of IBOrder objects.15671568            NAME: account_id1569            DESC: The account ID you wish to place an order for.1570            TYPE: String15711572            NAME: orders1573            DESC: Either a list of IBOrder objects or a list of dictionaries with the specified payload.1574            TYPE: List<IBOrder Object> or List<Dictionary>15751576        """15771578        # EXTENDED THIS1579        if type(orders) is list:1580            orders = orders1581        else:1582            orders = orders15831584        # define request components1585        endpoint = r'iserver/account/{}/orders'.format(account_id)1586        req_type = 'POST'1587        content = self._make_request(endpoint=endpoint, req_type=req_type, params=orders)15881589        return content15901591    def place_order_scenario(self, account_id: str, order: dict) -> Dict:1592        """1593            This end-point allows you to preview order without actually submitting the 1594            order and you can get commission information in the response.15951596            NAME: account_id1597            DESC: The account ID you wish to place an order for.1598            TYPE: String15991600            NAME: order1601            DESC: Either an IBOrder object or a dictionary with the specified payload.1602            TYPE: IBOrder or Dict16031604        """16051606        if type(order) is dict:1607            order = order1608        else:1609            order = order.create_order()16101611        # define request components1612        endpoint = r'iserver/account/{}/order/whatif'.format(account_id)1613        req_type = 'POST'1614        content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)16151616        return content16171618    def modify_order(self, account_id: str, customer_order_id: str, order: dict) -> Dict:1619        """1620            Modifies an open order. The /iserver/accounts endpoint must first1621            be called.16221623            NAME: account_id1624            DESC: The account ID you wish to place an order for.1625            TYPE: String16261627            NAME: customer_order_id1628            DESC: The customer order ID for the order you wish to MODIFY.1629            TYPE: String16301631            NAME: order1632            DESC: Either an IBOrder object or a dictionary with the specified payload.1633            TYPE: IBOrder or Dict16341635        """16361637        if type(order) is dict:1638            order = order1639        else:1640            order = order.create_order()16411642        # define request components1643        endpoint = r'iserver/account/{}/order/{}'.format(account_id, customer_order_id)1644        req_type = 'POST'1645        content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)16461647        return content16481649    def delete_order(self, account_id: str, customer_order_id: str) -> Dict:1650        """1651            Deletes the order specified by the customer order ID.16521653            NAME: account_id1654            DESC: The account ID you wish to place an order for.1655            TYPE: String16561657            NAME: customer_order_id1658            DESC: The customer order ID for the order you wish to DELETE.1659            TYPE: String16601661        """1662        # define request components1663        endpoint = r'iserver/account/{}/order/{}'.format(account_id, customer_order_id)1664        req_type = 'DELETE'1665        content = self._make_request(endpoint=endpoint, req_type=req_type)16661667        return content16681669    """1670        ORDERS ENDPOINTS1671    """16721673    def get_scanners(self):1674        """1675            Returns an object contains four lists contain all parameters for scanners.16761677            RTYPE Dictionary1678        """1679        # define request components1680        endpoint = r'iserver/scanner/params'1681        req_type = 'GET'1682        content = self._make_request(endpoint=endpoint, req_type=req_type)16831684        return content16851686    def run_scanner(self, instrument: str, scanner_type: str, location: str, size: str = '25', filters: List[dict] = None) -> Dict:1687        """1688            Run a scanner to get a list of contracts.16891690            NAME: instrument1691            DESC: The type of financial instrument you want to scan for.1692            TYPE: String16931694            NAME: scanner_type1695            DESC: The Type of scanner you wish to run, defined by the scanner code.1696            TYPE: String16971698            NAME: location1699            DESC: The geographic location you wish to run the scan. For example (STK.US.MAJOR)1700            TYPE: String17011702            NAME: size1703            DESC: The number of results to return back. Defaults to 25.1704            TYPE: String17051706            NAME: filters1707            DESC: A list of dictionaries where the key is the filter you wish to set and the value is the value you want set1708                  for that filter.1709            TYPE: List<Dictionaries>17101711            RTYPE Dictionary1712        """17131714        # define request components1715        endpoint = r'iserver/scanner/run'1716        req_type = 'POST'1717        payload = {1718            "instrument": instrument,1719            "type": scanner_type,1720            "filter": filters,1721            "location": location,1722            "size": size1723        }1724        print(payload)17251726        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)17271728        return content17291730    def customer_info(self):1731        """1732            Returns Applicant Id with all owner related entities     17331734            RTYPE Dictionary1735        """17361737        # define request components1738        endpoint = r'ibcust/entity/info'1739        req_type = 'GET'1740        content = self._make_request(endpoint=endpoint, req_type=req_type)17411742        return content17431744    def get_unread_messages(self):1745        """1746            Returns the unread messages associated with the account.17471748            RTYPE Dictionary1749        """17501751        # define request components1752        endpoint = r'fyi/unreadnumber'1753        req_type = 'GET'1754        content = self._make_request(endpoint=endpoint, req_type=req_type)17551756        return content17571758    def get_subscriptions(self):1759        """1760            Return the current choices of subscriptions, we can toggle the option.17611762            RTYPE Dictionary1763        """17641765        # define request components1766        endpoint = r'fyi/settings'1767        req_type = 'GET'1768        content = self._make_request(endpoint=endpoint, req_type=req_type)17691770        return content17711772    def change_subscriptions_status(self, type_code: str, enable: bool = True) -> Dict:1773        """1774            Turns the subscription on or off.17751776            NAME: type_code1777            DESC: The subscription code you wish to change the status for.1778            TYPE: String17791780            NAME: enable1781            DESC: True if you want the subscription turned on, False if you want it turned of.1782            TYPE: Boolean17831784            RTYPE Dictionary1785        """17861787        # define request components1788        endpoint = r'fyi/settings/{}'1789        req_type = 'POST'1790        payload = {'enable': enable}1791        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)17921793        return content17941795    def subscriptions_disclaimer(self, type_code: str) -> Dict:1796        """1797            Returns the disclaimer for the specified subscription.17981799            NAME: type_code1800            DESC: The subscription code you wish to change the status for.1801            TYPE: String18021803            RTYPE Dictionary1804        """18051806        # define request components1807        endpoint = r'fyi/disclaimer/{}'1808        req_type = 'GET'1809        content = self._make_request(endpoint=endpoint, req_type=req_type)18101811        return content18121813    def mark_subscriptions_disclaimer(self, type_code: str) -> Dict:1814        """1815            Sets the specified disclaimer to read.18161817            NAME: type_code1818            DESC: The subscription code you wish to change the status for.1819            TYPE: String18201821            RTYPE Dictionary1822        """18231824        # define request components1825        endpoint = r'fyi/disclaimer/{}'1826        req_type = 'PUT'1827        content = self._make_request(endpoint=endpoint, req_type=req_type)18281829        return content18301831    def subscriptions_delivery_options(self):1832        """1833            Options for sending fyis to email and other devices.18341835            RTYPE Dictionary1836        """18371838        # define request components1839        endpoint = r'fyi/deliveryoptions'1840        req_type = 'GET'1841        content = self._make_request(endpoint=endpoint, req_type=req_type)18421843        return content18441845    def mutual_funds_portfolios_and_fees(self, conid: str) -> Dict:1846        """1847            Grab the Fees and objectives for a specified mutual fund.18481849            NAME: conid1850            DESC: The Contract ID for the mutual fund.1851            TYPE: String18521853            RTYPE Dictionary1854        """18551856        # define request components1857        endpoint = r'fundamentals/mf_profile_and_fees/{mutual_fund_id}'.format(mutual_fund_id=conid)1858        req_type = 'GET'1859        content = self._make_request(endpoint=endpoint, req_type=req_type)18601861        return content18621863    def mutual_funds_performance(self, conid: str, risk_period: str, yield_period: str, statistic_period: str) -> Dict:1864        """1865            Grab the Lip Rating for a specified mutual fund.18661867            NAME: conid1868            DESC: The Contract ID for the mutual fund.1869            TYPE: String18701871            NAME: yield_period1872            DESC: The Period threshold for yield information1873                  possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1874            TYPE: String18751876            NAME: risk_period1877            DESC: The Period threshold for risk information1878                  possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1879            TYPE: String18801881            NAME: statistic_period1882            DESC: The Period threshold for statistic information1883                  possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1884            TYPE: String18851886            RTYPE Dictionary1887        """18881889        # define request components1890        endpoint = r'fundamentals/mf_performance/{mutual_fund_id}'.format(mutual_fund_id=conid)1891        req_type = 'GET'1892        payload = {1893            'risk_period': None,1894            'yield_period': None,1895            'statistic_period': None1896        }1897        content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)1898
...analysis.py
Source:analysis.py  
1from pathlib import Path2from dataclasses import dataclass3import pandas as pd4import re5import os6import moviepy.editor as mpy7import moviepy.tools8import moviepy.config9import json10import bbox11import cv212# TODO:13# - option to choose locale14# - docstrings15# - support for undistort16events_log_filename = "events.csv"17session_state_filename = "session_state.json"18name_locale = "Asia/Jerusalem"19def split_name_datetime(s):20    """21    Split a string with format {name}_%Y%m%d_%H%M%S into name and date.22    Return name (string), dt (np.datetime64)23    """24    match = re.search("(.*)_([0-9]*)[-_]([0-9]*)", s)25    dt = pd.to_datetime(match.group(2) + " " + match.group(3), format="%Y%m%d %H%M%S")26    name = match.group(1)27    return name, dt28def read_timeseries_csv(path: Path, time_col="time", tz="utc") -> pd.DataFrame:29    """30    Read csv file into a pandas DataFrame. Creates a DatetimeIndex and sets31    the timezone to the specified one.32    - path: csv file path33    - time_col: The name of the column to be used as a DatetimeIndex34    - tz: The timezone of the time column (see DatetimeIndex.tz_localize)35    """36    df = pd.read_csv(path)37    if hasattr(time_col, "__getitem__"):38        for col in time_col:39            if col in df.columns:40                time_col = col41                break42    df.index = pd.to_datetime(df[time_col], unit="s").dt.tz_localize(tz)43    df.drop(columns=[time_col], inplace=True)44    return df45def is_timestamp_contained(46    tdf: pd.DataFrame, timestamp: pd.Timestamp, time_col=None47) -> bool:48    """49    Return True if the timestamp is contained within the time range of the50    supplied timeseries dataframe.51    tdf: Timeseries dataframe (pd.DataFrame) with a time column.52    timestamp: The timestamp to test53    time_col: The name of the time column. When equals None the dataframe index54              will be used.55    """56    if time_col is None:57        beginning = tdf.index[0]58        end = tdf.index[-1]59    else:60        beginning = tdf[time_col].iloc[0]61        end = tdf[time_col].iloc[-1]62    return beginning < timestamp < end63def idx_for_time(df: pd.DataFrame, timestamp: pd.Timestamp, time_col=None) -> int:64    """65    Return the closest row index to the supplied timestamp.66    df: The dataframe to search67    timestamp: The timestamp that will be used to find the index68    time_col: The name of the time column. When equals None the dataframe index69              will be used.70    """71    if time_col is None:72        return df.index.get_loc(timestamp, method="nearest")73    else:74        return (df[time_col] - timestamp).abs().argmin()75def format_timedelta(td: pd.Timedelta, use_colons=True):76    total_secs = int(td.total_seconds())77    hrs = total_secs // 360078    mins = (total_secs % 3600) // 6079    secs = (total_secs % 3600) % 6080    if use_colons:81        return f"{hrs:02}:{mins:02}:{secs:02}"82    else:83        return f"{hrs:02}{mins:02}{secs:02}"84def format_timestamp(ts: pd.Timestamp, fmt="%m-%d %H:%M:%S", tz="Asia/Jerusalem"):85    return pd.to_datetime(ts.tz_convert(tz)).strftime(fmt)86def background_for_ts(info, ts, infix):87    imgs = [p for p in info.images if infix in p.name]88    im_tss = [split_name_datetime(p.stem)[1].tz_localize("Asia/Jerusalem").tz_convert("utc") for p in imgs]89    df = pd.DataFrame(data={'ts': im_tss, 'ts_diff': pd.Series(im_tss) - ts, 'path': imgs}, columns=['ts', 'ts_diff', 'path'])90    row = df[df.ts_diff < pd.Timedelta(0)].max()91    return cv2.imread(str(row.path))92def extract_clip(vid_path, start_frame: int, end_frame: int, output_path):93    """94    Extract a subclip of a video file without re-encoding it.95    vid_path: Path of the input video file (pathlib.Path or str).96    start_frame, end_frame: start and end of the subclip in frame numbers.97    output_path: Path for the output video file (pathlib.Path or str).98    """99    fps = mpy.VideoFileClip(str(vid_path)).fps100    start_time = start_frame / fps101    end_time = end_frame / fps102    ffmpeg_extract_subclip(vid_path, int(start_time), int(end_time), str(output_path))103def ffmpeg_extract_subclip(filename, t1, t2, targetname=None):104    """105    Makes a new video file playing video file ``filename`` between106    the times ``t1`` and ``t2``.107    from: https://zulko.github.io/moviepy/_modules/moviepy/video/io/ffmpeg_tools.html108    """109    name, ext = os.path.splitext(filename)110    if not targetname:111        T1, T2 = [int(1000 * t) for t in [t1, t2]]112        targetname = "%sSUB%d_%d.%s" % (name, T1, T2, ext)113    cmd = [114        moviepy.config.get_setting("FFMPEG_BINARY"),115        "-y",116        "-ss",117        "%0.2f" % t1,118        "-i",119        filename,120        "-t",121        "%0.2f" % (t2 - t1),122        "-map",123        "0",124        "-vcodec",125        "copy",126        "-acodec",127        "copy",128        targetname,129    ]130    moviepy.tools.subprocess_call(cmd, logger=None)131def sessions_df(session_data_root: Path) -> pd.DataFrame:132    """133    Find all sessions under the supplied session_data_root argument.134    Return a pandas dataframe with columns `name` and `dir` and a135    DatetimeIndex containing the session start time.136    """137    exp_dirs = list(filter(lambda p: p.is_dir(), session_data_root.glob("*")))138    dts = []139    names = []140    for exp_dir in exp_dirs:141        name, dt = split_name_datetime(exp_dir.stem)142        dts.append(dt)143        names.append(name)144    df = pd.DataFrame(columns=["name", "dir"], index=dts)145    df.name = names146    df.dir = exp_dirs147    return df.sort_index()148def session_stats(session_dir) -> dict:149    """150    Return a dictionary with statistics of the session in the supplied151    session_dir (Path or str) argument.152    """153    path = Path(session_dir)154    video_files = list(path.glob("*.mp4")) + list(path.glob("*.avi"))155    image_files = list(path.glob("*.png")) + list(path.glob("*.jpg"))156    csv_files = list(path.glob("*.csv"))157    return {158        "video_count": len(list(video_files)),159        "image_count": len(list(image_files)),160        "csv_count": len(list(csv_files)),161    }162def sessions_stats_df(sessions: pd.DataFrame) -> pd.DataFrame:163    """164    Return a dataframe containing statistics for each session in the supplied165    sessions argument.166    """167    df = pd.DataFrame(columns=["video_count", "image_count", "csv_count"])168    vids = []169    imgs = []170    csvs = []171    for dir in sessions.dir:172        stats = session_stats(dir)173        vids.append(stats["video_count"])174        imgs.append(stats["image_count"])175        csvs.append(stats["csv_count"])176    df.video_count = vids177    df.image_count = imgs178    df.csv_count = csvs179    return df180def read_database_table(conn, table: str, t0: pd.Timestamp, t1: pd.Timestamp):181    """182    Read data from a database table within the supplied time range.183    - conn: Database connection object (see database.make_connection)184    - table: Database table name185    - t0, t1: the time range of the returned dataframe186    """187    st0 = t0.isoformat()188    st1 = t1.isoformat()189    df = pd.read_sql(190        f"SELECT * FROM {table} WHERE \"time\" BETWEEN '{st0}' AND '{st1}'", conn191    )192    return df193@dataclass(init=False, repr=False)194class VideoInfo:195    """196    Represents a single timestamped video file.197    Expects a timestamps csv file in the same directory with the same name198    as the video file but with a `.csv` suffix199    Attributes:200        name: Video name (string)201        time: Video start time202        path: Video path203        timestamp_path: Timestamps csv path204        timestamps: The timestamps csv loaded using read_timeseries_csv()205        frame_count: Number of frames in the video (based on the timestamps file)206        duration: The total duration of the video (based on the timestamps file)207        src_id: The video image source id (based on the name attribute).208    """209    name: str210    time: pd.Timestamp211    path: Path212    timestamp_path: Path213    timestamps: pd.DataFrame214    frame_count: int215    duration: pd.Timestamp216    src_id: str217    def __init__(self, path: Path):218        """219        Return a new VideoInfo instance for the video file at the supplied path.220        """221        self.name, self.time = split_name_datetime(path.stem)222        self.time = self.time.tz_localize(name_locale).tz_convert("utc")223        self.timestamp_path = path.parent / (path.stem + ".csv")224        if not self.timestamp_path.exists():225            self.timestamp_path = None226            self.duration = None227            self.timestamps = None228            self.frame_count = None229        else:230            self.timestamps = read_timeseries_csv(231                self.timestamp_path, time_col=["time", "timestamp"]232            )233            self.duration = self.timestamps.index[-1] - self.timestamps.index[0]234            self.frame_count = self.timestamps.shape[0]235        self.path = path236        split = self.name.split("_")237        if len(split) == 1:238            self.src_id = self.name239        else:240            self.src_id = split[241                -1242            ]  # NOTE: what happens when both src_id and name have underscores?243    def __repr__(self):244        return f"\nVideoInfo(name: {self.name},\n\ttime: {self.time},\n\tpath: {self.path},\n\ttimestamp_path: {self.timestamp_path},\n\tframe_count: {self.frame_count},\n\tduration: {self.duration})"245@dataclass(init=False)246class VideoPosition:247    """248    Represents a time position in a specific video.249    video: The VideoInfo representing the video matching this position250    timestamp: The time of the video position251    frame: The frame number of the video position (based on the timestamp)252    """253    video: VideoInfo254    timestamp: pd.Timestamp255    frame: int = None256    def __init__(self, video: VideoInfo, timestamp: pd.Timestamp):257        """258        Return a new instance with the supplied video and timestamp.259        """260        self.video = video261        self.timestamp = timestamp262        if self.video.timestamps is not None:263            self.frame = idx_for_time(self.video.timestamps, timestamp)264@dataclass(init=False)265class SessionInfo:266    """267    Represents a single session.268    Attributes:269        dir: The session directory270        session_state_path: The path of the session state json file271        session_state: The last recorded session state.272        videos: A list of all videos contained within the session273        images: A list of all image paths found in the session274        event_log_path: The path of the session event log275        event_log: A timeseries dataframe of the session event log276        head_bbox: A timeseries dataframe of the animal head bounding boxes277        head_centroids: A timeseries dataframe of the animal head centroids278        csvs: A list of paths to all other csvs found in the session.279    All of the dataframes in this class, as well as the session_state, are280    loaded on first access (lazily), except for VideoInfo.timestamps dataframe281    which is loaded when the object is created. To reload the data create a282    new object.283    """284    name: str285    time: pd.Timestamp286    dir: Path287    videos: [VideoInfo]288    images: [Path]289    event_log_path: Path290    csvs: [Path]291    session_state_path: Path292    session_state: dict293    def __init__(self, session_dir):294        """295        Instantiate a SessionInfo for the session at the supplied session_dir296        argument (Path or str).297        """298        session_dir = Path(session_dir)299        if not session_dir.exists():300            raise ValueError(f"Session directory doesn't exist: {str(session_dir)}")301        self.name, self.time = split_name_datetime(session_dir.stem)302        self.time = self.time.tz_localize(name_locale).tz_convert("utc")303        self.dir = session_dir304        self.videos = [305            VideoInfo(p)306            for p in list(session_dir.glob("*.mp4")) + list(session_dir.glob("*.avi"))307        ]308        ts_paths = [v.timestamp_path for v in self.videos]309        self.csvs = []310        for csv_path in [p for p in session_dir.glob("*.csv") if p not in ts_paths]:311            if events_log_filename in csv_path.name:312                self.event_log_path = csv_path313            else:314                self.csvs.append(csv_path)315        self.images = list(session_dir.glob("*.jpg")) + list(session_dir.glob("*.png"))316        self.session_state_path = session_dir / "session_state.json"317        if not self.session_state_path.exists():318            # Legacy file name319            self.session_state_path = session_dir / "exp_state.json"320            if not self.session_state_path.exists():321                self.session_state_path = None322        self._session_state = None323        self._event_log = None324        self._head_bbox = None325    @property326    def session_state(self) -> dict:327        if self._session_state is not None:328            return self._session_state329        if self.session_state_path is None:330            self._session_state = None331        else:332            with open(self.session_state_path, "r") as f:333                self._session_state = json.load(f)334        return self._session_state335    @property336    def event_log(self) -> pd.DataFrame:337        if self._event_log is not None:338            return self._event_log339        self._event_log = read_timeseries_csv(self.event_log_path)340        return self._event_log341    def filter_videos(342        self, videos=None, src_id: str = None, ts: pd.Timestamp = None343    ) -> [VideoInfo]:344        """345        Filter videos according to image source id or start time.346        videos: A list of VideoInfo objects to filter. When equals None, all347                session videos are used.348        src_id: When not None, return only videos recorded from this image349                source id.350        ts: When not None, return only videos with this start time.351        """352        if videos is None:353            videos = self.videos354        if src_id is not None:355            videos = filter(lambda v: v.src_id == src_id, videos)356        if ts is not None:357            videos = filter(lambda v: v.time == ts, videos)358        return list(videos)359    def video_position_at_time(360        self, timestamp: pd.Timestamp, videos=None361    ) -> [VideoPosition]:362        """363        Find all video files and frame numbers matching the supplied timestamp.364        Return a list of VideoPosition objects, one for each video that was365        recording during the time denoted by timestamp.366        videos: A list of VideoInfos that will be searched. When this is None,367                all of the videos in the session will be searched.368        """369        res = []370        if videos is None:371            videos = self.videos372        for vid in videos:373            if vid.timestamps is None:374                print(f"WARNING: Video {vid.name}, {vid.time} has no timestamps")375                return376            if is_timestamp_contained(vid.timestamps, timestamp):377                res.append(VideoPosition(vid, timestamp))378        return res379    def extract_clip(380        self,381        src_id: str,382        start_time: pd.Timestamp,383        end_time: pd.Timestamp,384        output_dir: Path,385        file_prefix: str,386    ):387        """388        Extract a clip from the session in this session.389        src_id: The image source id to use.390        start_time: Clip start time.391        end_time: Clip end time.392        output_dir: The directory that will contain the clip video file.393        file_prefix: An additional prefix for the output video filename.394        """395        videos = self.filter_videos(src_id=src_id)396        start_pos = self.video_position_at_time(start_time, videos)397        end_pos = self.video_position_at_time(end_time, videos)398        if len(start_pos) != 1:399            raise ValueError("start_time matched multiple videos or no videos")400        if len(end_pos) != 1:401            raise ValueError("end_time matched multiple videos or no videos")402        if start_pos[0].video.path != end_pos[0].video.path:403            raise ValueError("start_time and end_time matched different videos")404        start_pos, end_pos = start_pos[0], end_pos[0]405        relative_ts = start_pos.timestamp - start_pos.video.time406        fts = format_timedelta(relative_ts, use_colons=False)407        clip_path = (408            output_dir / f"{file_prefix}_{fts}_{start_pos.frame}_{end_pos.frame}.mp4"409        )410        extract_clip(start_pos.video.path, start_pos.frame, end_pos.frame, clip_path)411    @property412    def head_bbox(self):413        if self._head_bbox is not None:414            return self._head_bbox415        bbox_csvs = [p for p in self.csvs if p.name == "head_bbox.csv"]416        if len(bbox_csvs) == 0:417            return None418        self._head_bbox = read_timeseries_csv(bbox_csvs[0])419        return self._head_bbox420    @property421    def head_centroids(self):422        head_bbox = self.head_bbox423        centroids = bbox.xyxy_to_centroid(head_bbox[["x1", "y1", "x2", "y2"]].values)424        df = pd.DataFrame(centroids, columns=["x", "y"])425        df.index = head_bbox.index426        df["confidence"] = head_bbox.confidence...session_reader.py
Source:session_reader.py  
1#!/usr/bin/python32import xml.etree.ElementTree as ET3import logging4import os5import json6import sys, traceback7from COREIfx import msg_ifx8from COREIfx.imnparser import imnparser9from pyparsing import nestedExpr, originalTextFor10#from core.api.tlv.coreapi import CoreConfMessage, CoreEventMessage11class SessionReader():12    13    def __init__(self, session_number=None):14        logging.debug("SessionReader(): instantiated")15        if session_number == None:16            #just get one from /tmp/pycore***17            logging.error("SessionReader(): init(): No session number was provided")18            return None19        self.session_number = session_number20        logging.debug("SessionReader(): init(): Retrieving imn filename")21        services_resp = str(msg_ifx.send_command('-s'+self.session_number+' SESSION flags=STRING --tcp -l'))22        self.imnfilename = ""23        for line in services_resp.splitlines():24            if "FILE: " in line:25                #we know this line has the imn filename26                ###TODO### Need to make sure we get the correct filename associated with the session27                self.imnfilename = line.split("FILE: ")[1].split("|")[0]28                break29        if self.imnfilename == "":30            logging.error("SessionReader(): init(): No associated imn file found. Exiting")31            return None32        logging.debug("SessionReader(): init(): Found filename: " + str(self.imnfilename))33        self.xmlfilename = os.path.join("/tmp","pycore."+str(session_number),"session-deployed.xml")       34        state = self.get_session_state()35        logging.debug("SessionReader(): init(): Current session state: " + str(state))36        if state == None:37            logging.error("SessionReader(): init(): Session state is not available: " + str(state))38            return None39        self.conditional_conns = self.relevant_session_to_JSON()40    def get_session_state(self):41        logging.debug("SessionReader(): get_session_state() instantiated")42        try:43        #check first if directory exists44            session_path = os.path.dirname(self.xmlfilename)45            session_state_path = os.path.join(session_path,"state")46            if os.path.exists(session_path) == False or os.path.exists(session_state_path) == False:47                logging.debug("SessionReader(): get_session_state() session does not exist!")48                return None49            logging.debug("SessionReader(): get_session_state() session found!")50            #read state file51            state_file_state = open(session_state_path,"r").readlines()[0]52            logging.debug("SessionReader(): get_session_state(): State: " + str(state_file_state))53            return state_file_state54        except Exception as e:55            exc_type, exc_value, exc_traceback = sys.exc_info()56            logging.error("SessionReader(): get_session_state(): An error occured ")57            traceback.print_exception(exc_type, exc_value, exc_traceback)58            return None59    def relevant_session_to_JSON(self):60        logging.debug("SessionReader(): relevant_session_to_JSON() instantiated")61        try:62            try:63                iparser = imnparser(self.imnfilename)64            except Exception:65                logging.error("SessionReader(): relevant_session_to_JSON(): imn scenario file not found: " + str(self.imnfilename))66                logging.error("SessionReader(): relevant_session_to_JSON(): " + self.session_number + " and ensure the imn file exists and is loaded in CORE")67                return None68            try:69                tree = ET.parse(self.xmlfilename)70            except Exception:71                logging.error("SessionReader(): relevant_session_to_JSON(): XML file not found: " + str(self.xmlfilename))72                logging.error("SessionReader(): relevant_session_to_JSON(): Make sure " + self.session_number + " has been run at least once or is running." )73                return None74            root = tree.getroot()75            conditional_conns = {}76            name_id_map = {}77            switches = []78            switch_ids = []79            links = root.find('links').findall('link')80            device_services = {}81            switch_services = {}82            switch_service_configurations = {}83            #find all devices (non-switch/hub/wireless) and identify their name/id mappings and services84            logging.debug("SessionReader(): relevant_session_to_JSON(): " + "finding devices")85            for device in root.find('devices').findall('device'):86                if device.attrib["type"] == "cc_dec_node_ovs":87                    switches.append(device)88                    switch_ids.append(device.attrib["id"])89                #keep track of mappings90                name_id_map[device.attrib["name"]] = device.attrib["id"]91                services = ""92                for service in device.find('services').findall('service'):93                    services += " " + str(service.attrib["name"])94                #store the services that are enabled for this device95                device_services[device.attrib["id"]] = services96            #find service files for switch type nodes and then store them97            services = root.find('service_configurations')98            if services != None:99                services = services.findall('service')100                node_num = ""101                service_name = ""102                filename = ""103                file_code = ""104                for service in services:105                    #Pulling service name and node id from tags106                    node_num = service.attrib["node"]107                    service_name = service.attrib["name"]108                    if node_num in switch_ids:109                        service_name = service.attrib["name"]110                        #Pulling filenames from tags111                        files = service.find("files").findall("file")112                        for filenode in files:113                            filename = filenode.attrib["name"]114                            #Pulling source code from tags115                            file_code = filenode.text116                            switch_service_configurations[(node_num, service_name, filename)] = file_code117            #now obtain the "enabled" services for all switch nodes118            logging.debug("SessionReader(): relevant_session_to_JSON(): " + "obtaining enabled services for switches")119            logging.debug("SessionReader(): relevant_session_to_JSON():Switch services found: " + str(switch_services)) 120            #Iterating through all switch nodes to consolidate source files and conditional connections121            for node in switches:122                logging.debug("SessionReader(): relevant_session_to_JSON(): " + "traversing through switches; processing CC_DecisionNode_OVS")123                cc_dec_node_number = node.attrib["id"]124                #check if this is a decision node125                #if it isn't move on to the next switch126                if "CC_DecisionNode_OVS" not in device_services[cc_dec_node_number]:127                    continue128                #get the source code for files129                #First get data from xml file130                #if it's not there, then it means that values are from default, so we use coresendmsg131                if (cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh") in switch_service_configurations:132                    monitor_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh")]133                else: 134                    monitor_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh")135                if (cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py") in switch_service_configurations:136                    trigger_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py")]137                else: 138                    trigger_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py")139                if (cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py") in switch_service_configurations:140                    swapper_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py")]141                else: 142                    swapper_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py")143                #setup entry for node144                conditional_conn = {}145                conditional_conn["name"] = node.attrib["name"]146                conditional_conn["MyMonitor.sh"] = monitor_code147                conditional_conn["MyTrigger.py"] = trigger_code148                conditional_conn["MySwapper.py"] = swapper_code149                logging.debug("SessionReader(): relevant_session_to_JSON(): Processing node: " + str(conditional_conn))150                #now find all connected nodes                151                connected_nodes = []152                for link in links:153                    connected_node = {}154                    #find interface for node connected to switch155                    if link.attrib["node1"] == cc_dec_node_number:156                        connected_node["number"] = link.attrib["node2"]157                        cc_dec_node_ifx = link.find("iface1")158                        cc_node_ifx = link.find("iface2")159                    if link.attrib["node2"] == cc_dec_node_number:160                        connected_node["number"] = link.attrib["node1"]161                        cc_dec_node_ifx = link.find("iface2")162                        cc_node_ifx = link.find("iface1")163                    if "number" not in connected_node:164                        continue165                    connected_node["node_type"] = "SWITCH"166                    #wlan, switch, don't have ifx information167                    if cc_node_ifx != None:168                        #if we do have ifx information, we know it's a layer 3 model (router)169                        connected_node["node_type"] = "router"170                        #found a connection, now we have to add all details 171                        #process remote node (cc_node)172                        connected_node["cc_nic"] = cc_node_ifx.attrib["name"]173                        connected_node["cc_mac"] = cc_node_ifx.attrib["mac"]174                        if "ip4" in cc_node_ifx.attrib:175                            connected_node["cc_ip4"] = cc_node_ifx.attrib["ip4"]176                            connected_node["cc_ip4_mask"] = cc_node_ifx.attrib["ip4_mask"]177                        if "ip6" in cc_node_ifx.attrib:178                            connected_node["cc_ip6"] = cc_node_ifx.attrib["ip6"]179                            connected_node["cc_ip6_mask"] = cc_node_ifx.attrib["ip6_mask"]180                    #by default we consider this a cc_node181                    connected_node["role"] = "cc_node"182                    # #if node has the CC_Node service enabled, then we know this is a cc_node; otherwise, it's a gw183                    # if connected_node["number"] in device_services:184                    #     if "CC_Node" in device_services[connected_node["number"]]:185                    #         #we know this is a good node186                    #         connected_node["role"] = "cc_node"187                    # #now check if the connected switches/nets have the CC_Node service enabled188                    # if connected_node["number"] in switch_services:189                    #     if "CC_Node" in switch_services[connected_node["number"]]:190                    #         #we know this is a good node191                    #         connected_node["role"] = "cc_node"192                    #add information about the cc_dec node associated with this link193                    connected_node["cc_dec_nic"] = cc_dec_node_ifx.attrib["name"]194                    connected_node["cc_dec_mac"] = cc_dec_node_ifx.attrib["mac"]195                    if "ip4" in cc_dec_node_ifx.attrib:196                        connected_node["cc_dec_ip4"] = cc_dec_node_ifx.attrib["ip4"]197                        connected_node["cc_dec_ip4_mask"] = cc_dec_node_ifx.attrib["ip4_mask"]198                    if "ip6" in cc_dec_node_ifx.attrib:199                        connected_node["cc_dec_ip6"] = cc_dec_node_ifx.attrib["ip6"]200                        connected_node["cc_dec_ip6_mask"] = cc_dec_node_ifx.attrib["ip6_mask"]201                    connected_node["connected"] = "False"202                    connected_nodes.append(connected_node)203                conditional_conn["connected_nodes"] = connected_nodes204                conditional_conns[node.attrib["id"]] = conditional_conn205            return conditional_conns206        except Exception:207            logging.error("SessionReader(): relevant_session_to_JSON(): Error in relevant_session_to_JSON(): An error occured ")208            exc_type, exc_value, exc_traceback = sys.exc_info()209            traceback.print_exception(exc_type, exc_value, exc_traceback)210            return None211    def get_node_file(self, node_id, service_name, filename):212        logging.debug("SessionReader(): get_node_file(): instantiated")213        #First check if file exists:214        logging.debug("SessionReader(): get_node_file(): checking if file exists")215        res = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node_id +' OBJECT=services OPAQUE=service:'+service_name+' TYPE=1 -l --tcp'))216        file_exists = False217        for line in res.splitlines():218            if filename in line:219                file_exists = True220                break221        if file_exists == False:222            return ""223        #Get file contents224        logging.debug("SessionReader(): get_node_file(): getting file contents")225        res_code = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node_id +' OBJECT=services OPAQUE=service:'+service_name+':'+filename+' TYPE=1 -l --tcp'))226        file_code = ""227        code_section = False228        for code_line in res_code.splitlines():229            if code_line.startswith("  DATA:"):230                code_section = True231                file_code += code_line.split("DATA: ")[1] + "\n"232                continue233            if code_section:234                file_code += code_line + "\n"235        return file_code236    def get_conditional_conns(self, cc_dec_number):237        logging.debug("SessionReader(): get_conditional_conns(): instantiated")238        return self.conditional_conns[cc_dec_number]239    def get_node_services(self, node):240        logging.debug("SessionReader(): get_node_services(): instantiated")241        res_services = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node.attrib["id"] +' OBJECT=services OPAQUE=service' +' TYPE=1 -l --tcp'))242        return res_services243if __name__ == "__main__":244    logging.basicConfig(level=logging.DEBUG)245    logging.debug("SessionReader(): instantiated")246    247    if len(sys.argv) < 2:248        logging.error("Usage: python test_session_reader.py <session-number>")249        exit()       250    251    sr = SessionReader(sys.argv[1])252    logging.debug("Printing All")253    state = sr.get_session_state254    logging.debug("Current session state: " + str(state))255    if state == None:256        logging.debug("Exiting since session data is not available: " + str(state))257        exit()...fabric.py
Source:fabric.py  
1from srlinux.data import ColumnFormatter, TagValueFormatter, Border, Data, Borders, Alignment2from srlinux.mgmt.cli import CliPlugin3from srlinux.schema import FixedSchemaRoot4from srlinux.syntax import Syntax5from srlinux.location import build_path6from srlinux.mgmt.cli.plugins.reports.gnmi_lite import GNMIHandlerLite7import datetime8############################ INPUTs here... ############################# 9                                                                        # 10interfaces = []                                                         # fill the interfaces list in or a pattern in the uplink descriptions11description = 'spine'                                                   # if both given: interfaces list has precedence over the description pattern12uplink_peer_group = 'spine'                                             # this code assumes Leaf uplinks have eBPG peering with Spine/DCGW 13rr_peer_group = 'EVPN-NVO'                                              # and iBGP EVPN with the route-reflector(s) 14uplink_network_instance = "default"                                     # networks instance is ideally default15                                                                        #16#########################################################################17class Plugin(CliPlugin):18    19    def load(self, cli, **_kwargs):20        fabric = cli.show_mode.add_command(Syntax('fabric', help='shows how to give the input parameters for "show fabric" commands'))21        help = fabric.add_command(Syntax('help', help='requires uplinks, route-reflector, statistics or summary keywords'),update_location=False, callback=self._show_help)22        summary = fabric.add_command(Syntax('summary', help='shows uplinks, route-reflector and statistics all together'), update_location=False, callback=self._show_summary, schema=self._get_schema())23        uplink = fabric.add_command(Syntax('uplink', help='shows uplinks in a table'), update_location=False, callback=self._show_uplinks, schema=self._get_schema())24        route_reflector = fabric.add_command(Syntax('route-reflector', help='shows route-reflectors in a table'), update_location=False, callback=self._show_rr, schema=self._get_schema())25        statistics =  fabric.add_command(Syntax('statistics', help='shows statistics in a table'), update_location=False, callback=self._show_stats, schema=self._get_schema())26    def _show_help (self,state,output,**_kwargs):27        print('''28        This 'show fabric' command shows you statistics and the status of the uplinks and BGP peerings. 29        Therefore it requires some inputs that need to be added in the 'fabric.py' file.30        31        '/opt/srlinux/python/virtual-env/lib/python3.6/site-packages/srlinux/mgmt/cli/plugins/reports/fabric.py'32        33        Example:34        interfaces = []             # fill the interfaces list in or the description pattern35        description = 'spine' 36        uplink_peer_group = 'spine'37        rr_peer_group = 'EVPN-NVO'38        ''')39    def _show_summary(self, state, output, **_kwargs):40        header = f'Fabric Connectivity Report'41        result_header = self._populate_header(header)42        self._set_formatters_header(result_header)43        output.print_data(result_header)44        self._show_uplinks(state,output)45        self._show_rr(state,output)46        self._show_stats(state,output)47    48    def _show_uplinks(self, state, output, **_kwargs):49        result = Data(self._get_schema())50        self._set_formatter_uplink(result)51        with output.stream_data(result):   52          self._populate_data(result, state)53  54    def _show_rr(self, state, output, **_kwargs):55        result_rr = Data(self._get_schema())56        self._set_formatters_rr(result_rr)57        with output.stream_data(result_rr):   58          self._populate_data_rr(result_rr, state)59        60    def _show_stats(self, state, output, **_kwargs):61        result_stats = Data(self._get_schema())62        self._set_formatters_stats(result_stats)63        with output.stream_data(result_stats):   64          self._populate_data_stats(result_stats, state)   65    66    def _get_header_schema(self):67        root = FixedSchemaRoot()68        root.add_child(69            'header',70            fields=['Summary']71        )72        return root73    def _get_schema(self):74        root = FixedSchemaRoot()75        uplink_header = root.add_child(76            'uplink_header',77            fields=['uplinks']78        )79        uplink_header.add_child(80            'uplink_child',81            key='Local Interface',82            fields=['Local Router', 'Link Status','eBGP Status','Remote Router', 'Remote Interface']83        )84        rr_header = root.add_child(85            'rr_header',86            fields=['Route Reflectors']87        )88        rr_header.add_child(89            'rr_child',90            key='Route Reflector Address',91            fields=['iBGP Status', 'Neighbor Description', 'Rx/Active/Tx', 'Uptime (hh:mm:ss)']92        )93        stats_header = root.add_child(94            'stats_header',95            fields=['Uplink Stats']96        )97        stats_header.add_child(98            'stats_child',99            key='Local Interface',100            fields=['Traffic Bps In/Out','Packets In/Out', 'Errored In/Out', 'FCS Err', 'CRC Err','Transceiver Volt']101        )102        return root103    def _fetch_state(self, state, uplink, interf):104        int_oper_state_path = build_path(f'/interface[name={interf[0]}]/subinterface[index={interf[1]}]/oper-state')105        self.int_oper_state_data = state.server_data_store.stream_data(int_oper_state_path, recursive=True)106        107        sys_name_path = build_path(f'/system/name/host-name')108        self.sys_name_data = state.server_data_store.stream_data(sys_name_path, recursive=True)109        110        lldp_neighbor_path = build_path(f'/system/lldp/interface[name={interf[0]}]/neighbor')111        self.lldp_neighbor_data = state.server_data_store.stream_data(lldp_neighbor_path, recursive=True)112        113        session_state_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor[peer-address={uplink}]')114        self.session_state_data = state.server_data_store.stream_data(session_state_path, recursive=True)115 116    def _fetch_state_rr(self, state, rr):117        rr_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor[peer-address={rr}]')118        self.rr_data = state.server_data_store.stream_data(rr_path, recursive=True)    119        120        time_path = build_path(f'/system/information/current-datetime')121        self.time_data = state.server_data_store.stream_data(time_path, recursive=True) 122        123        # rr_tcp_path = build_path(f'/network-instance[name={uplink_network_instance}]/tcp/connections[remote-address={rr}]')124        # self.rr_tcp_data = state.server_data_store.stream_data(rr_tcp_path, recursive=True)        125    def _fetch_state_stats(self, state, uplink):          126        gen_path = build_path(f'/interface[name={uplink}]')127        self.gen_data = state.server_data_store.stream_data(gen_path, recursive=True)128               129    def _time_handler (self, dt0, dt1):130        now = datetime.datetime(int(dt0[:4]),int(dt0[5:7]),int(dt0[8:10]),int(dt0[11:13]),int(dt0[14:16]),int(dt0[17:19]))       131        then = datetime.datetime(int(dt1[:4]),int(dt1[5:7]),int(dt1[8:10]),int(dt1[11:13]),int(dt1[14:16]),int(dt1[17:19]))       132        return (now-then)133        134    def _populate_header(self, header):135        result_header = Data(self._get_header_schema())136        data = result_header.header.create()137        data.summary = header138        return result_header139    def _populate_peer_list(self, state, group):140        peer_list_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor/peer-group')141        peer_list_data = state.server_data_store.stream_data(peer_list_path, recursive=True) 142        peer_list = []143        for peer in peer_list_data.network_instance.get().protocols.get().bgp.get().neighbor.items():144            if peer.peer_group == group :145                peer_list.append(peer.peer_address) 146        if not peer_list: print(f"No peer in <<{group}>> group!")147        return peer_list148                149    def _populate_interface_list(self, state, description):150        int_list_path = build_path(f'/interface[name=*]/subinterface[index=*]/description')151        int_list_data = state.server_data_store.stream_data(int_list_path, recursive=True) 152        int_list = []153        for i in int_list_data.interface.items():154            if description in i.subinterface.get().description:155                interfaces.append(f'{i.name}.{i.subinterface.get().index}')156        if not interfaces: print(f"No interface has <<{description}>> in it's description!")157    def _populate_data(self, result, state):158        uplink_peer_list = self._populate_peer_list(state,uplink_peer_group)       159        result.synchronizer.flush_fields(result)160        if not interfaces: self._populate_interface_list(state,description)161        i=0162        data = result.uplink_header.create()163        for interface in interfaces:164          server_data = self._fetch_state(state, uplink_peer_list[i], interface.split('.'))165          data_child = data.uplink_child.create(interfaces[i].split('.')[0])166          data_child.local_router = self.sys_name_data.system.get().name.get().host_name or '<Unknown>'167          data_child.link_status = self.int_oper_state_data.interface.get().subinterface.get().oper_state or '<Unknown>'168          data_child.ebgp_status = self.session_state_data.network_instance.get().protocols.get().bgp.get().neighbor.get().session_state or '<Unknown>'169          data_child.remote_router = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().system_name or '<Unknown>'170          data_child.remote_router = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().system_name or '<Unknown>'171          data_child.remote_interface = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().port_id or '<Unknown>'172          data_child.synchronizer.flush_fields(data_child)173          i=i+1174        result.synchronizer.flush_children(result.uplink_header)  175        return result176    def _populate_data_rr(self, result, state):                   177        rr_peer_list = self._populate_peer_list(state,rr_peer_group)178        result.synchronizer.flush_fields(result)179        i=0180        data = result.rr_header.create()181        for rr in rr_peer_list: 182          server_data = self._fetch_state_rr(state, rr)183          data_child = data.rr_child.create(rr)184          data_child.ibgp_status = self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().session_state or '<Unknown>'185          data_child.neighbor_description = self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().description or '<Unknown>'186          data_child.rx_active_tx = f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().received_routes}/'\187                                    f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().active_routes}/'\188                                    f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().sent_routes}'189          data_child.uptime__hh_mm_ss_ = self._time_handler(self.time_data.system.get().information.get().current_datetime, self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().last_established)190          data_child.synchronizer.flush_fields(data_child)191          i=i+1192        result.synchronizer.flush_children(result.rr_header)  193        return result194    def _populate_data_stats(self, result, state):195        uplink_peer_list = self._populate_peer_list(state,uplink_peer_group)196        result.synchronizer.flush_fields(result)197        if not interfaces: self._populate_interface_list(state,description)198        i=0199        data = result.stats_header.create()200        for interface in interfaces:201          server_data = self._fetch_state_stats(state, interface.split('.')[0])202          data_child = data.stats_child.create(interface.split('.')[0])203          data_child.traffic_bps_in_out = f'{self.gen_data.interface.get().traffic_rate.get().in_bps}/{self.gen_data.interface.get().traffic_rate.get().in_bps}'204          data_child.packets_in_out = f'{self.gen_data.interface.get().statistics.get().in_unicast_packets}/{self.gen_data.interface.get().statistics.get().out_unicast_packets or "<Unknown>"}'205          data_child.errored_in_out = f'{self.gen_data.interface.get().statistics.get().in_error_packets }/{self.gen_data.interface.get().statistics.get().out_error_packets}'206          data_child.fcs_err = self.gen_data.interface.get().statistics.get().in_fcs_error_packets207          data_child.crc_err = self.gen_data.interface.get().ethernet.get().statistics.get().in_crc_error_frames208          try: data_child.transceiver_volt =  self.gen_data.interface.get().transceiver.get().voltage.get().latest_value or 'N/A'209          except: data_child.transceiver_volt = 'N/A'210          data_child.synchronizer.flush_fields(data_child)211          i=i+1212        result.synchronizer.flush_children(result.stats_header)  213        return result214  215    def _set_formatters_header(self, data):216        data.set_formatter('/header',Border(TagValueFormatter()))  217  218    def _set_formatters_stats(self, data):219        data.set_formatter('/stats_header/stats_child', ColumnFormatter())    220    221    def _set_formatters_rr(self, data):222        data.set_formatter('/rr_header/rr_child', ColumnFormatter())223    def _set_formatter_uplink(self, data):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
