Best Python code snippet using localstack_python
kinesis_listener.py
Source:kinesis_listener.py  
...35            consumer = json_safe(consumer)36            STREAM_CONSUMERS.append(consumer)37            return {'Consumer': consumer}38        elif action == 'DeregisterStreamConsumer':39            def consumer_matches(c):40                stream_arn = data.get('StreamARN')41                cons_name = data.get('ConsumerName')42                cons_arn = data.get('ConsumerARN')43                return (c.get('ConsumerARN') == cons_arn or44                    (c.get('StreamARN') == stream_arn and c.get('ConsumerName') == cons_name))45            STREAM_CONSUMERS = [c for c in STREAM_CONSUMERS if not consumer_matches(c)]46            return {}47        elif action == 'ListStreamConsumers':48            result = {49                'Consumers': [c for c in STREAM_CONSUMERS if c.get('StreamARN') == data.get('StreamARN')]50            }51            return result52        elif action == 'DescribeStreamConsumer':53            consumer_arn = data.get('ConsumerARN') or data['ConsumerName']54            consumer_name = data.get('ConsumerName') or data['ConsumerARN']55            creation_timestamp = data.get('ConsumerCreationTimestamp')56            result = {57                'ConsumerDescription': {58                    'ConsumerARN': consumer_arn,59                    'ConsumerCreationTimestamp': creation_timestamp,...loaders.py
Source:loaders.py  
1import os, re2from collections import OrderedDict, Counter, defaultdict3from mmap import mmap4import logging5logger = logging.getLogger( __name__ )6class Archive( object ):7    def __init__( self ):8        pass9    def close( self ):10        pass11    def get_meta( self ):12        pass13    def list_files( self, path=None, recurse=True ):14        pass15    def list_paths( self, path=None, recurse=True ):16        pass17    def get_file_meta( self, path ):18        pass19    def get_file( self, path ):20        pass21    def get_path_meta( self, path ):22        pass23class FileSystem( Archive ):24    def __init__( self, base_path ):25        self.base_path = os.path.abspath( base_path )26    def _to_internal( self, path ):27        assert path.startswith( self.base_path )28        return "." + path[len( self.base_path ) :] + os.path.sep29    def _from_internal( self, path ):30        assert path.startswith( "." + os.path.sep )31        return self.base_path + path[1:]32    def list_paths( self, path=None, recurse=True ):33        results = []34        base = self.base_path if path is None else self._from_internal( path )35        if not recurse:36            _, sub_folders, _ = next( os.walk( base ) )37            results = [self._to_internal( root ) for root in sub_folders]38        else:39            for root, sub_folders, files in os.walk( base ):40                results.append( self._to_internal( root ) )41        return results42    def list_files( self, path=None, recurse=True ):43        results = []44        base = self.base_path if path is None else self._from_internal( path )45        if not recurse:46            _, _, files = next( os.walk( base ) )47            results = [self._to_internal( f ) for f in files]48        else:49            for root, sub_folders, files in os.walk( base ):50                for f in files:51                    results.append( self._to_internal( root ) + f )52        return results53    def get_file( self, path ):54        # TODO: something nicer involving mmap?55        return open( self._from_internal( path ), "r+b" )56class Loader( object ):57    _SEP = re.escape( os.path.sep )58    def __init__(59        self,60        file_class_map,61        dependency_list=None,62        case_sensitive=False,63        unique_matches=True,64    ):65        self.file_class_map = file_class_map66        self.dependency_list = dependency_list67        self.case_sensitive = case_sensitive68        self.unique_matches = unique_matches69        self.re_flags = re.IGNORECASE if not case_sensitive else 070        self.file_re_map = {71            key: re.compile( key, flags=self.re_flags )72            for key, klass in file_class_map.items()73            if klass74        }75        self._files = OrderedDict()76    def load( self, target_path ):77        # target_path = os.path.abspath( target_path )78        self.fs = FileSystem( target_path )79        for f in self.fs.list_files():80            for key, regex in self.file_re_map.items():81                match = regex.search( f )82                if match:83                    self._files[f] = {84                        "klass": self.file_class_map[key],85                        "re": key,86                        "match": match.groups(),87                    }88                    if not self.case_sensitive:89                        self._files[f]["match"] = tuple(90                            [x.upper() for x in self._files[f]["match"]]91                        )92        if self.unique_matches:93            unique_check = {94                k: v95                for k, v in Counter(96                    [x["match"] for x in self._files.values()]97                ).items()98                if v > 199            }100            if unique_check:101                extras = []102                for name, file in self._files.items():103                    if file["match"] in unique_check:104                        extras.append( name )105                self._files = {}106                raise Exception(107                    f"Multiple filename matches found for the same source: {', '.join( extras )}"108                )109        dependencies = []110        if self.dependency_list:111            for i, (consumer, dependency, format, attr) in enumerate(112                self.dependency_list113            ):114                consumer_re = re.compile( consumer, flags=self.re_flags )115                dependency_re = re.compile( dependency, flags=self.re_flags )116                consumer_matches = []117                dependency_matches = []118                if not self.case_sensitive:119                    format = tuple( [x.upper() for x in format] )120                for path in self._files:121                    consumer_match = consumer_re.search( path )122                    dependency_match = dependency_re.search( path )123                    if consumer_match and dependency_match:124                        self._files = {}125                        raise Exception(126                            f"Problem parsing dependencies: path {path} matches for both consumer ({consumer}) and dependency ({dependency})"127                        )128                    elif consumer_match:129                        groups = consumer_match.groups()130                        if not self.case_sensitive:131                            groups = tuple( [x.upper() for x in groups] )132                        consumer_matches.append( (path, groups) )133                    elif dependency_match:134                        groups = dependency_match.groups()135                        if not self.case_sensitive:136                            groups = tuple( [x.upper() for x in groups] )137                        dependency_matches.append( (path, groups) )138                for path, groups in consumer_matches:139                    target_groups = tuple( [x.format( *groups ) for x in format] )140                    if not self.case_sensitive:141                        target_groups = tuple( [x.upper() for x in target_groups] )142                    targets = [143                        x[0] for x in dependency_matches if x[1] == target_groups144                    ]145                    if len( targets ) > 1:146                        self._files = {}147                        raise Exception(148                            f"Problem parsing dependencies: path {path} has multiple matches for dependency {attr} ({', '.join( targets )})"149                        )150                    elif len( targets ) == 1:151                        dependencies.append( (i, path, targets[0]) )152        # make dependency lookup table153        dependency_map = defaultdict( list )154        for index, source, dest in dependencies:155            dependency_map[source].append( (dest, self.dependency_list[index][3]) )156        # model the dependency tree157        head_count = defaultdict( int )158        tails = defaultdict( list )159        heads = []160        for index, tail, head in dependencies:161            head_count[tail] += 1162            if head in tails:163                tails[head].append( tail )164            else:165                tails[head] = [tail]166                heads.append( head )167        load_order = [h for h in heads if h not in head_count]168        for head in load_order:169            for tail in tails[head]:170                head_count[tail] -= 1171                if not head_count[tail]:172                    load_order.append( tail )173        loop = [n for n, heads in head_count.items() if heads]174        if loop:175            self._files = {}176            raise Exception( "Problem parsing dependencies: loop detected" )177        load_order += [x for x in self._files.keys() if x not in load_order]178        # load files in based on dependency sorted list order179        logger.info( f"{self}: loading files" )180        for path in load_order:181            info = self._files[path]182            with self.fs.get_file( path ) as f:183                data = mmap( f.fileno(), 0 )184                logger.info( f'{path} => {info["klass"]}' )185                deps = {186                    attr: self._files[dest]["obj"]187                    for dest, attr in dependency_map[path]188                }189                info["obj"] = info["klass"]( data, preload_attrs=deps )190                data.close()191        self.post_load()192        return193    def post_load( self ):194        pass195    def save_file( self, target ):196        assert target in self._files197        export = self._files[target]["obj"].export_data()198        with open( target, "wb" ) as out:199            out.write( export )200        return201    def keys( self ):202        return self._files.keys()203    def __len__( self ):204        return len( self._files )205    def __getitem__( self, key ):206        return self._files[key]["obj"]207    def __contains__( self, key ):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
