Best Python code snippet using localstack_python
prototype.py
Source:prototype.py  
1#csv_in ---> logger2import csv3class component(object):4    def __init__(self,*args, **argv):5        is_start=True6        self.data=[]7        self.job=argv.get('job',False)8        self.trans_out = []9        self.trans_in = []10    def event(self,name):11        return self.output(event=name)12    def start(self):13        self.event('component_start')14        return True15    def stop(self):16        self.event('component_end')17        return True18    def run(self,input_data=[]):19        self.event('component_run')20        data=self.output(input_data)21        return data22    def input(self, rows, transition=None):23        self.event('component_inputflow')24        return self.output(rows)25    def output(self, rows=None, channel=None,event=None):26        self.event('component_outputflow')27        for trans in self.trans_out:28            if (not channel) or (trans.channel_source==channel) or (not trans.channel_source):29                if trans.type=='data_transition' or (trans.type=='trigger_transition' and event==trans.listen_event):30                    yield rows, trans.destination31                else:32                    yield rows,None33class csv_in(component):34    def __init__(self, filename, *args, **argv):35        super(csv_in, self).__init__(*args, **argv)36        self.filename = filename37    def run(self,data=[]):        38        fp = csv.DictReader(file(self.filename))39        data=[]40        for row in fp:41            data.append(row)42        return super(csv_in, self).run(data)43class csv_out(component):44    def __init__(self, filename, *args, **argv):45        super(csv_out, self).__init__(*args, **argv)46        self.filename=filename47        self.fp=None48    def run(self,rows=[]):49        self.fp = file(self.filename, 'wb+')50        return self.input(rows)51    def input(self,rows=[]):52        fieldnames = rows[0].keys()53        fp = csv.DictWriter(self.fp, fieldnames)54        fp.writerow(dict(map(lambda x: (x,x), fieldnames)))55        fp.writerows(rows)        56        return super(csv_out, self).input(rows)57class sort(component):58    def __init__(self, fieldname, *args, **argv):59        self.fieldname = fieldname60        super(sort, self).__init__(*args, **argv)    61    def run(self,rows=[], transition=None):62        self.data=rows63        self.data.sort(lambda x,y: cmp(x[self.fieldname],y[self.fieldname]))64        return super(sort, self).run(self.data)     65    66class logger(component):67    def __init__(self, name, *args, **argv):68        self.name = name69        super(logger, self).__init__(*args, **argv) 70    def run(self,data=[]): 71        res=[]    72        print ' Logger : ',self.name73        for row in data:74            print row75            res.append(row)76        return super(logger, self).run(data)77class transition(object):78    def __init__(self, source, destination,type='data_transition', status='open', channel_source=None, channel_destination=None):79        self.type=type80        self.source = source81        self.source.trans_out.append(self)82        self.destination = destination83        self.destination.trans_in.append(self)84        self.status = 'open'85        self.channel_source = channel_source86        self.channel_destination = channel_destination87class job(object):88    def __init__(self,start_component,components=[]):89        self.components=components90        self.start_component=start_component91        for component in components:92            component.job=self93    def run(self):94        data=None95        start_component=self.start_component        96        def _run(data,component):            97            if not component:98                return            99            res=component.start()        100            if not res:101                raise Exception('not started component')102            try:103                res_list=component.run(data)                104                for out_data,out_component in res_list:                    105                    _run(out_data,out_component)106            except Exception,e:107                raise e108            finally:109                component.stop() 110        _run(data,start_component)    111            112csv_in1= csv_in('partner.csv')113csv_out1= csv_out('partner2.csv')114sort1=sort('name')115log1=logger(name='Read Partner File')116log2=logger(name='After Sort')117tran1=transition(csv_in1,log1)118tran2=transition(csv_in1,sort1)119tran3=transition(sort1,csv_out1)120tran3=transition(sort1,log2)121job1=job(csv_in1,[csv_in1,log1])...day24.py
Source:day24.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import heapq4from collections import namedtuple5def solve(data):6    Component = namedtuple('Component', ['id', 'first', 'second'])7    components = []8    for i, line in enumerate(data.splitlines()):9        components.append(Component(i, *map(int, line.split('/'))))10    q = []11    heapq.heapify(q)12    for start_component in filter(lambda x: x.first == 0, components):13        heapq.heappush(q, (start_component.second, [start_component, ]))14    for start_component in filter(lambda x: x.second == 0, components):15        heapq.heappush(q, (start_component.first, [start_component, ]))16    best_bridge = 017    longest_and_strongest_bridge = (0, 0)18    while q:19        free_port, bridge = heapq.heappop(q)20        free_components = list(filter(lambda x: x not in bridge, components))21        for continuation in filter(lambda x: x.first == free_port,22                                   free_components):23            heapq.heappush(q, (continuation.second,24                               list(bridge) + [continuation, ]))25        for continuation in filter(lambda x: x.second == free_port,26                                   free_components):27            heapq.heappush(q, (continuation.first,28                               list(bridge) + [continuation, ]))29        bridge_strength = sum([c.first + c.second for c in bridge])30        if bridge_strength > best_bridge:31            best_bridge = bridge_strength32        if (len(bridge) > longest_and_strongest_bridge[0]) or \33                (len(bridge) == longest_and_strongest_bridge[0] and34                 bridge_strength > longest_and_strongest_bridge[1]):35            longest_and_strongest_bridge = (len(bridge), bridge_strength)36    return best_bridge, longest_and_strongest_bridge37def main():38    from _aocutils import ensure_data39    ensure_data(24)40    with open('input_24.txt', 'r') as f:41        data = f.read()42    sol_1, sol_2 = solve(data)43    print("Part 1: {0}".format(sol_1))44    print("Part 2: {0}".format(sol_2[1]))45if __name__ == '__main__':...main.py
Source:main.py  
1#!/usr/bin/env python2import click3from start_component import start_component4@click.group()5def cli():6	pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
