Best Python code snippet using lisa_python
test_parallel_build.py
Source:test_parallel_build.py  
...86        self.parallel_tracker.load_fake_deps(deps, deps1)87    # full queue88    def test_full_build(self):89        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)90        self.assertFalse(bq.is_done())91        self.assertFalse(bq.succeeded())92        self.assertEqual('f', bq.get_valid_package())93        self.assertEqual(0, len(bq.built))94        bq.return_built('f')95        self.assertEqual(1, len(bq.built))96        self.assertFalse(bq.is_done())97        self.assertFalse(bq.succeeded())98        self.assertEqual('e', bq.get_valid_package())99        bq.return_built('e')100        self.assertEqual(2, len(bq.built))101        self.assertFalse(bq.is_done())102        self.assertFalse(bq.succeeded())103        self.assertEqual('d', bq.get_valid_package())104        bq.return_built('d')105        self.assertEqual(3, len(bq.built))106        self.assertFalse(bq.is_done())107        self.assertFalse(bq.succeeded())108        self.assertEqual('c', bq.get_valid_package())109        bq.return_built('c')110        self.assertEqual(4, len(bq.built))111        self.assertFalse(bq.is_done())112        self.assertFalse(bq.succeeded())113        self.assertEqual('b', bq.get_valid_package())114        bq.return_built('b')115        self.assertEqual(5, len(bq.built))116        self.assertFalse(bq.is_done())117        self.assertFalse(bq.succeeded())118        self.assertEqual('a', bq.get_valid_package())119        self.assertFalse(bq.is_done())120        self.assertFalse(bq.succeeded())121        bq.return_built('a')122        self.assertEqual(6, len(bq.built))123        self.assertTrue(bq.is_done())124        self.assertTrue(bq.succeeded())125    # partial build126    def test_partial_build(self):127        bq = parallel_build.BuildQueue(['d', 'e', 'f'], self.serial_tracker)128        self.assertFalse(bq.is_done())129        self.assertFalse(bq.succeeded())130        self.assertEqual('f', bq.get_valid_package())131        self.assertEqual(0, len(bq.built))132        bq.return_built('f')133        self.assertEqual(1, len(bq.built))134        self.assertFalse(bq.is_done())135        self.assertFalse(bq.succeeded())136        self.assertEqual('e', bq.get_valid_package())137        bq.return_built('e')138        self.assertEqual(2, len(bq.built))139        self.assertFalse(bq.is_done())140        self.assertFalse(bq.succeeded())141        self.assertEqual('d', bq.get_valid_package())142        self.assertFalse(bq.is_done())143        self.assertFalse(bq.succeeded())144        bq.return_built('d')145        self.assertEqual(3, len(bq.built))146        self.assertTrue(bq.is_done())147        self.assertTrue(bq.succeeded())148    # abort early149    def test_abort_early(self):150        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.serial_tracker)151        self.assertFalse(bq.is_done())152        self.assertFalse(bq.succeeded())153        self.assertEqual(0, len(bq.built))154        self.assertEqual('f', bq.get_valid_package())155        bq.return_built('f')156        self.assertEqual(1, len(bq.built))157        self.assertFalse(bq.is_done())158        self.assertFalse(bq.succeeded())159        self.assertEqual('e', bq.get_valid_package())160        bq.return_built('e')161        self.assertEqual(2, len(bq.built))162        self.assertFalse(bq.is_done())163        self.assertFalse(bq.succeeded())164        self.assertEqual('d', bq.get_valid_package())165        bq.return_built('d')166        self.assertEqual(3, len(bq.built))167        self.assertFalse(bq.is_done())168        self.assertFalse(bq.succeeded())169        bq.stop()170        self.assertTrue(bq.is_done())171        self.assertFalse(bq.succeeded())172        self.assertEqual(None, bq.get_valid_package())173    # many parallel174    def test_parallel_build(self):175        bq = parallel_build.BuildQueue(['a', 'b', 'c', 'd', 'e', 'f'], self.parallel_tracker)176        self.assertFalse(bq.is_done())177        self.assertFalse(bq.succeeded())178        dependents = ['b', 'c', 'd', 'e', 'f']179        count = 0180        total = 6181        while len(dependents) > 0:182            result = bq.get_valid_package()183            done = len(bq.built)184            pkgs = bq._total_pkgs185            self.assertTrue(result in dependents)186            # print result, done, pkgs187            dependents.remove(result)188            self.assertEqual(count, done)189            self.assertEqual(total, pkgs)190            self.assertFalse(bq.is_done())191            self.assertFalse(bq.succeeded())192            bq.return_built(result)193            count = count + 1194            self.assertFalse(bq.is_done())195            self.assertFalse(bq.succeeded())196        self.assertEqual('a', bq.get_valid_package())197        self.assertFalse(bq.is_done())198        self.assertFalse(bq.succeeded())199        bq.return_built('a')200        self.assertTrue(bq.is_done())201        self.assertTrue(bq.succeeded())...binary_tree.py
Source:binary_tree.py  
1class Stack(object):2    def __init__(self):3        self.items = []4    def __len__(self):5        return self.size()6     7    def size(self):8        return len(self.items)9    def push(self, item):10        self.items.append(item)11    def pop(self):  12        if not self.is_empty():13            return self.items.pop()14    def peek(self):15        if not self.is_empty():16            return self.items[-1]17    def is_empty(self):18        return len(self.items) == 019    def __str__(self):20        s = ""21        for i in range(len(self.items)):22            s += str(self.items[i].value) + "-"23        return s24class Queue(object):25    def __init__(self):26        self.items = []27    def __len__(self):28        return self.size()29    def enqueue(self, item):30        self.items.insert(0, item)31    def dequeue(self):32        if not self.is_empty():33            return self.items.pop()34    def size(self):35        return len(self.items)36    def is_empty(self):37        return len(self.items) == 038    def peek(self):39        if not self.is_empty():40            return self.items[-1].value41class Node(object):42    def __init__(self, value):43        self.value = value44        self.left = None45        self.right = None46class BinaryTree(object):47    def __init__(self, root):48        self.root = Node(root)49    def search(self, find_val, traversal_type):50        if traversal_type == "preorder":51            return self.preorder_search(tree.root, find_val)52        elif traversal_type == "inorder":53            return self.inorder_search(tree.root, find_val)54        elif traversal_type == "postorder":55            return self.postorder_search(tree.root, find_val)56        else:57            print("Traversal type " + str(traversal_type) + " not recognized.")58            return False59    def print_tree(self, traversal_type):60        # Recursive traversals61        if traversal_type == "preorder":62            return self.preorder_print(tree.root, "")63        elif traversal_type == "inorder":64            return self.inorder_print(tree.root, "")65        elif traversal_type == "postorder":66            return self.postorder_print(tree.root, "")67        # Iterative traversals68        elif traversal_type == "levelorder":69            return self.levelorder_print(tree.root)70        elif traversal_type == "inorder_iterative":71            return self.inorder_iterative(tree.root)72        elif traversal_type == "preorder_iterative":73            return self.preorder_iterative(tree.root)74        elif traversal_type == "postorder_iterative":75            return self.postorder_iterative(tree.root)76        else:77            print("Traversal type " + str(traversal_type) + " not recognized.")78            return False79    def levelorder_print(self, start):80        if start is None:81            return82        queue = Queue()83        queue.enqueue(start)84        traversal = ""85        while len(queue) > 0:86            traversal += str(queue.peek()) + "-"87            node = queue.dequeue()88            if node.left:89                queue.enqueue(node.left)90            if node.right:91                queue.enqueue(node.right)92        return traversal93    def preorder_search(self, start, find_val):94        if start:95            if start.value == find_val:96                return True97            else:98                return self.preorder_search(start.left, find_val) or \99                       self.preorder_search(start.right, find_val)100        return False101    def preorder_print(self, start, traversal):102        """Root->Left-Right"""103        if start:104            traversal += (str(start.value) + "-")105            traversal = self.preorder_print(start.left, traversal)106            traversal = self.preorder_print(start.right, traversal)107        return traversal108    def inorder_print(self, start, traversal):109        """Left->Root->Right"""110        if start:111            traversal = self.inorder_print(start.left, traversal)112            traversal += (str(start.value) + "-")113            traversal = self.inorder_print(start.right, traversal)114        return traversal115    def postorder_print(self, start, traversal):116        """Left->Right->Root"""117        if start:118            traversal = self.postorder_print(start.left, traversal)119            traversal = self.postorder_print(start.right, traversal)120            traversal += (str(start.value) + "-")121        return traversal122    def preorder_iterative(self, start):123        stack = Stack()124        cur = start125        is_done = False126        traversal = ""127        while not is_done:128            if cur is not None:129                traversal += str(cur.value) + "-"130                stack.push(cur)131                cur = cur.left132            else:133                if len(stack) > 0:134                    cur = stack.pop()135                    cur = cur.right136                else:137                    is_done = True138        return traversal139    def inorder_iterative(self, start):140        s = Stack()141        cur = start142        is_done = False143        traversal = ""144        while not is_done:145            if cur is not None:146                s.push(cur)147                cur = cur.left148            else:149                if len(s) > 0:150                    cur = s.pop()151                    traversal += str(cur.value) + "-"152                    cur = cur.right153                else:154                    is_done = True155        return traversal156    def postorder_iterative(self, start):157        s = Stack()158        cur = start159        is_done = False160        traversal = ""161        while not is_done:162             163            if cur is not None:164                s.push(cur)165                cur = cur.left166            else:167                if len(s) > 0:168                    cur = s.pop()169                    traversal += str(cur.value) + "-"170                    cur = cur.right171                else:172                    is_done = True173        return traversal174    def height(self, node):175        if node is None:176            return 0177        left = self.height(node.left)178        right = self.height(node.right)179        if left > right:180            h = 1 + left181        else:182            h = 1 + right183        return h184    def lowest_common_ancestor(self, node1, node2):185        stack_path_1 = Stack()186        stack_path_2 = Stack()187        cur = self.root188        is_done = False189        while not is_done:190            if cur is not None:191                if cur.value == node1.value:192                    break193                stack_path_1.push(cur)194                cur = cur.left195            else:196                if len(stack_path_1) > 0:197                    cur = stack_path_1.pop()198                    cur = cur.right199                else:200                    is_done = True201        202        cur = self.root203        is_done = False204        while not is_done:205            if cur is not None:206                if cur.value == node2.value:207                    break208                stack_path_2.push(cur)209                cur = cur.left210            else:211                if len(stack_path_2) > 0:212                    cur = stack_path_2.pop()213                    cur = cur.right214                else:215                    is_done = True216        print(stack_path_1)217        print(stack_path_2)218        #return traversal219# Set up tree:220tree = BinaryTree(1)221tree.root.left = Node(2)222tree.root.right = Node(3)223tree.root.left.left = Node(4)224tree.root.left.right = Node(5)225tree.root.right.left = Node(6)226tree.root.right.right = Node(7)227tree.root.right.right.right = Node(8)228#tree.root.left.right.left = Node(7)229#tree.root.left.right.right = Node(8)230#tree.root.right.right = Node(6)231#tree.root.right.right.left = Node(9)232#print(tree.height(tree.root))233# Should be True234#print(tree.search(4))235# Should be False:236#print(tree.search(6))237# Test print_tree238print("Preorder Recursive:")239print(tree.print_tree("preorder"))240print("Preorder Iterative:")241print(tree.print_tree("preorder_iterative"))242print("Postorder Recursive")243print(tree.print_tree("postorder"))244#print("Postorder Iterative:")245#print(tree.print_tree("postorder_iterative"))246print("Inorder Recursive:")247print(tree.print_tree("inorder"))248print("Inorder Iterative:")249print(tree.print_tree("inorder_iterative"))250print("Levelorder Iterative:")251print(tree.print_tree("levelorder"))252print("\n")253x = tree.root.left.left254y = tree.root.left.right...fabfile.py
Source:fabfile.py  
1from fabric.api import *2from os import *3env.use_ssh_config = True4env.output_prefix = True5scriptFileName = None6repoList = {}7def get_immediate_subdirectories(a_dir='~/repos/'):8    result = run("find {} -maxdepth 1 -type d".format(a_dir))9    result = result.splitlines()10    del result[0]11    return result12@serial13def reboot():14    with hide('running', 'stdout', 'stderr'):15        is_done = True16        try:17            sudo("shutdown -r 1", shell=False)18        except Exception as e:19            print("{} - {}".format(env.host_string, e))20            is_done = False21        finally:22            return is_done23def apt_update():24    with hide('running', 'stdout', 'stderr'):25        is_done = True26        try:27            pathList = run('ls -a ~')28            if ".noApt" in pathList.stdout:29                print("{} has .noApt - skipping".format(env.host_string))30                return is_done31            print("Updating {} with apt...".format(env.host_string))32            sudo("apt update", shell=False)33            sudo("DEBIAN_FRONTEND=noninteractive apt upgrade -yq", shell=False,34                 timeout=120)35            sudo("apt autoremove -yq", shell=False, timeout=120)36        except Exception as e:37            print("{} - {}".format(env.host_string, e))38            is_done = False39        finally:40            return is_done41def yum_update():42    with hide('running', 'stdout', 'stderr'):43        is_done = True44        try:45            print("Updating {} with yum...".format(env.host_string))46            sudo("yum -y update", shell=False)47        except Exception as e:48            print("{} - {}".format(env.host_string, e))49            is_done = False50        finally:51            return is_done52def opkg_update():53    with show('running', 'stdout', 'stderr'):54        is_done = True55        try:56            print("Updating {} with opkg...".format(env.host_string))57            run("opkg update", shell=False)58            packages = run("opkg list-upgradable | awk '{print $1}' | "59                           "sed ':M;N;$!bM;s#\\n# #g'", shell=False)60            if packages:61                run("opkg upgrade %s" % packages, shell=False)62            else:63                print("No updated packages on {}".format(env.host_string))64        except Exception as e:65            print("{} - {}".format(env.host_string, e))66            is_done = False67        finally:68            return is_done69def repo_update(repoName="Home Bin", repoDirectory="~/bin/"):70    with hide('running', 'stdout', 'stderr'):71        is_done = True72        try:73            with cd(repoDirectory):74                pathList = run('ls -a')75                if ".noPull" in pathList.stdout:76                    print("{} repo {} has .noPull - skipping".format(77                        env.host_string, repoName))78                    return is_done79                r = run('git pull')80                if 'Already up-to-date' in r.stdout:81                    print("{} repo {} already up-to-date".format(82                        env.host_string, repoName))83                else:84                    print("{} repo {} updated".format(env.host_string,85                                                      repoName))86                if '.postpull.sh' in pathList.stdout:87                    print("{} repo {} - updating permissions".format(88                        env.host_string, repoName))89                    run('./.postpull.sh')90        except Fatal as e:91            print("{} - {}".format(env.host_string, e))92            is_done = False93        finally:94            return is_done95def setup():96    with hide('running', 'stdout', 'stderr'):97        is_done = True98        apt_sudoers = env.user + " * = (root) NOPASSWD:SETENV: /usr/bin/apt,"\99            "/usr/local/bin/apt"100        power_sudoers = env.user + " * = (root) NOPASSWD: /sbin/shutdown"101        try:102            sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(apt_sudoers))103            sudo("echo \"{}\" >> /etc/sudoers.d/fabric".format(power_sudoers))104            pathList = run('ls -a ~')105            if "repos" not in pathList.stdout:106                print("No repos dir on {} - creating".format(env.host_string))107                run("mkdir ~/repos/")108            if "bin" not in pathList.stdout:109                print("No bin dir on {} - cloning".format(env.host_string))110                run("git clone https://github.com/mbeland/bin.git")111        except Exception as e:112            print("{} - {}".format(env.host_string, e))113            is_done = False114        finally:115            return is_done116@parallel117def updates():118    with hide('running', 'stdout', 'stderr'):119        is_done = True120        try:121            apt_update()122            updateRepos()123        except Exception as e:124            print("{} - {}".format(env.host_string, e))125            is_done = False126        finally:127            return is_done128def updateRepos():129    with hide('running', 'stdout', 'stderr'):130        is_done = True131        reposDir = '~/repos/'132        try:133            repo_update("Home Bin", "~/bin/")134            repo_update("SSH", "~/.ssh")135            repo_update("Home Dir", "~/")136            dirsToUpdate = get_immediate_subdirectories(reposDir)137            for x in dirsToUpdate:138                y = x.split("/")139                repo_update(y[-1], x)140        except Exception as e:141            print("{} - {}".format(env.host_string, e))142            is_done = False143        finally:144            return is_done145def deploy_script():146    with hide('running', 'stdout', 'stderr'):147        is_done = True148        global scriptFileName149        try:150            if scriptFileName:151                scriptFileName = input('Script to execute: ')152            put(scriptFileName, "/tmp/", mirror_local_mode=True)153            remotePath = "/tmp/" + scriptFileName154            run(remotePath)155            run("rm " + remotePath)156        except Exception as e:157            print("{} - {}".format(env.host_string, e))158            is_done = False159        finally:...train_hand.py
Source:train_hand.py  
1#coding=utf-82import rospy3from std_msgs.msg import Int164import time5from geometry_msgs.msg import Twist #导å
¥éè¦çrosçpackage 6import numpy as np7is_done=Int16()8is_done.data=09#å®ä¹åéæ°æ®ç彿°:åªå䏿¬¡ï¼å¯ä»¥èèåå¾å¤æ¬¡ï¼æç»è¦çææå¦ä½ï¼10def data_publish(u):11    #åå§ånode12    13    #åå§åpublisher14    pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) # topicåæ¶æ¯ç±»åéè¦åè岸æåéç¡®å®ä¸ä¸ 15    16    #åå§åtwist17    twist = Twist()18    # ç»twistèµå¼19    twist.linear.x = u[0] #线é度20    twist.linear.y = 0.021    twist.linear.z = 0.022    twist.angular.x = 0.0 #è§é度23    twist.angular.y = 0.024    twist.angular.z = u[1]25    #è°è¯ä»£ç 26    # print("twist")27    # print(twist)28    #åétwist29    pub.publish(twist) #å°æ°æ®åéåºå»30#åétwistï¼ä¸ç´åæä»¤ï¼ç´å°ä¸æ¬¡è¿å¨ç»æä¹åå°±ä¸åæä»¤31def callback_2(data): 32    #æ¥åæ°æ®33    is_done.data= data.data34    #print(finish)35    #å¤æç¶æ36    # if (finish.data == 1):#å®æè½¬å¨37    #     is_done = 138    #     # print("callback_2",is_done)39    # else:#æ²¡å®æè½¬å¨40    #     is_done = 0 41    #     print("moving")42if __name__ == '__main__':43    #åå§å44    rospy.init_node('train_hand', anonymous=True)45    rospy.Subscriber("is_done", Int16, callback_2)46    start_flag = 1 #å¼å§æä»¤47    twist = Twist() #åé声æ48    u = np.zeros(2) #æ§å¶é声æ49   50    time.sleep(1)51    print(is_done.data)52    #æ§å¶è¯´æ53    print("åè¿ï¼w å·¦æï¼a 峿:d åéï¼s")54    time.sleep(0.1)55    #å¼å§æ§å¶56    while start_flag == 1: #ä¸ç´å¾ªç¯æ¥åæ°æ®57        control_key = raw_input("raw_inputï¼") #è·å¾é®çè¾å
¥58        if control_key == "w": #åè¿59            #æ¾ç¤ºæ§å¶é60            print("control is w")61            #设置é¢ç62           63            #没æ¶å°ç»æä¿¡æ¯ä¸ç´åéæ§å¶é64            if(is_done.data==1):65                #èµå¼çº¿é度66                u[0] = 167                u[1] = 068                69                #åå¸çº¿éåº¦æ¶æ¯70                data_publish(u)71               72                #以10HZé¢çåéæ°æ®73              74            75        elif control_key == "a": #左转76            #æ¾ç¤ºæ§å¶é77            print("control is a")78            #设置é¢ç79            rate = rospy.Rate(10) # 10hz80            #没æ¶å°ç»æä¿¡æ¯ä¸ç´åéæ§å¶é81            if(is_done.data==1):82                #èµå¼çº¿é度83                u[0] = 084                u[1] = -185                86                #åå¸çº¿éåº¦æ¶æ¯87                data_publish(u)88                89                #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æå¾ªç¯90               91                #以10HZé¢çåéæ°æ®92                rate.sleep()93        elif control_key == "d": #å³è½¬94            #æ¾ç¤ºæ§å¶é95            print("control is d")96            #设置é¢ç97            rate = rospy.Rate(10) # 10hz98            if(is_done.data==1):99                #èµå¼çº¿é度100                u[0] = 0101                u[1] = 1102                103                #åå¸çº¿éåº¦æ¶æ¯104                data_publish(u)105                106                #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æå¾ªç¯107                108                #以10HZé¢çåéæ°æ®109                rate.sleep()110        elif control_key == "s": #åé111            #æ¾ç¤ºæ§å¶é112            print("control is s")113            #设置é¢ç114            rate = rospy.Rate(10) # 10hz115            if(is_done.data==1):116                #èµå¼çº¿é度117                u[0] = -1118                u[1] = 0119                120                #åå¸çº¿éåº¦æ¶æ¯121                data_publish(u)122        123                #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æå¾ªç¯124          125                #以10HZé¢çåéæ°æ®126                rate.sleep()127        128<<<<<<< HEAD129        elif control_key == "q": #å
è¶³æ¥æ130            #æ¾ç¤ºæ§å¶é131            print("control is s")132            #设置é¢ç133            rate = rospy.Rate(10) # 10hz134            if(is_done.data==1):135                #èµå¼çº¿é度136                u[0] = 2137                u[1] = 0138                139                #åå¸çº¿éåº¦æ¶æ¯140                data_publish(u)141        142                #å¦ææ¥æ¶å°ç»æä¿¡æ¯ï¼is_done设为1,ç»æå¾ªç¯143          144                #以10HZé¢çåéæ°æ®145                rate.sleep()146=======147>>>>>>> 6ca41d4ff84544147f430048270163452df0609e148        #å°is_downå¤ä½149        print("is_doneå·²éç½®")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
