Best Python code snippet using autotest_python
equity_barrier_option.py
Source:equity_barrier_option.py  
1###############################################################################2# Copyright (C) 2018, 2019, 2020 Dominic O'Kane3###############################################################################45import numpy as np6from enum import Enum78from ...utils.error import FinError9from ...utils.global_vars import gDaysInYear10from ...products.equity.equity_option import EquityOption11from ...models.process_simulator import FinProcessSimulator12from ...market.curves.discount_curve import DiscountCurve13from ...utils.helpers import label_to_string, check_argument_types14from ...utils.date import Date151617from ...utils.math import N1819# TODO: SOME REDESIGN ON THE MONTE CARLO PROCESS IS PROBABLY NEEDED2021###############################################################################222324class EquityBarrierTypes(Enum):25    DOWN_AND_OUT_CALL = 126    DOWN_AND_IN_CALL = 227    UP_AND_OUT_CALL = 328    UP_AND_IN_CALL = 429    UP_AND_OUT_PUT = 530    UP_AND_IN_PUT = 631    DOWN_AND_OUT_PUT = 732    DOWN_AND_IN_PUT = 83334###############################################################################353637class EquityBarrierOption(EquityOption):38    """ Class to hold details of an Equity Barrier Option. It also39    calculates the option price using Black Scholes for 8 different40    variants on the Barrier structure in enum EquityBarrierTypes. """4142    def __init__(self,43                 expiry_date: Date,44                 strike_price: float,45                 option_type: EquityBarrierTypes,46                 barrier_level: float,47                 num_observations_per_year: (int, float) = 252,48                 notional: float = 1.0):49        """ Create the EquityBarrierOption by specifying the expiry date,50        strike price, option type, barrier level, the number of observations51        per year and the notional. """5253        check_argument_types(self.__init__, locals())5455        self._expiry_date = expiry_date56        self._strike_price = float(strike_price)57        self._barrier_level = float(barrier_level)58        self._num_observations_per_year = int(num_observations_per_year)5960        if option_type not in EquityBarrierTypes:61            raise FinError("Option Type " + str(option_type) + " unknown.")6263        self._option_type = option_type64        self._notional = notional6566###############################################################################6768    def value(self,69              valuation_date: Date,70              stock_price: (float, np.ndarray),71              discount_curve: DiscountCurve,72              dividend_curve: DiscountCurve,73              model):74        """ This prices an Equity Barrier option using the formulae given in75        the paper by Clewlow, Llanos and Strickland December 1994 which can be76        found at7778        https://warwick.ac.uk/fac/soc/wbs/subjects/finance/research/wpaperseries/1994/94-54.pdf79        """8081        if isinstance(valuation_date, Date) == False:82            raise FinError("Valuation date is not a Date")8384        if valuation_date > self._expiry_date:85            raise FinError("Valuation date after expiry date.")8687        if discount_curve._valuation_date != valuation_date:88            raise FinError(89                "Discount Curve valuation date not same as option valuation date")9091        if dividend_curve._valuation_date != valuation_date:92            raise FinError(93                "Dividend Curve valuation date not same as option valuation date")9495        if isinstance(stock_price, int):96            stock_price = float(stock_price)9798        if isinstance(stock_price, float):99            stock_prices = [stock_price]100        else:101            stock_prices = stock_price102103        values = []104        for s in stock_prices:105            v = self._value_one(valuation_date, s, discount_curve,106                                dividend_curve, model)107            values.append(v)108109        if isinstance(stock_price, float):110            return values[0]111        else:112            return np.array(values)113114###############################################################################115116    def _value_one(self,117                   valuation_date: Date,118                   stock_price: (float, np.ndarray),119                   discount_curve: DiscountCurve,120                   dividend_curve: DiscountCurve,121                   model):122        """ This values a single option. Because of its structure it cannot123        easily be vectorised which is why it has been wrapped. """124125        texp = (self._expiry_date - valuation_date) / gDaysInYear126127        if texp < 0:128            raise FinError("Option expires before value date.")129130        texp = max(texp, 1e-6)131132        lnS0k = np.log(stock_price / self._strike_price)133        sqrtT = np.sqrt(texp)134135        r = discount_curve.cc_rate(self._expiry_date)136        q = dividend_curve.cc_rate(self._expiry_date)137138        k = self._strike_price139        s = stock_price140        h = self._barrier_level141142        volatility = model._volatility143        sigmaRootT = volatility * sqrtT144        v2 = volatility * volatility145        mu = r - q146        d1 = (lnS0k + (mu + v2 / 2.0) * texp) / sigmaRootT147        d2 = (lnS0k + (mu - v2 / 2.0) * texp) / sigmaRootT148        df = np.exp(-r * texp)149        dq = np.exp(-q * texp)150151        c = s * dq * N(d1) - k * df * N(d2)152        p = k * df * N(-d2) - s * dq * N(-d1)153#        print("CALL:",c,"PUT:",p)154155        if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL and s <= h:156            return 0.0157        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL and s >= h:158            return 0.0159        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT and s >= h:160            return 0.0161        elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT and s <= h:162            return 0.0163        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL and s <= h:164            return c165        elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL and s >= h:166            return c167        elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT and s >= h:168            return p169        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT and s <= h:170            return p171172        num_observations = 1 + texp * self._num_observations_per_year173174        # Correction by Broadie, Glasserman and Kou, Mathematical Finance, 1997175        # Adjusts the barrier for discrete and not continuous observations176        h_adj = h177        t = texp / num_observations178179        if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:180            h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))181        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:182            h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))183        elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL:184            h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))185        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL:186            h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))187        elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT:188            h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))189        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT:190            h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))191        elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:192            h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))193        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:194            h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))195        else:196            raise FinError("Unknown barrier option type." +197                           str(self._option_type))198199        h = h_adj200201        if abs(volatility) < 1e-5:202            volatility = 1e-5203204        l = (mu + v2 / 2.0) / v2205        y = np.log(h * h / (s * k)) / sigmaRootT + l * sigmaRootT206        x1 = np.log(s / h) / sigmaRootT + l * sigmaRootT207        y1 = np.log(h / s) / sigmaRootT + l * sigmaRootT208        hOverS = h / s209210        if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:211            if h >= k:212                c_do = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \213                    - s * dq * pow(hOverS, 2.0 * l) * N(y1) \214                    + k * df * pow(hOverS, 2.0 * l - 2.0) * N(y1 - sigmaRootT)215                price = c_do216            else:217                c_di = s * dq * pow(hOverS, 2.0 * l) * N(y) \218                    - k * df * pow(hOverS, 2.0 * l - 2.0) * N(y - sigmaRootT)219                price = c - c_di220        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:221            if h <= k:222                c_di = s * dq * pow(hOverS, 2.0 * l) * N(y) \223                    - k * df * pow(hOverS, 2.0 * l - 2.0) * N(y - sigmaRootT)224                price = c_di225            else:226                c_do = s * dq * N(x1) \227                    - k * df * N(x1 - sigmaRootT) \228                    - s * dq * pow(hOverS, 2.0 * l) * N(y1) \229                    + k * df * pow(hOverS, 2.0 * l - 2.0) * N(y1 - sigmaRootT)230                price = c - c_do231        elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL:232            if h >= k:233                c_ui = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \234                    - s * dq * pow(hOverS, 2.0 * l) * (N(-y) - N(-y1)) \235                    + k * df * pow(hOverS, 2.0 * l - 2.0) * \236                    (N(-y + sigmaRootT) - N(-y1 + sigmaRootT))237                price = c_ui238            else:239                price = c240        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL:241            if h > k:242                c_ui = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \243                    - s * dq * pow(hOverS, 2.0 * l) * (N(-y) - N(-y1)) \244                    + k * df * pow(hOverS, 2.0 * l - 2.0) * \245                    (N(-y + sigmaRootT) - N(-y1 + sigmaRootT))246                price = c - c_ui247            else:248                price = 0.0249        elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT:250            if h > k:251                p_ui = -s * dq * pow(hOverS, 2.0 * l) * N(-y) \252                    + k * df * pow(hOverS, 2.0 * l - 2.0) * N(-y + sigmaRootT)253                price = p_ui254            else:255                p_uo = -s * dq * N(-x1) \256                    + k * df * N(-x1 + sigmaRootT) \257                    + s * dq * pow(hOverS, 2.0 * l) * N(-y1) \258                       - k * df * pow(hOverS, 2.0 * l - 2.0) * \259                    N(-y1 + sigmaRootT)260                price = p - p_uo261        elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT:262            if h >= k:263                p_ui = -s * dq * pow(hOverS, 2.0 * l) * N(-y) \264                    + k * df * pow(hOverS, 2.0 * l - 2.0) * N(-y + sigmaRootT)265                price = p - p_ui266            else:267                p_uo = -s * dq * N(-x1) \268                    + k * df * N(-x1 + sigmaRootT) \269                    + s * dq * pow(hOverS, 2.0 * l) * N(-y1) \270                       - k * df * pow(hOverS, 2.0 * l - 2.0) * \271                    N(-y1 + sigmaRootT)272                price = p_uo273        elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:274            if h >= k:275                price = 0.0276            else:277                p_di = -s * dq * N(-x1) \278                    + k * df * N(-x1 + sigmaRootT) \279                    + s * dq * pow(hOverS, 2.0 * l) * (N(y) - N(y1)) \280                       - k * df * pow(hOverS, 2.0 * l - 2.0) * \281                    (N(y - sigmaRootT) - N(y1 - sigmaRootT))282                price = p - p_di283        elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:284            if h >= k:285                price = p286            else:287                p_di = -s * dq * N(-x1) \288                    + k * df * N(-x1 + sigmaRootT) \289                    + s * dq * pow(hOverS, 2.0 * l) * (N(y) - N(y1)) \290                       - k * df * pow(hOverS, 2.0 * l - 2.0) * \291                    (N(y - sigmaRootT) - N(y1 - sigmaRootT))292                price = p_di293        else:294            raise FinError("Unknown barrier option type." +295                           str(self._option_type))296297        v = price * self._notional298        return v299300###############################################################################301302    def value_mc(self,303                 valuation_date: Date,304                 stock_price: float,305                 discount_curve: DiscountCurve,306                 dividend_curve: DiscountCurve,307                 process_type,308                 model_params,309                 numAnnObs: int = 252,310                 num_paths: int = 10000,311                 seed: int = 4242):312        """ A Monte-Carlo based valuation of the barrier option which simulates313        the evolution of the stock price of at a specified number of annual314        observation times until expiry to examine if the barrier has been315        crossed and the corresponding value of the final payoff, if any. It316        assumes a GBM model for the stock price. """317318        texp = (self._expiry_date - valuation_date) / gDaysInYear319        num_time_steps = int(texp * numAnnObs)320        K = self._strike_price321        B = self._barrier_level322        option_type = self._option_type323324        process = FinProcessSimulator()325326        r = discount_curve.zero_rate(self._expiry_date)327328        # TODO - NEED TO DECIDE IF THIS IS PART OF MODEL PARAMS OR NOT ??????????????329330        r = discount_curve.cc_rate(self._expiry_date)331        q = dividend_curve.cc_rate(self._expiry_date)332333        #######################################################################334335        if option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL and stock_price <= B:336            return 0.0337        elif option_type == EquityBarrierTypes.UP_AND_OUT_CALL and stock_price >= B:338            return 0.0339        elif option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT and stock_price <= B:340            return 0.0341        elif option_type == EquityBarrierTypes.UP_AND_OUT_PUT and stock_price >= B:342            return 0.0343344        #######################################################################345346        simple_call = False347        simple_put = False348349        if option_type == EquityBarrierTypes.DOWN_AND_IN_CALL and stock_price <= B:350            simple_call = True351        elif option_type == EquityBarrierTypes.UP_AND_IN_CALL and stock_price >= B:352            simple_call = True353        elif option_type == EquityBarrierTypes.UP_AND_IN_PUT and stock_price >= B:354            simple_put = True355        elif option_type == EquityBarrierTypes.DOWN_AND_IN_PUT and stock_price <= B:356            simple_put = True357358        if simple_put or simple_call:359            Sall = process.get_process(360                process_type, texp, model_params, 1, num_paths, seed)361362        if simple_call:363            c = (np.maximum(Sall[:, -1] - K, 0.0)).mean()364            c = c * np.exp(-r * texp)365            return c366367        if simple_put:368            p = (np.maximum(K - Sall[:, -1], 0.0)).mean()369            p = p * np.exp(-r * texp)370            return p371372        # Get full set of paths373        Sall = process.get_process(process_type, texp, model_params, num_time_steps,374                                   num_paths, seed)375376        (num_paths, num_time_steps) = Sall.shape377378        if option_type == EquityBarrierTypes.DOWN_AND_IN_CALL or \379           option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL or \380           option_type == EquityBarrierTypes.DOWN_AND_IN_PUT or \381           option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:382383            barrierCrossedFromAbove = [False] * num_paths384385            for p in range(0, num_paths):386                barrierCrossedFromAbove[p] = np.any(Sall[p] <= B)387388        if option_type == EquityBarrierTypes.UP_AND_IN_CALL or \389           option_type == EquityBarrierTypes.UP_AND_OUT_CALL or \390           option_type == EquityBarrierTypes.UP_AND_IN_PUT or \391           option_type == EquityBarrierTypes.UP_AND_OUT_PUT:392393            barrierCrossedFromBelow = [False] * num_paths394            for p in range(0, num_paths):395                barrierCrossedFromBelow[p] = np.any(Sall[p] >= B)396397        payoff = np.zeros(num_paths)398        ones = np.ones(num_paths)399400        if option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:401            payoff = np.maximum(Sall[:, -1] - K, 0.0) * \402                (ones - barrierCrossedFromAbove)403        elif option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:404            payoff = np.maximum(Sall[:, -1] - K, 0.0) * barrierCrossedFromAbove405        elif option_type == EquityBarrierTypes.UP_AND_IN_CALL:406            payoff = np.maximum(Sall[:, -1] - K, 0.0) * barrierCrossedFromBelow407        elif option_type == EquityBarrierTypes.UP_AND_OUT_CALL:408            payoff = np.maximum(Sall[:, -1] - K, 0.0) * \409                (ones - barrierCrossedFromBelow)410        elif option_type == EquityBarrierTypes.UP_AND_IN_PUT:411            payoff = np.maximum(K - Sall[:, -1], 0.0) * barrierCrossedFromBelow412        elif option_type == EquityBarrierTypes.UP_AND_OUT_PUT:413            payoff = np.maximum(K - Sall[:, -1], 0.0) * \414                (ones - barrierCrossedFromBelow)415        elif option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:416            payoff = np.maximum(K - Sall[:, -1], 0.0) * \417                (ones - barrierCrossedFromAbove)418        elif option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:419            payoff = np.maximum(K - Sall[:, -1], 0.0) * barrierCrossedFromAbove420        else:421            raise FinError("Unknown barrier option type." +422                           str(self._option_type))423424        v = payoff.mean() * np.exp(- r * texp)425426        return v * self._notional427428###############################################################################429430    def __repr__(self):431        s = label_to_string("OBJECT TYPE", type(self).__name__)432        s += label_to_string("EXPIRY DATE", self._expiry_date)433        s += label_to_string("STRIKE PRICE", self._strike_price)434        s += label_to_string("OPTION TYPE", self._option_type)435        s += label_to_string("BARRIER LEVEL", self._barrier_level)436        s += label_to_string("NUM OBSERVATIONS",437                             self._num_observations_per_year)438        s += label_to_string("NOTIONAL", self._notional, "")439        return s440441###############################################################################442443    def _print(self):444        """ Simple print function for backward compatibility. """445        print(self)446
...barrier.py
Source:barrier.py  
1import re2from grblogtools.parsers.util import typeconvert_groupdict3class BarrierParser:4    # The pattern indicating the initialization of the parser5    barrier_start_pattern = re.compile(6        r"Iter(\s+)Primal(\s+)Dual(\s+)Primal(\s+)Dual(\s+)Compl(\s+)Time"7    )8    barrier_ordering_pattern = re.compile(r"Ordering time: (?P<OrderingTime>[\d\.]+)s")9    # The pattern indicating the barrier progress10    barrier_progress_pattern = re.compile(11        r"\s*(?P<Iteration>\d+)(?P<Indicator>\s|\*)\s+(?P<PObj>[^\s]+)\s+(?P<DObj>[^\s]+)\s+(?P<PRes>[^\s]+)\s+(?P<DRes>[^\s]+)\s+(?P<Compl>[^\s]+)\s+(?P<Time>\d+)s"12    )13    # The pattern indicating the crossover14    barrier_crossover_pattern = re.compile(15        r"\s*Push phase complete: Pinf (?P<PushPhasePInf>[^\s]+), Dinf (?P<PushPhaseDInf>[^\s]+)\s+(?P<PushPhaseEndTime>\d+)s"16    )17    # The pattern indicating the termination of the barrier algorithm18    barrier_termination_patterns = [19        re.compile(20            r"Barrier solved model in (?P<BarIterCount>[^\s]+) iterations and (?P<Runtime>[^\s]+) seconds"21        ),22        re.compile(23            r"Barrier performed (?P<BarIterCount>\d+) iterations in (?P<Runtime>[^\s]+) seconds"24        ),25    ]26    def __init__(self):27        """Initialize the Barrier parser."""28        self._summary = {}29        self._progress = []30        self._started = False31    def parse(self, line: str) -> bool:32        """Parse the given log line to populate summary and progress data.33        Args:34            line (str): A line in the log file.35        Returns:36            bool: Return True if the given line is matched by some pattern.37        """38        barrier_ordering_match = BarrierParser.barrier_ordering_pattern.match(line)39        if barrier_ordering_match:40            self._summary.update(typeconvert_groupdict(barrier_ordering_match))41            return True42        if not self._started:43            match = BarrierParser.barrier_start_pattern.match(line)44            if match:45                self._started = True46                return True47            return False48        progress_match = BarrierParser.barrier_progress_pattern.match(line)49        if progress_match:50            entry = {"Type": "barrier"}51            entry.update(typeconvert_groupdict(progress_match))52            self._progress.append(entry)53            return True54        for barrier_termination_pattern in BarrierParser.barrier_termination_patterns:55            barrier_termination_match = barrier_termination_pattern.match(line)56            if barrier_termination_match:57                self._summary.update(typeconvert_groupdict(barrier_termination_match))58                return True59        crossover_match = BarrierParser.barrier_crossover_pattern.match(line)60        if crossover_match:61            self._summary.update(typeconvert_groupdict(crossover_match))62            return True63        return False64    def get_summary(self) -> dict:65        """Return the current parsed summary."""66        return self._summary67    def get_progress(self) -> list:68        """Return the detailed progress in the barrier method."""...test_barrier.py
Source:test_barrier.py  
1from unittest import TestCase, main2from grblogtools.parsers.barrier import BarrierParser3from grblogtools.parsers.util import parse_block4example_log_barrier = """5Iter       Primal          Dual         Primal    Dual     Compl     Time6    0   4.56435085e+07  1.53061018e+04  1.69e+05 8.58e+00  1.59e+03     2s7    1   3.76722276e+07 -5.41297282e+05  8.07e+04 9.12e+00  8.17e+02     2s8   17   2.17403572e+02  2.17403571e+02  3.93e-14 7.11e-15  7.71e-13     5s9Barrier solved model in 17 iterations and 4.83 seconds (6.45 work units)10Optimal objective 2.17403572e+0211Crossover log...12    57249 DPushes remaining with DInf 0.0000000e+00                 8s13        0 DPushes remaining with DInf 0.0000000e+00                 9s14     9342 PPushes remaining with PInf 1.2118572e-05                 9s15        0 PPushes remaining with PInf 0.0000000e+00                 9s16   Push phase complete: Pinf 0.0000000e+00, Dinf 1.8540725e-14      9s17"""18expected_summary_barrier = {19    "BarIterCount": 17,20    "Runtime": 4.83,21    "PushPhasePInf": 0.0000000e00,22    "PushPhaseDInf": 1.8540725e-14,23    "PushPhaseEndTime": 9,24}25expected_progress_barrier = [26    {27        "Type": "barrier",28        "Iteration": 0,29        "Indicator": " ",30        "PObj": 45643508.5,31        "DObj": 15306.1018,32        "PRes": 169000.0,33        "DRes": 8.58,34        "Compl": 1590.0,35        "Time": 2,36    },37    {38        "Type": "barrier",39        "Iteration": 1,40        "Indicator": " ",41        "PObj": 37672227.6,42        "DObj": -541297.282,43        "PRes": 80700.0,44        "DRes": 9.12,45        "Compl": 817.0,46        "Time": 2,47    },48    {49        "Type": "barrier",50        "Iteration": 17,51        "Indicator": " ",52        "PObj": 217.403572,53        "DObj": 217.403571,54        "PRes": 3.93e-14,55        "DRes": 7.11e-15,56        "Compl": 7.71e-13,57        "Time": 5,58    },59]60class TestBarrier(TestCase):61    def setUp(self):62        self.barrier_parser = BarrierParser()63    def test_get_summary_progress(self):64        parse_block(self.barrier_parser, example_log_barrier)65        self.assertEqual(self.barrier_parser.get_summary(), expected_summary_barrier)66        self.assertEqual(self.barrier_parser.get_progress(), expected_progress_barrier)67if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
