Best Python code snippet using assertpy_python
pendulum_test.py
Source:pendulum_test.py  
1from __future__ import absolute_import, division, print_function2import base643from cmath import nan4import imageio5import matplotlib6import matplotlib.pyplot as plt7import numpy as np8import PIL.Image9import reverb10import zlib11import os12import cv2 as cv13import glob14import tensorflow as tf15import tensorflow_probability as tfp16tfd = tfp.distributions17from tensorflow import keras18from tensorflow.keras import layers, Model, models19from tensorflow.keras.layers import Dense20from tf_agents.environments import suite_gym21from tf_agents.environments import tf_py_environment22# from cartpole_noise import CartPoleEnvNoise23from cartpole_high import CartPoleEnvNoise24# gym_env = CartPoleEnvNoise(1.0)25gym_env = suite_gym.wrap_env(CartPoleEnvNoise(1.0, 2))26# env = suite_gym.wrap_env(gym_env)27# env_name = 'CartPole-v0'28# env = suite_gym.load(env_name)29eval_env = tf_py_environment.TFPyEnvironment(gym_env)30class CustomLossNLL(tf.losses.Loss):31    @tf.function32    def call(self, y_true, y_pred):33        mean, log_sigma = tf.split(y_pred, 2, axis=-1)34        y_target, temp =tf.split(y_true,2,axis=-1)35        sigma = tf.nn.softplus(log_sigma)36        dist = tfp.distributions.MultivariateNormalDiag(loc=mean, scale_diag=sigma)37        loss = -tf.reduce_mean(dist.log_prob(y_target))38        return loss39def run_models(env, dyn_model, obs_model, saved_policy):40    zero_vec = np.zeros((1,5))41    time_step = env.reset()42    obs=time_step.observation.numpy()43    state = np.concatenate((obs, np.array([0]).reshape(1,1)), axis=1)44    test_state=[]; test_dyn=[]; test_obs=[]45    # video_full = cv.VideoWriter('video_full_3.avi', 0, 30, (600,400))46    # video_sliced = cv.VideoWriter('video_sliced_3.avi', 0, 30, (300,75))47    policy_state = saved_policy.get_initial_state(batch_size=1)48    for j in range(200):49        if not time_step.is_last():50            # use ground truth state51            img_full=env.render(mode='rgb_array').numpy().reshape(400,600,3)52            cut = cv.pyrDown(img_full[167:317,:,:])53            gray = cv.cvtColor(cut, cv.COLOR_BGR2GRAY)54            img = np.abs((gray/255)-1)55            if j == 0:56                prev_img = img57            obs = np.concatenate((img.reshape(1,75,300,1), prev_img.reshape(1,75,300,1)), axis=0)58            obs = obs.reshape(1,2,75,300,1)59            obs_pred = obs_model(obs, training=False)60            state_pred = dyn_model(state, training=False)61            62            policy_step = saved_policy.action(time_step, policy_state)63            policy_state = policy_step.state64            action = policy_step.action.numpy()65            time_step = env.step(action)66            obs=time_step.observation.numpy()67            state = np.concatenate((obs, np.array([action]).reshape(1,1)), axis=1)68            prev_img = img69            test_state.append(state)70            test_dyn.append(state_pred)71            test_obs.append(obs_pred.numpy())72            # video_full.write(img_full)73            # video_sliced.write(gray)74        else:75            break76    # cv.destroyAllWindows()77    # video_full.release()78    # video_sliced.release()79    test_state = np.asarray(test_state).reshape(-1,5)80    test_dyn = np.asarray(test_dyn).reshape(-1,10)81    dynamics_mean, dynamics_log_sigma = tf.split(test_dyn, 2, axis=-1)82    dynamics_sigma = np.sqrt(tf.nn.softplus(dynamics_log_sigma))83    # obs_mean, obs_log_sigma = tf.split(test_obs, 2, axis=-1)84    # obs_mean=obs_mean.numpy().reshape(-1,5)85    # obs_sigma = np.sqrt(tf.nn.softplus(obs_log_sigma)).reshape(-1,5)86    plt.figure(10)87    for i in range(5):88        plt.subplot(5,1,i+1)89        plt.plot(test_state[:,i], color='k')90        plt.plot(dynamics_mean[:,i], color='b')91        plt.fill_between(np.linspace(0,dynamics_mean.shape[0],dynamics_mean.shape[0]), dynamics_mean[:,i]+dynamics_sigma[:,i], dynamics_mean[:,i]-dynamics_sigma[:,i], facecolor='b', alpha=.2)92        plt.plot(np.asarray(test_obs)[:,:,i], color='g')93        # plt.fill_between(np.linspace(0,obs_mean.shape[0],obs_mean.shape[0]), obs_mean[:,i]+obs_sigma[:,i], obs_mean[:,i]-obs_sigma[:,i], facecolor='g', alpha=.2)94    plt.show()95def run_filter(env, dyn_model, obs_model, saved_policy):96    savename = 'Pendulum_specklenoise'97    video_full = cv.VideoWriter(savename+'.avi', 0, 30, (600,400), 0)98    time_step = env.reset()99    state = np.concatenate((time_step.observation.numpy(), np.array([0]).reshape(1,1), np.zeros([1,5])), axis=1)100    test_state=[]; test_dyn=[]; test_obs=[]; test_filter=[]101    cycle_state = state102    policy_state = saved_policy.get_initial_state(batch_size=1)103    for j in range(500):104        if not time_step.is_last():105            img_full=env.render(mode='rgb_array').numpy().reshape(400,600,3)106            # gray = cv.cvtColor(img_full, cv.COLOR_BGR2GRAY).reshape(400,600,1)107            gray = img_full108            # speckle noise109            # gauss = np.random.normal(0,.5,gray.size)110            # gauss = gauss.reshape(gray.shape[0],gray.shape[1], gray.shape[2]).astype('uint8')111            # noise = gray + gray * gauss112            # Salt noise113            # gauss = np.random.normal(0,1,gray.size)114            # gauss = gauss.reshape(gray.shape[0],gray.shape[1], gray.shape[2]).astype('uint8')115            # noise = cv.add(gray,gauss).reshape(400,600,1)116            # Gaussian Blur117            # noise = cv.GaussianBlur(gray.reshape(400,600), (7, 7), 0).reshape(400,600,1)118            # No noise119            noise = gray120            video_full.write(noise)121            # cv.imshow("full_img", noise)122            # cv.waitKey(1)123            cut = cv.pyrDown(noise[167:317,:,:])124            img = np.abs((cut/255)-1)125            # cv.imshow("full_img", cut)126            # cv.waitKey(1)127            if j == 0:128                prev_img = img129            obs = np.concatenate((img.reshape(1,75,300,-1), prev_img.reshape(1,75,300,-1)), axis=0)130            obs = obs.reshape(1,2,75,300,-1)131            obs_pred = obs_model(obs, training=False)132            obs_mean, obs_logvar = np.split(obs_pred, 2, axis=-1)133            obs_var = tf.nn.softplus(obs_logvar).numpy()134            state_pred = dyn_model(cycle_state[-1,0:5].reshape(1,-1), training=False)135            # state_pred = dyn_model(state[0,0:5].reshape(1,-1), training=False)136            dyn_mean, dyn_logvar = np.split(state_pred, 2, axis=-1)137            dyn_var = tf.nn.softplus(dyn_logvar).numpy()138            proc_var = (dyn_var + cycle_state[-1,5:10])*2139            filtered_state = (obs_var/(proc_var+obs_var))*dyn_mean + (proc_var/(proc_var+obs_var))*obs_mean140            filtered_var = proc_var*(proc_var+obs_var)*obs_var141            new_state = np.concatenate((filtered_state, filtered_var), axis=1)142            cycle_state = np.concatenate((cycle_state, new_state), axis=0)143            policy_step = saved_policy.action(time_step, policy_state)144            policy_state = policy_step.state145            action = policy_step.action.numpy()146            print(action)147            time_step = env.step(action)148            state = np.concatenate((time_step.observation.numpy(), np.array([action]).reshape(1,1), np.zeros([1,5])), axis=1)149            prev_img = img150            test_state.append(state)151            test_dyn.append(state_pred)152            test_obs.append(obs_pred)153            test_filter.append(new_state)154        else:155            break156    cv.destroyAllWindows()157    video_full.release()158    test_state = np.asarray(test_state).reshape(-1,10)159    test_dyn = np.asarray(test_dyn).reshape(-1,10)160    test_obs = np.asarray(test_obs).reshape(-1,10)161    test_filter = np.asarray(test_filter).reshape(-1,10)162    plt.figure(1, figsize=(10,10))163    for i in range(5):164        plt.subplot(5,1,i+1)165        plt.plot(test_state[:,i], c='#173f5f')166        # plt.plot(test_dyn[:,i], c='b')167        # plt.fill_between(np.linspace(0,test_dyn.shape[0]-1,test_dyn.shape[0]), test_dyn[:,i]+tf.nn.softplus(test_dyn[:,i+5]), test_dyn[:,i]-tf.nn.softplus(test_dyn[:,i+5]), facecolor='b', alpha=.2)168        plt.plot(test_obs[:,i], c=[.4705, .7921, .6470])169        plt.fill_between(np.linspace(0,test_obs.shape[0]-1,test_obs.shape[0]), test_obs[:,i]+tf.nn.softplus(test_obs[:,i+5]), test_obs[:,i]-tf.nn.softplus(test_obs[:,i+5]), facecolor='g', alpha=.2)170        plt.plot(test_filter[:,i], c='b')171        plt.fill_between(np.linspace(0,test_filter.shape[0]-1,test_filter.shape[0]), test_filter[:,i]+test_filter[:,i+5], test_filter[:,i]-test_filter[:,i+5], facecolor='b', alpha=.2)172    173    plt.subplot(5,1,1)174    plt.ylabel('Cart Position')175    plt.subplot(5,1,2)176    plt.ylabel('Cart Velocity')177    plt.subplot(5,1,3)178    plt.ylabel('Pole Position')179    plt.subplot(5,1,4)180    plt.ylabel('Pole Velocity')181    plt.subplot(5,1,5)182    i=4183    plt.plot(test_dyn[:,i], c='b')184    plt.fill_between(np.linspace(0,test_dyn.shape[0]-1,test_dyn.shape[0]), test_dyn[:,i]+tf.nn.softplus(test_dyn[:,i+5]), test_dyn[:,i]-tf.nn.softplus(test_dyn[:,i+5]), facecolor='b', alpha=.2)185    plt.ylabel('Control Force')186    # plt.ylim([-.25, 1.25])187    plt.xlabel('Time (t)')188    plt.savefig(savename+'.pdf', bbox_inches='tight')189    plt.show()190if __name__=='__main__':191    # control model192    ctrl_model = tf.compat.v2.saved_model.load('/home/geoffrey/Research/data/pendulum_high/models/ctrl0')193    # dyn_model = build_dynamics_model()194    dyn_model = models.load_model('/home/geoffrey/Research/data/pendulum_high/models/dyn_simple0')195    # obs_model = build_timedistributed_observation_model()196    obs_model = models.load_model('/home/geoffrey/Research/data/pendulum_high/models/obs0', custom_objects={'CustomLossNLL': CustomLossNLL()})197    run_models(eval_env, dyn_model, obs_model, ctrl_model)...sliding_window_max.py
Source:sliding_window_max.py  
1"""2You are given an array of integers `nums`, there is a sliding window of size `k`3which is moving from the very left of the array to the very right.4You can only see the `k` numbers in the window. Each time the sliding window moves5right by one position.6Return the max sliding window.7## Example 1:8- Input: `nums = [1,3,-1,-3,5,3,6,7]`, `k = 3`9- Output: `[3, 3, 5, 5, 6, 7]`10- Explanation:11Window position                Max12---------------               -----13[1  3  -1] -3  5  3  6  7       314 1 [3  -1  -3] 5  3  6  7       315 1  3 [-1  -3  5] 3  6  7       516 1  3  -1 [-3  5  3] 6  7       517 1  3  -1  -3 [5  3  6] 7       618 1  3  -1  -3  5 [3  6  7]      719## Example 2:20- Input: `nums = [1]`, `k = 1`21- Output: `[1]`22## Example 3:23- Input: `nums = [1, -1]`, `k = 1`24- Output: `[1, -1]`25## Example 4:26- Input: `nums = [9, 11]`, `k = 2`27- Output: `[11]`28## Example 5:29- Input: `nums = [4, -2]`, `k = 2`30- Output: `[4]`31## Constraints:32- `1 <= nums.length <= 105`33- `-104 <= nums[i] <= 104`34- `1 <= k <= nums.length`35"""36import unittest37from collections import deque38import util39class Solution(object):40    def max_sliding_window_dynamic(self, nums, k):41        """42        - time  : `O(N)`, since all we do is `3` passes along the array of length `N`.43        - space : `O(N)` to keep left and right arrays of length `N`, and output array of length `N - k + 1`44        """45        n = len(nums)46        if n * k == 0:47            return []48        if k == 1:49            return nums50        left, right = ([0 for _ in range(n)] for _ in range(2))51        left[0] = nums[0]52        right[n - 1] = nums[n - 1]53        for i in range(1, n):54            # from left to right55            if i % k == 0:56                # block start57                left[i] = nums[i]58            else:59                left[i] = max(left[i - 1], nums[i])60            # from right to left61            j = n - i - 162            if (j + 1) % k == 0:63                # block end64                right[j] = nums[j]65            else:66                right[j] = max(right[j + 1], nums[j])67        return [68            max(left[i + k - 1], right[i]) for i in range(n - k + 1)69        ]70    def max_sliding_window_deque(self, nums, k):71        """72        - time  : `O(N)`, since each element is processed exactly twice - it's index added and then removed from the deque.73        - space : `O(N)`, since `O(Nâk+1)` is used for an output array and `O(k)` for a deque.74        """75        # base cases76        n = len(nums)77        if n * k == 0:78            return []79        if k == 1:80            return nums81        def clean_deque(i):82            # remove indexes of elements not from sliding window83            if deq and deq[0] == i - k:84                deq.popleft()85            # remove from deq indexes of all elements86            # which are smaller than current element nums[i]87            while deq and nums[i] > nums[deq[-1]]:88                deq.pop()89        # init deque and output90        deq = deque()91        max_idx = 092        for i in range(k):93            clean_deque(i)94            deq.append(i)95            # compute max in nums[:k]96            if nums[i] > nums[max_idx]:97                max_idx = i98        output = [nums[max_idx]]99        # build output100        for i in range(k, n):101            clean_deque(i)102            deq.append(i)103            output.append(nums[deq[0]])104        return output105    def max_sliding_window_bad(self, nums, k):106        """107        - time : O(N ^ 2)108        - space: O(N)109        """110        if len(nums) < k:111            raise ValueError("k cannot be larger than len(nums)")112        return [113            max(nums[i:i+k]) for i in range(len(nums) - k + 1)114        ]115# class Test(unittest.TestCase):116#     def setUp(self) -> None:117#         super().setUp()118#         self.fn = Solution().max_sliding_window_dynamic119    # def test_001(self):120    #     self.assertEqual(121    #         [3, 3, 5, 5, 6, 7],122    #         self.fn([1, 3, -1, -3, 5, 3, 6, 7], 3)123    #     )124if __name__ == "__main__":125    test_deq = util.Tester(Solution().max_sliding_window_deque)126    test_many = util.TestMany(127        [128            Solution().max_sliding_window_bad,129            Solution().max_sliding_window_deque,130            Solution().max_sliding_window_dynamic131        ]132    )133    test_many.test(134        [3, 3, 5, 5, 6, 7],135        ([1, 3, -1, -3, 5, 3, 6, 7], 3)136    )137    # test_dyn = util.Tester(Solution().max_sliding_window_dynamic)138    # print(test_dyn)139    # test_dyn.test(140    #     [3, 3, 5, 5, 6, 7],141    #     ([1, 3, -1, -3, 5, 3, 6, 7], 3)142    # )143    # test_dyn.run()...play_simulation.py
Source:play_simulation.py  
1#! /usr/bin/env python2import numpy as np3import rospy, tf2_ros, tf, geometry_msgs.msg, sensor_msgs.msg4import pdb5import pat3.utils as p3_u6import pat3.algebra as p3_al7import pat3.frames as p3_fr8import pat3.vehicles.fixed_wing.legacy_6dof as p1_fw_dyn9# should be in ros_pat10import pat3.ros_utils as p3_rpu11import pat3.test.fixed_wing.test_01_dynamics as test_dyn12import pat3.test.fixed_wing.test_02_att_ctl  as test_pil13class Agent(p3_rpu.PeriodicNode):14    mode_cst, mode_sim = range(2)15    def __init__(self, mode=mode_sim, name='ros_pat_play_simulation'):16        p3_rpu.PeriodicNode.__init__(self, name)17        self.mode = mode18        pos_ned = [0, 0, -1]19        self.T_w2b = np.eye(4)20        self.T_b2w = np.linalg.inv(self.T_w2b)21        self.T_a2b = np.eye(4)22        self.T_b2a = self.T_a2b#np.linalg.inv(self.T_a2b) # FIXME23        save=False24        savefile_name = '/tmp/pat_traj.npz'25        #savefile_name = '/tmp/pat_glider_circle.npz'26        if mode == self.mode_sim:27            if save:28                #self.time, self.X = test_dyn.sim_step_thr()29                #self.time, self.X = test_dyn.sim_step_ail()30                #self.time, self.X = test_dyn.sim_step_ele()31                self.time, self.X, self.U = test_dyn.sim_step_rud()32                #self.time, self.X, self.U = test_pil.test_step_theta(dm)33                np.savez(savefile_name, time=self.time, X=self.X)34                print('saved {}'.format(savefile_name))35            else:36                print('loading {}'.format(savefile_name))37                _data =  np.load(savefile_name)38                self.time, self.X =[_data[k] for k in ['time', 'X']] 39            self.sim_dt = self.time[1] - self.time[0]40            self.sim_dur = self.time[-1] - self.time[0]41            42        self.tf_pub = p3_rpu.TransformPublisher()43        self.marker_pub = p3_rpu.PoseArrayPublisher(dae='ds_glider_full.dae')44        self.odom_pub = p3_rpu.OdomPublisher()45        self.joint_state_pub = p3_rpu.JointStatePublisher(what=name)46        47    def periodic(self):48        now = rospy.Time.now()49        self.tf_pub.send_w_enu_to_ned_transform(now)50        if self.mode == self.mode_sim:51            t_sim = np.fmod(now.to_sec(), self.sim_dur)52            idx_t = int(t_sim / self.sim_dt)53            X_eul = self.X[idx_t, p3_fr.SixDOFAeroEuler.sv_slice_eul]54            #X_eul[0] = np.sin(t_sim)55            #print t_sim, idx_t, self.X[idx_t, p3_fr.SixDOFAeroEuler.sv_slice_eul]56            p3_u.set_rot(self.T_w2b, p3_al.rmat_of_euler(X_eul))57            #_set_trans(self.T_w2b, self.X[idx_t, p3_fr.SixDOFAeroEuler.sv_slice_pos])58            self.T_b2w = np.linalg.inv(self.T_w2b)59            p3_u.set_trans(self.T_b2w, self.X[idx_t, p3_fr.SixDOFAeroEuler.sv_slice_pos])60            va, alpha, beta = self.X[idx_t, p3_fr.SixDOFAeroEuler.sv_slice_vaero]61            p3_u.set_rot(self.T_a2b, p3_fr.R_aero_to_body(alpha, beta))62            self.T_b2a = self.T_a2b#np.linalg.inv(self.T_a2b) # FIXME63            64        #joint_states = [self.dail, -self.dail, self.dele, self.dele, self.dflap, self.dflap]65        joint_states = [0, 0, 0, 0, 0, 0]66        self.joint_state_pub.publish(joint_states, now)67        68        self.tf_pub.send_w_ned_to_b_transform(now, self.T_b2w)69        self.tf_pub.send_b_to_a_transform(now, self.T_b2a)70        self.tf_pub.send_transform('w_ned', 'ds_glider/base_link', now, self.T_b2w)71        self.marker_pub.publish([self.T_b2w])72        self.odom_pub.publish(self.T_b2w, now)73        74def main():75    Agent().run(25)76if __name__ == "__main__":77    np.set_printoptions(linewidth=500)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
