How to use session_state_path method in Slash

Best Python code snippet using slash

client.py

Source:client.py Github

copy

Full Screen

1import os2import sys3import ssl4import json5import time6import urllib7import urllib38import certifi9import logging10import pathlib11import requests12import subprocess13import psutil1415from typing import Union16from typing import List17from typing import Dict1819from urllib3.exceptions import InsecureRequestWarning20from ibw.clientportal import ClientPortal2122urllib3.disable_warnings(category=InsecureRequestWarning)23# http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())2425try:26 _create_unverified_https_context = ssl._create_unverified_context27except AttributeError:28 # Legacy Python that doesn't verify HTTPS certificates by default29 pass30else:31 # Handle target environment that doesn't support HTTPS verification32 ssl._create_default_https_context = _create_unverified_https_context3334logging.basicConfig(35 filename='app.log',36 format='%(levelname)s - %(name)s - %(message)s',37 level=logging.DEBUG38)394041class IBClient():4243 def __init__(self, username: str, account: str, client_gateway_path: str = None) -> None:44 """Initalizes a new instance of the IBClient Object.4546 Arguments:47 ----48 username {str} -- Your IB account username for either your paper or regular account.4950 account {str} -- Your IB account number for either your paper or regular account.5152 Keyword Arguments:53 ----54 password {str} -- Your IB account password for either your paper or regular account. (default:{""})5556 Usage:57 ----58 >>> ib_paper_session = IBClient(59 username='IB_PAPER_USERNAME',60 account='IB_PAPER_ACCOUNT',61 )62 >>> ib_paper_session63 >>> ib_regular_session = IBClient(64 username='IB_REGULAR_USERNAME',65 account='IB_REGULAR_ACCOUNT',66 )67 >>> ib_regular_session68 """6970 self.account = account71 self.username = username72 self.client_portal_client = ClientPortal()7374 self.api_version = 'v1/'75 self._operating_system = sys.platform76 self.session_state_path: pathlib.Path = pathlib.Path(__file__).parent.joinpath('server_session.json').resolve()77 self.authenticated = False7879 # Define URL Components80 ib_gateway_host = r"https://localhost"81 ib_gateway_port = r"5000"82 self.ib_gateway_path = ib_gateway_host + ":" + ib_gateway_port83 self.backup_gateway_path = r"https://cdcdyn.interactivebrokers.com/portal.proxy"8485 if client_gateway_path is None:8687 # Grab the Client Portal Path.88 self.client_portal_folder: pathlib.Path = pathlib.Path(__file__).parents[1].joinpath(89 'resources/clientportal.beta.gw'90 ).resolve()9192 # See if it exists.93 if not self.client_portal_folder.exists():94 print("The Client Portal Gateway doesn't exist. You need to download it before using the Library.")95 print("Downloading the Client Portal file...")96 self.client_portal_client.download_and_extract()9798 else:99 self.client_portal_folder = client_gateway_path100101 # Log the initial Info.102 logging.info('''103 Operating System: {op_sys}104 Session State Path: {state_path}105 Client Portal Folder: {client_path}106 '''.format(107 op_sys=self._operating_system,108 state_path=self.session_state_path,109 client_path=self.client_portal_folder110 )111 )112113 # Load the Server State.114 self.server_process = self._server_state(action='load')115116 # Log the response.117 logging.debug('''118 Server Prcoess Init: {serv_proc}119 '''.format(serv_proc=self.server_process)120 )121122 def create_session(self) -> bool:123 """Creates a new session.124125 Creates a new session with Interactive Broker using the credentials126 passed through when the Robot was initalized.127128 Usage:129 ----130 >>> ib_client = IBClient(131 username='IB_PAPER_username',132 password='IB_PAPER_PASSWORD',133 account='IB_PAPER_account',134 )135 >>> server_response = ib_client.create_session()136 >>> server_response137 True138139 Returns:140 ----141 bool -- True if the session was created, False if wasn't created.142 """143144 # Log the process ID.145 logging.info('Server Process: {serv_proc}'.format(serv_proc=self.server_process))146147 # first let's check if the server is running, if it's not then we can start up.148 if self.server_process is None:149150 # If it's None we need to connect first.151 self.connect(start_server=True)152153 # then make sure the server is updated.154 if self._set_server():155 return True156157 # more than likely it's running let's try and see if we can authenticate.158 auth_response = self.is_authenticated()159160 if auth_response is None:161 time.sleep(2)162 auth_response = self.is_authenticated()163164 if 'authenticated' in auth_response.keys() and auth_response['authenticated'] == True:165166 if self._set_server():167 self.authenticated = True168 return True169170 else:171172 # in this case don't connect, but prompt the user to log in again.173 self.connect(start_server=False)174175 if self._set_server():176 self.authenticated = True177 return True178179 def _set_server(self) -> bool:180 """Sets the server info for the session.181182 Sets the Server for the session, and if the server cannot be set then183 script will halt. Otherwise will return True to continue on in the script.184185 Returns:186 ----187 bool -- True if the server was set, False if wasn't188 """189190 server_update_content = self.update_server_account(account_id=self.account, check=False)191 server_account_content = self.server_accounts()192193 success = '\nNew session has been created and authenticated. Requests will not be limited.\n'.upper()194 failure = '\nCould not create a new session that was authenticated, exiting script.\n'.upper()195196 # Log the response.197 logging.debug('''198 Server Update Response: {auth_resp}199 Server Response: {serv_resp}200 '''.format(201 auth_resp=server_update_content,202 serv_resp=server_account_content203 )204 )205206 # TO DO: Add check market hours here and then check for a mutual fund.207 # if 'news' in self.data_news(conid='265598'):208 # print(success)209 # return True210 if server_account_content is not None and 'set' in server_update_content.keys() and server_update_content['set'] == True:211 print(success)212 return True213 elif ('error' in server_update_content.keys()) and (server_update_content['error'] == 'Account already set'):214 print(success)215 return True216 else:217 print(failure)218 sys.exit()219220 def _server_state(self, action: str = 'save') -> Union[None, int]:221 """Determines the server state.222223 Maintains the server state, so we can easily load a previous session,224 save a new session, or delete a closed session.225226 Arguments:227 ----228 action {str} -- The action you wish to take to the `json` file. Can be one of the following options:229230 1. save - saves the current state and overwrites the old one.231 2. load - loads the previous state from a session that has a server still running.232 3. delete - deletes the state because the server has been closed.233234 Returns:235 ----236 Union[None, int] -- The Process ID of the Server.237 """238239 # Define file components.240 file_exists = self.session_state_path.exists()241242 # Log the response.243 logging.debug('''244 Server State: {state}245 State File: {exist}246 '''.format(247 state=action,248 exist=file_exists249 )250 )251252 if action == 'save':253254 # Save the State.255 with open(self.session_state_path, 'w') as server_file:256 json.dump(obj={'server_process_id': self.server_process}, fp=server_file)257258 # If we are loading check the file exists first.259 elif action == 'load' and file_exists:260261 # Load it.262 with open(self.session_state_path, 'r') as server_file:263 server_state = json.load(fp=server_file)264265 # Grab the Process Id.266 proc_id = server_state['server_process_id']267268 # If it's running return the process ID.269 is_running = self._check_if_server_running(process_id=proc_id)270271 if is_running:272 return proc_id273274 # Delete it.275 elif action == 'delete' and file_exists:276 self.session_state_path.unlink()277278 def _check_if_server_running(self, process_id: str) -> bool:279280 if self._operating_system == 'win32':281282 # See if the Process is running.283 with os.popen('tasklist') as task_list:284285 # Grab each task.286 for process in task_list.read().splitlines()[4:]:287288 if str(process_id) in process:289290 # Log the response.291 logging.debug('''292 Process ID Found: {process}293 '''.format(294 process=process295 )296 )297298 process_details = process.split()299 return True300301 else:302303 if process_id == None:304 return True305306 try:307 os.kill(process_id, 0)308 return True309 except OSError:310 return False311312 def _check_authentication_user_input(self) -> bool:313 """Used to check the authentication of the Server.314315 Returns:316 ----317 bool: `True` if authenticated, `False` otherwise.318 """319320 max_retries = 0321322 while (max_retries > 4 or self.authenticated == False):323324 user_input = input('Would you like to make an authenticated request (Yes/No)? ').upper()325326 if user_input == 'NO':327 self.close_session()328 else:329 auth_response = self.is_authenticated()330331 logging.debug('Check User Auth Inital: {auth_resp}'.format(332 auth_resp=auth_response333 )334 )335336 if 'statusCode' in auth_response.keys() and auth_response['statusCode'] == 401:337 print("Session isn't connected, closing script.")338 self.close_session()339340 elif 'authenticated' in auth_response.keys() and auth_response['authenticated'] == True:341 self.authenticated = True342 break343344 elif 'authenticated' in auth_response.keys() and auth_response['authenticated'] == False:345 valid_resp = self.validate()346 reauth_resp = self.reauthenticate()347 auth_response = self.is_authenticated()348349 try:350 news_resp = self.data_news(conid='265598')351 if 'news' in news_resp:352 self.authenticated = True353354 # Log the response.355 logging.debug('Had to do News Update Response: {auth_resp}'.format(356 auth_resp=news_resp357 )358 )359 break360 except:361 pass362363 logging.debug(364 '''365 Validate Response: {valid_resp}366 Reauth Response: {reauth_resp}367 '''.format(368 valid_resp=valid_resp,369 reauth_resp=reauth_resp370 )371 )372373 # if self.reauthenticate().get('message','Null') == 'triggered':374 # self.authenticated = True375 # else:376 # self.authenticated = False377378 # elif auth_response.get('authenticated','Null') in (False, 'Null') and auth_response.get('connected','Null') in (False, 'Null'):379380 # self.validate()381 # if self.reauthenticate() == True:382 # self.authenticated = True383 # else:384 # self.authenticated = False385386 max_retries += 1387388 return self.authenticated389390 def _check_authentication_non_input(self) -> bool:391392 auth_response = self.is_authenticated()393394 if 'statusCode' in auth_response:395 print("Session isn't connected, closing script.")396 self.close_session()397398 elif 'authenticated' in auth_response and auth_response['authenticated']:399 self.authenticated = True400401 elif 'authenticated' in auth_response and not auth_response['authenticated']:402403 self.validate()404 self.reauthenticate()405406 if self.reauthenticate().get('message', 'Null') == 'triggered':407 self.authenticated = True408 else:409 self.authenticated = False410411 elif auth_response.get('authenticated', 'Null') in (False, 'Null') and auth_response.get('connected', 'Null') in (False, 'Null'):412413 self.validate()414 if self.reauthenticate() == True:415 self.authenticated = True416 else:417 self.authenticated = False418419 def _start_server(self) -> str:420 """Starts the Server.421422 Returns:423 ----424 str: The Server Process ID.425 """426427 # windows will use the command line application.428 if self._operating_system == 'win32':429 IB_WEB_API_PROC = ["cmd", "/k", r"bin\run.bat", r"root\conf.yaml"]430 self.server_process = subprocess.Popen(431 args=IB_WEB_API_PROC,432 cwd=self.client_portal_folder,433 creationflags=subprocess.CREATE_NEW_CONSOLE434 ).pid435436 # mac will use the terminal.437 elif self._operating_system == 'darwin':438 IB_WEB_API_PROC = ["open", "-F", "-a", "Terminal", r"bin/run.sh", r"root/conf.yaml"]439 self.server_process = subprocess.Popen(440 args=IB_WEB_API_PROC,441 cwd=self.client_portal_folder,442 ).pid443444 # if linux445 else:446 IB_WEB_API_PROC = [r"bin/run.sh", r"root/conf.yaml"]447 self.server_process = subprocess.Popen(448 args=IB_WEB_API_PROC,449 cwd=self.client_portal_folder,450 stdout=subprocess.DEVNULL451 ).pid452453 time.sleep(2)454455 for proc in psutil.process_iter():456 if 'java' in proc.name():457 self.server_process = proc.pid458459 return self.server_process460461 def connect(self, start_server: bool = True) -> bool:462 """Connects the session with the API.463464 Connects the session to the Interactive Broker API by, starting up the Client Portal Gateway,465 prompting the user to log in and then returns the results back to the `create_session` method.466467 Arguments:468 ----469 start_server {bool} -- True if the server isn't running but needs to be started, False if it470 is running and just needs to be authenticated.471472 Returns:473 ----474 bool -- `True` if it was connected.475 """476477 logging.debug('Running Client Folder at: {file_path}'.format(file_path=self.client_portal_folder))478479 # If needed, start the server and save the State.480 if start_server:481 server_state = self._start_server()482 self._server_state(action='save')483484 print("""{}485 The Interactive Broker server is currently starting up, so we can authenticate your session.486 STEP 1: GO TO THE FOLLOWING URL: {}487 STEP 2: LOGIN TO YOUR account WITH YOUR username AND PASSWORD.488 STEP 3: WHEN YOU SEE `Client login succeeds` RETURN BACK TO THE TERMINAL AND TYPE `YES` TO CHECK IF THE SESSION IS AUTHENTICATED.489 SERVER IS RUNNING ON PROCESS ID: {}490 {}491 """.format('-'*80, self.ib_gateway_path + "/sso/Login?forwardTo=22&RL=1&ip2loc=on", self.server_process, '-'*80)492 )493494 # Check the auth status495 auth_status = self._check_authentication_user_input()496497 return auth_status498499 def close_session(self) -> None:500 """Closes the current session and kills the server using Taskkill."""501502 print('\nCLOSING SERVER AND EXITING SCRIPT.')503504 if self._operating_system == 'win32':505506 # kill the process.507 return_code = subprocess.call("TASKKILL /F /PID {} /T".format(self.server_process), creationflags=subprocess.DETACHED_PROCESS)508509 else:510 return_code = os.kill(self.server_process, 9)511512 # delete the state513 self._server_state(action='delete')514515 # and exit.516 sys.exit()517518 def _headers(self, mode: str = 'json') -> Dict:519 """ 520 Returns a dictionary of default HTTP headers for calls to TD Ameritrade API,521 in the headers we defined the Authorization and access token.522523 Arguments:524 ----525 mode {str} -- Defines the content-type for the headers dictionary.526 default is 'json'. Possible values are ['json','form']527528 Returns:529 ----530 Dict531 """532533 if mode == 'json':534 headers = {'Content-Type': 'application/json'}535 elif mode == 'form':536 headers = {'Content-Type': 'application/x-www-form-urlencoded'}537538 return headers539540 def _build_url(self, endpoint: str) -> str:541 """Builds a url for a request.542543 Arguments:544 ----545 endpoint {str} -- The URL that needs conversion to a full endpoint URL.546547 Returns:548 ----549 {srt} -- A full URL path.550 """551552 # otherwise build the URL553 return urllib.parse.unquote(urllib.parse.urljoin(self.ib_gateway_path, self.api_version) + r'portal/' + endpoint)554555 def _make_request(self, endpoint: str, req_type: str, params: Dict = None) -> Dict:556 """Handles the request to the client.557558 Handles all the requests made by the client and correctly organizes559 the information so it is sent correctly. Additionally it will also560 build the URL.561562 Arguments:563 ----564 endpoint {str} -- The endpoint we wish to request.565566 req_type {str} -- Defines the type of request to be made. Can be one of four567 possible values ['GET','POST','DELETE','PUT']568569 params {dict} -- Any arguments that are to be sent along in the request. That570 could be parameters of a 'GET' request, or a data payload of a571 'POST' request.572573 Returns:574 ----575 {Dict} -- A response dictionary.576577 """578579 # first build the url580 url = self._build_url(endpoint=endpoint)581582 # Scenario 1: POST with a payload.583 if req_type == 'POST' and params is not None:584585 # make sure it's a JSON String586 headers = self._headers(mode='json')587588 # grab the response.589 response = requests.post(url, headers=headers, json=params, verify=False)590591 # SCENARIO 2: POST without a payload.592 elif req_type == 'POST' and params is None:593594 # grab the response.595 response = requests.post(url, headers=self._headers(mode='json'), verify=False)596597 # SCENARIO 3: GET without parameters.598 elif req_type == 'GET' and params is None:599600 # grab the response.601 response = requests.get(url, headers=self._headers(mode='json'), verify=False)602603 # SCENARIO 3: GET with parameters.604 elif req_type == 'GET' and params is not None:605606 # grab the response.607 response = requests.get(url, headers=self._headers(mode='json'), params=params, verify=False)608609 # grab the status code610 status_code = response.status_code611612 # grab the response headers.613 response_headers = response.headers614615 # Check to see if it was successful616 if response.ok:617618 if response_headers.get('Content-Type', 'null') == 'application/json;charset=utf-8':619 return response.json()620 else:621 return response.json()622623 elif url == 'https://localhost:5000/v1/portal/iserver/account':624625 if response_headers.get('Content-Type', 'null') == 'application/json;charset=utf-8':626 return response.json()627 else:628 return response.json()629630 # if it was a bad request print it out.631 else:632633 print('')634 print('-'*80)635 print("BAD REQUEST - STATUS CODE: {}".format(status_code))636 print("RESPONSE URL: {}".format(response.url))637 print("RESPONSE HEADERS: {}".format(response.headers))638 print("RESPONSE TEXT: {}".format(response.text))639 print('-'*80)640 print('')641642 def _prepare_arguments_list(self, parameter_list: List[str]) -> str:643 """Prepares the arguments for the request.644645 Some endpoints can take multiple values for a parameter, this646 method takes that list and creates a valid string that can be647 used in an API request. The list can have either one index or648 multiple indexes.649650 Arguments:651 ----652 parameter_list {List} -- A list of paramater values assigned to an argument.653654 Usage:655 ----656 >>> SessionObject._prepare_arguments_list(parameter_list=['MSFT','SQ'])657658 Returns:659 ----660 {str} -- The joined list.661662 """663664 # validate it's a list.665 if type(parameter_list) is list:666667 # specify the delimiter and join the list.668 delimiter = ','669 parameter_list = delimiter.join(parameter_list)670671 return parameter_list672673 """674 SESSION ENDPOINTS675 """676677 def validate(self) -> Dict:678 """Validates the current session for the SSO user."""679680 # define request components681 endpoint = r'sso/validate'682 req_type = 'GET'683 content = self._make_request(endpoint=endpoint, req_type=req_type)684685 return content686687 def tickle(self) -> Dict:688 """Keeps the session open.689690 If the gateway has not received any requests for several minutes an open session will 691 automatically timeout. The tickle endpoint pings the server to prevent the 692 session from ending.693 """694695 # define request components696 endpoint = r'tickle'697 req_type = 'POST'698 content = self._make_request(endpoint=endpoint, req_type=req_type)699700 return content701702 def logout(self) -> Dict:703 """Logs the session out.704705 Logs the user out of the gateway session. Any further activity requires 706 re-authentication.707 """708709 # https://cdcdyn.interactivebrokers.com/portal.proxy/v1/portal/logout710 # https://cdcdyn.interactivebrokers.com/portal.proxy/v1/ibcust/logout711 # https://cdcdyn.interactivebrokers.com/sso/Logout?RL=1712713 # define request components714 endpoint = r'logout'715 req_type = 'POST'716 content = self._make_request(endpoint=endpoint, req_type=req_type)717718 return content719720 def reauthenticate(self) -> Dict:721 """Reauthenticates an existing session.722723 Provides a way to reauthenticate to the Brokerage system as long as there 724 is a valid SSO session, see /sso/validate.725 """726727 # define request components728 endpoint = r'iserver/reauthenticate'729 req_type = 'POST'730731 # this is special, I don't want the JSON content right away.732 content = self._make_request(endpoint=endpoint, req_type=req_type)733734 return content735736 def is_authenticated(self) -> Dict:737 """Checks if session is authenticated.738739 Current Authentication status to the Brokerage system. Market Data and 740 Trading is not possible if not authenticated, e.g. authenticated 741 shows `False`.742 """743744 # define request components745 endpoint = 'iserver/auth/status'746 req_type = 'POST'747 content = self._make_request(endpoint=endpoint, req_type=req_type)748749 return content750751 """752 FUNDAMENTAL DATA ENDPOINTS753 """754755 def fundamentals_summary(self, conid: str) -> Dict:756 """Grabs a financial summary of a company.757758 Return a financial summary for specific Contract ID. The financial summary759 includes key ratios and descriptive components of the Contract ID.760761 Arguments:762 ---- 763 conid {str} -- The contract ID.764765 Returns:766 ----767 {Dict} -- The response dictionary.768 """769770 # define request components771 endpoint = 'iserver/fundamentals/{}/summary'.format(conid)772 req_type = 'GET'773 content = self._make_request(endpoint=endpoint, req_type=req_type)774775 return content776777 def fundamentals_financials(self, conid: str, financial_statement: str, period: str = 'annual') -> Dict:778 """779 Return a financial summary for specific Contract ID. The financial summary780 includes key ratios and descriptive components of the Contract ID.781782 NAME: conid783 DESC: The contract ID.784 TYPE: String785786 NAME: financial_statement787 DESC: The specific financial statement you wish to request for the Contract ID. Possible788 values are ['balance','cash','income']789 TYPE: String790791 NAME: period792 DESC: The specific period you wish to see. Possible values are ['annual','quarter']793 TYPE: String794795 RTYPE: Dictionary796 """797798 # define the period799 if period == 'annual':800 period = True801 else:802 period = False803804 # Build the arguments.805 params = {806 'type': financial_statement,807 'annual': period808 }809810 # define request components811 endpoint = 'fundamentals/financials/{}'.format(conid)812 req_type = 'GET'813 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)814815 return content816817 def fundamentals_key_ratios(self, conid: str) -> Dict:818 """819 Returns analyst ratings for a specific conid.820821 NAME: conid822 DESC: The contract ID.823 TYPE: String824825 """826827 # Build the arguments.828 params = {829 'widgets': 'key_ratios'830 }831832 # define request components833 endpoint = 'fundamentals/landing/{}'.format(conid)834 req_type = 'GET'835 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)836837 return content838839 def fundamentals_dividends(self, conid: str) -> Dict:840 """841 Returns analyst ratings for a specific conid.842843 NAME: conid844 DESC: The contract ID.845 TYPE: String846847 """848849 # Build the arguments.850 params = {851 'widgets': 'dividends'852 }853854 # define request components855 endpoint = 'fundamentals/landing/{}'.format(conid)856 req_type = 'GET'857 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)858859 return content860861 def fundamentals_esg(self, conid: str) -> Dict:862 """863 Returns analyst ratings for a specific conid.864865 NAME: conid866 DESC: The contract ID.867 TYPE: String868869 """870871 # Build the arguments.872 params = {873 'widgets': 'esg'874 }875876 # define request components877 endpoint = 'fundamentals/landing/{}'.format(conid)878 req_type = 'GET'879 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)880881 return content882883 """884 DATA ENDPOINTS885 """886887 def data_news(self, conid: str) -> Dict:888 """889 Return a financial summary for specific Contract ID. The financial summary890 includes key ratios and descriptive components of the Contract ID.891892 NAME: conid893 DESC: The contract ID.894 TYPE: String895896 """897898 # Build the arguments.899 params = {900 'widgets': 'news',901 'lang': 'en'902 }903904 # define request components905 endpoint = 'fundamentals/landing/{}'.format(conid)906 req_type = 'GET'907 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)908909 return content910911 def data_ratings(self, conid: str) -> Dict:912 """913 Returns analyst ratings for a specific conid.914915 NAME: conid916 DESC: The contract ID.917 TYPE: String918919 """920921 # Build the arguments.922 params = {923 'widgets': 'ratings'924 }925926 # define request components927 endpoint = 'fundamentals/landing/{}'.format(conid)928 req_type = 'GET'929 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)930931 return content932933 def _data_events(self, conid: str) -> Dict:934 """935 Returns analyst ratings for a specific conid.936937 NAME: conid938 DESC: The contract ID.939 TYPE: String940941 """942943 # Build the arguments.944 params = {945 'widgets': 'ratings'946 }947948 # define request components949 endpoint = 'fundamentals/landing/{}'.format(conid)950 req_type = 'GET'951 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)952953 return content954955 def data_ownership(self, conid: str) -> Dict:956 """957 Returns analyst ratings for a specific conid.958959 NAME: conid960 DESC: The contract ID.961 TYPE: String962963 """964965 # Build the arguments.966 params = {967 'widgets': 'ownership'968 }969970 # define request components971 endpoint = 'fundamentals/landing/{}'.format(conid)972 req_type = 'GET'973 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)974975 return content976977 def data_competitors(self, conid: str) -> Dict:978 """979 Returns analyst ratings for a specific conid.980981 NAME: conid982 DESC: The contract ID.983 TYPE: String984985 """986987 # Build the arguments.988 params = {989 'widgets': 'competitors'990 }991992 # define request components993 endpoint = 'fundamentals/landing/{}'.format(conid)994 req_type = 'GET'995 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)996997 return content998999 def data_analyst_forecast(self, conid: str) -> Dict:1000 """1001 Returns analyst ratings for a specific conid.10021003 NAME: conid1004 DESC: The contract ID.1005 TYPE: String10061007 """10081009 # Build the arguments.1010 params = {1011 'widgets': 'analyst_forecast'1012 }10131014 # define request components1015 endpoint = 'fundamentals/landing/{}'.format(conid)1016 req_type = 'GET'1017 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)10181019 return content10201021 def market_data(self, conids: List[str], since: str, fields: List[str]) -> Dict:1022 """1023 Get Market Data for the given conid(s). The end-point will return by 1024 default bid, ask, last, change, change pct, close, listing exchange. 1025 See response fields for a list of available fields that can be request 1026 via fields argument. The endpoint /iserver/accounts should be called 1027 prior to /iserver/marketdata/snapshot. To receive all available fields 1028 the /snapshot endpoint will need to be called several times.10291030 NAME: conid1031 DESC: The list of contract IDs you wish to pull current quotes for.1032 TYPE: List<String>10331034 NAME: since1035 DESC: Time period since which updates are required.1036 Uses epoch time with milliseconds.1037 TYPE: String10381039 NAME: fields1040 DESC: List of fields you wish to retrieve for each quote.1041 TYPE: List<String>10421043 """10441045 # define request components1046 endpoint = 'iserver/marketdata/snapshot'1047 req_type = 'GET'10481049 # join the two list arguments so they are both a single string.1050 conids_joined = self._prepare_arguments_list(parameter_list=conids)10511052 if fields is not None:1053 fields_joined = ",".join(str(n) for n in fields)1054 else:1055 fields_joined = ""10561057 # define the parameters1058 if since is None:1059 params = {1060 'conids': conids_joined,1061 'fields': fields_joined1062 }1063 else:1064 params = {1065 'conids': conids_joined,1066 'since': since,1067 'fields': fields_joined1068 }10691070 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)10711072 return content10731074 def market_data_history(self, conid: str, period: str, bar: str) -> Dict:1075 """1076 Get history of market Data for the given conid, length of data is controlled by period and 1077 bar. e.g. 1y period with bar=1w returns 52 data points.10781079 NAME: conid1080 DESC: The contract ID for a given instrument. If you don't know the contract ID use the1081 `search_by_symbol_or_name` endpoint to retrieve it.1082 TYPE: String10831084 NAME: period1085 DESC: Specifies the period of look back. For example 1y means looking back 1 year from today.1086 Possible values are ['1d','1w','1m','1y']1087 TYPE: String10881089 NAME: bar1090 DESC: Specifies granularity of data. For example, if bar = '1h' the data will be at an hourly level.1091 Possible values are ['5min','1h','1w']1092 TYPE: String10931094 """10951096 # define request components1097 endpoint = 'iserver/marketdata/history'1098 req_type = 'GET'1099 params = {1100 'conid': conid,1101 'period': period,1102 'bar': bar1103 }1104 content = self._make_request(endpoint=endpoint, req_type=req_type, params=params)11051106 return content11071108 """1109 SERVER ACCOUNTS ENDPOINTS1110 """11111112 def server_accounts(self):1113 """11141115 Returns a list of accounts the user has trading access to, their1116 respective aliases and the currently selected account. Note this1117 endpoint must be called before modifying an order or querying1118 open orders.11191120 """11211122 # define request components1123 endpoint = 'iserver/accounts'1124 req_type = 'GET'1125 content = self._make_request(endpoint=endpoint, req_type=req_type)11261127 return content11281129 def update_server_account(self, account_id: str, check: bool = False) -> Dict:1130 """1131 If an user has multiple accounts, and user wants to get orders, trades, 1132 etc. of an account other than currently selected account, then user 1133 can update the currently selected account using this API and then can 1134 fetch required information for the newly updated account.11351136 NAME: account_id1137 DESC: The account ID you wish to set for the API Session. This will be used to1138 grab historical data and make orders.1139 TYPE: String11401141 """11421143 # define request components1144 endpoint = 'iserver/account'1145 req_type = 'POST'1146 params = {1147 'acctId': account_id1148 }11491150 content = self._make_request(1151 endpoint=endpoint,1152 req_type=req_type,1153 params=params1154 )11551156 return content11571158 def server_account_pnl(self):1159 """1160 Returns an object containing PnLfor the selected account and its models 1161 (if any).1162 """11631164 # define request components1165 endpoint = 'iserver/account/pnl/partitioned'1166 req_type = 'GET'1167 content = self._make_request(endpoint=endpoint, req_type=req_type)11681169 return content11701171 """1172 CONTRACT ENDPOINTS1173 """11741175 def symbol_search(self, symbol: str) -> Dict:1176 """1177 Performs a symbol search for a given symbol and returns information related to the1178 symbol including the contract id.1179 """11801181 # define the request components1182 endpoint = 'iserver/secdef/search'1183 req_type = 'POST'1184 payload = {'symbol': symbol}1185 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)11861187 return content11881189 def contract_details(self, conid: str) -> Dict:1190 """1191 Get contract details, you can use this to prefill your order before you submit an order.11921193 NAME: conid1194 DESC: The contract ID you wish to get details for.1195 TYPE: String11961197 RTYPE: Dictionary1198 """11991200 # define the request components1201 endpoint = '/iserver/contract/{conid}/info'.format(conid=conid)1202 req_type = 'GET'1203 content = self._make_request(endpoint=endpoint, req_type=req_type)12041205 return content12061207 def contracts_definitions(self, conids: List[str]) -> Dict:1208 """1209 Returns a list of security definitions for the given conids.12101211 NAME: conids1212 DESC: A list of contract IDs you wish to get details for.1213 TYPE: List<Integer>12141215 RTYPE: Dictionary1216 """12171218 # define the request components1219 endpoint = '/trsrv/secdef'1220 req_type = 'POST'1221 payload = {1222 'conids': conids1223 }1224 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12251226 return content12271228 def futures_search(self, symbols: List[str]) -> Dict:1229 """1230 Returns a list of non-expired future contracts for given symbol(s).12311232 NAME: Symbol1233 DESC: List of case-sensitive symbols separated by comma.1234 TYPE: List<String>12351236 RTYPE: Dictionary1237 """12381239 # define the request components1240 endpoint = '/trsrv/futures'1241 req_type = 'GET'1242 payload = {'symbols': "{}".format(','.join(symbols))}1243 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12441245 return content12461247 def symbols_search_list(self, symbols: List[str]) -> Dict:1248 """1249 Returns a list of non-expired future contracts for given symbol(s).12501251 NAME: Symbol1252 DESC: List of case-sensitive symbols separated by comma.1253 TYPE: List<String>12541255 RTYPE: Dictionary1256 """12571258 # define the request components1259 endpoint = '/trsrv/stocks'1260 req_type = 'GET'1261 payload = {'symbols': '{}'.format(','.join(symbols))}1262 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)12631264 return content12651266 """1267 PORTFOLIO ACCOUNTS ENDPOINTS1268 """12691270 def portfolio_accounts(self):1271 """1272 In non-tiered account structures, returns a list of accounts for which the 1273 user can view position and account information. This endpoint must be called prior 1274 to calling other /portfolio endpoints for those accounts. For querying a list of accounts 1275 which the user can trade, see /iserver/accounts. For a list of subaccounts in tiered account 1276 structures (e.g. financial advisor or ibroker accounts) see /portfolio/subaccounts.12771278 """12791280 # define request components1281 endpoint = 'portfolio/accounts'1282 req_type = 'GET'1283 content = self._make_request(endpoint=endpoint, req_type=req_type)12841285 return content12861287 def portfolio_sub_accounts(self):1288 """1289 Used in tiered account structures (such as financial advisor and ibroker accounts) to return a 1290 list of sub-accounts for which the user can view position and account-related information. This 1291 endpoint must be called prior to calling other /portfolio endpoints for those subaccounts. To 1292 query a list of accounts the user can trade, see /iserver/accounts.12931294 """12951296 # define request components1297 endpoint = r'​portfolio/subaccounts'1298 req_type = 'GET'1299 content = self._make_request(endpoint=endpoint, req_type=req_type)13001301 return content13021303 def portfolio_account_info(self, account_id: str) -> Dict:1304 """1305 Used in tiered account structures (such as financial advisor and ibroker accounts) to return a 1306 list of sub-accounts for which the user can view position and account-related information. This 1307 endpoint must be called prior to calling other /portfolio endpoints for those subaccounts. To 1308 query a list of accounts the user can trade, see /iserver/accounts.13091310 NAME: account_id1311 DESC: The account ID you wish to return info for.1312 TYPE: String13131314 """13151316 # define request components1317 endpoint = r'portfolio/{}/meta'.format(account_id)1318 req_type = 'GET'1319 content = self._make_request(endpoint=endpoint, req_type=req_type)13201321 return content13221323 def portfolio_account_summary(self, account_id: str) -> Dict:1324 """1325 Returns information about margin, cash balances and other information 1326 related to specified account. See also /portfolio/{accountId}/ledger. 1327 /portfolio/accounts or /portfolio/subaccounts must be called 1328 prior to this endpoint.13291330 NAME: account_id1331 DESC: The account ID you wish to return info for.1332 TYPE: String13331334 """13351336 # define request components1337 endpoint = r'portfolio/{}/summary'.format(account_id)1338 req_type = 'GET'1339 content = self._make_request(endpoint=endpoint, req_type=req_type)13401341 return content13421343 def portfolio_account_ledger(self, account_id: str) -> Dict:1344 """1345 Information regarding settled cash, cash balances, etc. in the account's 1346 base currency and any other cash balances hold in other currencies. /portfolio/accounts 1347 or /portfolio/subaccounts must be called prior to this endpoint. The list of supported 1348 currencies is available at https://www.interactivebrokers.com/en/index.php?f=3185.13491350 NAME: account_id1351 DESC: The account ID you wish to return info for.1352 TYPE: String13531354 """13551356 # define request components1357 endpoint = r'portfolio/{}/ledger'.format(account_id)1358 req_type = 'GET'1359 content = self._make_request(endpoint=endpoint, req_type=req_type)13601361 return content13621363 def portfolio_account_allocation(self, account_id: str) -> Dict:1364 """1365 Information about the account's portfolio allocation by Asset Class, Industry and 1366 Category. /portfolio/accounts or /portfolio/subaccounts must be called prior to 1367 this endpoint.13681369 NAME: account_id1370 DESC: The account ID you wish to return info for.1371 TYPE: String13721373 """13741375 # define request components1376 endpoint = r'portfolio/{}/allocation'.format(account_id)1377 req_type = 'GET'1378 content = self._make_request(endpoint=endpoint, req_type=req_type)13791380 return content13811382 def portfolio_accounts_allocation(self, account_ids: List[str]) -> Dict:1383 """1384 Similar to /portfolio/{accountId}/allocation but returns a consolidated view of of all the 1385 accounts returned by /portfolio/accounts. /portfolio/accounts or /portfolio/subaccounts must 1386 be called prior to this endpoint.13871388 NAME: account_ids1389 DESC: A list of Account IDs you wish to return alloacation info for.1390 TYPE: List<String>13911392 """13931394 # define request components1395 endpoint = r'portfolio/allocation'1396 req_type = 'POST'1397 payload = account_ids1398 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)13991400 return content14011402 def portfolio_account_positions(self, account_id: str, page_id: int = 0) -> Dict:1403 """1404 Returns a list of positions for the given account. The endpoint supports paging, 1405 page's default size is 30 positions. /portfolio/accounts or /portfolio/subaccounts 1406 must be called prior to this endpoint.14071408 NAME: account_id1409 DESC: The account ID you wish to return positions for.1410 TYPE: String14111412 NAME: page_id1413 DESC: The page you wish to return if there are more than 1. The1414 default value is `0`.1415 TYPE: String141614171418 ADDITIONAL ARGUMENTS NEED TO BE ADDED!!!!!1419 """14201421 # define request components1422 endpoint = r'portfolio/{}/positions/{}'.format(account_id, page_id)1423 req_type = 'GET'1424 content = self._make_request(endpoint=endpoint, req_type=req_type)14251426 return content14271428 #1429 # RENAME THIS1430 #14311432 def portfolio_account_position(self, account_id: str, conid: str) -> Dict:1433 """1434 Returns a list of all positions matching the conid. For portfolio models the conid 1435 could be in more than one model, returning an array with the name of the model it 1436 belongs to. /portfolio/accounts or /portfolio/subaccounts must be called prior to 1437 this endpoint.14381439 NAME: account_id1440 DESC: The account ID you wish to return positions for.1441 TYPE: String14421443 NAME: conid1444 DESC: The contract ID you wish to find matching positions for.1445 TYPE: String14461447 """14481449 # define request components1450 endpoint = r'portfolio/{}/position/{}'.format(account_id, conid)1451 req_type = 'GET'1452 content = self._make_request(endpoint=endpoint, req_type=req_type)14531454 return content14551456 #1457 # GET MORE DETAILS ON THIS1458 #14591460 def portfolio_positions_invalidate(self, account_id: str) -> Dict:1461 """1462 Invalidates the backend cache of the Portfolio. ???14631464 NAME: account_id1465 DESC: The account ID you wish to return positions for.1466 TYPE: String14671468 """14691470 # define request components1471 endpoint = r'portfolio/{}/positions/invalidate'.format(account_id)1472 req_type = 'POST'1473 content = self._make_request(endpoint=endpoint, req_type=req_type)14741475 return content14761477 def portfolio_positions(self, conid: str) -> Dict:1478 """1479 Returns an object of all positions matching the conid for all the selected accounts. 1480 For portfolio models the conid could be in more than one model, returning an array 1481 with the name of the model it belongs to. /portfolio/accounts or /portfolio/subaccounts 1482 must be called prior to this endpoint.14831484 NAME: conid1485 DESC: The contract ID you wish to find matching positions for.1486 TYPE: String 1487 """14881489 # define request components1490 endpoint = r'portfolio/positions/{}'.format(conid)1491 req_type = 'GET'1492 content = self._make_request(endpoint=endpoint, req_type=req_type)14931494 return content14951496 """1497 TRADES ENDPOINTS1498 """14991500 def trades(self):1501 """1502 Returns a list of trades for the currently selected account for current day and 1503 six previous days.1504 """15051506 # define request components1507 endpoint = r'iserver/account/trades'1508 req_type = 'GET'1509 content = self._make_request(endpoint=endpoint, req_type=req_type)15101511 return content15121513 """1514 ORDERS ENDPOINTS1515 """15161517 def get_live_orders(self):1518 """1519 The end-point is meant to be used in polling mode, e.g. requesting every 1520 x seconds. The response will contain two objects, one is notification, the 1521 other is orders. Orders is the list of orders (cancelled, filled, submitted) 1522 with activity in the current day. Notifications contains information about 1523 execute orders as they happen, see status field.15241525 """15261527 # define request components1528 endpoint = r'iserver/account/orders'1529 req_type = 'GET'1530 content = self._make_request(endpoint=endpoint, req_type=req_type)15311532 return content15331534 def place_order(self, account_id: str, order: dict) -> Dict:1535 """1536 Please note here, sometimes this end-point alone can't make sure you submit the order 1537 successfully, you could receive some questions in the response, you have to to answer 1538 them in order to submit the order successfully. You can use "/iserver/reply/{replyid}" 1539 end-point to answer questions.15401541 NAME: account_id1542 DESC: The account ID you wish to place an order for.1543 TYPE: String15441545 NAME: order1546 DESC: Either an IBOrder object or a dictionary with the specified payload.1547 TYPE: IBOrder or Dict15481549 """15501551 if type(order) is dict:1552 order = order1553 else:1554 order = order.create_order()15551556 # define request components1557 endpoint = r'iserver/account/{}/order'.format(account_id)1558 req_type = 'POST'1559 content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)15601561 return content15621563 def place_orders(self, account_id: str, orders: List[Dict]) -> Dict:1564 """1565 An extension of the `place_order` endpoint but allows for a list of orders. Those orders may be1566 either a list of dictionary objects or a list of IBOrder objects.15671568 NAME: account_id1569 DESC: The account ID you wish to place an order for.1570 TYPE: String15711572 NAME: orders1573 DESC: Either a list of IBOrder objects or a list of dictionaries with the specified payload.1574 TYPE: List<IBOrder Object> or List<Dictionary>15751576 """15771578 # EXTENDED THIS1579 if type(orders) is list:1580 orders = orders1581 else:1582 orders = orders15831584 # define request components1585 endpoint = r'iserver/account/{}/orders'.format(account_id)1586 req_type = 'POST'1587 content = self._make_request(endpoint=endpoint, req_type=req_type, params=orders)15881589 return content15901591 def place_order_scenario(self, account_id: str, order: dict) -> Dict:1592 """1593 This end-point allows you to preview order without actually submitting the 1594 order and you can get commission information in the response.15951596 NAME: account_id1597 DESC: The account ID you wish to place an order for.1598 TYPE: String15991600 NAME: order1601 DESC: Either an IBOrder object or a dictionary with the specified payload.1602 TYPE: IBOrder or Dict16031604 """16051606 if type(order) is dict:1607 order = order1608 else:1609 order = order.create_order()16101611 # define request components1612 endpoint = r'iserver/account/{}/order/whatif'.format(account_id)1613 req_type = 'POST'1614 content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)16151616 return content16171618 def modify_order(self, account_id: str, customer_order_id: str, order: dict) -> Dict:1619 """1620 Modifies an open order. The /iserver/accounts endpoint must first1621 be called.16221623 NAME: account_id1624 DESC: The account ID you wish to place an order for.1625 TYPE: String16261627 NAME: customer_order_id1628 DESC: The customer order ID for the order you wish to MODIFY.1629 TYPE: String16301631 NAME: order1632 DESC: Either an IBOrder object or a dictionary with the specified payload.1633 TYPE: IBOrder or Dict16341635 """16361637 if type(order) is dict:1638 order = order1639 else:1640 order = order.create_order()16411642 # define request components1643 endpoint = r'iserver/account/{}/order/{}'.format(account_id, customer_order_id)1644 req_type = 'POST'1645 content = self._make_request(endpoint=endpoint, req_type=req_type, params=order)16461647 return content16481649 def delete_order(self, account_id: str, customer_order_id: str) -> Dict:1650 """1651 Deletes the order specified by the customer order ID.16521653 NAME: account_id1654 DESC: The account ID you wish to place an order for.1655 TYPE: String16561657 NAME: customer_order_id1658 DESC: The customer order ID for the order you wish to DELETE.1659 TYPE: String16601661 """1662 # define request components1663 endpoint = r'iserver/account/{}/order/{}'.format(account_id, customer_order_id)1664 req_type = 'DELETE'1665 content = self._make_request(endpoint=endpoint, req_type=req_type)16661667 return content16681669 """1670 ORDERS ENDPOINTS1671 """16721673 def get_scanners(self):1674 """1675 Returns an object contains four lists contain all parameters for scanners.16761677 RTYPE Dictionary1678 """1679 # define request components1680 endpoint = r'iserver/scanner/params'1681 req_type = 'GET'1682 content = self._make_request(endpoint=endpoint, req_type=req_type)16831684 return content16851686 def run_scanner(self, instrument: str, scanner_type: str, location: str, size: str = '25', filters: List[dict] = None) -> Dict:1687 """1688 Run a scanner to get a list of contracts.16891690 NAME: instrument1691 DESC: The type of financial instrument you want to scan for.1692 TYPE: String16931694 NAME: scanner_type1695 DESC: The Type of scanner you wish to run, defined by the scanner code.1696 TYPE: String16971698 NAME: location1699 DESC: The geographic location you wish to run the scan. For example (STK.US.MAJOR)1700 TYPE: String17011702 NAME: size1703 DESC: The number of results to return back. Defaults to 25.1704 TYPE: String17051706 NAME: filters1707 DESC: A list of dictionaries where the key is the filter you wish to set and the value is the value you want set1708 for that filter.1709 TYPE: List<Dictionaries>17101711 RTYPE Dictionary1712 """17131714 # define request components1715 endpoint = r'iserver/scanner/run'1716 req_type = 'POST'1717 payload = {1718 "instrument": instrument,1719 "type": scanner_type,1720 "filter": filters,1721 "location": location,1722 "size": size1723 }1724 print(payload)17251726 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)17271728 return content17291730 def customer_info(self):1731 """1732 Returns Applicant Id with all owner related entities 17331734 RTYPE Dictionary1735 """17361737 # define request components1738 endpoint = r'ibcust/entity/info'1739 req_type = 'GET'1740 content = self._make_request(endpoint=endpoint, req_type=req_type)17411742 return content17431744 def get_unread_messages(self):1745 """1746 Returns the unread messages associated with the account.17471748 RTYPE Dictionary1749 """17501751 # define request components1752 endpoint = r'fyi/unreadnumber'1753 req_type = 'GET'1754 content = self._make_request(endpoint=endpoint, req_type=req_type)17551756 return content17571758 def get_subscriptions(self):1759 """1760 Return the current choices of subscriptions, we can toggle the option.17611762 RTYPE Dictionary1763 """17641765 # define request components1766 endpoint = r'fyi/settings'1767 req_type = 'GET'1768 content = self._make_request(endpoint=endpoint, req_type=req_type)17691770 return content17711772 def change_subscriptions_status(self, type_code: str, enable: bool = True) -> Dict:1773 """1774 Turns the subscription on or off.17751776 NAME: type_code1777 DESC: The subscription code you wish to change the status for.1778 TYPE: String17791780 NAME: enable1781 DESC: True if you want the subscription turned on, False if you want it turned of.1782 TYPE: Boolean17831784 RTYPE Dictionary1785 """17861787 # define request components1788 endpoint = r'fyi/settings/{}'1789 req_type = 'POST'1790 payload = {'enable': enable}1791 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)17921793 return content17941795 def subscriptions_disclaimer(self, type_code: str) -> Dict:1796 """1797 Returns the disclaimer for the specified subscription.17981799 NAME: type_code1800 DESC: The subscription code you wish to change the status for.1801 TYPE: String18021803 RTYPE Dictionary1804 """18051806 # define request components1807 endpoint = r'fyi/disclaimer/{}'1808 req_type = 'GET'1809 content = self._make_request(endpoint=endpoint, req_type=req_type)18101811 return content18121813 def mark_subscriptions_disclaimer(self, type_code: str) -> Dict:1814 """1815 Sets the specified disclaimer to read.18161817 NAME: type_code1818 DESC: The subscription code you wish to change the status for.1819 TYPE: String18201821 RTYPE Dictionary1822 """18231824 # define request components1825 endpoint = r'fyi/disclaimer/{}'1826 req_type = 'PUT'1827 content = self._make_request(endpoint=endpoint, req_type=req_type)18281829 return content18301831 def subscriptions_delivery_options(self):1832 """1833 Options for sending fyis to email and other devices.18341835 RTYPE Dictionary1836 """18371838 # define request components1839 endpoint = r'fyi/deliveryoptions'1840 req_type = 'GET'1841 content = self._make_request(endpoint=endpoint, req_type=req_type)18421843 return content18441845 def mutual_funds_portfolios_and_fees(self, conid: str) -> Dict:1846 """1847 Grab the Fees and objectives for a specified mutual fund.18481849 NAME: conid1850 DESC: The Contract ID for the mutual fund.1851 TYPE: String18521853 RTYPE Dictionary1854 """18551856 # define request components1857 endpoint = r'fundamentals/mf_profile_and_fees/{mutual_fund_id}'.format(mutual_fund_id=conid)1858 req_type = 'GET'1859 content = self._make_request(endpoint=endpoint, req_type=req_type)18601861 return content18621863 def mutual_funds_performance(self, conid: str, risk_period: str, yield_period: str, statistic_period: str) -> Dict:1864 """1865 Grab the Lip Rating for a specified mutual fund.18661867 NAME: conid1868 DESC: The Contract ID for the mutual fund.1869 TYPE: String18701871 NAME: yield_period1872 DESC: The Period threshold for yield information1873 possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1874 TYPE: String18751876 NAME: risk_period1877 DESC: The Period threshold for risk information1878 possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1879 TYPE: String18801881 NAME: statistic_period1882 DESC: The Period threshold for statistic information1883 possible values: ['6M', '1Y', '3Y', '5Y', '10Y']1884 TYPE: String18851886 RTYPE Dictionary1887 """18881889 # define request components1890 endpoint = r'fundamentals/mf_performance/{mutual_fund_id}'.format(mutual_fund_id=conid)1891 req_type = 'GET'1892 payload = {1893 'risk_period': None,1894 'yield_period': None,1895 'statistic_period': None1896 }1897 content = self._make_request(endpoint=endpoint, req_type=req_type, params=payload)1898 ...

Full Screen

Full Screen

analysis.py

Source:analysis.py Github

copy

Full Screen

1from pathlib import Path2from dataclasses import dataclass3import pandas as pd4import re5import os6import moviepy.editor as mpy7import moviepy.tools8import moviepy.config9import json10import bbox11import cv212# TODO:13# - option to choose locale14# - docstrings15# - support for undistort16events_log_filename = "events.csv"17session_state_filename = "session_state.json"18name_locale = "Asia/Jerusalem"19def split_name_datetime(s):20 """21 Split a string with format {name}_%Y%m%d_%H%M%S into name and date.22 Return name (string), dt (np.datetime64)23 """24 match = re.search("(.*)_([0-9]*)[-_]([0-9]*)", s)25 dt = pd.to_datetime(match.group(2) + " " + match.group(3), format="%Y%m%d %H%M%S")26 name = match.group(1)27 return name, dt28def read_timeseries_csv(path: Path, time_col="time", tz="utc") -> pd.DataFrame:29 """30 Read csv file into a pandas DataFrame. Creates a DatetimeIndex and sets31 the timezone to the specified one.32 - path: csv file path33 - time_col: The name of the column to be used as a DatetimeIndex34 - tz: The timezone of the time column (see DatetimeIndex.tz_localize)35 """36 df = pd.read_csv(path)37 if hasattr(time_col, "__getitem__"):38 for col in time_col:39 if col in df.columns:40 time_col = col41 break42 df.index = pd.to_datetime(df[time_col], unit="s").dt.tz_localize(tz)43 df.drop(columns=[time_col], inplace=True)44 return df45def is_timestamp_contained(46 tdf: pd.DataFrame, timestamp: pd.Timestamp, time_col=None47) -> bool:48 """49 Return True if the timestamp is contained within the time range of the50 supplied timeseries dataframe.51 tdf: Timeseries dataframe (pd.DataFrame) with a time column.52 timestamp: The timestamp to test53 time_col: The name of the time column. When equals None the dataframe index54 will be used.55 """56 if time_col is None:57 beginning = tdf.index[0]58 end = tdf.index[-1]59 else:60 beginning = tdf[time_col].iloc[0]61 end = tdf[time_col].iloc[-1]62 return beginning < timestamp < end63def idx_for_time(df: pd.DataFrame, timestamp: pd.Timestamp, time_col=None) -> int:64 """65 Return the closest row index to the supplied timestamp.66 df: The dataframe to search67 timestamp: The timestamp that will be used to find the index68 time_col: The name of the time column. When equals None the dataframe index69 will be used.70 """71 if time_col is None:72 return df.index.get_loc(timestamp, method="nearest")73 else:74 return (df[time_col] - timestamp).abs().argmin()75def format_timedelta(td: pd.Timedelta, use_colons=True):76 total_secs = int(td.total_seconds())77 hrs = total_secs // 360078 mins = (total_secs % 3600) // 6079 secs = (total_secs % 3600) % 6080 if use_colons:81 return f"{hrs:02}:{mins:02}:{secs:02}"82 else:83 return f"{hrs:02}{mins:02}{secs:02}"84def format_timestamp(ts: pd.Timestamp, fmt="%m-%d %H:%M:%S", tz="Asia/Jerusalem"):85 return pd.to_datetime(ts.tz_convert(tz)).strftime(fmt)86def background_for_ts(info, ts, infix):87 imgs = [p for p in info.images if infix in p.name]88 im_tss = [split_name_datetime(p.stem)[1].tz_localize("Asia/Jerusalem").tz_convert("utc") for p in imgs]89 df = pd.DataFrame(data={'ts': im_tss, 'ts_diff': pd.Series(im_tss) - ts, 'path': imgs}, columns=['ts', 'ts_diff', 'path'])90 row = df[df.ts_diff < pd.Timedelta(0)].max()91 return cv2.imread(str(row.path))92def extract_clip(vid_path, start_frame: int, end_frame: int, output_path):93 """94 Extract a subclip of a video file without re-encoding it.95 vid_path: Path of the input video file (pathlib.Path or str).96 start_frame, end_frame: start and end of the subclip in frame numbers.97 output_path: Path for the output video file (pathlib.Path or str).98 """99 fps = mpy.VideoFileClip(str(vid_path)).fps100 start_time = start_frame / fps101 end_time = end_frame / fps102 ffmpeg_extract_subclip(vid_path, int(start_time), int(end_time), str(output_path))103def ffmpeg_extract_subclip(filename, t1, t2, targetname=None):104 """105 Makes a new video file playing video file ``filename`` between106 the times ``t1`` and ``t2``.107 from: https://zulko.github.io/moviepy/_modules/moviepy/video/io/ffmpeg_tools.html108 """109 name, ext = os.path.splitext(filename)110 if not targetname:111 T1, T2 = [int(1000 * t) for t in [t1, t2]]112 targetname = "%sSUB%d_%d.%s" % (name, T1, T2, ext)113 cmd = [114 moviepy.config.get_setting("FFMPEG_BINARY"),115 "-y",116 "-ss",117 "%0.2f" % t1,118 "-i",119 filename,120 "-t",121 "%0.2f" % (t2 - t1),122 "-map",123 "0",124 "-vcodec",125 "copy",126 "-acodec",127 "copy",128 targetname,129 ]130 moviepy.tools.subprocess_call(cmd, logger=None)131def sessions_df(session_data_root: Path) -> pd.DataFrame:132 """133 Find all sessions under the supplied session_data_root argument.134 Return a pandas dataframe with columns `name` and `dir` and a135 DatetimeIndex containing the session start time.136 """137 exp_dirs = list(filter(lambda p: p.is_dir(), session_data_root.glob("*")))138 dts = []139 names = []140 for exp_dir in exp_dirs:141 name, dt = split_name_datetime(exp_dir.stem)142 dts.append(dt)143 names.append(name)144 df = pd.DataFrame(columns=["name", "dir"], index=dts)145 df.name = names146 df.dir = exp_dirs147 return df.sort_index()148def session_stats(session_dir) -> dict:149 """150 Return a dictionary with statistics of the session in the supplied151 session_dir (Path or str) argument.152 """153 path = Path(session_dir)154 video_files = list(path.glob("*.mp4")) + list(path.glob("*.avi"))155 image_files = list(path.glob("*.png")) + list(path.glob("*.jpg"))156 csv_files = list(path.glob("*.csv"))157 return {158 "video_count": len(list(video_files)),159 "image_count": len(list(image_files)),160 "csv_count": len(list(csv_files)),161 }162def sessions_stats_df(sessions: pd.DataFrame) -> pd.DataFrame:163 """164 Return a dataframe containing statistics for each session in the supplied165 sessions argument.166 """167 df = pd.DataFrame(columns=["video_count", "image_count", "csv_count"])168 vids = []169 imgs = []170 csvs = []171 for dir in sessions.dir:172 stats = session_stats(dir)173 vids.append(stats["video_count"])174 imgs.append(stats["image_count"])175 csvs.append(stats["csv_count"])176 df.video_count = vids177 df.image_count = imgs178 df.csv_count = csvs179 return df180def read_database_table(conn, table: str, t0: pd.Timestamp, t1: pd.Timestamp):181 """182 Read data from a database table within the supplied time range.183 - conn: Database connection object (see database.make_connection)184 - table: Database table name185 - t0, t1: the time range of the returned dataframe186 """187 st0 = t0.isoformat()188 st1 = t1.isoformat()189 df = pd.read_sql(190 f"SELECT * FROM {table} WHERE \"time\" BETWEEN '{st0}' AND '{st1}'", conn191 )192 return df193@dataclass(init=False, repr=False)194class VideoInfo:195 """196 Represents a single timestamped video file.197 Expects a timestamps csv file in the same directory with the same name198 as the video file but with a `.csv` suffix199 Attributes:200 name: Video name (string)201 time: Video start time202 path: Video path203 timestamp_path: Timestamps csv path204 timestamps: The timestamps csv loaded using read_timeseries_csv()205 frame_count: Number of frames in the video (based on the timestamps file)206 duration: The total duration of the video (based on the timestamps file)207 src_id: The video image source id (based on the name attribute).208 """209 name: str210 time: pd.Timestamp211 path: Path212 timestamp_path: Path213 timestamps: pd.DataFrame214 frame_count: int215 duration: pd.Timestamp216 src_id: str217 def __init__(self, path: Path):218 """219 Return a new VideoInfo instance for the video file at the supplied path.220 """221 self.name, self.time = split_name_datetime(path.stem)222 self.time = self.time.tz_localize(name_locale).tz_convert("utc")223 self.timestamp_path = path.parent / (path.stem + ".csv")224 if not self.timestamp_path.exists():225 self.timestamp_path = None226 self.duration = None227 self.timestamps = None228 self.frame_count = None229 else:230 self.timestamps = read_timeseries_csv(231 self.timestamp_path, time_col=["time", "timestamp"]232 )233 self.duration = self.timestamps.index[-1] - self.timestamps.index[0]234 self.frame_count = self.timestamps.shape[0]235 self.path = path236 split = self.name.split("_")237 if len(split) == 1:238 self.src_id = self.name239 else:240 self.src_id = split[241 -1242 ] # NOTE: what happens when both src_id and name have underscores?243 def __repr__(self):244 return f"\nVideoInfo(name: {self.name},\n\ttime: {self.time},\n\tpath: {self.path},\n\ttimestamp_path: {self.timestamp_path},\n\tframe_count: {self.frame_count},\n\tduration: {self.duration})"245@dataclass(init=False)246class VideoPosition:247 """248 Represents a time position in a specific video.249 video: The VideoInfo representing the video matching this position250 timestamp: The time of the video position251 frame: The frame number of the video position (based on the timestamp)252 """253 video: VideoInfo254 timestamp: pd.Timestamp255 frame: int = None256 def __init__(self, video: VideoInfo, timestamp: pd.Timestamp):257 """258 Return a new instance with the supplied video and timestamp.259 """260 self.video = video261 self.timestamp = timestamp262 if self.video.timestamps is not None:263 self.frame = idx_for_time(self.video.timestamps, timestamp)264@dataclass(init=False)265class SessionInfo:266 """267 Represents a single session.268 Attributes:269 dir: The session directory270 session_state_path: The path of the session state json file271 session_state: The last recorded session state.272 videos: A list of all videos contained within the session273 images: A list of all image paths found in the session274 event_log_path: The path of the session event log275 event_log: A timeseries dataframe of the session event log276 head_bbox: A timeseries dataframe of the animal head bounding boxes277 head_centroids: A timeseries dataframe of the animal head centroids278 csvs: A list of paths to all other csvs found in the session.279 All of the dataframes in this class, as well as the session_state, are280 loaded on first access (lazily), except for VideoInfo.timestamps dataframe281 which is loaded when the object is created. To reload the data create a282 new object.283 """284 name: str285 time: pd.Timestamp286 dir: Path287 videos: [VideoInfo]288 images: [Path]289 event_log_path: Path290 csvs: [Path]291 session_state_path: Path292 session_state: dict293 def __init__(self, session_dir):294 """295 Instantiate a SessionInfo for the session at the supplied session_dir296 argument (Path or str).297 """298 session_dir = Path(session_dir)299 if not session_dir.exists():300 raise ValueError(f"Session directory doesn't exist: {str(session_dir)}")301 self.name, self.time = split_name_datetime(session_dir.stem)302 self.time = self.time.tz_localize(name_locale).tz_convert("utc")303 self.dir = session_dir304 self.videos = [305 VideoInfo(p)306 for p in list(session_dir.glob("*.mp4")) + list(session_dir.glob("*.avi"))307 ]308 ts_paths = [v.timestamp_path for v in self.videos]309 self.csvs = []310 for csv_path in [p for p in session_dir.glob("*.csv") if p not in ts_paths]:311 if events_log_filename in csv_path.name:312 self.event_log_path = csv_path313 else:314 self.csvs.append(csv_path)315 self.images = list(session_dir.glob("*.jpg")) + list(session_dir.glob("*.png"))316 self.session_state_path = session_dir / "session_state.json"317 if not self.session_state_path.exists():318 # Legacy file name319 self.session_state_path = session_dir / "exp_state.json"320 if not self.session_state_path.exists():321 self.session_state_path = None322 self._session_state = None323 self._event_log = None324 self._head_bbox = None325 @property326 def session_state(self) -> dict:327 if self._session_state is not None:328 return self._session_state329 if self.session_state_path is None:330 self._session_state = None331 else:332 with open(self.session_state_path, "r") as f:333 self._session_state = json.load(f)334 return self._session_state335 @property336 def event_log(self) -> pd.DataFrame:337 if self._event_log is not None:338 return self._event_log339 self._event_log = read_timeseries_csv(self.event_log_path)340 return self._event_log341 def filter_videos(342 self, videos=None, src_id: str = None, ts: pd.Timestamp = None343 ) -> [VideoInfo]:344 """345 Filter videos according to image source id or start time.346 videos: A list of VideoInfo objects to filter. When equals None, all347 session videos are used.348 src_id: When not None, return only videos recorded from this image349 source id.350 ts: When not None, return only videos with this start time.351 """352 if videos is None:353 videos = self.videos354 if src_id is not None:355 videos = filter(lambda v: v.src_id == src_id, videos)356 if ts is not None:357 videos = filter(lambda v: v.time == ts, videos)358 return list(videos)359 def video_position_at_time(360 self, timestamp: pd.Timestamp, videos=None361 ) -> [VideoPosition]:362 """363 Find all video files and frame numbers matching the supplied timestamp.364 Return a list of VideoPosition objects, one for each video that was365 recording during the time denoted by timestamp.366 videos: A list of VideoInfos that will be searched. When this is None,367 all of the videos in the session will be searched.368 """369 res = []370 if videos is None:371 videos = self.videos372 for vid in videos:373 if vid.timestamps is None:374 print(f"WARNING: Video {vid.name}, {vid.time} has no timestamps")375 return376 if is_timestamp_contained(vid.timestamps, timestamp):377 res.append(VideoPosition(vid, timestamp))378 return res379 def extract_clip(380 self,381 src_id: str,382 start_time: pd.Timestamp,383 end_time: pd.Timestamp,384 output_dir: Path,385 file_prefix: str,386 ):387 """388 Extract a clip from the session in this session.389 src_id: The image source id to use.390 start_time: Clip start time.391 end_time: Clip end time.392 output_dir: The directory that will contain the clip video file.393 file_prefix: An additional prefix for the output video filename.394 """395 videos = self.filter_videos(src_id=src_id)396 start_pos = self.video_position_at_time(start_time, videos)397 end_pos = self.video_position_at_time(end_time, videos)398 if len(start_pos) != 1:399 raise ValueError("start_time matched multiple videos or no videos")400 if len(end_pos) != 1:401 raise ValueError("end_time matched multiple videos or no videos")402 if start_pos[0].video.path != end_pos[0].video.path:403 raise ValueError("start_time and end_time matched different videos")404 start_pos, end_pos = start_pos[0], end_pos[0]405 relative_ts = start_pos.timestamp - start_pos.video.time406 fts = format_timedelta(relative_ts, use_colons=False)407 clip_path = (408 output_dir / f"{file_prefix}_{fts}_{start_pos.frame}_{end_pos.frame}.mp4"409 )410 extract_clip(start_pos.video.path, start_pos.frame, end_pos.frame, clip_path)411 @property412 def head_bbox(self):413 if self._head_bbox is not None:414 return self._head_bbox415 bbox_csvs = [p for p in self.csvs if p.name == "head_bbox.csv"]416 if len(bbox_csvs) == 0:417 return None418 self._head_bbox = read_timeseries_csv(bbox_csvs[0])419 return self._head_bbox420 @property421 def head_centroids(self):422 head_bbox = self.head_bbox423 centroids = bbox.xyxy_to_centroid(head_bbox[["x1", "y1", "x2", "y2"]].values)424 df = pd.DataFrame(centroids, columns=["x", "y"])425 df.index = head_bbox.index426 df["confidence"] = head_bbox.confidence...

Full Screen

Full Screen

session_reader.py

Source:session_reader.py Github

copy

Full Screen

1#!/usr/bin/python32import xml.etree.ElementTree as ET3import logging4import os5import json6import sys, traceback7from COREIfx import msg_ifx8from COREIfx.imnparser import imnparser9from pyparsing import nestedExpr, originalTextFor10#from core.api.tlv.coreapi import CoreConfMessage, CoreEventMessage11class SessionReader():12 13 def __init__(self, session_number=None):14 logging.debug("SessionReader(): instantiated")15 if session_number == None:16 #just get one from /tmp/pycore***17 logging.error("SessionReader(): init(): No session number was provided")18 return None19 self.session_number = session_number20 logging.debug("SessionReader(): init(): Retrieving imn filename")21 services_resp = str(msg_ifx.send_command('-s'+self.session_number+' SESSION flags=STRING --tcp -l'))22 self.imnfilename = ""23 for line in services_resp.splitlines():24 if "FILE: " in line:25 #we know this line has the imn filename26 ###TODO### Need to make sure we get the correct filename associated with the session27 self.imnfilename = line.split("FILE: ")[1].split("|")[0]28 break29 if self.imnfilename == "":30 logging.error("SessionReader(): init(): No associated imn file found. Exiting")31 return None32 logging.debug("SessionReader(): init(): Found filename: " + str(self.imnfilename))33 self.xmlfilename = os.path.join("/tmp","pycore."+str(session_number),"session-deployed.xml") 34 state = self.get_session_state()35 logging.debug("SessionReader(): init(): Current session state: " + str(state))36 if state == None:37 logging.error("SessionReader(): init(): Session state is not available: " + str(state))38 return None39 self.conditional_conns = self.relevant_session_to_JSON()40 def get_session_state(self):41 logging.debug("SessionReader(): get_session_state() instantiated")42 try:43 #check first if directory exists44 session_path = os.path.dirname(self.xmlfilename)45 session_state_path = os.path.join(session_path,"state")46 if os.path.exists(session_path) == False or os.path.exists(session_state_path) == False:47 logging.debug("SessionReader(): get_session_state() session does not exist!")48 return None49 logging.debug("SessionReader(): get_session_state() session found!")50 #read state file51 state_file_state = open(session_state_path,"r").readlines()[0]52 logging.debug("SessionReader(): get_session_state(): State: " + str(state_file_state))53 return state_file_state54 except Exception as e:55 exc_type, exc_value, exc_traceback = sys.exc_info()56 logging.error("SessionReader(): get_session_state(): An error occured ")57 traceback.print_exception(exc_type, exc_value, exc_traceback)58 return None59 def relevant_session_to_JSON(self):60 logging.debug("SessionReader(): relevant_session_to_JSON() instantiated")61 try:62 try:63 iparser = imnparser(self.imnfilename)64 except Exception:65 logging.error("SessionReader(): relevant_session_to_JSON(): imn scenario file not found: " + str(self.imnfilename))66 logging.error("SessionReader(): relevant_session_to_JSON(): " + self.session_number + " and ensure the imn file exists and is loaded in CORE")67 return None68 try:69 tree = ET.parse(self.xmlfilename)70 except Exception:71 logging.error("SessionReader(): relevant_session_to_JSON(): XML file not found: " + str(self.xmlfilename))72 logging.error("SessionReader(): relevant_session_to_JSON(): Make sure " + self.session_number + " has been run at least once or is running." )73 return None74 root = tree.getroot()75 conditional_conns = {}76 name_id_map = {}77 switches = []78 switch_ids = []79 links = root.find('links').findall('link')80 device_services = {}81 switch_services = {}82 switch_service_configurations = {}83 #find all devices (non-switch/hub/wireless) and identify their name/id mappings and services84 logging.debug("SessionReader(): relevant_session_to_JSON(): " + "finding devices")85 for device in root.find('devices').findall('device'):86 if device.attrib["type"] == "cc_dec_node_ovs":87 switches.append(device)88 switch_ids.append(device.attrib["id"])89 #keep track of mappings90 name_id_map[device.attrib["name"]] = device.attrib["id"]91 services = ""92 for service in device.find('services').findall('service'):93 services += " " + str(service.attrib["name"])94 #store the services that are enabled for this device95 device_services[device.attrib["id"]] = services96 #find service files for switch type nodes and then store them97 services = root.find('service_configurations')98 if services != None:99 services = services.findall('service')100 node_num = ""101 service_name = ""102 filename = ""103 file_code = ""104 for service in services:105 #Pulling service name and node id from tags106 node_num = service.attrib["node"]107 service_name = service.attrib["name"]108 if node_num in switch_ids:109 service_name = service.attrib["name"]110 #Pulling filenames from tags111 files = service.find("files").findall("file")112 for filenode in files:113 filename = filenode.attrib["name"]114 #Pulling source code from tags115 file_code = filenode.text116 switch_service_configurations[(node_num, service_name, filename)] = file_code117 #now obtain the "enabled" services for all switch nodes118 logging.debug("SessionReader(): relevant_session_to_JSON(): " + "obtaining enabled services for switches")119 logging.debug("SessionReader(): relevant_session_to_JSON():Switch services found: " + str(switch_services)) 120 #Iterating through all switch nodes to consolidate source files and conditional connections121 for node in switches:122 logging.debug("SessionReader(): relevant_session_to_JSON(): " + "traversing through switches; processing CC_DecisionNode_OVS")123 cc_dec_node_number = node.attrib["id"]124 #check if this is a decision node125 #if it isn't move on to the next switch126 if "CC_DecisionNode_OVS" not in device_services[cc_dec_node_number]:127 continue128 #get the source code for files129 #First get data from xml file130 #if it's not there, then it means that values are from default, so we use coresendmsg131 if (cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh") in switch_service_configurations:132 monitor_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh")]133 else: 134 monitor_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MyMonitor.sh")135 if (cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py") in switch_service_configurations:136 trigger_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py")]137 else: 138 trigger_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MyTrigger.py")139 if (cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py") in switch_service_configurations:140 swapper_code = switch_service_configurations[(cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py")]141 else: 142 swapper_code = self.get_node_file(cc_dec_node_number, "CC_DecisionNode_OVS", "MySwapper.py")143 #setup entry for node144 conditional_conn = {}145 conditional_conn["name"] = node.attrib["name"]146 conditional_conn["MyMonitor.sh"] = monitor_code147 conditional_conn["MyTrigger.py"] = trigger_code148 conditional_conn["MySwapper.py"] = swapper_code149 logging.debug("SessionReader(): relevant_session_to_JSON(): Processing node: " + str(conditional_conn))150 #now find all connected nodes 151 connected_nodes = []152 for link in links:153 connected_node = {}154 #find interface for node connected to switch155 if link.attrib["node1"] == cc_dec_node_number:156 connected_node["number"] = link.attrib["node2"]157 cc_dec_node_ifx = link.find("iface1")158 cc_node_ifx = link.find("iface2")159 if link.attrib["node2"] == cc_dec_node_number:160 connected_node["number"] = link.attrib["node1"]161 cc_dec_node_ifx = link.find("iface2")162 cc_node_ifx = link.find("iface1")163 if "number" not in connected_node:164 continue165 connected_node["node_type"] = "SWITCH"166 #wlan, switch, don't have ifx information167 if cc_node_ifx != None:168 #if we do have ifx information, we know it's a layer 3 model (router)169 connected_node["node_type"] = "router"170 #found a connection, now we have to add all details 171 #process remote node (cc_node)172 connected_node["cc_nic"] = cc_node_ifx.attrib["name"]173 connected_node["cc_mac"] = cc_node_ifx.attrib["mac"]174 if "ip4" in cc_node_ifx.attrib:175 connected_node["cc_ip4"] = cc_node_ifx.attrib["ip4"]176 connected_node["cc_ip4_mask"] = cc_node_ifx.attrib["ip4_mask"]177 if "ip6" in cc_node_ifx.attrib:178 connected_node["cc_ip6"] = cc_node_ifx.attrib["ip6"]179 connected_node["cc_ip6_mask"] = cc_node_ifx.attrib["ip6_mask"]180 #by default we consider this a cc_node181 connected_node["role"] = "cc_node"182 # #if node has the CC_Node service enabled, then we know this is a cc_node; otherwise, it's a gw183 # if connected_node["number"] in device_services:184 # if "CC_Node" in device_services[connected_node["number"]]:185 # #we know this is a good node186 # connected_node["role"] = "cc_node"187 # #now check if the connected switches/nets have the CC_Node service enabled188 # if connected_node["number"] in switch_services:189 # if "CC_Node" in switch_services[connected_node["number"]]:190 # #we know this is a good node191 # connected_node["role"] = "cc_node"192 #add information about the cc_dec node associated with this link193 connected_node["cc_dec_nic"] = cc_dec_node_ifx.attrib["name"]194 connected_node["cc_dec_mac"] = cc_dec_node_ifx.attrib["mac"]195 if "ip4" in cc_dec_node_ifx.attrib:196 connected_node["cc_dec_ip4"] = cc_dec_node_ifx.attrib["ip4"]197 connected_node["cc_dec_ip4_mask"] = cc_dec_node_ifx.attrib["ip4_mask"]198 if "ip6" in cc_dec_node_ifx.attrib:199 connected_node["cc_dec_ip6"] = cc_dec_node_ifx.attrib["ip6"]200 connected_node["cc_dec_ip6_mask"] = cc_dec_node_ifx.attrib["ip6_mask"]201 connected_node["connected"] = "False"202 connected_nodes.append(connected_node)203 conditional_conn["connected_nodes"] = connected_nodes204 conditional_conns[node.attrib["id"]] = conditional_conn205 return conditional_conns206 except Exception:207 logging.error("SessionReader(): relevant_session_to_JSON(): Error in relevant_session_to_JSON(): An error occured ")208 exc_type, exc_value, exc_traceback = sys.exc_info()209 traceback.print_exception(exc_type, exc_value, exc_traceback)210 return None211 def get_node_file(self, node_id, service_name, filename):212 logging.debug("SessionReader(): get_node_file(): instantiated")213 #First check if file exists:214 logging.debug("SessionReader(): get_node_file(): checking if file exists")215 res = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node_id +' OBJECT=services OPAQUE=service:'+service_name+' TYPE=1 -l --tcp'))216 file_exists = False217 for line in res.splitlines():218 if filename in line:219 file_exists = True220 break221 if file_exists == False:222 return ""223 #Get file contents224 logging.debug("SessionReader(): get_node_file(): getting file contents")225 res_code = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node_id +' OBJECT=services OPAQUE=service:'+service_name+':'+filename+' TYPE=1 -l --tcp'))226 file_code = ""227 code_section = False228 for code_line in res_code.splitlines():229 if code_line.startswith(" DATA:"):230 code_section = True231 file_code += code_line.split("DATA: ")[1] + "\n"232 continue233 if code_section:234 file_code += code_line + "\n"235 return file_code236 def get_conditional_conns(self, cc_dec_number):237 logging.debug("SessionReader(): get_conditional_conns(): instantiated")238 return self.conditional_conns[cc_dec_number]239 def get_node_services(self, node):240 logging.debug("SessionReader(): get_node_services(): instantiated")241 res_services = str(msg_ifx.send_command('-s'+self.session_number+' CONFIG NODE='+node.attrib["id"] +' OBJECT=services OPAQUE=service' +' TYPE=1 -l --tcp'))242 return res_services243if __name__ == "__main__":244 logging.basicConfig(level=logging.DEBUG)245 logging.debug("SessionReader(): instantiated")246 247 if len(sys.argv) < 2:248 logging.error("Usage: python test_session_reader.py <session-number>")249 exit() 250 251 sr = SessionReader(sys.argv[1])252 logging.debug("Printing All")253 state = sr.get_session_state254 logging.debug("Current session state: " + str(state))255 if state == None:256 logging.debug("Exiting since session data is not available: " + str(state))257 exit()...

Full Screen

Full Screen

fabric.py

Source:fabric.py Github

copy

Full Screen

1from srlinux.data import ColumnFormatter, TagValueFormatter, Border, Data, Borders, Alignment2from srlinux.mgmt.cli import CliPlugin3from srlinux.schema import FixedSchemaRoot4from srlinux.syntax import Syntax5from srlinux.location import build_path6from srlinux.mgmt.cli.plugins.reports.gnmi_lite import GNMIHandlerLite7import datetime8############################ INPUTs here... ############################# 9 # 10interfaces = [] # fill the interfaces list in or a pattern in the uplink descriptions11description = 'spine' # if both given: interfaces list has precedence over the description pattern12uplink_peer_group = 'spine' # this code assumes Leaf uplinks have eBPG peering with Spine/DCGW 13rr_peer_group = 'EVPN-NVO' # and iBGP EVPN with the route-reflector(s) 14uplink_network_instance = "default" # networks instance is ideally default15 #16#########################################################################17class Plugin(CliPlugin):18 19 def load(self, cli, **_kwargs):20 fabric = cli.show_mode.add_command(Syntax('fabric', help='shows how to give the input parameters for "show fabric" commands'))21 help = fabric.add_command(Syntax('help', help='requires uplinks, route-reflector, statistics or summary keywords'),update_location=False, callback=self._show_help)22 summary = fabric.add_command(Syntax('summary', help='shows uplinks, route-reflector and statistics all together'), update_location=False, callback=self._show_summary, schema=self._get_schema())23 uplink = fabric.add_command(Syntax('uplink', help='shows uplinks in a table'), update_location=False, callback=self._show_uplinks, schema=self._get_schema())24 route_reflector = fabric.add_command(Syntax('route-reflector', help='shows route-reflectors in a table'), update_location=False, callback=self._show_rr, schema=self._get_schema())25 statistics = fabric.add_command(Syntax('statistics', help='shows statistics in a table'), update_location=False, callback=self._show_stats, schema=self._get_schema())26 def _show_help (self,state,output,**_kwargs):27 print('''28 This 'show fabric' command shows you statistics and the status of the uplinks and BGP peerings. 29 Therefore it requires some inputs that need to be added in the 'fabric.py' file.30 31 '/opt/srlinux/python/virtual-env/lib/python3.6/site-packages/srlinux/mgmt/cli/plugins/reports/fabric.py'32 33 Example:34 interfaces = [] # fill the interfaces list in or the description pattern35 description = 'spine' 36 uplink_peer_group = 'spine'37 rr_peer_group = 'EVPN-NVO'38 ''')39 def _show_summary(self, state, output, **_kwargs):40 header = f'Fabric Connectivity Report'41 result_header = self._populate_header(header)42 self._set_formatters_header(result_header)43 output.print_data(result_header)44 self._show_uplinks(state,output)45 self._show_rr(state,output)46 self._show_stats(state,output)47 48 def _show_uplinks(self, state, output, **_kwargs):49 result = Data(self._get_schema())50 self._set_formatter_uplink(result)51 with output.stream_data(result): 52 self._populate_data(result, state)53 54 def _show_rr(self, state, output, **_kwargs):55 result_rr = Data(self._get_schema())56 self._set_formatters_rr(result_rr)57 with output.stream_data(result_rr): 58 self._populate_data_rr(result_rr, state)59 60 def _show_stats(self, state, output, **_kwargs):61 result_stats = Data(self._get_schema())62 self._set_formatters_stats(result_stats)63 with output.stream_data(result_stats): 64 self._populate_data_stats(result_stats, state) 65 66 def _get_header_schema(self):67 root = FixedSchemaRoot()68 root.add_child(69 'header',70 fields=['Summary']71 )72 return root73 def _get_schema(self):74 root = FixedSchemaRoot()75 uplink_header = root.add_child(76 'uplink_header',77 fields=['uplinks']78 )79 uplink_header.add_child(80 'uplink_child',81 key='Local Interface',82 fields=['Local Router', 'Link Status','eBGP Status','Remote Router', 'Remote Interface']83 )84 rr_header = root.add_child(85 'rr_header',86 fields=['Route Reflectors']87 )88 rr_header.add_child(89 'rr_child',90 key='Route Reflector Address',91 fields=['iBGP Status', 'Neighbor Description', 'Rx/Active/Tx', 'Uptime (hh:mm:ss)']92 )93 stats_header = root.add_child(94 'stats_header',95 fields=['Uplink Stats']96 )97 stats_header.add_child(98 'stats_child',99 key='Local Interface',100 fields=['Traffic Bps In/Out','Packets In/Out', 'Errored In/Out', 'FCS Err', 'CRC Err','Transceiver Volt']101 )102 return root103 def _fetch_state(self, state, uplink, interf):104 int_oper_state_path = build_path(f'/interface[name={interf[0]}]/subinterface[index={interf[1]}]/oper-state')105 self.int_oper_state_data = state.server_data_store.stream_data(int_oper_state_path, recursive=True)106 107 sys_name_path = build_path(f'/system/name/host-name')108 self.sys_name_data = state.server_data_store.stream_data(sys_name_path, recursive=True)109 110 lldp_neighbor_path = build_path(f'/system/lldp/interface[name={interf[0]}]/neighbor')111 self.lldp_neighbor_data = state.server_data_store.stream_data(lldp_neighbor_path, recursive=True)112 113 session_state_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor[peer-address={uplink}]')114 self.session_state_data = state.server_data_store.stream_data(session_state_path, recursive=True)115 116 def _fetch_state_rr(self, state, rr):117 rr_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor[peer-address={rr}]')118 self.rr_data = state.server_data_store.stream_data(rr_path, recursive=True) 119 120 time_path = build_path(f'/system/information/current-datetime')121 self.time_data = state.server_data_store.stream_data(time_path, recursive=True) 122 123 # rr_tcp_path = build_path(f'/network-instance[name={uplink_network_instance}]/tcp/connections[remote-address={rr}]')124 # self.rr_tcp_data = state.server_data_store.stream_data(rr_tcp_path, recursive=True) 125 def _fetch_state_stats(self, state, uplink): 126 gen_path = build_path(f'/interface[name={uplink}]')127 self.gen_data = state.server_data_store.stream_data(gen_path, recursive=True)128 129 def _time_handler (self, dt0, dt1):130 now = datetime.datetime(int(dt0[:4]),int(dt0[5:7]),int(dt0[8:10]),int(dt0[11:13]),int(dt0[14:16]),int(dt0[17:19])) 131 then = datetime.datetime(int(dt1[:4]),int(dt1[5:7]),int(dt1[8:10]),int(dt1[11:13]),int(dt1[14:16]),int(dt1[17:19])) 132 return (now-then)133 134 def _populate_header(self, header):135 result_header = Data(self._get_header_schema())136 data = result_header.header.create()137 data.summary = header138 return result_header139 def _populate_peer_list(self, state, group):140 peer_list_path = build_path(f'/network-instance[name={uplink_network_instance}]/protocols/bgp/neighbor/peer-group')141 peer_list_data = state.server_data_store.stream_data(peer_list_path, recursive=True) 142 peer_list = []143 for peer in peer_list_data.network_instance.get().protocols.get().bgp.get().neighbor.items():144 if peer.peer_group == group :145 peer_list.append(peer.peer_address) 146 if not peer_list: print(f"No peer in <<{group}>> group!")147 return peer_list148 149 def _populate_interface_list(self, state, description):150 int_list_path = build_path(f'/interface[name=*]/subinterface[index=*]/description')151 int_list_data = state.server_data_store.stream_data(int_list_path, recursive=True) 152 int_list = []153 for i in int_list_data.interface.items():154 if description in i.subinterface.get().description:155 interfaces.append(f'{i.name}.{i.subinterface.get().index}')156 if not interfaces: print(f"No interface has <<{description}>> in it's description!")157 def _populate_data(self, result, state):158 uplink_peer_list = self._populate_peer_list(state,uplink_peer_group) 159 result.synchronizer.flush_fields(result)160 if not interfaces: self._populate_interface_list(state,description)161 i=0162 data = result.uplink_header.create()163 for interface in interfaces:164 server_data = self._fetch_state(state, uplink_peer_list[i], interface.split('.'))165 data_child = data.uplink_child.create(interfaces[i].split('.')[0])166 data_child.local_router = self.sys_name_data.system.get().name.get().host_name or '<Unknown>'167 data_child.link_status = self.int_oper_state_data.interface.get().subinterface.get().oper_state or '<Unknown>'168 data_child.ebgp_status = self.session_state_data.network_instance.get().protocols.get().bgp.get().neighbor.get().session_state or '<Unknown>'169 data_child.remote_router = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().system_name or '<Unknown>'170 data_child.remote_router = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().system_name or '<Unknown>'171 data_child.remote_interface = self.lldp_neighbor_data.system.get().lldp.get().interface.get().neighbor.get().port_id or '<Unknown>'172 data_child.synchronizer.flush_fields(data_child)173 i=i+1174 result.synchronizer.flush_children(result.uplink_header) 175 return result176 def _populate_data_rr(self, result, state): 177 rr_peer_list = self._populate_peer_list(state,rr_peer_group)178 result.synchronizer.flush_fields(result)179 i=0180 data = result.rr_header.create()181 for rr in rr_peer_list: 182 server_data = self._fetch_state_rr(state, rr)183 data_child = data.rr_child.create(rr)184 data_child.ibgp_status = self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().session_state or '<Unknown>'185 data_child.neighbor_description = self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().description or '<Unknown>'186 data_child.rx_active_tx = f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().received_routes}/'\187 f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().active_routes}/'\188 f'{self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().evpn.get().sent_routes}'189 data_child.uptime__hh_mm_ss_ = self._time_handler(self.time_data.system.get().information.get().current_datetime, self.rr_data.network_instance.get().protocols.get().bgp.get().neighbor.get().last_established)190 data_child.synchronizer.flush_fields(data_child)191 i=i+1192 result.synchronizer.flush_children(result.rr_header) 193 return result194 def _populate_data_stats(self, result, state):195 uplink_peer_list = self._populate_peer_list(state,uplink_peer_group)196 result.synchronizer.flush_fields(result)197 if not interfaces: self._populate_interface_list(state,description)198 i=0199 data = result.stats_header.create()200 for interface in interfaces:201 server_data = self._fetch_state_stats(state, interface.split('.')[0])202 data_child = data.stats_child.create(interface.split('.')[0])203 data_child.traffic_bps_in_out = f'{self.gen_data.interface.get().traffic_rate.get().in_bps}/{self.gen_data.interface.get().traffic_rate.get().in_bps}'204 data_child.packets_in_out = f'{self.gen_data.interface.get().statistics.get().in_unicast_packets}/{self.gen_data.interface.get().statistics.get().out_unicast_packets or "<Unknown>"}'205 data_child.errored_in_out = f'{self.gen_data.interface.get().statistics.get().in_error_packets }/{self.gen_data.interface.get().statistics.get().out_error_packets}'206 data_child.fcs_err = self.gen_data.interface.get().statistics.get().in_fcs_error_packets207 data_child.crc_err = self.gen_data.interface.get().ethernet.get().statistics.get().in_crc_error_frames208 try: data_child.transceiver_volt = self.gen_data.interface.get().transceiver.get().voltage.get().latest_value or 'N/A'209 except: data_child.transceiver_volt = 'N/A'210 data_child.synchronizer.flush_fields(data_child)211 i=i+1212 result.synchronizer.flush_children(result.stats_header) 213 return result214 215 def _set_formatters_header(self, data):216 data.set_formatter('/header',Border(TagValueFormatter())) 217 218 def _set_formatters_stats(self, data):219 data.set_formatter('/stats_header/stats_child', ColumnFormatter()) 220 221 def _set_formatters_rr(self, data):222 data.set_formatter('/rr_header/rr_child', ColumnFormatter())223 def _set_formatter_uplink(self, data):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful