Best Python code snippet using SeleniumLibrary
crawler_template.py
Source:crawler_template.py  
...47        self.log_clean()48        self.__SETTINGS__()49        self.main()50    def __SETTINGS__(self):51        self.log_start(extra=1)52        # BROWSER CONFIG53        ################54        # Configured for MacOS55        self.CHROME_PATH = "/Users/home/Library/Application Support/Google/Chrome/"56        self.CHROME_PROFILE = "Profile 5"57        # Don't change this. It's the only option for slenium_stealth58        self.USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.53 Safari/537.36"59        # PROJECT SPECIFIC60    def main(self):61        self.log_start(extra=1)62        self.web()63    ###################################64    # GENERAL HELPERS AND UTILS BEGIN #65    ###################################66    # SELENIUM CONFIG67    #################68    def web(self):69        self.log_start()70        self.web_options()71        self.web_driver()72        self.web_action()73        self.web_shh()74        self.web_clear()75    def web_options(self):76        self.log_start()77        self.options = wd.ChromeOptions()78        self.options.add_argument(f"--user-data-dir={self.CHROME_PATH}")79        self.options.add_argument(f"--profile-directory={self.CHROME_PROFILE}")80        self.options.add_argument("--user-agent=%s" % self.USER_AGENT)81        self.options.add_argument("start-maximized")82        self.options.add_experimental_option("excludeSwitches", ["enable-automation"])83        self.options.add_experimental_option("useAutomationExtension", 0)84    def web_driver(self):85        self.log_start()86        self.driver = sv(CHROME_DRIVER)87        self.driver = wd.Chrome(service=self.driver, options=self.options)88        self.driver.set_script_timeout(1000)89        self.driver.implicitly_wait(30)90    def web_action(self):91        self.log_start()92        self.action = ac(self.driver)93    def web_shh(self):94        self.log_start()95        shh(96            driver=self.driver,97            user_agent=self.USER_AGENT,98            languages=["en-US", "en"],99            vendor="Google Inc.",100            platform="Win32",101            webgl_vendor="Intel Inc.",102            renderer="Intel Iris OpenGL Engine",103            fix_hairline=1,104            run_on_insecure_origins=0,105        )106    def web_clear(self):107        self.log_start()108        # I think they place a blocked cookie in browser ...109        # sneaky little fuckers. Ruins the proxy rotation110        self.driver.delete_all_cookies()111    # SELENIUM HELPERS112    ##################113    def web_go(self, url, get=0, https=1):114        self.log_start()115        if https:116            self.driver.get(f"https://{url}")117        else:118            self.driver.get(f"{url}")119        if get:120            return self.bs_html()121    def web_scroll(self, count):122        self.log_start()123        for i in range(count):124            l0(f"{stack()[1][3]} status ({i}/{count})")125            html = self.bs_html()126            body = self.bs_element(html, key="body")127            l0("Down keys to be sent.")128            body.send_keys(Keys.PAGE_DOWN)129            self.zzz(mn=0.25 , mx=1)130    def web_type(self, element, txt):131        self.log_start()132        element.click()133        for char in txt:134            self.zzz(mn=0.1, mx=0.2)135            pf = [1] * 59136            pf.append(0)137            pf = choice(pf)138            if pf == 1:139                element.send_keys(char)140            else:141                element.send_keys(choice(abc))142                element.send_keys(Keys.BACK_SPACE)143                element.send_keys(char)144    # BS4 HELPERS145    #############146    def bs_children(self, element):147        self.log_start()148        elements = element.children149        for e in elements:150            if len(str(e)) > 1:151                yield bs(str(e), "html.parser")152    def bs_click(self, html, key, val=0, n=0):153        self.log_start()154        if val:155            element = html.find_all(attrs={key: val})[n]156        else:157            element = html.find_all(key)[n]158        xpath = self.bs_xpath(element)159        wdw(self.driver, 30).until(160            ec.element_to_be_clickable((By.XPATH, xpath))161        ).click()162        self.zzz()163    def bs_element(self, html, key, val=0, n=0):164        self.log_start()165        if val:166            element = html.find_all(attrs={key: val})[n]167        else:168            element = html.find_all(key)[n]169        xpath = self.bs_xpath(element)170        l0(f"grabbed xpath: {xpath}")171        element = self.driver.find_element(By.XPATH, xpath)172        l0(f"found element: {element}")173        return element174    def bs_html(self, pretty=0):175        self.log_start()176        src = self.driver.page_source177        l0("grabbed page src")178        html = bs(src, "html.parser")179        l0("parsed page src")180        if pretty:181            l0("making html pretty")182            return html.prettify()183        l0("html complete")184        return html185    def bs_read(self, page):186        self.log_start()187        f = open(f"{DIR_HTML}/{page}.html", "r")188        html = f.read()189        soup = bs(html, "html.parser")190        self.bs_save(page=page, html=soup)191        return soup192    def bs_save(self, page, html=0):193        self.log_start()194        if html:195            html = html.prettify()196        else:197            html = self.bs_html(pretty=1)198        with open(f"{DIR_HTML}{page}.html", "w") as fout:199            fout.write(html)200    def bs_title(self):201        html = self.bs_html(pretty=1)202        soup = bs(html, "html.parser")203        title = soup.title.string204        title = self.str_clean(str(title))205        return title206    def bs_xpath(self, element):207        self.log_start()208        """209        Generate xpath of soup element210        :param element: bs4 text or node211        :return: xpath as string212        """213        components = []214        child = element if element.name else element.parent215        for parent in child.parents:216            """217            @type parent: bs4.element.Tag218            """219            previous = islice(parent.children, 0, parent.contents.index(child))220            xpath_tag = child.name221            xpath_index = sum(1 for i in previous if i.name == xpath_tag) + 1222            components.append(223                xpath_tag if xpath_index == 1 else "%s[%d]" % (xpath_tag, xpath_index)224            )225            child = parent226        components.reverse()227        return "/%s" % "/".join(components)228    # LOGGING HELPERS229    #################230    def log_config(self):231        t = str(dt.now()).split(".")[0].replace(" ", "@")232        lbc(233            filename=f"{DIR_LOGS}{t}.log",234            encoding="utf-8",235            format="%(asctime)s - %(levelname)s - %(message)s",236            datefmt="%Y-%m-%d@%H:%M:%S",237            level=0,238        )239        lsh().setLevel(0)240    def log_clean(self, store=3):241        self.log_start()242        logs = listdir(DIR_LOGS)243        names = []244        for i in range(len(logs)):245            names.append(str(logs[i]).split(".")[0])246        names.sort(key=lambda date: dt.strptime(date, "%Y-%m-%d@%H:%M:%S"))247        keepers = names[-store:]248        for f in logs:249            name = str(f).split(".")[0]250            if name not in keepers:251                remove(f"{DIR_LOGS}{f}")252    def log_start(self, nap=0, extra=0, n=3):253        if extra:254            look = ">" * 45255        else:256            look = ">" * 15257        name = str(stack()[1][n])258        l0(f"{look} starting {name} ")259        if nap:260            self.zzz()261        return name262    def log_except(self, e):263        look = "!" * 15264        return l3(f"{look} error at {stack()[1][3]}: {e}")265    # JSON HELPERS266    ##############267    def json_dump(self, content, dest):268        self.log_start()269        pth = f"{DIR_ROOT}/{uu()}{uu()}.json"270        with open(pth, "w") as f:271            dump(content, f, indent=4)272        try:273            with open(pth) as f:274                q = load(f)275            replace(pth, dest)276        except:277            remove(pth)278    def json_load(self, pth):279        self.log_start()280        try:281            with open(pth, "r") as json_file:282                data = load(json_file)283        except FileNotFoundError as e:284            self.log_except(e)285            data = {}286            self.json_dump(data, pth)287        return data288    # OTHER289    #######290    # string cleaning hahaha .. get it? like sPring cleaning hahaha!291    def str_clean(self, s, json=0):292        self.log_start()293        s = sub(r"â¦", "...", s)294        s = sub(r"[`ââââ¸â¸â¸â¸â¸â¸]", "'", s)295        s = sub(r"[ââ]|(\'\')|(,,)", '"', s)296        s = sub(r"\s+", " ", s).strip()297        if json:298            l0("cleaning json")299            s = s.replace("'", '"')300        return s301    def zzz(self, mn=1, mx=5):302        t = round(uniform(mn, mx), 7)303        if t > 5:304            l0(f"sleeping for {t}")305        sleep(t)306if __name__ == "__main__":...test_double_license.py
Source:test_double_license.py  
...52		return dobData53	u'''æ·»å å人ææ'''54	def add_double_license_001(self):55		#æ¥å¿å¼å§è®°å½56		self.log.log_start("add_double_license")57		#è·åå人ææçæ°æ®58		dobData = self.get_double_data("add_double_license")59		for dataRow in range(len(dobData)):60			data = dobData[dataRow]61			try:62				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®63				if dataRow != 0:64					self.double.click_double_license_button(data[1])65					self.authElem.click_all_approver()66					self.authElem.click_all_candidate()67					self.authElem.click_start_association()68					self.authElem.click_create_relate()69					self.cmf.click_login_msg_button()70					#ç¹å»è¿å71					self.authElem.click_child_page_back_button()72					self.log.log_detail(u"æ·»å åäººæææå", True)73			except Exception as e:74				print ("add_double_license fail: ") + str(e)75		self.log.log_end("add_double_license")76	u'''å人审æ¹åç»ç«¯å®¡æ¹'''77	def same_termina_approvel_002(self):78		self.cmf.select_role_by_text(u"è¿ç»´æä½å")79		#æ¥å¿å¼å§è®°å½80		self.log.log_start("same_termina_approvel")81		#è·åå人审æ¹ç³è¯·çæ°æ®82		dobData = self.get_double_data("double_license_sso")83		for dataRow in range(len(dobData)):84			data = dobData[dataRow]85			try:86				#妿æ¯ç¬¬1è¡,è¯»åæ°æ®87				if dataRow == 1:88					self.double.send_double_license_applicant(data)89					self.double.check_ico_len(data[1])90					self.loginElem.quit()91			except Exception as e:92				print ("same_termina_approvel fail: ") + str(e)93		self.log.log_end("same_termina_approvel")94	u'''å人审æ¹ç³è¯·äººå·²ä¸çº¿å®¡æ¹è¿æ'''95	def termina_expired_approvel_003(self):96		self.comsuit.use_new_user_login()97		#æ¥å¿å¼å§è®°å½98		self.log.log_start("Expired_approvel")99		#è·åå人审æ¹ç³è¯·çæ°æ®100		dobData = self.get_double_data("double_license_sso")101		expData = self.get_double_data("termina_approvel")102		for dataRow in range(len(dobData)):103			data = dobData[dataRow]104			try:105				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®106				if dataRow == 2:107					self.double.send_double_license_applicant(data)108					number = self.acproval.get_new_process_number()109					self.loginElem.quit()110					self.double.approver_by_process_approval(expData, number)111			except Exception as e:112				print ("Expired_approvel fail: ") + str(e)113		self.log.log_end("Expired_approvel")114	u'''å人审æ¹å®¡æ¹äººæç»ç³è¯·'''115	def termina_deny_approvel_004(self):116		self.comsuit.use_new_user_login()117		#æ¥å¿å¼å§è®°å½118		self.log.log_start("deny_double_approvel")119		#è·åå人审æ¹ç³è¯·çæ°æ®120		dobData = self.get_double_data("double_license_sso")121		expData = self.get_double_data("deny_approvel")122		for dataRow in range(len(dobData)):123			data = dobData[dataRow]124			try:125				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®126				if dataRow == 2:127					self.double.send_double_license_applicant(data)128					number = self.acproval.get_new_process_number()129					self.double.approver_remote_approval(expData, number)130					self.cmf.select_menu(u"è¿ç»´æä½", u"SSO")131					self.double.check_ico_len(data[1])132					self.loginElem.quit()133			except Exception as e:134				print ("deny_double_approvel fail: ") + str(e)135		self.log.log_end("deny_double_approvel")136	u'''å人审æ¹å®¡æ¹äººåæç³è¯·'''137	def termina_agree_approvel_005(self):138		self.comsuit.use_new_user_login()139		#æ¥å¿å¼å§è®°å½140		self.log.log_start("agree_double_approvel")141		#è·åå人审æ¹ç³è¯·çæ°æ®142		dobData = self.get_double_data("double_license_sso")143		expData = self.get_double_data("agree_approvel")144		for dataRow in range(len(dobData)):145			data = dobData[dataRow]146			try:147				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®148				if dataRow == 2:149					self.double.send_double_license_applicant(data)150					number = self.acproval.get_new_process_number()151					self.double.approver_remote_approval(expData, number)152					self.cmf.select_menu(u"è¿ç»´æä½", u"SSO")153					self.double.check_ico_len(data[1])154					self.loginElem.quit()155			except Exception as e:156				print ("agree_double_approvel fail: ") + str(e)157		self.log.log_end("agree_double_approvel")158	u'''访é®å®¡æ¹æµç¨ä»»å¡æ¥è¯¢'''159	def double_query_process_task_006(self):160		#æ¥å¿å¼å§è®°å½161		self.log.log_start("double_query_process_task")162		#è·åæµç¨ä»»å¡æ¥è¯¢çæ°æ®163		taskData = self.get_double_data("process_task")164		for dataRow in range(len(taskData)):165			data = taskData[dataRow]166			try:167				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®168				if dataRow != 0:169					if dataRow == 1:170						self.acproval.user_login(data[1])171					self.flow.query_process_task(data)172					self.log.log_detail(data[0], True)173			except Exception as e:174				print ("double_query_process_task fail: ") + str(e)175		self.log.log_end("double_query_process_task")176	u'''访é®å®¡æ¹ä¸ªäººå岿¥è¯¢'''177	def double_query_personal_history_007(self):178		#æ¥å¿å¼å§è®°å½179		self.log.log_start("double_query_personal_history")180		#è·å个人å岿¥è¯¢çæ°æ®181		perData = self.get_double_data("personal_history")182		for dataRow in range(len(perData)):183			data = perData[dataRow]184			try:185				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®186				if dataRow != 0:187					self.flow.query_personal_history(data)188					self.log.log_detail(data[0], True)189			except Exception as e:190				print ("double_query_personal_history fail: ") + str(e)191		self.loginElem.quit()192		self.log.log_end("double_query_personal_history")193	u'''访é®å®¡æ¹ç³è¯·å岿¥è¯¢'''194	def double_query_apply_history_008(self):195		self.comsuit.use_new_user_login()196		#æ¥å¿å¼å§è®°å½197		self.log.log_start("double_query_apply_history")198		#è·åç³è¯·å岿¥è¯¢çæ°æ®199		applyData = self.get_double_data("apply_history")200		for dataRow in range(len(applyData)):201			data = applyData[dataRow]202			try:203				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®204				if dataRow != 0:205					self.flow.query_apply_history(data)206					self.log.log_detail(data[0], True)207			except Exception as e:208				print ("double_query_apply_history fail: ") + str(e)209		self.log.log_end("double_query_apply_history")210	u'''访é®å®¡æ¹å
¨é¨å岿¥è¯¢'''211	def double_query_all_history_009(self):212		self.comsuit.dep_switch_to_sys()213		#æ¥å¿å¼å§è®°å½214		self.log.log_start("double_query_all_history")215		#è·åå
¨é¨å岿¥è¯¢çæ°æ®216		allData = self.get_double_data("all_history")217		for dataRow in range(len(allData)):218			data = allData[dataRow]219			try:220				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®221				if dataRow != 0:222					self.flow.query_all_history(data)223					self.log.log_detail(data[0], True)224			except Exception as e:225				print ("double_query_all_history fail: ") + str(e)226		self.log.log_end("double_query_all_history")227	u'''访é®å®¡æ¹é¨é¨å岿¥è¯¢'''228	def double_query_department_history_010(self):229		self.comsuit.sys_switch_to_dep()230		#æ¥å¿å¼å§è®°å½231		self.log.log_start("double_query_department_history")232		#è·åæµç¨ä»»å¡æ¥è¯¢çæ°æ®233		deprtData = self.get_double_data("department_history")234		for dataRow in range(len(deprtData)):235			data = deprtData[dataRow]236			try:237				#妿䏿¯ç¬¬1è¡,è¯»åæ°æ®238				if dataRow != 0:239					self.flow.query_department_history(data)240					self.log.log_detail(data[0], True)241			except Exception as e:242				print ("double_query_department_history fail: ") + str(e)...simplemovingaverage.py
Source:simplemovingaverage.py  
1import datetime2import sys3import pandas as pd4from live_trading.logging_ import logging_5from .signals import Signals6STOP_LOSS = .27MINS_SELL_BEFORE_EOB =  158class SimpleMovingAverage(Signals):9    def __init__(self, strategy, long_periods, short_periods):10        super(SimpleMovingAverage, self).__init__()11        self.strategy = strategy12        self.long_periods = long_periods13        self.short_periods = short_periods14    # @classmethod15    def __buy_conditions(self, ticker):16        log_start = datetime.datetime.now()17        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))18        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())19        # try:20        #     short_ma_greater_than_long = self.__moving_averages(ticker)21        # except:22        #     short_ma_greater_than_long = True23        return_minimal = True24        status = True25        while status:26            # if not short_ma_greater_than_long:27            #     return False28            if not return_minimal:29                status = False30            break31        if not status:32            self.strategy.add_to_manager(self.strategy.strat_name, 'n False buy_conditions', 'increment_1')33        log.log(return_minimal)34        return status35    def __sell_conditions(self, ticker):36        log_start = datetime.datetime.now()37        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))38        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())39        price_higher_than_bought = self.__price_higher_than_bought(ticker)40        sell_at_eob = self.__sell_at_eob_f()41        try:42            short_ma_greater_than_long = self.__moving_averages(ticker)43        except Exception as e:44            short_ma_greater_than_long = True45            print('this may be problematic')46        status = True47        while status:48            if not price_higher_than_bought:49                status = False50            if not sell_at_eob:51                status = False52            if short_ma_greater_than_long:53                status = False54            break55        if not status:56            self.strategy.add_to_manager(self.strategy.strat_name, 'n False sell_conditions', 'increment_1')57        log.log(price_higher_than_bought58                ,short_ma_greater_than_long59                ,sell_at_eob)60        return status61    # @print_meth_name62    def price_size_select(self, ticker, ix, side):63        log_start = datetime.datetime.now()64        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))65        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())66        if side == 1:67            items = ticker.domBids68        else:69            items = ticker.domAsks70        _len = self.strategy.lt._ticker_len_n_type_check(items)71        if _len > 1:72            # if self.market_open.debugging:73            if self.strategy.debugging.dummy_data:74                # price, size = float(random.randint(10, 100)), random.randint(100, 10000)75                price, size = items[ix].price, items[ix].size76            else:77                price, size = items[ix].price, items[ix].size78        else:79            # if self.market_open.debugging:80            if self.strategy.debugging.dummy_data:81                # price, size = float(random.randint(10, 100)), random.randint(100, 10000)82                price, size = items.price, items.size83            else:84                price, size = items.price, items.size85        log.log(price86                ,size)87        return price, size88    def sell_at_eob_f(self):89        log_start = datetime.datetime.now()90        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))91        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())92        if self.strategy.debugging.dummy_time:93            _now = self.strategy.lt._manipulated_time(datetime.datetime.now())94        else:95            _now = datetime.datetime.now()96        if self.strategy.eob_min >= MINS_SELL_BEFORE_EOB:97            eob_dt = datetime.datetime.strptime("{}:{}:00".format(self.strategy.eob_hour,98                                                                  str(30 - MINS_SELL_BEFORE_EOB)), '%H:%M:%S')99        else:100            hour = self.strategy.eob_hour - 1101            minute = 60 + self.strategy.eob_min - MINS_SELL_BEFORE_EOB102            eob_dt = datetime.datetime.strptime("{}:{}:00".format(str(hour), str(minute)),'%H:%M:%S')103        sell_at_eob = self.strategy.lt.us_tz_op(_now).time() > eob_dt.time()104        log.log(sell_at_eob)105        return sell_at_eob106    def __price_higher_than_bought(self, ticker):107        log_start = datetime.datetime.now()108        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))109        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())110        symbol =  ticker.contract.symbol111        held_ix = self.strategy.lt.portfolio_instr.index(symbol)112        held_price_ix = 2113        held_price = self.strategy.lt.portfolio[held_ix][held_price_ix]114        rank = 0115        if self.strategy.lt._ticker_len_n_type_check(ticker.domBids) > 1:116            curr_price = ticker.domAsks[rank].price117        else:118            curr_price = ticker.domAsks.price119        log.log(self.strategy.lt.portfolio120                ,curr_price)121        if held_price < curr_price:122            return True123        else:124            return False125    def stop_loss_f(self, ticker):126        log_start = datetime.datetime.now()127        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))128        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())129        if self.strategy.lt._ticker_len_n_type_check(ticker.domAsks) > 1:130            current_ask_price = ticker.domAsks[0].price131        else:132            current_ask_price = ticker.domAsks.price133        change = (current_ask_price - self.strategy.lt.portfolio[ticker.contract.symbol]) / current_ask_price134        log.log(current_ask_price135                ,change)136        if change < -STOP_LOSS:137            return True138        else:139            return False140    # @classmethod141    def __moving_averages(self, ticker):142        log_start = datetime.datetime.now()143        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))144        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())145        # ma procedure146        long_ma, short_ma = self.__ma_calc('collected', ticker)147        if long_ma == np.nan and short_ma == np.nan:148            raise Exception('not defined')149        elif long_ma > short_ma:150            short_ma_greater_than_long = False151        else:152            short_ma_greater_than_long = True153        log.log(long_ma154                ,short_ma)155        return short_ma_greater_than_long156    def __ma_calc(self, by, ticker):157        log_start = datetime.datetime.now()158        log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))159        self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())160        long_ma, short_ma = None, None161        if by == 'collected':162            rank = 0163            # side164            # bid = 1165            # ask = 0166            side = 1167            self.filtered_df = self.strategy.__hist_data_winners_df[(self.strategy.__hist_data_winners_df['symbol'] == ticker.contract.symbol) &168                                                                    (self.strategy.__hist_data_winners_df['side'] == side) &169                                                                    (self.strategy.__hist_data_winners_df['rank'] == rank)]170            _length = self.filtered_df.shape[0]171            if _length < self.long_periods:172                long_ma, short_ma = np.nan, np.nan173            else:174                long_ma = self.filtered_df.sort_values('unix_ts', ascending=False)[0:self.long_periods]['price'].mean()175                short_ma = self.filtered_df.sort_values('unix_ts', ascending=False)[0:self.short_periods]['price'].mean()176                # print(long_ma, short_ma)177                _unix_ts = self.filtered_df['unix_ts'].sort_values(ascending=False).iloc[0]178                if self.strategy.ma_counter == 0:179                    self._ma_collection_df = pd.DataFrame([[ticker.contract.symbol, _unix_ts, short_ma, long_ma]])180                else:181                    add_df = pd.DataFrame([[ticker.contract.symbol,182                               _unix_ts,183                               short_ma,184                               long_ma]])185                    self._ma_collection_df = pd.concat([self._ma_collection_df, add_df])186        log.log(long_ma187                ,short_ma)...rootfinder.py
Source:rootfinder.py  
1import jax.numpy as jnp2from jax import jit, grad, vmap3from jax import random4from jax import lax5from jax.ops import index, index_update6from jax.flatten_util import ravel_pytree7from functools import partial8@partial(jit, static_argnums=(0,))9def dual_bisect_method(10    func,11    aL=-1e5, bL=-1e-5, aR=1e-5, bR=1e5,12    tol=1e-6, maxiter=100):13    batch_func = vmap(func, (0))14    i = maxiter-1.015    saL, sbL, saR, sbR = jnp.sign(batch_func(jnp.array([aL, bL, aR, bR])))16    init_val = [aL, bL, aR, bR, saL, sbL, saR, sbR, i]17    def cond_fun(val):18        aL, bL, aR, bR, saL, sbL, saR, sbR, i = val19        return jnp.sum(bL-aL) + jnp.sum(bR-aR) + 100 * jnp.minimum(i, 0.0) > tol20    def body_fun(val):21        # unpack val22        aL, bL, aR, bR, saL, sbL, saR, sbR, i = val23        # new center points24        cL = (aL+bL)/2.025        cR = (aR+bR)/2.026        scL, scR = jnp.sign(batch_func(jnp.array([cL, cR])))27        # L28        aL = jnp.sum(cL * jnp.maximum( scL * saL, 0.0) + \29            aL * jnp.maximum( -1.0 * scL * saL, 0.0))30        bL = jnp.sum(cL * jnp.maximum( scL * sbL, 0.0) + \31            bL * jnp.maximum( -1.0 * scL * sbL, 0.0))32        saL = jnp.sum(scL * jnp.maximum( scL * saL, 0.0) + \33            saL * jnp.maximum( -1.0 * scL * saL, 0.0))34        sbL = jnp.sum(scL * jnp.maximum( scL * sbL, 0.0) + \35            sbL * jnp.maximum( -1.0 * scL * sbL, 0.0))36        # R37        aR = jnp.sum(cR * jnp.maximum( scR * saR, 0.0) + \38            aR * jnp.maximum( -1.0 * scR * saR, 0.0))39        bR = jnp.sum(cR * jnp.maximum( scR * sbR, 0.0) + \40            bR * jnp.maximum( -1.0 * scR * sbR, 0.0))41        saR = jnp.sum(scR * jnp.maximum( scR * saR, 0.0) + \42            saR * jnp.maximum( -1.0 * scR * saR, 0.0))43        sbR = jnp.sum(scR * jnp.maximum( scR * sbR, 0.0) + \44            sbR * jnp.maximum( -1.0 * scR * sbR, 0.0))45        i = i - 146        val = [aL, bL, aR, bR, saL, sbL, saR, sbR, i]47        return val48    val = lax.while_loop(cond_fun, body_fun, init_val)49    # unpack val50    aL, bL, aR, bR, saL, sbL, saR, sbR, i = val51    # new center points52    cL = (aL+bL)/2.053    cR = (aR+bR)/2.054    return [cL, cR]55@partial(jit, static_argnums=(0,))56def choose_start(57    func,58    log_start = -3.0, log_space = 1.0 / 5.0):59    batch_func = vmap(func, (0))60    i = 061    aL = -1.0 * jnp.power(10.0, log_start + i * log_space)62    bR = jnp.power(10.0, log_start + i * log_space)63    aL_val, bR_val = batch_func(jnp.array([aL, bR]))64    init_val = [aL, bR, aL_val, bR_val, i]65    def cond_fun(val):66        aL, bR, aL_val, bR_val, i = val67        return jnp.maximum(aL_val, 0.0) + jnp.maximum(bR_val, 0.0) > 0.068    def body_fun(val):69        aL, bR, aL_val, bR_val, i = val70        i = i+171        sign_aL = jnp.sign(aL_val)72        sign_bR = jnp.sign(bR_val)73        aL = jnp.sum(-1.0 * jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_aL, 0.0) \74                + aL * jnp.maximum(-1.0 * sign_aL, 0.0))75        bR = jnp.sum(jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_bR, 0.0) \76                + bR * jnp.maximum(-1.0 * sign_bR, 0.0))77        aL_val, bR_val = batch_func(jnp.array([aL, bR]))78        val = [aL, bR, aL_val, bR_val, i]79        return val80    val = lax.while_loop(cond_fun, body_fun, init_val)81    aL, bR, aL_val, bR_val, i = val82    return [aL, bR]83@partial(jit, static_argnums=(0,))84def bisect_method(85    func, a=-1e5, b=-1e-5,86    tol=1e-6, maxiter=100):87    batch_func = vmap(func, (0))88    i = maxiter-1.089    sa, sb = jnp.sign(batch_func(jnp.array([a, b])))90    init_val = [a, b, sa, sb, i]91    def cond_fun(val):92        a, b, sa, sb, i = val93        return jnp.sum(b-a) + 100 * jnp.minimum(i, 0.0) > tol94    def body_fun(val):95        # unpack val96        a, b, sa, sb, i = val97        # new center points98        c = (a+b)/2.099        sc = jnp.sign(func(c))100        # L101        a = jnp.sum(c * jnp.maximum( sc * sa, 0.0) + \102            a * jnp.maximum( -1.0 * sc * sa, 0.0))103        b = jnp.sum(c * jnp.maximum( sc * sb, 0.0) + \104            b * jnp.maximum( -1.0 * sc * sb, 0.0))105        sa = jnp.sum(sc * jnp.maximum( sc * sa, 0.0) + \106            sa * jnp.maximum( -1.0 * sc * sa, 0.0))107        sb = jnp.sum(sc * jnp.maximum( sc * sb, 0.0) + \108            sb * jnp.maximum( -1.0 * sc * sb, 0.0))109        i = i - 1110        val = [a, b, sa, sb, i]111        return val112    val = lax.while_loop(cond_fun, body_fun, init_val)113    # unpack val114    a, b, sa, sb, i = val115    # new center points116    c = (a+b)/2.0117    return c118@partial(jit, static_argnums=(0,))119def single_choose_start(120    func,121    log_start = -3.0, log_space = 1.0 / 5.0):122    i = 0123    a = 1.0 * jnp.power(10.0, log_start + i * log_space)124    a_val = func(a)125    init_val = [a, a_val, i]126    def cond_fun(val):127        a, a_val, i = val128        return jnp.maximum(a_val, 0.0) + 100 * jnp.minimum(100. - i, 0.0) > 0.0129    def body_fun(val):130        a, a_val, i = val131        i = i+1132        sign_a = jnp.sign(a_val)133        a = jnp.sum(jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_a, 0.0) \134                + a * jnp.maximum(-1.0 * sign_a, 0.0))135        a_val = func(a)136        val = [a, a_val, i]137        return val138    val = lax.while_loop(cond_fun, body_fun, init_val)139    a, a_val, i = val...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
