Best Python code snippet using lettuce-tools_python
sqs_queue.py
Source:sqs_queue.py  
1import boto32import ujson3import time4import multiprocessing5from stepist.flow.workers.worker_engine import BaseWorkerEngine6class SQSAdapter(BaseWorkerEngine):7    def __init__(self, session=boto3, visibility_timeout=None,8                 message_retention_period=None, wait_seconds=5,9                 data_pickler=ujson):10        self.data_pickler = data_pickler11        self.session = session12        self.sqs_client = session.client('sqs')13        self.sqs_resource = session.resource('sqs')14        self.message_retention_period = message_retention_period15        self.visibility_timeout = visibility_timeout16        self.wait_seconds = wait_seconds17        self._queues = dict()18        self._steps = dict()19    def add_job(self, step, data, **kwargs):20        queue_name = self.get_queue_name(step)21        queue = self._queues.get(queue_name, None)22        if not queue:23            raise RuntimeError("Queue %s not found" % queue_name)24        kwargs = {25            'MessageBody': self.data_pickler.dumps(data.get_dict()),26            'MessageAttributes': {},27            'DelaySeconds': 028        }29        queue.send_message(**kwargs)30    def add_jobs(self, step, jobs_data, **kwargs):31        for job_data in jobs_data:32            self.add_job(step, job_data, **kwargs)33    def receive_job(self, step, wait_seconds=5):34        q_name = self.get_queue_name(step)35        queue = self.session.resource('sqs').get_queue_by_name(36            QueueName=q_name)37        kwargs = {38            'WaitTimeSeconds': wait_seconds,39            'MaxNumberOfMessages': 1,40            'MessageAttributeNames': ['All'],41            'AttributeNames': ['All'],42        }43        messages = queue.receive_messages(**kwargs)44        if not messages:45            return None46        if len(messages) != 1:47            raise RuntimeError("Got more than 1 job for some reason")48        msg = messages[0]49        msg_result = {50            'Id': msg.message_id,51            'ReceiptHandle': msg.receipt_handle52        }53        queue.delete_messages(Entries=[msg_result])54        return self.data_pickler.loads(msg.body)55    def process(self, *steps, die_when_empty=False, die_on_error=True):56        queues = []57        for step in steps:58            queues.append(self.get_queue_name(step))59        if not queues:60            return61        mng = multiprocessing.Manager()62        empty_queues = mng.dict({q: False for q in queues})63        processes = []64        for queue_name in queues:65            p = multiprocessing.Process(66                target=self.process_queue,67                kwargs={68                    'queue_name': queue_name,69                    'die_on_error': die_on_error,70                    'empty_queues': empty_queues,71                    'die_when_empty': die_when_empty,72                },73            )74            p.start()75            processes.append(p)76        for p in processes:77            p.join()78            p.terminate()79    def process_queue(self, queue_name, die_on_error, empty_queues,80                      die_when_empty):81        try:82            queue = self.session.resource('sqs').get_queue_by_name(QueueName=queue_name)83        except Exception:84            empty_queues[queue_name] = True85            raise86        if not queue_name or not queue:87            empty_queues[queue_name] = True88            return89        while True:90            kwargs = {91                'WaitTimeSeconds': self.wait_seconds,92                'MaxNumberOfMessages': 10,93                'MessageAttributeNames': ['All'],94                'AttributeNames': ['All'],95            }96            messages = queue.receive_messages(**kwargs)97            if not messages:98                empty_queues[queue_name] = True99                if all(list(empty_queues.values())) and die_when_empty:100                    exit()101                time.sleep(self.wait_seconds)102                continue103            empty_queues[queue_name] = False104            msg_results = []105            for msg in messages:106                data = self.data_pickler.loads(msg.body)107                try:108                    self._steps[queue_name].receive_job(**data)109                except Exception:110                    empty_queues[queue_name] = True111                    if die_on_error:112                        raise113                msg_results.append({114                    'Id': msg.message_id,115                    'ReceiptHandle': msg.receipt_handle116                })117            if msg_results:118                queue.delete_messages(Entries=msg_results)119    def flush_queue(self, step):120        raise NotImplemented("Not implemented yet. Delete queue using "121                             "SQS dashboard")122    def jobs_count(self, *steps):123        jobs = 0124        for step in steps:125            queue_name = self.get_queue_name(step)126            sqs_q = self.sqs_client.get_queue_url(QueueName=queue_name)127            attrs = self.sqs_client.get_queue_attributes(128                sqs_q, ['ApproximateNumberOfMessages'])129            jobs += attrs.get("ApproximateNumberOfMessages", 0)130        return jobs131    def register_worker(self, step):132        queue_name = self.get_queue_name(step)133        attrs = {}134        kwargs = {135            'QueueName': queue_name,136            'Attributes': attrs,137        }138        if self.message_retention_period is not None:139            attrs['MessageRetentionPeriod'] = str(self.message_retention_period)140        if self.visibility_timeout is not None:141            attrs['VisibilityTimeout'] = str(self.visibility_timeout)142        self.sqs_client.create_queue(**kwargs)143        queue = self.sqs_resource.get_queue_by_name(QueueName=queue_name)144        self._queues[queue_name] = queue145        self._steps[queue_name] = step146    def monitor_steps(self, step_keys, monitoring_for_sec):147        pass148    def get_queue_name(self, step):149        return step.step_key().replace(":", "-")150def _move_first_to_the_end(a):151    return a[1:] + [a[0]]152class DieWhenEmpty:153    def __init__(self, active, queues):154        self.active = active155        self.queues = queues156        self.queus_no_jobs = set()157    def update_status(self, queue_name, no_job):158        if no_job:159            self.queus_no_jobs.add(queue_name)160        elif queue_name in self.queus_no_jobs:161            self.queus_no_jobs.remove(queue_name)162    def __bool__(self):...d23.py
Source:d23.py  
1import collections2import intcode3import sys4program = [int(x) for x in open("../input/23", "r").readline().split(",")]5computers = [intcode.Computer(program, network_id=i) for i in range(50)]6queues = [collections.deque([i]) for i in range(len(computers))]7output_buffers = [[] for _ in range(len(computers))]8waiting = [False for _ in range(len(computers))]9prev_nat_x, prev_nat_y = -1, -110nat_x, nat_y = 0, 011pt1 = False12while True:13    for i in range(len(computers)):14        c = computers[i]15        c.step()16        if c.SIG_INPUT:17            if len(queues[i]) > 0:18                c.input = queues[i].popleft()19                waiting[i] = False20            else:21                c.input = -122                waiting[i] = True23        if c.SIG_OUTPUT:24            output_buffers[i].append(c.output)25            c.output = None26            waiting[i] = False27            if len(output_buffers[i]) == 3:28                addr, x, y = output_buffers[i]29                output_buffers[i] = []30                if addr == 255:31                    if not pt1:32                        print("pt1", y)33                        pt1 = True34                    nat_x, nat_y = x, y35                else:36                    queues[addr].append(x)37                    queues[addr].append(y)38    empty_queues = True39    for q in queues:40        if len(q) > 0:41            empty_queues = False42            break43    all_waiting = True44    for w in waiting:45        if not w:46            all_waiting = False47            break48    if empty_queues and all_waiting:49        queues[0].append(nat_x)50        queues[0].append(nat_y)51        if nat_y == prev_nat_y:52            print("pt2", nat_y)53            sys.exit()...parallelisation.py
Source:parallelisation.py  
...20    print(f'==> Properly Terminating {len(children)} child Processes & Threads...', flush=True)21    for terminator in terminators:22        terminator.set()23    24    empty_queues(queues)2526    pbar = tqdm(total=len(children), ascii=True, ncols=100, unit_scale=True)27    while len(children) > 0:28        alive_children = []29        for child in children:30            if child.is_alive():31                child.join(timeout=1)32                alive_children.append(child)33            else:34                pbar.update(1)35        children = alive_children36    pbar.close()37    38    empty_queues(queues)394041def empty_queues(queues):42    for queue in queues:43        try:44            while not queue.empty():45                queue.get(False)46        except Exception:
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
