Best Python code snippet using localstack_python
scheduler.py
Source:scheduler.py  
...14        self.__queue3 = [self.create_container(c_tuple) for c_tuple in q2_conf]15        self.__load_level = NORMAL  # Initialize the load level to normal16        self.__logger = logger17        # Initialize client object for docker.18        self.start_or_unpause_container(self.__queue3[0])19        print("run " + self.__queue3[0].name)20        self.__running = [0, 0, 1]21    def get_load_level(self):22        return self.__load_level23    def get_running(self):24        return self.__running25    def get_core_usage(self):26        return self.__running[0] + 2 * self.__running[1] + 3 * self.__running[2]27    def can_schedule_queue1(self, num=1):28        return len(self.__queue1) >= (self.__running[0] + num)29    def can_schedule_queue2(self, num=1):30        return len(self.__queue2) >= (self.__running[1] + num)31    def can_schedule_queue3(self, num=1):32        return len(self.__queue3) >= (self.__running[2] + num)33    def REMOVE_EXITED_CONTAINERS(self):34        # Remove exited containers.35        pop_location = 036        running = self.__running[0]37        for x in range(0, running):38            removed = self.remove_if_done_container(self.__queue1[pop_location])39            if removed:40                self.__running[0] -= 141                self.__queue1.pop(pop_location)42            else:43                pop_location += 144        pop_location = 045        running = self.__running[1]46        for x in range(0, running):47            removed = self.remove_if_done_container(self.__queue2[pop_location])48            if removed:49                self.__running[1] -= 150                self.__queue2.pop(pop_location)51            else:52                pop_location += 153        pop_location = 054        running = self.__running[2]55        for x in range(0, running):56            removed = self.remove_if_done_container(self.__queue3[pop_location])57            if removed:58                self.__running[2] -= 159                self.__queue3.pop(pop_location)60            else:61                pop_location += 162    def get_best_distribution(self, max):63        if max >= 4:64            if self.can_schedule_queue3() and self.can_schedule_queue1():65                return [3, 1]66            if self.can_schedule_queue2(2):67                return [2, 2]68            if self.can_schedule_queue2() and self.can_schedule_queue1(2):69                return [2, 1, 1]70            if self.can_schedule_queue1(4):71                return [1, 1, 1, 1]72        if max >= 3:73            if self.can_schedule_queue3():74                return [3]75            if self.can_schedule_queue2() and self.can_schedule_queue1():76                return [2, 1]77            if self.can_schedule_queue1(3):78                return [1, 1, 1]79        if max >= 2:80            if self.can_schedule_queue2():81                return [2]82            if self.can_schedule_queue1(2):83                return [1, 1]84        if max >= 1:85            if self.can_schedule_queue1():86                return [1]87            if self.can_schedule_queue2():88                return [2]89            if self.can_schedule_queue3():90                return [3]91        return []92    def add(self, n_containers):93        print("trying to run ", n_containers, " more instances.")94        distr = self.get_best_distribution(n_containers)95        print("best distr ", distr)96        for q in distr:97            if q == 3:98                self.start_or_unpause_container(self.__queue3[self.__running[2]])99                self.__running[2] += 1100            if q == 2:101                print("add(2)", len(self.__queue2), " ", self.__running)102                self.start_or_unpause_container(self.__queue2[self.__running[1]])103                self.__running[1] += 1104            if q == 1:105                self.start_or_unpause_container(self.__queue1[self.__running[0]])106                self.__running[0] += 1107    def remove(self, n_containers):108        if n_containers <= 0:109            return110        weight = 3111        queues = [self.__queue1, self.__queue2, self.__queue3]112        for n in reversed(self.__running):113            for i in reversed(range(0, n)):114                if n_containers > weight:115                    n_containers -= weight116                    self.__running[weight - 1] -= 1117                    self.pause_container(queues[weight - 1][i])118            weight -= 1119        print("going into the removal loop")120        while n_containers > 0 and self.get_core_usage() > 0:121            for w, n in enumerate(self.__running):122                for pos_in_queue in range(0, n):123                    n_containers -= (w + 1)124                    self.__running[w] -= 1125                    self.pause_container(queues[w][pos_in_queue])126        print("yay it returned")127    def SCHEDULE_NEXT(self):128        if self.__load_level == NORMAL:129            if self.get_core_usage() == 0 and self.can_schedule_queue3():130                self.start_or_unpause_container(self.__queue3[self.__running[2]])131                self.__running[2] += 1132            if self.get_core_usage() <= 1 and self.can_schedule_queue2():133                self.start_or_unpause_container(self.__queue2[self.__running[1]])134                self.__running[1] += 1135            while self.get_core_usage() < 3 and self.can_schedule_queue1():136                self.start_or_unpause_container(self.__queue1[self.__running[0]])137                self.__running[0] += 1138        if self.__load_level == HIGH:139            if self.get_core_usage() == 0 and self.can_schedule_queue2():140                self.start_or_unpause_container(self.__queue2[self.__running[1]])141                self.__running[1] += 1142            while self.get_core_usage() < 2 and self.can_schedule_queue1():143                self.start_or_unpause_container(self.__queue1[self.__running[0]])144                self.__running[0] += 1145            # If we still didn't schedule anything, just schedule queue3.146            if self.get_core_usage() == 0 and self.__queue3:147                self.start_or_unpause_container(self.__queue3[0])148                self.update_container(self.__queue3[0], "2,3")149                self.__running = [0, 0, 1]150    def DONE(self):151        return self.__running == [0, 0,152                                  0] and not self.__queue1 and not self.__queue2 and not self.__queue3153    def print_queues(self):154        print("queue1", [c.name for c in self.__queue1], end="  ")155        print("queue2", [c.name for c in self.__queue2], end="  ")156        print("queue3", [c.name for c in self.__queue3], end="\n")157    # Container management helpers.158    def create_container(self, c_tuple):159        cont = self.__client.containers.create(cpuset_cpus=c_tuple[0],160                                               name=c_tuple[1],161                                               detach=True,162                                               auto_remove=False,163                                               image=c_tuple[2],164                                               command=c_tuple[3])165        cont.reload()166        return cont167    def hard_remove_container(self, cont):168        if cont is None: return169        try:170            cont.reload()171            if cont.status == "paused":172                cont.unpause()173            cont.reload()174            if cont.status == "running":175                cont.kill()176            cont.remove()177        except:178            print("You fucked up the 'hard_remove thingy'")179    def remove_container(self, cont):180        if cont is None:181            # print("Inside remove container. Returning none.")182            return None183        try:184            # print(f"Removed {cont.name}.")185            cont.remove()186        except:187            self.hard_remove_container(cont)188        return None189    def remove_if_done_container(self, cont):190        if cont is None:191            # print("Inside remove if done, returning none.")192            return False193        cont.reload()194        if cont.status == "exited":195            # print(f"Removing {cont.name} because it is done.")196            self.__logger.log_container_event(cont.name, 'FINISH')197            self.remove_container(cont)198            return True199        return False200        # else:201        #    raise NotImplementedError("IMPLEMENT ME???")202    def pause_container(self, cont):203        if cont is None:204            return205        try:206            cont.reload()207            if cont.status in ["running", "restarting"]:208                self.__logger.log_container_event(cont.name, 'PAUSE')209                cont.pause()210        except:211            print("something seems to have gone wrong while PAUSING the container (But dont care)")212    def unpause_container(self, cont):213        if cont is None:214            return215        cont.reload()216        if cont.status == "paused":217            self.__logger.log_container_event(cont.name, 'UNPAUSE')218            cont.unpause()219    def start_or_unpause_container(self, cont):220        if cont is None:221            return222        cont.reload()223        if cont.status == "paused":224            self.__logger.log_container_event(cont.name, 'UNPAUSE')225            cont.unpause()226            print("unpause " + cont.name)227        elif cont.status == "created":228            self.__logger.log_container_event(cont.name, 'START')229            print("start " + cont.name)230            cont.start()231        else:232            print("start_or_unpause didn't do anything.")233            return234    def update_container(self, cont, cpu_set):235        if cont is None:236            return237        if cont.status == "exited":238            return239        cont.update(cpuset_cpus=cpu_set)240    def hard_remove_everything(self):241        try:242            self.hard_remove_container(self.__client.containers.get("dedup"))243        except:244            print("Tried to remove Dedup, but didn't exist.")245        try:246            self.hard_remove_container(self.__client.containers.get("splash2x-fft"))247        except:248            print("Tried to remove FFT, but didn't exist.")249        try:250            self.hard_remove_container(self.__client.containers.get("blackscholes"))251        except:252            print("Tried to remove Blackscholes, but didn't exist.")253        try:254            self.hard_remove_container(self.__client.containers.get("canneal"))255        except:256            print("Tried to remove Canneal, but didn't exist.")257        try:258            self.hard_remove_container(self.__client.containers.get("freqmine"))259        except:260            print("Tried to remove Freqmine, but didn't exist.")261        try:262            self.hard_remove_container(self.__client.containers.get("ferret"))263        except:264            print("Tried to remove Ferret, but didn't exist.")265    # Old version using different execution stages.266    # def NORMAL_to_HIGH(self):267    #     print("switching to HIGH")268    #     self.__load_level = HIGH269    #     if self.__running == [0, 0, 1]:270    #         # If there are only jobs in q3 left, we don't want want to stop it.271    #         if not self.__queue1 and not self.__queue2:272    #             self.update_container(self.__queue3[0], "2,3")273    #             return274    #275    #         self.pause_container(self.__queue3[0])276    #         if self.__queue2:277    #             self.start_or_unpause_container(self.__queue2[0])278    #             self.__running = [0, 1, 0]279    #         else:280    #             if self.__queue1:281    #                 self.start_or_unpause_container(self.__queue1[0])282    #                 self.__running = [1, 0, 0]283    #             if len(self.__queue1) >= 2:284    #                 self.start_or_unpause_container(self.__queue1[1])285    #                 self.__running = [2, 0, 0]286    #287    #     elif self.__running == [1, 1, 0]:288    #         self.pause_container(self.__queue1[0])289    #         self.__running = [0, 1, 0]290    #291    #     elif self.__running == [3, 0, 0]:292    #         self.pause_container(self.__queue1[2])293    #         self.__running = [2, 0, 0]294    #295    # def HIGH_to_NORMAL(self):296    #     print("switching to NORMAL")297    #     self.__load_level = NORMAL298    #     if self.__running == [0, 1, 0]:299    #         if self.__queue3:300    #             self.pause_container(self.__queue2[0])301    #             self.start_or_unpause_container(self.__queue3[0])302    #             self.__running = [0, 0, 1]303    #         elif self.__queue1:304    #             self.start_or_unpause_container(self.__queue1[0])305    #             self.__running = [1, 1, 0]306    #     elif self.__running == [2, 0, 0]:307    #         if self.__queue3:308    #             self.pause_container(self.__queue1[0])309    #             self.pause_container(self.__queue1[1])310    #             self.start_or_unpause_container(self.__queue3[0])311    #             self.__running = [0, 0, 1]312    #         if len(self.__queue1) >= 3:313    #             self.start_or_unpause_container(self.__queue1[2])314    #             self.__running = [3, 0, 0]315    #316    #     # This is a special case when all other jobs are done.317    #     elif self.__running == [0, 0, 1]:318    #         self.update_container(self.__queue3[0], "1,2,3")319    #320    # def HIGH_to_CRITICAL(self):321    #     print("switching to CRITICAL")322    #     self.__load_level = CRITICAL323    #     # allow memcached to use all cpus.324    #     if self.__running == [0, 1, 0]:325    #         self.pause_container(self.__queue2[0])326    #     elif self.__running == [2, 0, 0]:327    #         self.pause_container(self.__queue1[0])328    #         self.pause_container(self.__queue1[1])329    #     elif self.__running == [1, 0, 0]:330    #         self.pause_container(self.__queue1[0])331    #     elif self.__running == [0, 0, 1]:332    #         self.pause_container(self.__queue3[0])333    #     self.__running = [0, 0, 0]334    #335    # def CRITICAL_to_HIGH(self):336    #     print("switching to HIGH")337    #     self.__load_level = HIGH338    #     if self.__queue2:339    #         self.unpause_container(self.__queue2[0])340    #         self.__running = [0, 1, 0]341    #     elif self.__queue1:342    #         self.unpause_container(self.__queue1[0])343    #         self.__running = [1, 0, 0]344    #         if len(self.__queue1) >= 2:345    #             self.unpause_container(self.__queue1[1])346    #             self.__running = [2, 0, 0]347    #     elif self.__queue3:348    #         self.unpause_container(self.__queue3[0])...controller_final_old.py
Source:controller_final_old.py  
...90        return remove_container(cont)91    return cont92    #else:93    #    raise NotImplementedError("IMPLEMENT ME???")94def pause_container(cont):95    if cont is None: return96    try:97        cont.reload()98        if cont.status in ["running", "restarting"]:99            cont.pause()100    except:101        print("something seems to have gone wrong while PAUSING the container (But dont care)")102def unpause_container(cont):103    if cont is None: return104    cont.reload()105    if cont.status == "paused":106        cont.unpause()107def update_container(cont, cpu_set):108    if cont is None:109        return110    if cont.status == "exited":111        return112    cont.update(cpuset_cpus=cpu_set)113def hard_remove_everything():114    try:115        hard_remove_container(client.containers.get("dedup"))116    except:117        print("Dedup already didn't exist.")118    try:119        hard_remove_container(client.containers.get("splash2x-fft"))120    except:121        print("FFT already didn't exist.")122    try:123        hard_remove_container(client.containers.get("blackscholes"))124    except:125        print("Blackscholes already didn't exist.")126    try:127        hard_remove_container(client.containers.get("canneal"))128    except:129        print("Canneal already didn't exist.")130    try:131        hard_remove_container(client.containers.get("freqmine"))132    except:133        print("Freqmine already didn't exist.")134    try:135        hard_remove_container(client.containers.get("ferret"))136    except:137        print("Ferret already didn't exist.")138def main():139    hard_remove_everything()140    # Queue for CPU1 (1 cores [1])141    queue1 = Queue(maxsize=6)142    container1 = None143    # Queue for CPU2 (2 cores [2,3])144    queue2 = Queue(maxsize=6)145    container2 = None146    queue1.put(dedup)147    queue1.put(fft)148    queue1.put(blackscholes)149    queue2.put(canneal)150    queue2.put(freqmine)151    queue2.put(ferret)152    # Discard first measurement, since it is always wrong.153    psutil.cpu_percent(interval=None, percpu=True)154    mc_pid, mc_ncpus = init_memcached_config()155    running = True156    load_level = NORMAL  # Initialize the load level to normal157    while running:158        cpu_utilizations = psutil.cpu_percent(interval=None, percpu=True)159        cpu_util_avg = cpu_utilizations[0]  \160                if mc_ncpus == 1 \161                else (cpu_utilizations[0] + cpu_utilizations[1]) / 2.0162        if load_level == NORMAL:163            if cpu_util_avg > 90.0:164                load_level = HIGH165                # Update MemCached166                mc_pid, mc_ncpus = set_memcached_cpu(mc_pid, 2)167                # Update containers168                if queue2.empty() and container2 is None and container1 is not None:169                    print(f"Updating CPU set of {container1.name} to 2,3")170                    update_container(container1, "2,3")171                    #container1.update(cpuset_cpus="2,3")172                    # container1.reload()173                else:174                    pause_container(container1)175                if queue1.empty() and container1 is None and container2 is not None:176                    print(f"Updating CPU set of {container2.name} to 2,3")177                    update_container(container2, "2,3")178                    #container2.update(cpuset_cpus="2,3")179        elif load_level == HIGH:180            if cpu_util_avg <= 60:181                # change to 1 core182                load_level = NORMAL183                # Update MemCached184                mc_pid, mc_ncpus = set_memcached_cpu(mc_pid, 1)185                # Update containers186                if queue2.empty() and container2 is None and container1 is not None:187                    print(f"Updating CPU set of {container1.name} to 1-3")188                    update_container(container1, "1-3")189                    #container1.update(cpuset_cpus="1-3")190                    # container1.reload()191                else:192                    unpause_container(container1)193                if queue1.empty() and container1 is None and container2 is not None:194                    print(f"Updating CPU set of {container2.name} to 1-3")195                    update_container(container2, "1-3")196                    #container2.update(cpuset_cpus="1-3")197            elif cpu_util_avg >= 95.0:198                # stop all containers199                load_level = CRITICAL200                pause_container(container1)201                pause_container(container2)202        elif load_level == CRITICAL:203            if cpu_util_avg <= 90:204                # restart containers205                load_level = HIGH206                if container2 is None and queue2.empty():207                    unpause_container(container1)208                else:209                    unpause_container(container2)210        # Remove containers if they are done.211        container1 = remove_if_done_container(container1)212        container2 = remove_if_done_container(container2)213        # Start containers.214        if load_level != CRITICAL:215            if container2 is None and not queue2.empty():216                container2 = create_container(queue2.get())217                print(f"Starting {container2.name}")218                container2.start()219        if load_level == NORMAL:220            if container1 is None and not queue1.empty():221                container1 = create_container(queue1.get())222                print(f"Starting {container1.name}")223                container1.start()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
