Best Python code snippet using grail_python
env.py
Source:env.py  
1from typing import Tuple, Callable, Union2import gym3import numpy as np4import torch5from gym import register6import util7from util.sample import geometricBM8class Info:9    def __init__(self, strike_price: float, r: float, mu: float, sigma: float, risk_lambda: float, _dt: float,10                 friction: float):11        self.strike_price = strike_price12        self.r = r13        self.mu = mu14        self.sigma = sigma15        self.risk_lambda = risk_lambda16        self._dt = _dt17        self.friction = friction18class State:19    def __init__(self, normalized_asset_price: float, passed_step: int, remaining_step: int):20        self.normalized_asset_price = normalized_asset_price21        self.passed_step = passed_step22        self.remaining_step = remaining_step23    def to_tensor(self, info: Info):24        return torch.tensor((25            self.normalized_asset_price,26            self.passed_step * info._dt,27            self.remaining_step * info._dt,28            util.standard_to_normalized_price(info.strike_price, info.mu, info.sigma,29                                              (self.passed_step + self.remaining_step), 1),30            info.r,31            info.mu,32            info.sigma,33            info.risk_lambda,34            info.friction,35        ))36class Policy:37    def __init__(self):38        pass39    def action(self, state: State, info: Info):40        raise NotImplementedError41    def batch_action(self, state_info_tensor, random: bool = True):42        """43        :param random:44        :param state_info_tensor: [[normal_price, passed_real_time, remaining_real_time, normal_strike_price, r, mu,45            sigma, risk_lambda, friction]]46        :return:47        """48        raise NotImplementedError49    def update(self, delta: float, action: float, state: State, info: Info):50        raise NotImplementedError51    def train_based_on(self, source, target, lr, itr_max):52        raise NotImplementedError53class Baseline:54    def __init__(self):55        pass56    def __call__(self, state: State, info: Info):57        raise NotImplementedError58    def batch_estimate(self, state_info_tensor):59        """60        :param state_info_tensor: [[normal_price, passed_real_time, remaining_real_time, normal_strike_price, r, mu,61            sigma, risk_lambda, friction]]62        :return:63        """64        raise NotImplementedError65    def update(self, G: float, state: State, info: Info):66        raise NotImplementedError67    def train_based_on(self, source, target, lr, itr_max):68        raise NotImplementedError69class QLBSEnv(gym.Env):70    def __init__(self, is_call_option: bool, strike_price: float, max_step: int, mu: float, sigma: float, r: float,71                 risk_lambda: float, friction: float, initial_asset_price: float, risk_simulation_paths: int, *,72                 mutation: Union[float, Callable] = 0.1, _dt: float = 1):73        # Setting of the system; stays constant for a long time74        self.is_call_option = is_call_option75        self.info = Info(strike_price=strike_price, r=r, mu=mu, sigma=sigma, risk_lambda=risk_lambda, _dt=_dt,76                         friction=friction)77        self.gamma = np.exp(-r * _dt)78        self.max_step = max_step79        self.initial_asset_price = initial_asset_price80        self.mutation = mutation81        self.risk_simulation_paths = risk_simulation_paths82        # State variables83        self._normalized_price = None84        self._standard_price = None85        self.current_step = 086    def _describe(self):87        return State(self._normalized_price[self.current_step], self.current_step, self.max_step - self.current_step)88    def reset(self) -> Tuple[State, Info]:89        self.mutate_parameters()90        GBM, BM = geometricBM(self.initial_asset_price, self.max_step, 1, self.info.mu, self.info.sigma, self.info._dt)91        self._normalized_price = BM[0, :]92        self._standard_price = GBM[0, :]93        self.current_step = 094        return self._describe(), self.info95    def step(self, action, pi: Policy) -> Tuple[State, float, bool, dict]:96        t = self.current_step97        # simulate RS paths to compute E[Pi_t|F_t], E[Pi_(t+1)|F_(t+1)], and Var[Pi_t|F_t]98        RS = self.risk_simulation_paths99        t_arr = np.arange(self.max_step - t + 1)100        t_arr_broad = np.broadcast_to(t_arr[np.newaxis, :], (RS, len(t_arr)))101        GBM, BM = geometricBM(self._standard_price[t], self.max_step - t, RS, self.info.mu, self.info.sigma,102                              self.info._dt)103        # Compute hedge position in a batch fashion104        sits = []105        for s in np.arange(t, self.max_step):106            sit = torch.empty((RS, 9))107            sit[:, 0] = torch.tensor(BM[:, s - t])  # normal_price108            sit[:, 1] = s * self.info._dt  # passed_real_time109            sit[:, 2] = (self.max_step - s) * self.info._dt  # remaining_real_time110            sit[:, 3] = util.standard_to_normalized_price(self.info.strike_price, self.info.mu, self.info.sigma,111                                                          self.max_step, self.info._dt)  # normal_strike_price112            sit[:, 4] = self.info.r  # r113            sit[:, 5] = self.info.mu  # mu114            sit[:, 6] = self.info.sigma  # sigma115            sit[:, 7] = self.info.risk_lambda  # risk_lambda116            sit[:, 8] = self.info.friction  # friction117            sits.append(sit)118        hedge_long = pi.batch_action(torch.cat(sits, dim=0))119        hedge = hedge_long.reshape(RS, self.max_step - t, order='F')120        hedge[:, 0] = action121        # compute discounted stock price and value change122        discount = np.power(self.gamma, t_arr_broad)123        discounted_S = discount * GBM124        discounted_cashflow = hedge * (discounted_S[:, 1:] - discounted_S[:, :-1])125        extended_hedge = np.concatenate((hedge, np.zeros((RS, 1))), axis=1)126        discounted_tc = self.info.friction * discount[:, 1:] * util.abs(127            extended_hedge[:, 1:] - extended_hedge[:, :-1]) * GBM[:, 1:]128        end_value = util.payoff_of_option(self.is_call_option, GBM[:, -1], self.info.strike_price)129        # sum up change and compute the expected portfolio value and its variance130        total_value_change = np.sum(discounted_cashflow, axis=1)131        total_tc = np.sum(discounted_tc, axis=1)132        t_value = end_value * np.power(self.gamma, self.max_step - t) - total_value_change + total_tc133        tp1_value = (t_value + discounted_cashflow[:, 0]) / self.gamma134        base_reward = self.gamma * (1 - (t + 1) / self.max_step) * np.mean(tp1_value) - (135                1 - t / self.max_step) * np.mean(t_value)136        risk = np.std(t_value, ddof=1)137        # clean up and return138        self.current_step = t + 1139        done = self.current_step >= self.max_step140        reward = base_reward - self.info.risk_lambda * risk * self.info._dt141        return self._describe(), reward, done, {'risk': risk}142    def render(self, mode='human'):143        pass144    def mutate_parameters(self):145        if callable(self.mutation):146            self.mutation(self)147            return148        if np.random.rand(1) < self.mutation:149            self.info.r = np.clip(self.info.r * (1 + 0.1 * np.random.randn(1)[0]), 0, 2e-2)150        if np.random.rand(1) < self.mutation:151            self.info.mu = np.clip(self.info.mu + 1e-3 * np.random.randn(1)[0] - 1 * self.info.mu, -1e-3, 1e-3)152        if np.random.rand(1) < self.mutation:153            self.info.sigma = np.clip(self.info.sigma * (1 + 0.1 * np.random.randn(1)[0]), 0, 2e-1)154        if np.random.rand(1) < self.mutation:155            self.info.strike_price = np.clip(156                self.info.strike_price + 1e-2 * np.random.randn(1)[0] - 0.1 * (self.info.strike_price - 1), 0.9, 1.1)157        if np.random.rand(1) < self.mutation:158            self.initial_asset_price = np.clip(159                self.initial_asset_price + 1e-1 * np.random.randn(1)[0] - 0.1 * (self.initial_asset_price - 1), 0.7,160                1.3)...test_direct_exception_handling.py
Source:test_direct_exception_handling.py  
...4from tests.utils import validate_method_output5failure_exception = TestCase.failureException('Failure exception')6error_exception = Exception('Error exception')7@step8def passed_step():9    pass10@step11def failed_step():12    raise failure_exception13@step14def error_step():15    raise error_exception16def method_fail():17    passed_step()18    failed_step()19    raise Exception('we should not reach this')20@step(step_group=True)21def failed_group():22    passed_step()23    failed_step()24    raise Exception('we should not reach this too')25def method_fail_group():26    failed_group()27    raise Exception('we should not reach this even here')28def method_error():29    passed_step()30    error_step()31    raise Exception('we should not reach this')32@step(step_group=True)33def error_group():34    passed_step()35    error_step()36    raise Exception('we should not reach this too')37def method_error_group():38    error_group()39    raise Exception('we should not reach this even here')40class TestDirectHandling(TestCase):41    def test_method_fail(self):42        try:43            validate_method_output(method_fail, 'PASSED passed step\n'44                                                'FAILED failed step')45        except Exception as inst:46            eq_(inst, failure_exception)47    def test_group_fail(self):48        try:...test_step.py
Source:test_step.py  
1from xayah_test.classes.step_classes import ClassStepPassed, ClassStepFailed2class TestStep:3    def test_step_passed(self, test_result):4        passed_class_name = 'ClassStepPassed'5        ClassStepPassed.run_test_cases()6        result = test_result.create_test_result()7        assert len(result) == 1, "not exactly 1 test scenario in test result"8        test_suite = result.get(passed_class_name)9        class_name = test_suite.get('class_name')10        assert class_name == passed_class_name11        test_cases = test_suite.get('test_cases')12        assert len(test_cases) == 1, 'not exactly 1 test case in test scenario'13        test_case = test_cases[0]14        steps = test_case.get('steps')15        assert len(steps) == 2, 'not exactly 2 steps in test case'16        step1 = steps[0]17        assert step1.get('name') == 'step one'18        assert step1.get('status') == 'passed'19        assert step1.get('category') == 'step'20        step2 = steps[1]21        assert step2.get('name') == 'step two'22        assert step2.get('status') == 'passed'23        assert step2.get('category') == 'step'24    "if previous step failed next step is not run"25    def test_previous_step_failed(self, test_result):26        step_class_name = 'ClassStepFailed'27        ClassStepFailed.run_test_cases()28        result = test_result.create_test_result()29        assert len(result.keys()) == 1, "not exactly 1 test scenario in test result"30        test_scenario = result.get(step_class_name)31        class_name = test_scenario.get('class_name')32        assert class_name == step_class_name33        test_cases = test_scenario.get('test_cases')34        assert len(test_cases) == 1, 'not exactly 1 test case in test scenario'35        test_case = test_cases[0]36        steps = test_case.get('steps')37        assert len(steps) == 2, 'not exactly 2 steps in test case'38        passed_step = steps[0]39        assert passed_step.get('name') == 'passed step 1'40        assert passed_step.get('status') == 'passed'41        failed_step = steps[1]42        assert failed_step.get('name') == 'failed step'...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
