How to use is_done method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

train_hand.py

Source: train_hand.py Github

copy
1#coding=utf-8
2import rospy
3from std_msgs.msg import Int16
4import time
5from geometry_msgs.msg import Twist #导入需要的ros的package 
6import numpy as np
7is_done=Int16()
8is_done.data=0
9#定义发送数据的函数:只发一次(可以考虑发很多次,最终要看效果如何)
10def data_publish(u):
11    #初始化node
12    
13
14    #初始化publisher
15    pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) # topic和消息类型需要和肖岸星商量确定一下 
16    
17    #初始化twist
18    twist = Twist()
19
20    # 给twist赋值
21    twist.linear.x = u[0] #线速度
22    twist.linear.y = 0.0
23    twist.linear.z = 0.0
24
25    twist.angular.x = 0.0 #角速度
26    twist.angular.y = 0.0
27    twist.angular.z = u[1]
28
29    #调试代码
30    # print("twist")
31    # print(twist)
32
33    #发送twist
34    pub.publish(twist) #将数据发送出去
35
36#发送twist:一直发指令,直到一次运动结束之后就不发指令
37def callback_2(data): 
38    #接受数据
39    is_done.data= data.data
40    #print(finish)
41    #判断状态
42    # if (finish.data == 1):#完成转动
43    #     is_done = 1
44    #     # print("callback_2",is_done)
45    # else:#没完成转动
46    #     is_done = 0 
47    #     print("moving")
48
49if __name__ == '__main__':
50    #初始化
51    rospy.init_node('train_hand', anonymous=True)
52    rospy.Subscriber("is_done", Int16, callback_2)
53    start_flag = 1 #开始指令
54
55    twist = Twist() #变量声明
56    u = np.zeros(2) #控制量声明
57   
58    time.sleep(1)
59    print(is_done.data)
60    #控制说明
61    print("前进:w 左拐:a 右拐:d 后退:s")
62    time.sleep(0.1)
63    #开始控制
64    while start_flag == 1: #一直循环接受数据
65        control_key = raw_input("raw_input:") #获得键盘输入
66
67        if control_key == "w": #前进
68            #显示控制量
69            print("control is w")
70
71            #设置频率
72           
73
74            #没收到结束信息一直发送控制量
75            if(is_done.data==1):
76                #赋值线速度
77                u[0] = 1
78                u[1] = 0
79                
80                #发布线速度消息
81                data_publish(u)
82
83               
84
85                #以10HZ频率发送数据
86              
87            
88        elif control_key == "a": #左转
89            #显示控制量
90            print("control is a")
91
92            #设置频率
93            rate = rospy.Rate(10) # 10hz
94
95            #没收到结束信息一直发送控制量
96            if(is_done.data==1):
97                #赋值线速度
98                u[0] = 0
99                u[1] = -1
100                
101                #发布线速度消息
102                data_publish(u)
103
104                
105                #如果接收到结束信息,is_done设为1,结束循环
106               
107
108                #以10HZ频率发送数据
109                rate.sleep()
110
111        elif control_key == "d": #右转
112            #显示控制量
113            print("control is d")
114
115            #设置频率
116            rate = rospy.Rate(10) # 10hz
117
118            if(is_done.data==1):
119                #赋值线速度
120                u[0] = 0
121                u[1] = 1
122                
123                #发布线速度消息
124                data_publish(u)
125
126                
127
128                #如果接收到结束信息,is_done设为1,结束循环
129                
130
131                #以10HZ频率发送数据
132                rate.sleep()
133
134        elif control_key == "s": #后退
135            #显示控制量
136            print("control is s")
137
138            #设置频率
139            rate = rospy.Rate(10) # 10hz
140
141            if(is_done.data==1):
142                #赋值线速度
143                u[0] = -1
144                u[1] = 0
145                
146                #发布线速度消息
147                data_publish(u)
148
149        
150
151                #如果接收到结束信息,is_done设为1,结束循环
152          
153
154                #以10HZ频率发送数据
155                rate.sleep()
156        
157<<<<<<< HEAD
158        elif control_key == "q": #六足步态
159            #显示控制量
160            print("control is s")
161
162            #设置频率
163            rate = rospy.Rate(10) # 10hz
164
165            if(is_done.data==1):
166                #赋值线速度
167                u[0] = 2
168                u[1] = 0
169                
170                #发布线速度消息
171                data_publish(u)
172
173        
174
175                #如果接收到结束信息,is_done设为1,结束循环
176          
177
178                #以10HZ频率发送数据
179                rate.sleep()
180=======
181>>>>>>> 6ca41d4ff84544147f430048270163452df0609e
182        #将is_down复位
183        print("is_done已重置")
184        is_done.data = 0
185
186
187
Full Screen

test_parallel_build.py

Source: test_parallel_build.py Github

copy
1#!/usr/bin/env python
2# Copyright (c) 2009, Willow Garage, Inc.
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are met:
7#
8#     * Redistributions of source code must retain the above copyright
9#       notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above copyright
11#       notice, this list of conditions and the following disclaimer in the
12#       documentation and/or other materials provided with the distribution.
13#     * Neither the name of the Willow Garage, Inc. nor the names of its
14#       contributors may be used to endorse or promote products derived from
15#       this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27# POSSIBILITY OF SUCH DAMAGE.
28
29import unittest
30
31from rosmake import parallel_build
32
33
34class TestDependencyTracker(unittest.TestCase):
35
36    def setUp(self):
37        self.deps = {}
38        self.deps1 = {}
39        self.deps['a'] = ['b', 'c', 'd', 'e']
40        self.deps1['a'] = ['b']
41        self.deps['b'] = ['c']
42        self.deps1['b'] = ['c']
43        self.deps['d'] = ['c', 'e']
44        self.deps1['d'] = ['c', 'e']
45        self.dt = parallel_build.DependencyTracker()
46        self.dt.load_fake_deps(self.deps, self.deps1)
47
48    def test_deps_1(self):
49        self.assertEquals(self.deps1['a'], self.dt.get_deps_1('a'))
50        self.assertEquals(self.deps1['b'], self.dt.get_deps_1('b'))
51        self.assertEquals(self.deps1['d'], self.dt.get_deps_1('d'))
52
53    def test_deps(self):
54        self.assertEquals(self.deps['a'], self.dt.get_deps('a'))
55        self.assertEquals(self.deps['b'], self.dt.get_deps('b'))
56        self.assertEquals(self.deps['d'], self.dt.get_deps('d'))
57
58    def test_not_package(self):
59        self.assertEquals([], self.dt.get_deps('This is not a valid package name'))
60        self.assertEquals([], self.dt.get_deps_1('This is not a valid package name'))
61
62
63class TestBuildQueue(unittest.TestCase):
64
65    def setUp(self):
66        deps = {}
67        deps1 = {}
68        deps1['a'] = ['b']
69        deps['a'] = ['b', 'c', 'd', 'e', 'f']
70        deps1['b'] = ['c']
71        deps['b'] = ['c', 'd', 'e', 'f']
72        deps1['c'] = ['d']
73        deps['c'] = ['d', 'e', 'f']
74        deps1['d'] = ['e']
75        deps['d'] = ['e', 'f']
76        deps['e'] = ['f']
77        deps1['e'] = ['f']
78        deps['f'] = []
79        deps1['f'] = []
80
81        self.serial_tracker = parallel_build.DependencyTracker()
82        self.serial_tracker.load_fake_deps(deps, deps1)
83
84        deps = {}
85        deps1 = {}
86        deps['a'] = ['b', 'c', 'd', 'e', 'f']
87        deps1['a'] = ['b', 'c', 'd', 'e', 'f']
88        deps['b'] = []
89        deps1['b'] = []
90        deps['c'] = []
91        deps1['c'] = []
92        deps['d'] = []
93        deps1['d'] = []
94        deps['e'] = []
95        deps1['e'] = []
96        deps['f'] = []
97        deps1['f'] = []
98
99        self.parallel_tracker = parallel_build.DependencyTracker()
100        self.parallel_tracker.load_fake_deps(deps, deps1)
101
102    # full queue
103    def test_full_build(self):
104        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)
105        self.assertFalse(bq.is_done())
106        self.assertFalse(bq.succeeded())
107
108        self.assertEqual('f', bq.get_valid_package())
109        self.assertEqual(0, len(bq.built))
110        bq.return_built('f')
111        self.assertEqual(1, len(bq.built))
112        self.assertFalse(bq.is_done())
113        self.assertFalse(bq.succeeded())
114
115        self.assertEqual('e', bq.get_valid_package())
116        bq.return_built('e')
117        self.assertEqual(2, len(bq.built))
118        self.assertFalse(bq.is_done())
119        self.assertFalse(bq.succeeded())
120
121        self.assertEqual('d', bq.get_valid_package())
122        bq.return_built('d')
123        self.assertEqual(3, len(bq.built))
124        self.assertFalse(bq.is_done())
125        self.assertFalse(bq.succeeded())
126
127        self.assertEqual('c', bq.get_valid_package())
128        bq.return_built('c')
129        self.assertEqual(4, len(bq.built))
130        self.assertFalse(bq.is_done())
131        self.assertFalse(bq.succeeded())
132
133        self.assertEqual('b', bq.get_valid_package())
134        bq.return_built('b')
135        self.assertEqual(5, len(bq.built))
136        self.assertFalse(bq.is_done())
137        self.assertFalse(bq.succeeded())
138
139        self.assertEqual('a', bq.get_valid_package())
140        self.assertFalse(bq.is_done())
141        self.assertFalse(bq.succeeded())
142        bq.return_built('a')
143        self.assertEqual(6, len(bq.built))
144        self.assertTrue(bq.is_done())
145        self.assertTrue(bq.succeeded())
146
147    # partial build
148    def test_partial_build(self):
149        bq = parallel_build.BuildQueue(['d', 'e', 'f'], self.serial_tracker)
150        self.assertFalse(bq.is_done())
151        self.assertFalse(bq.succeeded())
152
153        self.assertEqual('f', bq.get_valid_package())
154        self.assertEqual(0, len(bq.built))
155        bq.return_built('f')
156        self.assertEqual(1, len(bq.built))
157        self.assertFalse(bq.is_done())
158        self.assertFalse(bq.succeeded())
159
160        self.assertEqual('e', bq.get_valid_package())
161        bq.return_built('e')
162        self.assertEqual(2, len(bq.built))
163        self.assertFalse(bq.is_done())
164        self.assertFalse(bq.succeeded())
165
166        self.assertEqual('d', bq.get_valid_package())
167        self.assertFalse(bq.is_done())
168        self.assertFalse(bq.succeeded())
169        bq.return_built('d')
170        self.assertEqual(3, len(bq.built))
171        self.assertTrue(bq.is_done())
172        self.assertTrue(bq.succeeded())
173
174    # abort early
175    def test_abort_early(self):
176        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)
177        self.assertFalse(bq.is_done())
178        self.assertFalse(bq.succeeded())
179        self.assertEqual(0, len(bq.built))
180
181        self.assertEqual('f', bq.get_valid_package())
182        bq.return_built('f')
183        self.assertEqual(1, len(bq.built))
184        self.assertFalse(bq.is_done())
185        self.assertFalse(bq.succeeded())
186
187        self.assertEqual('e', bq.get_valid_package())
188        bq.return_built('e')
189        self.assertEqual(2, len(bq.built))
190        self.assertFalse(bq.is_done())
191        self.assertFalse(bq.succeeded())
192
193        self.assertEqual('d', bq.get_valid_package())
194        bq.return_built('d')
195        self.assertEqual(3, len(bq.built))
196        self.assertFalse(bq.is_done())
197        self.assertFalse(bq.succeeded())
198
199        bq.stop()
200        self.assertTrue(bq.is_done())
201        self.assertFalse(bq.succeeded())
202
203        self.assertEqual(None, bq.get_valid_package())
204
205    # many parallel
206    def test_parallel_build(self):
207        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.parallel_tracker)
208        self.assertFalse(bq.is_done())
209        self.assertFalse(bq.succeeded())
210
211        dependents = ['b', 'c', 'd', 'e', 'f']
212        count = 0
213        total = 6
214        while len(dependents) > 0:
215            result = bq.get_valid_package()
216            done = len(bq.built)
217            pkgs = bq._total_pkgs
218            self.assertTrue(result in dependents)
219            # print result, done, pkgs
220            dependents.remove(result)
221            self.assertEqual(count, done)
222            self.assertEqual(total, pkgs)
223            self.assertFalse(bq.is_done())
224            self.assertFalse(bq.succeeded())
225            bq.return_built(result)
226            count = count + 1
227            self.assertFalse(bq.is_done())
228            self.assertFalse(bq.succeeded())
229
230        self.assertEqual('a', bq.get_valid_package())
231        self.assertFalse(bq.is_done())
232        self.assertFalse(bq.succeeded())
233        bq.return_built('a')
234        self.assertTrue(bq.is_done())
235        self.assertTrue(bq.succeeded())
236
237    # stalled(future)
238
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)