Best Python code snippet using tempest_python
qlearning_test.py
Source:qlearning_test.py  
...24    def test_init_dqn(self,25                      mock_parameters,26                      mock_model,27                      mock_replay_memory):28        self._setup_parameters(mock_parameters.return_value)29        mock_model.return_value = self._setup_test_model()30        action_space = spaces.Discrete(2)31        observation_space = spaces.Box(0, 1, (1,))32        sut = QLearning('', observation_space, action_space)33        self.assertEqual(sut._num_actions, 2)34        self.assertIsNone(sut._num_states)35        self.assertEqual(sut._shape_of_inputs, (1,))36        self.assertFalse(sut._discrete_observation_space)37        self.assertIsNone(sut._space_discretizer)38        self.assertIsNone(sut._preprocessor)39        self.assertFalse(hasattr(sut, 'weight_variables'))40        self.assertIsNotNone(sut._trainer)41        mock_model.assert_called_with((1,), 2, '[2]', None)42        mock_replay_memory.assert_called_with(100, False)43    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')44    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')45    def test_init_dqn_prioritized_replay(self,46                                         mock_parameters,47                                         mock_replay_memory):48        self._setup_parameters(mock_parameters.return_value)49        mock_parameters.return_value.use_prioritized_replay = True50        action_space = spaces.Discrete(2)51        observation_space = spaces.Box(0, 1, (1,))52        sut = QLearning('', observation_space, action_space)53        self.assertIsNotNone(sut._weight_variables)54        mock_replay_memory.assert_called_with(100, True)55    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')56    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')57    def test_init_dqn_preprocessing(self,58                                    mock_parameters,59                                    mock_replay_memory):60        self._setup_parameters(mock_parameters.return_value)61        mock_parameters.return_value.preprocessing = \62            'cntk.contrib.deeprl.agent.shared.preprocessing.AtariPreprocessing'63        mock_parameters.return_value.preprocessing_args = '()'64        action_space = spaces.Discrete(2)65        observation_space = spaces.Box(0, 1, (1,))66        sut = QLearning('', observation_space, action_space)67        # Preprocessor with default arguments.68        self.assertIsNotNone(sut._preprocessor)69        self.assertEqual(sut._preprocessor.output_shape(), (4, 84, 84))70        # Preprocessor with arguments passed as a tuple.71        mock_parameters.return_value.preprocessing_args = '(3,)'72        sut = QLearning('', observation_space, action_space)73        self.assertEqual(sut._preprocessor.output_shape(), (3, 84, 84))74        # Preprocessor with inappropriate arguments.75        mock_parameters.return_value.preprocessing_args = '(3, 4)'76        self.assertRaises(77            TypeError, QLearning, '', observation_space, action_space)78        # Undefined preprocessor.79        mock_parameters.return_value.preprocessing = 'undefined'80        self.assertRaises(81            ValueError, QLearning, '', observation_space, action_space)82    @patch('cntk.contrib.deeprl.agent.qlearning.Models.dueling_network')83    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')84    def test_init_dueling_dqn(self, mock_parameters, mock_model):85        self._setup_parameters(mock_parameters.return_value)86        mock_parameters.return_value.q_representation = 'dueling-dqn'87        mock_parameters.return_value.hidden_layers = '[2, [2], [2]]'88        mock_model.return_value = self._setup_test_model()89        action_space = spaces.Discrete(2)90        observation_space = spaces.Box(0, 1, (1,))91        sut = QLearning('', observation_space, action_space)92        self.assertEqual(sut._num_actions, 2)93        self.assertIsNone(sut._num_states)94        self.assertEqual(sut._shape_of_inputs, (1,))95        self.assertFalse(sut._discrete_observation_space)96        self.assertIsNone(sut._space_discretizer)97        self.assertIsNone(sut._preprocessor)98        mock_model.assert_called_with((1,), 2, '[2, [2], [2]]', None)99    @patch('cntk.contrib.deeprl.agent.shared.customized_models.conv_dqn')100    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')101    def test_init_customized_q(self, mock_parameters, mock_model):102        self._setup_parameters(mock_parameters.return_value)103        mock_parameters.return_value.q_representation = \104            'cntk.contrib.deeprl.agent.shared.customized_models.conv_dqn'105        mock_model.return_value = self._setup_test_model()106        action_space = spaces.Discrete(2)107        observation_space = spaces.Box(0, 1, (1,))108        sut = QLearning('', observation_space, action_space)109        self.assertEqual(sut._num_actions, 2)110        self.assertIsNone(sut._num_states)111        self.assertEqual(sut._shape_of_inputs, (1,))112        self.assertFalse(sut._discrete_observation_space)113        self.assertIsNone(sut._space_discretizer)114        self.assertIsNone(sut._preprocessor)115        mock_model.assert_called_with((1,), 2, None)116    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')117    def test_init_unsupported_q(self, mock_parameters):118        instance = mock_parameters.return_value119        instance.q_representation = 'undefined'120        instance.preprocessing = ''121        action_space = spaces.Discrete(2)122        observation_space = spaces.Box(0, 1, (1,))123        self.assertRaises(124            ValueError, QLearning, '', observation_space, action_space)125    @patch('cntk.contrib.deeprl.agent.qlearning.Models.feedforward_network')126    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')127    def test_init_dqn_huber_loss(self, mock_parameters, mock_model):128        self._setup_parameters(mock_parameters.return_value)129        mock_parameters.return_value.use_error_clipping = True130        mock_model.return_value = self._setup_test_model()131        action_space = spaces.Discrete(2)132        observation_space = spaces.Box(0, 1, (1,))133        sut = QLearning('', observation_space, action_space)134        mock_model.assert_called_with((1,), 2, '[2]', huber_loss)135    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')136    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')137    def test_update_q(self,138                      mock_parameters,139                      mock_replay_memory):140        """Test if _update_q_periodically() can finish successfully."""141        self._setup_parameters(mock_parameters.return_value)142        self._setup_replay_memory(mock_replay_memory.return_value)143        action_space = spaces.Discrete(2)144        observation_space = spaces.Box(0, 1, (1,))145        sut = QLearning('', observation_space, action_space)146        sut._trainer.train_minibatch = MagicMock()147        sut._choose_action = MagicMock(side_effect=[148            (1, 'GREEDY'),149            (0, 'GREEDY'),150            (1, 'RANDOM'),151        ])152        action, debug_info = sut.start(np.array([0.1], np.float32))153        self.assertEqual(action, 1)154        self.assertEqual(debug_info['action_behavior'], 'GREEDY')155        self.assertEqual(sut.episode_count, 1)156        self.assertEqual(sut.step_count, 0)157        self.assertEqual(sut._epsilon, 0.1)158        self.assertEqual(sut._trainer.parameter_learners[0].learning_rate(), 0.1)159        self.assertEqual(sut._last_state, np.array([0.1], np.float32))160        self.assertEqual(sut._last_action, 1)161        action, debug_info = sut.step(1, np.array([0.2], np.float32))162        self.assertEqual(action, 0)163        self.assertEqual(debug_info['action_behavior'], 'GREEDY')164        self.assertEqual(sut.episode_count, 1)165        self.assertEqual(sut.step_count, 1)166        self.assertEqual(sut._epsilon, 0.09)167        # learning rate remains 0.1 as Q is not updated during this time step.168        self.assertEqual(sut._trainer.parameter_learners[0].learning_rate(), 0.1)169        self.assertEqual(sut._last_state, np.array([0.2], np.float32))170        self.assertEqual(sut._last_action, 0)171        action, debug_info = sut.step(2, np.array([0.3], np.float32))172        self.assertEqual(action, 1)173        self.assertEqual(debug_info['action_behavior'], 'RANDOM')174        self.assertEqual(sut.episode_count, 1)175        self.assertEqual(sut.step_count, 2)176        self.assertEqual(sut._epsilon, 0.08)177        self.assertEqual(sut._trainer.parameter_learners[0].learning_rate(), 0.08)178        self.assertEqual(sut._last_state, np.array([0.3], np.float32))179        self.assertEqual(sut._last_action, 1)180        sut.end(3, np.array([0.4], np.float32))181        self.assertEqual(sut.episode_count, 1)182        self.assertEqual(sut.step_count, 3)183        self.assertEqual(sut._epsilon, 0.08)184        # learning rate remains 0.08 as Q is not updated during this time step.185        self.assertEqual(sut._trainer.parameter_learners[0].learning_rate(), 0.08)186    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')187    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')188    def test_update_q_dqn(self,189                          mock_parameters,190                          mock_replay_memory):191        self._setup_parameters(mock_parameters.return_value)192        self._setup_replay_memory(mock_replay_memory.return_value)193        action_space = spaces.Discrete(2)194        observation_space = spaces.Box(0, 1, (1,))195        sut = QLearning('', observation_space, action_space)196        sut._q.eval = \197            MagicMock(return_value=np.array([[[0.2, 0.1]]], np.float32))198        sut._target_q.eval = \199            MagicMock(return_value=np.array([[[0.3, 0.4]]], np.float32))200        sut._trainer = MagicMock()201        sut._update_q_periodically()202        np.testing.assert_array_equal(203            sut._trainer.train_minibatch.call_args[0][0][sut._input_variables],204            [np.array([0.1], np.float32)])205        # 10 (reward) + 0.9 (gamma) x 0.4 (max q_target) -> update action 0206        np.testing.assert_array_equal(207            sut._trainer.train_minibatch.call_args[0][0][sut._output_variables],208            [np.array([10.36, 0.1], np.float32)])209    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')210    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')211    def test_update_q_dqn_prioritized_replay(self,212                                             mock_parameters,213                                             mock_replay_memory):214        self._setup_parameters(mock_parameters.return_value)215        mock_parameters.return_value.use_prioritized_replay = True216        self._setup_prioritized_replay_memory(mock_replay_memory.return_value)217        action_space = spaces.Discrete(2)218        observation_space = spaces.Box(0, 1, (1,))219        sut = QLearning('', observation_space, action_space)220        def new_q_value(self):221            return np.array([[[0.2, 0.1]]], np.float32)222        sut._q.eval = MagicMock(side_effect=new_q_value)223        sut._target_q.eval = MagicMock(224            return_value=np.array([[[0.3, 0.4]]], np.float32))225        sut._trainer = MagicMock()226        sut._update_q_periodically()227        self.assertEqual(sut._trainer.train_minibatch.call_count, 1)228        np.testing.assert_array_equal(229            sut._trainer.train_minibatch.call_args[0][0][sut._input_variables],230            [231                np.array([0.1], np.float32),232                np.array([0.3], np.float32),233                np.array([0.1], np.float32)234            ])235        np.testing.assert_array_equal(236            sut._trainer.train_minibatch.call_args[0][0][sut._output_variables],237            [238                # 10 (reward) + 0.9 (gamma) x 0.4 (max q_target)239                np.array([10.36, 0.1], np.float32),240                # 11 (reward) + 0.9 (gamma) x 0.4 (max q_target)241                np.array([0.2, 11.36], np.float32),242                np.array([10.36, 0.1], np.float32)243            ])244        np.testing.assert_almost_equal(245            sut._trainer.train_minibatch.call_args[0][0][sut._weight_variables],246            [247                [0.16666667],248                [0.66666667],249                [0.16666667]250            ])251        self.assertAlmostEqual(252            sut._replay_memory.update_priority.call_args[0][0][3],253            105.2676)  # (10.16 + 0.1)^2254        self.assertAlmostEqual(255            sut._replay_memory.update_priority.call_args[0][0][4],256            129.0496,257            places=6)  # (11.26 + 0.1) ^ 2258    @patch('cntk.contrib.deeprl.agent.qlearning.ReplayMemory')259    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')260    def test_update_q_double_dqn(self,261                                 mock_parameters,262                                 mock_replay_memory):263        self._setup_parameters(mock_parameters.return_value)264        mock_parameters.return_value.double_q_learning = True265        self._setup_replay_memory(mock_replay_memory.return_value)266        action_space = spaces.Discrete(2)267        observation_space = spaces.Box(0, 1, (1,))268        sut = QLearning('', observation_space, action_space)269        sut._q.eval = \270            MagicMock(return_value=np.array([[[0.2, 0.1]]], np.float32))271        sut._target_q.eval = \272            MagicMock(return_value=np.array([[[0.3, 0.4]]], np.float32))273        sut._trainer = MagicMock()274        sut._update_q_periodically()275        # 10 (reward) + 0.9 (gamma) x 0.3 -> update action 0276        np.testing.assert_array_equal(277            sut._trainer.train_minibatch.call_args[0][0][sut._output_variables],278            [np.array([10.27, 0.1], np.float32)])279    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')280    def test_populate_replay_memory(self, mock_parameters):281        self._setup_parameters(mock_parameters.return_value)282        mock_parameters.return_value.preprocessing = \283            'cntk.contrib.deeprl.agent.shared.preprocessing.SlidingWindow'284        mock_parameters.return_value.preprocessing_args = '(2, )'285        action_space = spaces.Discrete(2)286        observation_space = spaces.Box(0, 1, (1,))287        sut = QLearning('', observation_space, action_space)288        sut._compute_priority = Mock(side_effect=[1, 2, 3])289        sut._choose_action = Mock(290            side_effect=[(0, ''), (0, ''), (1, ''), (1, '')])291        sut._replay_memory = MagicMock()292        sut._update_q_periodically = MagicMock()293        sut.start(np.array([0.1], np.float32))294        sut.step(0.1, np.array([0.2], np.float32))295        sut.step(0.2, np.array([0.3], np.float32))296        sut.end(0.3, np.array([0.4], np.float32))297        self.assertEqual(sut._replay_memory.store.call_count, 3)298        call_args = sut._replay_memory.store.call_args_list[0]299        np.testing.assert_array_equal(300            call_args[0][0],301            np.array([[0], [0.1]], np.float32))302        self.assertEqual(call_args[0][1], 0)303        self.assertEqual(call_args[0][2], 0.1)304        np.testing.assert_array_equal(305            call_args[0][3],306            np.array([[0.1], [0.2]], np.float32))307        self.assertEqual(call_args[0][4], 1)308        call_args = sut._replay_memory.store.call_args_list[2]309        np.testing.assert_array_equal(310            call_args[0][0],311            np.array([[0.2], [0.3]], np.float32))312        self.assertEqual(call_args[0][1], 1)313        self.assertEqual(call_args[0][2], 0.3)314        self.assertIsNone(call_args[0][3])315        self.assertEqual(call_args[0][4], 3)316    @patch('cntk.contrib.deeprl.agent.qlearning.QLearningParameters')317    def test_replay_start_size(self, mock_parameters):318        self._setup_parameters(mock_parameters.return_value)319        # Set exploration rate to 0320        mock_parameters.return_value.initial_epsilon = 0321        mock_parameters.return_value.epsilon_decay_step_count = 100322        mock_parameters.return_value.epsilon_minimum = 0323        mock_parameters.return_value.replay_start_size = 3324        action_space = spaces.Discrete(2)325        observation_space = spaces.Box(0, 1, (1,))326        sut = QLearning('', observation_space, action_space)327        sut._trainer = MagicMock()328        sut._replay_memory = MagicMock()329        _, debug = sut.start(np.array([0.1], np.float32))330        self.assertEqual(sut.step_count, 0)331        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)332        self.assertEqual(debug['action_behavior'], 'RANDOM')333        _, debug = sut.step(0.1, np.array([0.2], np.float32))334        self.assertEqual(sut.step_count, 1)335        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)336        self.assertEqual(debug['action_behavior'], 'RANDOM')337        sut.end(0.2, np.array([0.3], np.float32))338        self.assertEqual(sut.step_count, 2)339        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)340        _, debug = sut.start(np.array([0.4], np.float32))341        self.assertEqual(sut.step_count, 2)342        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)343        self.assertEqual(debug['action_behavior'], 'RANDOM')344        a, debug = sut.step(0.3, np.array([0.5], np.float32))345        self.assertEqual(sut.step_count, 3)346        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)347        self.assertEqual(debug['action_behavior'], 'GREEDY')348        a, debug = sut.start(np.array([0.6], np.float32))349        self.assertEqual(sut.step_count, 3)350        self.assertEqual(sut._trainer.train_minibatch.call_count, 0)351        self.assertEqual(debug['action_behavior'], 'GREEDY')352        a, debug = sut.step(0.4, np.array([0.7], np.float32))353        self.assertEqual(sut.step_count, 4)354        self.assertEqual(sut._trainer.train_minibatch.call_count, 1)355        self.assertEqual(debug['action_behavior'], 'GREEDY')356    def _setup_parameters(self, parameters):357        parameters.q_representation = 'dqn'358        parameters.hidden_layers = '[2]'359        parameters.initial_epsilon = 0.1360        parameters.epsilon_decay_step_count = 9361        parameters.epsilon_minimum = 0.01362        parameters.initial_eta = 0.1363        parameters.eta_decay_step_count = 9364        parameters.eta_minimum = 0.01365        parameters.momentum = 0.95366        parameters.gradient_clipping_threshold = 10367        parameters.q_update_frequency = 2368        parameters.gamma = 0.9369        parameters.double_q_learning = False370        parameters.replay_start_size = 0...models.py
Source:models.py  
...7from .scene import SceneModel8# If we are using autograd, then we need to use a special version of numpy.9from .config import numpy as np10class PointSource(SubsampledModelComponent):11    def _setup_parameters(self):12        self._add_parameter('amplitude', 1., (None, None), 'AMP',13                            'Point source amplitude', coefficient=True)14        self._add_parameter('center_x', 0., (None, None), 'XC',15                            'Point source center position X')16        self._add_parameter('center_y', 0., (None, None), 'YC',17                            'Point source center position Y')18    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, amplitude,19                          center_x, center_y, **kwargs):20        # A delta function is a complex exponential in Fourier space.21        point_source_fourier = amplitude * np.exp(22            - 1j * (center_x * kx + center_y * ky)23        )24        return point_source_fourier25class GaussianPointSource(SubsampledModelComponent):26    """A point source convolved with a Gaussian.27    This can be evaluated entirely in real space, so it is useful to use this28    for simple Gaussian fits over a model where a point source is explicitly29    convolved with a Gaussian PSF.30    """31    def _setup_parameters(self):32        self._add_parameter('amplitude', None, (None, None), 'AMP',33                            'Point source amplitude', coefficient=True)34        self._add_parameter('center_x', None, (None, None), 'XC',35                            'Point source center position X')36        self._add_parameter('center_y', None, (None, None), 'YC',37                            'Point source center position Y')38        self._add_parameter('sigma_x', 1., (0.1, 20.), 'SIGX',39                            'Gaussian width in X direction')40        self._add_parameter('sigma_y', 1., (0.1, 20.), 'SIGY',41                            'Gaussian width in Y direction')42        self._add_parameter('rho', 0., (-1., 1.), 'RHO',43                            'Gaussian correlation')44    def _evaluate(self, x, y, subsampling, grid_info, amplitude, center_x,45                  center_y, sigma_x, sigma_y, rho, **kwargs):46        gaussian = np.exp(-0.5 / (1 - rho**2) * (47            (x - center_x)**2 / sigma_x**2 +48            (y - center_y)**2 / sigma_y**2 +49            -2. * x * y * rho / sigma_x / sigma_y50        ))51        # Normalize52        gaussian /= 2 * np.pi * sigma_x * sigma_y * np.sqrt(1 - rho**2)53        gaussian /= subsampling**254        gaussian *= amplitude55        return gaussian56class SimpleGaussianPointSource(GaussianPointSource):57    """A Gaussian point source with no ellipticity.58    See GaussianPointSource for details. We fix rho to 0, and force sigma_x to59    be equal to sigma_y, removing 2 parameters from the fit.60    """61    def _setup_parameters(self):62        super(SimpleGaussianPointSource, self)._setup_parameters()63        # sigma_y and sigma_x are now just one sigma parameter.64        self._add_parameter('sigma', 1., (0.1, 20.), 'SIG', 'Gaussian width')65        self._modify_parameter('sigma_x', derived=True)66        self._modify_parameter('sigma_y', derived=True)67        self.fix(rho=0)68    def _calculate_derived_parameters(self, parameters):69        p = parameters70        p['sigma_x'] = p['sigma']71        p['sigma_y'] = p['sigma']72        # Update parameters from the superclass73        parent_parameters = super(SimpleGaussianPointSource, self).\74            _calculate_derived_parameters(p)75        return parent_parameters76class Background(PixelModelComponent):77    def _setup_parameters(self):78        self._add_parameter('background', None, (None, None), 'BKG',79                            'Background', coefficient=True)80    def _evaluate(self, x, y, grid_info, background, **kwargs):81        return np.ones(x.shape) * background82class PolynomialBackground(PixelModelComponent):83    def __init__(self, background_degree=0, normalization_scale=10., *args,84                 **kwargs):85        self.background_degree = background_degree86        self.normalization_scale = normalization_scale87        super(PolynomialBackground, self).__init__(*args, **kwargs)88    def _setup_parameters(self):89        """Setup the polynomial background parameters.90        This is a 2-dimensional polynomial background. We label the flat91        background level parameters as background, and the higher order terms92        as background_x_y where x is the degree in the x-direction and y is the93        degree in the y direction. eg: background_1_2 is degree 1 in x and94        degree 2 in y.95        """96        self._add_parameter('background', None, (None, None), 'BKG',97                            'Background', coefficient=True)98        for x_degree in range(self.background_degree + 1):99            for y_degree in range(self.background_degree + 1 - x_degree):100                if x_degree == 0 and y_degree == 0:101                    # Already added the constant background.102                    continue103                self._add_parameter(104                    'background_%d_%d' % (x_degree, y_degree),105                    0.,106                    (None, None),107                    'BKG%d%d' % (x_degree, y_degree),108                    'Polynomial background, x-degree=%d, y-degree=%d' %109                    (x_degree, y_degree),110                    coefficient=True111                )112    def _evaluate(self, x, y, grid_info, background, **parameters):113        components = []114        # Zeroth order background115        components.append(background * np.ones(x.shape))116        # Normalize so that things vary on a reasonable scale117        norm_x = x / self.normalization_scale118        norm_y = y / self.normalization_scale119        # Polynomial background components120        for x_degree in range(self.background_degree + 1):121            for y_degree in range(self.background_degree + 1 - x_degree):122                if x_degree == 0 and y_degree == 0:123                    # Already added the constant background.124                    continue125                name = 'background_%d_%d' % (x_degree, y_degree)126                coefficient = parameters[name]127                component = (128                    coefficient * (norm_x**x_degree) * (norm_y**y_degree)129                )130                components.append(component)131        return components132class GaussianPsfElement(PsfElement):133    def _setup_parameters(self):134        self._add_parameter('sigma_x', 1., (0.01, 20.), 'SIGX',135                            'Gaussian width in X direction')136        self._add_parameter('sigma_y', 1., (0.01, 20.), 'SIGY',137                            'Gaussian width in Y direction')138        self._add_parameter('rho', 0., (-1., 1.), 'RHO',139                            'Gaussian correlation')140    def _evaluate(self, x, y, subsampling, grid_info, sigma_x, sigma_y, rho,141                  **kwargs):142        gaussian = np.exp(-0.5 / (1 - rho**2) * (143            x**2 / sigma_x**2 +144            y**2 / sigma_y**2 +145            -2. * x * y * rho / sigma_x / sigma_y146        ))147        # Normalize148        gaussian /= 2 * np.pi * sigma_x * sigma_y * np.sqrt(1 - rho**2)149        gaussian /= subsampling**2150        return gaussian151    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, sigma_x,152                          sigma_y, rho, **kwargs):153        gaussian = np.exp(-0.5 * (154            kx**2 * sigma_x**2 +155            ky**2 * sigma_y**2 +156            2. * kx * ky * rho * sigma_x * sigma_y157        ))158        return gaussian159class GaussianMoffatPsfElement(PsfElement):160    def _setup_parameters(self):161        self._add_parameter('alpha', 2.5, (0.1, 15.), 'ALPHA', 'Moffat width')162        self._add_parameter('sigma', 1., (0.5, 5.), 'SIGMA', 'Gaussian width')163        self._add_parameter('beta', 2., (1.5, 50.), 'BETA', 'Moffat power')164        self._add_parameter('eta', 1., (0., None), 'ETA', 'Gaussian ratio')165        self._add_parameter('ell', 1., (0.2, 5.), 'E0', 'Ellipticity')166        self._add_parameter('xy', 0., (-0.6, 0.6), 'XY', 'XY coefficient')167    def _evaluate(self, x, y, subsampling, grid_info, alpha, sigma, beta, eta,168                  ell, xy, **kwargs):169        # Issue: with the pipeline parametrization, the radius can sometimes be170        # negative which is obviously not physical. Restrict the xy parameter171        # so that when we evaluate it we get a radius very close to but not172        # exactly 0 when that happens.173        max_ratio = 0.99999174        max_xy = max_ratio * np.sqrt(ell)175        xy = np.clip(xy, -max_xy, max_xy)176        r2 = x**2 + ell * y**2 + 2 * xy * x * y177        gaussian = np.exp(-0.5 * r2 / sigma**2)178        moffat = (1 + r2 / alpha**2)**(-beta)179        model = moffat + eta * gaussian180        model /= subsampling**2181        model /= (np.pi / np.sqrt(ell - xy**2) * (2 * eta * sigma**2 + alpha**2182                                                  / (beta - 1)))183        return model184class ExponentialPowerPsfElement(PsfElement):185    """A Psf model element that has a profile in Fourier space of186    exp(-(w * width)**power).187    When power is 5/3, this is a Kolmogorov PSF.188    """189    def _setup_parameters(self):190        self._add_parameter('power', 1.6, (0., 2.), 'POW', 'power')191        self._add_parameter('width', 0.5, (0.001, 30.), 'WID', 'width')192    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, power, width,193                          **kwargs):194        k = np.sqrt(kx*kx + ky*ky)195        fourier_profile = np.exp(-width**power * k**power)196        return fourier_profile197class DeltaExponentialPsfElement(PsfElement):198    """A Psf model element that is the sum of a delta function and a199    Fourier exponential profile .200    """201    def _setup_parameters(self):202        self._add_parameter('delta_fraction', 0.5, (0., 1.), 'DELT',203                            'delta function fraction')204        self._add_parameter('power', 1.6, (0., 2.), 'POW', 'power')205        self._add_parameter('width', 0.5, (0.001, 30.), 'WID', 'width')206    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, delta_fraction,207                          power, width, **kwargs):208        k = np.sqrt(kx*kx + ky*ky)209        fourier_profile = (210            delta_fraction +211            (1 - delta_fraction) * np.exp(-width**power * k**power)212        )213        return fourier_profile214class FourierMoffatPsfElement(PsfElement):215    """A Psf model element that has a Fourier profile of a Moffat distribution.216    This is not theoretically motivated in any way.217    """218    def _setup_parameters(self):219        self._add_parameter('alpha', 1., (0.0001, 100.), 'ALPHA', 'alpha')220        self._add_parameter('beta', 2., (0., 5.), 'BETA', 'beta')221    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, alpha, beta,222                          **kwargs):223        k2 = kx*kx + ky*ky224        fourier_profile = (1 + k2 / alpha**2)**(-beta)225        return fourier_profile226class KolmogorovPsfElement(ExponentialPowerPsfElement):227    """A Kolmogorov PSF.228    This is just an ExponentialPowerPsfElement with the power set to 5/3229    """230    def _setup_parameters(self):231        super(KolmogorovPsfElement, self)._setup_parameters()232        self.fix(power=5./3.)233class VonKarmanPsfElement(PsfElement):234    """VonKarman PSF.235    In Fourier space, this has the form:236        r0^(-5/3) * (f^2 + L0^-2)^(-11/6)237    where r0 is the Fried parameter and L0 is the outer scale.238    """239    def _setup_parameters(self):240        self._add_parameter('r0', 1., (0., None), 'R0',241                            'von Karman Fried parameter')242        self._add_parameter('L0', 20., (1., None), 'L0',243                            'von Karman outer scale')244    def _evaluate_fourier(self, kx, ky, subsampling, grid_info, r0, L0,245                          **kwargs):246        k = np.sqrt(kx*kx + ky*ky)247        # fourier_profile = np.exp(-r0**(5/3.) * (k**2 + L0**-2)**(11/6.))248        # fourier_profile = np.exp(249            # -(L0 / r0)**(5/3.) * (250                # + 1.87439 * (k / L0)**(5/3.)251                # - 1.50845 * (k / L0)**2.252            # )253        # )254        from scipy.special import kv, gamma255        fourier_profile = np.exp(256            -(L0 / r0)**(5/3.) * (257                gamma(5/6.) / 2**(1/6.) -258                (k / L0)**(5/6.) * kv(5/6., k / L0)259            )260        )261        # Set the (0, 0) bin to 1. The profile has that as the limit, but the262        # above calculation will give nan.263        fourier_profile[0, 0] = 1.264        return fourier_profile265class ChromaticExponentialPowerPsfElement(ExponentialPowerPsfElement):266    """A chromatic ExponentialPowerPsfElement to represent seeing.267    The width of the PSF takes the form:268        width = ref_width * (wave / ref_wave) ** (ref_power)269    So the full PSF profile in Fourier space is:270        width = exp(-(w * ref_width * (wave / ref_wave) ** (ref_power))**power)271    """272    def _setup_parameters(self):273        super(ChromaticExponentialPowerPsfElement, self)._setup_parameters()274        # Width is now a derived parameter275        self._modify_parameter('width', derived=True)276        self._add_parameter('wavelength', None, (None, None), 'WAVE',277                            'wavelength [A]', fixed=True, apply_prefix=False)278        self._add_parameter('ref_width', 1., (0.01, 30.), 'RWID',279                            'width at reference wavelength')280        self._add_parameter('ref_power', -0.3, (-2., 2.), 'RPOW',281                            'powerlaw power')282    def _calculate_derived_parameters(self, parameters):283        """Calculate the seeing width parameter using a power-law in wavelength284        """285        p = parameters286        # Ensure that all required variables have been set properly.287        if p['wavelength'] is None:...modules.py
Source:modules.py  
...19        mean: Tuple[int, int, int] = (0.485, 0.456, 0.406),20        std: Tuple[int, int, int] = (0.229, 0.224, 0.225),21    ) -> None:22        super().__init__()23        self.mean = self._setup_parameters(mean)24        self.std = self._setup_parameters(std)25    @staticmethod26    def _setup_parameters(param: float) -> Parameter:27        """expand parameter dimensions for future broadcasting, return28        as `Parameter` for easy movement to/from GPU.29        """30        return Parameter(31            torch.as_tensor(param)[None, :, None, None], requires_grad=False32        )33    def forward(self, x: Tensor) -> Tensor:34        """input[channel] = (input[channel] - mean[channel]) / std[channel]"""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
