Best Python code snippet using sure_python
nn_transition.py
Source:nn_transition.py  
1#!/usr/bin/env python2import rospy3from std_msgs.msg import Float64MultiArray, Float32MultiArray, Int164from std_srvs.srv import SetBool, Empty, EmptyResponse5import math6import numpy as np7import matplotlib.pyplot as plt8from predict_nn import predict_nn9from svm_class import svm_failure10import sys11sys.path.insert(0, '/home/juntao/catkin_ws/src/beliefspaceplanning/gpup_gp_node/src/')12from mean_shift import mean_shift13from gpup_gp_node.srv import batch_transition, batch_transition_repeat, one_transition, setk14# np.random.seed(10)15simORreal = 'sim'16discreteORcont = 'discrete'17probability_threshold = 0.6518class Spin_nn(predict_nn, mean_shift, svm_failure):19    OBS = True20    def __init__(self):21        22        predict_nn.__init__(self)23        svm_failure.__init__(self, discrete = (True if discreteORcont=='discrete' else False))24        mean_shift.__init__(self)25        rospy.Service('/nn/transition', batch_transition, self.GetTransition)26        rospy.Service('/nn/transitionOneParticle', one_transition, self.GetTransitionOneParticle)27        rospy.Service('/nn/transitionRepeat', batch_transition_repeat, self.GetTransitionRepeat)28        rospy.Service('/nn/batchSVMcheck', batch_transition, self.batch_svm_check_service)29        rospy.init_node('nn_transition', anonymous=True)30        print('[nn_transition] Ready.')  31        self.time_svm = 0.32        self.num_checks_svm = 0     33        self.time_nn = 0.34        self.num_checks_nn = 035        self.time_bnn = 0.36        self.num_checks_bnn = 0        37        rospy.spin()38    def batch_svm_check(self, S, a):39        failed_inx = []40        for i in range(S.shape[0]):41            st = rospy.get_time()42            p = self.probability(S[i,:], a) # Probability of failure43            self.time_svm += rospy.get_time() - st44            self.num_checks_svm += 145            prob_fail = np.random.uniform(0,1)46            if prob_fail <= p:47                failed_inx.append(i)48        return failed_inx49    def batch_svm_check_service(self, req):50        S = np.array(req.states).reshape(-1, self.state_dim)51        a = np.array(req.action)52        failed_inx = []53        for i in range(S.shape[0]):54            p = self.probability(S[i,:], a) # Probability of failure55            prob_fail = np.random.uniform(0,1)56            if prob_fail <= p:57                failed_inx.append(i)58        node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])59        return {'node_probability': node_probability}60    # Predicts the next step by calling the GP class61    def GetTransition(self, req):62        S = np.array(req.states).reshape(-1, self.state_dim)63        a = np.array(req.action)64        if (len(S) == 1):65            st = rospy.get_time()66            p = self.probability(S[0,:], a)67            self.time_svm += rospy.get_time() - st68            self.num_checks_svm += 169            print("------")70            print(S[0,:], a)71            node_probability = 1.0 - p72            sa = np.concatenate((S[0,:],a), axis=0)73            st = rospy.get_time()74            s_next = self.predict(sa)75            self.time_nn += rospy.get_time() - st76            self.num_checks_nn += 177            print(s_next)78            79            collision_probability = 1.0 if (self.OBS and self.obstacle_check(s_next)) else 0.080            81            return {'next_states': s_next, 'mean_shift': s_next, 'node_probability': node_probability, 'collision_probability': collision_probability}82        else:       83            # Check which particles failed84            failed_inx = self.batch_svm_check(S, a)85            try:86                node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])87            except:88                S_next = []89                mean = [0,0]90                return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}91                92            # Remove failed particles by duplicating good ones93            bad_action = np.array([0.,0.])94            if len(failed_inx):95                good_inx = np.delete( np.array(range(S.shape[0])), failed_inx )96                if len(good_inx) == 0: # All particles failed97                    S_next = []98                    mean = [0,0]99                    return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}100                # Find main direction of fail101                S_failed_mean = np.mean(S[failed_inx, :], axis=0)102                S_mean = np.mean(S, axis=0)103                ang = np.rad2deg(np.arctan2(S_failed_mean[1]-S_mean[1], S_failed_mean[0]-S_mean[0]))104                if ang <= 45. and ang >= -45.:105                    bad_action = np.array([1.,-1.])106                elif ang >= 135. or ang <= -135.:107                    bad_action = np.array([-1.,1.])108                elif ang > 45. and ang < 135.:109                    bad_action = np.array([1.,1.])110                elif ang < -45. and ang > -135.:111                    bad_action = np.array([-1.,-1.])112                dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]113                S[failed_inx, :] = S[dup_inx,:]114            # Propagate115            stb = rospy.get_time()116            SA = np.concatenate((S, np.tile(a, (S.shape[0],1))), axis=1)117            S_next = []118            for sa in SA:119                st = rospy.get_time()120                sa_next = self.predict(sa)121                self.time_nn += rospy.get_time() - st122                self.num_checks_nn += 1123                S_next.append(sa_next)124            S_next = np.array(S_next)125            self.time_bnn += rospy.get_time() - stb126            self.num_checks_bnn += 1127            if self.OBS:128                # print "Checking obstacles..."129                failed_inx = []130                good_inx = []131                for i in range(S_next.shape[0]):132                    if self.obstacle_check(S_next[i,:]):133                        failed_inx.append(i)134                collision_probability = float(len(failed_inx))/float(S.shape[0])135                # node_probability = min(node_probability, node_probability2)136                if len(failed_inx):137                    good_inx = np.delete( np.array(range(S_next.shape[0])), failed_inx )138                    if len(good_inx) == 0: # All particles failed139                        S_next = []140                        mean = [0,0]141                        return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}142                    # Find main direction of fail143                    S_next_failed_mean = np.mean(S_next[failed_inx, :], axis=0)144                    S_next_mean = np.mean(S_next, axis=0)145                    ang = np.rad2deg(np.arctan2(S_next_failed_mean[1]-S_next_mean[1], S_next_failed_mean[0]-S_next_mean[0]))146                    if ang <= 45. and ang >= -45.:147                        bad_action = np.array([1.,-1.])148                    elif ang >= 135. or ang <= -135.:149                        bad_action = np.array([-1.,1.])150                    elif ang > 45. and ang < 135.:151                        bad_action = np.array([1.,1.])152                    elif ang < -45. and ang > -135.:153                        bad_action = np.array([-1.,-1.])154                    dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]155                    S_next[failed_inx, :] = S_next[dup_inx,:]156            else:157                collision_probability = 0.0158            # print('svm time: ' + str(self.time_svm/self.num_checks_svm) + ', prediction time: ' + str(self.time_nn/self.num_checks_nn) + ', batch prediction time: ' + str(self.time_bnn/self.num_checks_bnn))159            mean = np.mean(S_next, 0) #self.get_mean_shift(S_next)160            return {'next_states': S_next.reshape((-1,)), 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': bad_action, 'collision_probability': collision_probability}161    def obstacle_check(self, s):162        Obs = np.array([[-38, 117.1, 4.],163            [-33., 105., 4.],164            [-52.5, 105.2, 4.],165            [43., 111.5, 6.],166            [59., 80., 3.],167            [36.5, 94., 4.]168        ])169        f = 1.2 # inflate170        for obs in Obs:171            if np.linalg.norm(s[:2]-obs[:2]) <= f * obs[2]:172                return True173        return False174    # Predicts the next step by calling the GP class - repeats the same action 'n' times175    def GetTransitionRepeat(self, req):176        n = req.num_repeat177        TranReq = batch_transition()178        TranReq.states = req.states179        TranReq.action = req.action180        for _ in range(n):181            res = self.GetTransition(TranReq)182            TranReq.states = res['next_states']183            prob = res['node_probability']184            if prob < req.probability_threshold:185                break186        187        return {'next_states': res['next_states'], 'mean_shift': res['mean_shift'], 'node_probability': res['node_probability'], 'bad_action': res['bad_action'], 'collision_probability': res['collision_probability']}188    # Predicts the next step by calling the GP class189    def GetTransitionOneParticle(self, req):190        s = np.array(req.state)191        a = np.array(req.action)192        # Check which particles failed193        p = self.probability(s, a)194        node_probability = 1.0 - p195        # Propagate196        sa = np.concatenate((s, a), axis=0)197        s_next = self.predict(sa)   198        return {'next_state': s_next, 'node_probability': node_probability}199if __name__ == '__main__':200    try:201        NN = Spin_nn()202    except rospy.ROSInterruptException:...nn_run.py
Source:nn_run.py  
1#!/usr/bin/env python2import rospy3from gpup_gp_node.srv import batch_transition, one_transition4from nn_node.srv import critic_seq5#from gpup_gp_node_exp.srv import batch_transition, one_transition6import numpy as np7from svm_class import svm_failure8import pickle9from predict_nn import predict_nn10from sklearn.neighbors import KDTree11from sklearn.gaussian_process import GaussianProcessRegressor12from sklearn.gaussian_process.kernels import RBF13CRITIC = True14class Spin_predict(predict_nn, svm_failure):15    state_dim = 416    OBS = True17    def __init__(self):18        predict_nn.__init__(self)19        svm_failure.__init__(self, simORreal = 't42_cyl35', discrete = True)20        rospy.Service('/nn/transition', batch_transition, self.GetTransition)21        rospy.Service('/nn/transitionOneParticle', one_transition, self.GetTransitionOneParticle)22        if CRITIC:23            rospy.Service('/nn/critic_seq', critic_seq, self.GetCritic)24        rospy.init_node('predict', anonymous=True)25        if self.OBS:26            self.Obs = np.array([[-15, 115, 23]])27        if CRITIC:28            self.K = 329            with open('/home/pracsys/catkin_ws/src/t42_control/nn_node/gp_eval/data_P40.pkl', 'rb') as f: 30                self.O, self.E = pickle.load(f)31            if 0:32                self.kdt = KDTree(self.O, leaf_size=100, metric='euclidean')33            else:34                with open('/home/pracsys/catkin_ws/src/t42_control/nn_node/gp_eval/kdt_P40.pkl', 'rb') as f: 35                    self.kdt = pickle.load(f)36            self.kernel = RBF(length_scale=1.0, length_scale_bounds=(1e-1, 10.0))37            print('[nn_predict_node] Critic loaded.')38        print('[nn_predict_node] Ready to predict...')39        rospy.spin()40    def batch_svm_check(self, S, a):41        failed_inx = []42        for i in range(S.shape[0]):43            p = self.probability(S[i,:], a) # Probability of failure44            prob_fail = np.random.uniform(0,1)45            if prob_fail <= p:46                failed_inx.append(i)47        return failed_inx48    # Predicts the next step by calling the GP class49    def GetTransition(self, req):50        S = np.array(req.states).reshape(-1, self.state_dim)51        a = np.array(req.action)52        if (len(S) == 1):53            st = rospy.get_time()54            p = self.probability(S[0,:], a)55            node_probability = 1.0 - p56            sa = np.concatenate((S[0,:],a), axis=0)57            s_next = self.predict(sa)58            collision_probability = 1.0 if (self.OBS and self.obstacle_check(s_next)) else 0.059            return {'next_states': s_next, 'mean_shift': s_next, 'node_probability': node_probability, 'collision_probability': collision_probability}60        else:       61            # Check which particles failed62            failed_inx = self.batch_svm_check(S, a)63            try:64                node_probability = 1.0 - float(len(failed_inx))/float(S.shape[0])65            except:66                S_next = []67                mean = [0,0]68                return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}69            # Remove failed particles by duplicating good ones70            bad_action = np.array([0.,0.])71            if len(failed_inx):72                good_inx = np.delete( np.array(range(S.shape[0])), failed_inx )73                if len(good_inx) == 0: # All particles failed74                    S_next = []75                    mean = [0,0]76                    return {'next_states': S_next, 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': np.array([0.,0.]), 'collision_probability': 1.0}77                # Find main direction of fail78                S_failed_mean = np.mean(S[failed_inx, :], axis=0)79                S_mean = np.mean(S, axis=0)80                ang = np.rad2deg(np.arctan2(S_failed_mean[1]-S_mean[1], S_failed_mean[0]-S_mean[0]))81                if ang <= 45. and ang >= -45.:82                    bad_action = np.array([1.,-1.])83                elif ang >= 135. or ang <= -135.:84                    bad_action = np.array([-1.,1.])85                elif ang > 45. and ang < 135.:86                    bad_action = np.array([1.,1.])87                elif ang < -45. and ang > -135.:88                    bad_action = np.array([-1.,-1.])89                dup_inx = good_inx[np.random.choice(len(good_inx), size=len(failed_inx), replace=True)]90                S[failed_inx, :] = S[dup_inx,:]91            # Propagate92            SA = np.concatenate((S, np.tile(a, (S.shape[0],1))), axis=1)93            S_next = self.predict(SA)94            mean = np.mean(S_next, 0) #self.get_mean_shift(S_next)95            return {'next_states': S_next.reshape((-1,)), 'mean_shift': mean, 'node_probability': node_probability, 'bad_action': bad_action, 'collision_probability': collision_probability}96    def obstacle_check(self, s):97        f = 1.4 #2.0 # inflate98        for o in self.Obs:99            if np.linalg.norm(s[:2]-o[:2]) <= f * o[2]:100                return True101        return False102    def GetTransitionOneParticle(self, req):103        s = np.array(req.state)104        a = np.array(req.action)105        # Check which particles failed106        p = self.probability(s, a)107        node_probability = 1.0 - p108        # Propagate109        sa = np.concatenate((s, a), axis=0)110        s_next = self.predict(sa) 111        # print(self.time_nn / self.num_checks_nn) 112        return {'next_state': s_next, 'node_probability': node_probability}113    def GetCritic(self, req):114        s = np.array(req.state)115        a = np.array(req.future_action)116        n = req.steps117        Apr = np.array(req.seq)118        sa = np.concatenate((s, a, np.array([n]), Apr.reshape((-1))), axis=0)119        # sa = np.append(sa, n)120        idx = self.kdt.query(sa.reshape(1,-1), k = self.K, return_distance=False)121        O_nn = self.O[idx,:].reshape(self.K, -1)122        E_nn = self.E[idx].reshape(self.K, 1)123        gpr = GaussianProcessRegressor(kernel=self.kernel).fit(O_nn, E_nn)124        e, _ = gpr.predict(sa.reshape(1, -1), return_std=True)125    126        return {'err': e}127    128if __name__ == '__main__':129    130    try:131        SP = Spin_predict()132    except rospy.ROSInterruptException:...test_transactions.py
Source:test_transactions.py  
1import transfiles.transactions as transactions2from unittest.mock import Mock, create_autospec, MagicMock3def mock_action_not_failing_anywhere():4    action = create_autospec(transactions.Action)5    return action6def mock_action_failing_on_precommit():7    action = create_autospec(transactions.Action)8    action.pre_commit = Mock(side_effect=RuntimeError("fail!"))9    return action10def mock_action_failing_on_precommit_and_rollback():11    action = create_autospec(transactions.Action)12    action.pre_commit = Mock(side_effect=RuntimeError("fail precommit!"))13    action.rollback = Mock(side_effect=RuntimeError("fail rollback!"))14    return action15def mock_action_failing_on_commit():16    action = create_autospec(transactions.Action)17    action.commit = Mock(side_effect=RuntimeError("fail on commit!"))18    return action19class TestTransactions:20    def test_process_actions_atomically_happy_path(self):21        """tests all is good when all actions finish fine"""22        good_action1 = mock_action_not_failing_anywhere()23        good_action2 = mock_action_not_failing_anywhere()24        transactions.process_actions_atomically([good_action1, good_action2])25        good_action1.pre_commit.assert_called_once()26        good_action2.pre_commit.assert_called_once()27        good_action1.commit.assert_called_once()28        good_action2.commit.assert_called_once()29        good_action1.rollback.assert_not_called()30        good_action2.rollback.assert_not_called()31    def test_rollbacks_itself_if_precommit_fails(self):32        bad_action = mock_action_failing_on_precommit()33        transactions.process_actions_atomically((bad_action,))34        bad_action.pre_commit.assert_called_once()35        bad_action.commit.assert_not_called()36        bad_action.rollback.assert_called_once()37    def test_calls_rollback_in_reverse_order(self):38        manager = MagicMock()39        good_action1 = mock_action_not_failing_anywhere()40        bad_action = mock_action_failing_on_precommit()41        good_action2 = mock_action_not_failing_anywhere()42        good_action1_name = "good_action1"43        manager.attach_mock(good_action1, good_action1_name)44        bad_action_name = "bad_action"45        manager.attach_mock(bad_action, bad_action_name)46        transactions.process_actions_atomically(47            [good_action1, bad_action, good_action2])48        """it should fail on bad action, roll it back, 49        then roll back good_action1.50        good_action2 shouldn't be touched at all"""51        good_action1.pre_commit.assert_called_once()52        good_action1.commit.assert_not_called()53        good_action1.rollback.assert_called_once()54        bad_action.pre_commit.assert_called_once()55        bad_action.commit.assert_not_called()56        bad_action.rollback.assert_called_once()57        good_action2.pre_commit.assert_not_called()58        good_action2.commit.assert_not_called()59        good_action2.rollback.assert_not_called()60        """now test it calls rollbacks in correct sequence 61        (first bad_action, then good_action1)"""62        idx_good_action1_rollback = None63        idx_bad_action_rollback = None64        for idx, call in enumerate(manager.mock_calls):65            if call[0] == f"{good_action1_name}.rollback":66                idx_good_action1_rollback = idx67            elif call[0] == f"{bad_action_name}.rollback":68                idx_bad_action_rollback = idx69        assert idx_good_action1_rollback is not None70        assert idx_bad_action_rollback is not None71        assert idx_bad_action_rollback < idx_good_action1_rollback72    def test_failure_on_rollback_doesnt_prevent_other_rollbacks(self):73        good_action = mock_action_not_failing_anywhere()74        bad_action = mock_action_failing_on_precommit_and_rollback()75        transactions.process_actions_atomically([good_action, bad_action])76        good_action.pre_commit.assert_called_once()77        bad_action.pre_commit.assert_called_once()78        # will fail:79        bad_action.rollback.assert_called_once()80        # but it shouldn't preclude to rollback good action:81        good_action.rollback.assert_called_once()82        good_action.commit.assert_not_called()83        bad_action.commit.assert_not_called()84    def test_failure_on_commit_doesnt_prevent_other_commits(self):85        good_action = mock_action_not_failing_anywhere()86        bad_action = mock_action_failing_on_commit()87        transactions.process_actions_atomically([bad_action, good_action])88        bad_action.pre_commit.assert_called_once()89        good_action.pre_commit.assert_called_once()90        # will fail:91        bad_action.commit.assert_called_once()92        # but should not prevent calling commit on good action:93        good_action.commit.assert_called_once()94        """95        Q: shouldn't bad_action.rollback be called too?96        A: I think no, it would violate rule that transaction is atomic.97        In most cases target action will be done on pre_commit() already, 98        so at worst we'll have some temporary leftover for being able to revert....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
