Best Python code snippet using localstack_python
svclambda.py
Source:svclambda.py  
...40            retry = (state == 'Pending') or (lastUpdateStatus == 'InProgress')41            if not retry: return exFunction42            if self._utils.retry(tracker):43                tracker = self._utils.backoff(tracker)44    def get_function_concurrency(self, functionArn):45        op = 'get_function_concurrency'46        try:47            response = self._client.get_function_concurrency(48                FunctionName=functionArn49            )50            return response.get('ReservedConcurrentExecutions', _UndefinedConcurrency)51        except botocore.exceptions.ClientError as e:52            if self._utils.is_resource_not_found(e): return -153            raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn))54    # Allow lambda:CreateFunction55    def create_function(self, functionName, rq, codeZip, tags):56        op = 'create_function'57        tracker = self._utils.init_tracker(op)58        while True:59            try:60                response = self._client.create_function(61                    FunctionName=functionName,62                    Runtime=rq['Runtime'],63                    Role=rq['Role'],64                    Handler=rq['Handler'],65                    Description=rq['Description'],66                    Timeout=rq['Timeout'],67                    MemorySize=rq['MemorySize'],68                    Environment=rq['Environment'],69                    Code={70                        'ZipFile': codeZip71                    },72                    Tags=tags.toDict()73                )74                return response75            except botocore.exceptions.ClientError as e:76                if self._utils.retry_propagation_delay(e, tracker):77                    tracker = self._utils.backoff(tracker)78                    continue79                raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'RoleArn', rq['Role'], 'Tracker', tracker))80    # Allow lambda:UpdateFunctionConfiguration81    def update_function_configuration(self, functionName, rq):82        op = 'update_function_configuration'83        tracker = self._utils.init_tracker(op)84        while True:85            try:86                response = self._client.update_function_configuration(87                    FunctionName=functionName,88                    Runtime=rq['Runtime'],89                    Role=rq['Role'],90                    Handler=rq['Handler'],91                    Description=rq['Description'],92                    Timeout=rq['Timeout'],93                    MemorySize=rq['MemorySize'],94                    Environment=rq['Environment']95                )96                return response97            except botocore.exceptions.ClientError as e:98                if self._utils.retry_propagation_delay(e, tracker):99                    tracker = self._utils.backoff(tracker)100                    continue101                if self._utils.retry_resource_conflict(e, tracker):102                    tracker = self._utils.backoff(tracker)103                    continue104                raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'RoleArn', rq['Role'], 'Tracker', tracker))105    # Allow lambda:UpdateFunctionCode106    def update_function_code(self, functionName, codeZip):107        op = 'update_function_code'108        tracker = self._utils.init_tracker(op)109        while True:110            try:111                response = self._client.update_function_code(112                    FunctionName=functionName,113                    ZipFile=codeZip114                )115                return response116            except botocore.exceptions.ClientError as e:117                if self._utils.retry_resource_conflict(e, tracker):118                    tracker = self._utils.backoff(tracker)119                    continue120                raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))121    def put_function_concurrency(self, functionArn, reservedConcurrentExecutions: int):122        op = 'put_function_concurrency'123        if reservedConcurrentExecutions == _UndefinedConcurrency: return False124        try:125            self._client.put_function_concurrency(126                FunctionName=functionArn,127                ReservedConcurrentExecutions = reservedConcurrentExecutions128            )129            return True130        except botocore.exceptions.ClientError as e:131            raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'ReservedConcurrentExecutions', reservedConcurrentExecutions))132    def get_policy(self, functionArn):133        op = 'get_policy'134        try:135            response = self._client.get_policy(136                FunctionName=functionArn137            )138            return response139        except botocore.exceptions.ClientError as e:140            if self._utils.is_resource_not_found(e): return None141            raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn))142    def add_permission(self, functionArn, sid, action, principal, sourceArn):143        op = 'add_permission'144        try:145            self._client.add_permission(146                FunctionName=functionArn,147                StatementId = sid,148                Action = action,149                Principal = principal,150                SourceArn = sourceArn151            )152        except botocore.exceptions.ClientError as e:153            raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'Principal', principal))154    def remove_permission(self, functionArn, sid):155        op = 'remove_permission'156        try:157            self._client.remove_permission(158                FunctionName=functionArn,159                StatementId = sid160            )161            return True162        except botocore.exceptions.ClientError as e:163            if self._utils.is_resource_not_found(e): return False164            raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'Sid', sid))165    # Allow lambda:DeleteFunction166    def delete_function(self, functionName):167        op ='delete_function'168        try:169            self._client.delete_function(170                FunctionName=functionName171            )172            return True173        except botocore.exceptions.ClientError as e:174            if self._utils.is_resource_not_found(e): return False175            raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))176    def list_event_source_mappings(self, functionName, eventSourceArn):177        op = "list_event_source_mappings"178        try:179            paginator = self._client.get_paginator(op)180            page_iterator = paginator.paginate(FunctionName=functionName, EventSourceArn=eventSourceArn)181            mappings = []182            for page in page_iterator:183                items = page["EventSourceMappings"]184                for item in items:185                    mappings.append(item)186            return mappings187        except botocore.exceptions.ClientError as e:188            raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))189    def list_event_source_mappings_all(self, functionName):190        op = "list_event_source_mappings"191        try:192            paginator = self._client.get_paginator(op)193            page_iterator = paginator.paginate(FunctionName=functionName)194            mappings = []195            for page in page_iterator:196                items = page["EventSourceMappings"]197                for item in items:198                    mappings.append(item)199            return mappings200        except botocore.exceptions.ClientError as e:201            raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))202    def get_event_source_mapping(self, uuid):203        op = 'get_event_source_mapping'204        try:205            response = self._client.get_event_source_mapping(206                UUID=uuid207            )208            return response209        except botocore.exceptions.ClientError as e:210            raise RdqError(self._utils.fail(e, op, 'UUID', uuid))211    def find_event_source_mapping(self, functionName, eventSourceArn):212        mappings = self.list_event_source_mappings(functionName, eventSourceArn)213        mappingCount = len(mappings)214        if mappingCount == 0: return None215        if mappingCount > 1:216            msg = "Multiple ({}) event source mappings".format(mappingCount)217            raise RdqError(self._utils.integrity(msg, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))218        mapping = mappings[0]219        uuid = mapping['UUID']220        return self.get_event_source_mapping(uuid)221    def create_event_source_mapping_uuid(self, functionName, eventSourceArn, cfg):222        op = 'create_event_source_mapping'223        try:224            response = self._client.create_event_source_mapping(225                FunctionName=functionName,226                EventSourceArn=eventSourceArn,227                BatchSize=cfg['BatchSize'],228                MaximumBatchingWindowInSeconds=cfg['MaximumBatchingWindowInSeconds'],229                Enabled=True230            )231            return response['UUID']232        except botocore.exceptions.ClientError as e:233            raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))234    def update_event_source_mapping(self, uuid, cfg):235        op = 'update_event_source_mapping'236        try:237            self._client.update_event_source_mapping(238                UUID=uuid,239                BatchSize=cfg['BatchSize'],240                MaximumBatchingWindowInSeconds=cfg['MaximumBatchingWindowInSeconds']241            )242        except botocore.exceptions.ClientError as e:243            raise RdqError(self._utils.fail(e, op, 'UUID', uuid))244    def enable_event_source_mapping_for_uuid(self, uuid, enabled):245        op = 'update_event_source_mapping'246        try:247            self._client.update_event_source_mapping(248                UUID=uuid,249                Enabled=enabled250            )251        except botocore.exceptions.ClientError as e:252            raise RdqError(self._utils.fail(e, op, 'UUID', uuid))253    def enable_event_source_mappings(self, mappings, enabled):254        for mapping in mappings:255            uuid = mapping['UUID']256            self.enable_event_source_mapping_for_uuid(uuid, enabled)257    def delete_event_source_mappings(self, mappings):258        for mapping in mappings:259            uuid = mapping['UUID']260            self.delete_event_source_mapping_for_uuid(uuid)261    def delete_event_source_mapping_for_uuid(self, uuid):262        op = 'delete_event_source_mapping'263        tracker = self._utils.init_tracker(op)264        while True:265            try:266                self._client.delete_event_source_mapping(267                    UUID=uuid268                )269                return270            except botocore.exceptions.ClientError as e:271                if self._utils.is_resource_not_found(e): return272                if self._utils.retry_resource_in_use(e, tracker):273                    tracker = self._utils.backoff(tracker)274                    continue275                raise RdqError(self._utils.fail(e, op, 'UUID', uuid))276    # Allow lambda:InvokeFunction277    def invoke_function_json(self, functionName, payloadMap):278        op = 'invoke'279        inJson = self._utils.to_json(payloadMap)280        inBytes = inJson.encode("utf-8")281        try:282            response = self._client.invoke(283                FunctionName=functionName,284                InvocationType='RequestResponse',285                Payload=inBytes286            )287            outBytes = response['Payload'].read()288            outJson = outBytes.decode("utf-8")289            outMap = self._utils.from_json(outJson)290            return {291                'StatusCode': response['StatusCode'],292                'Payload': outMap293            }294        except botocore.exceptions.ClientError as e:295            raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))296    def declareFunctionArn(self, functionName, description, roleArn, cfg: dict, concurrency: dict, codeZip, tags: Tags):297        dbCfg = DeltaBuild()298        dbCfg.putRequired('Handler', 'lambda_function.lambda_handler')299        dbCfg.putRequired('Environment.Variables.LOGLEVEL', 'INFO')300        dbCfg.updateRequired(cfg)301        dbCfg.putRequired('Description', description)302        dbCfg.putRequired('Role', roleArn)303        rqCfg = dbCfg.required()304        rqReservedConcurrentExecutions = concurrency.get('ReservedConcurrentExecutions', _UndefinedConcurrency)305        exFunction = self.get_function(functionName)306        if not exFunction:307            newFunction = self.create_function(functionName, rqCfg, codeZip, tags)308            self.get_function_nonpending(functionName)309            self.put_function_concurrency(functionName, rqReservedConcurrentExecutions)310            return newFunction['FunctionArn']311        exFunctionConfiguration = exFunction['Configuration']312        exFunctionArn = exFunctionConfiguration['FunctionArn']313        dbCfg.loadExisting(exFunctionConfiguration)314        delta = dbCfg.delta()315        if delta:316            self._utils.info('CheckConfigurationDelta', 'FunctionName', functionName, "Reconfiguring", "Delta", delta)317            self.update_function_configuration(functionName, rqCfg)318            self.get_function_nonpending(functionName)        319        exCodeSha256 = exFunctionConfiguration['CodeSha256']320        inCodeSha256 = _codeSha256(codeZip)321        if inCodeSha256 != exCodeSha256:322            self.update_function_code(functionName, codeZip)323            self.get_function_nonpending(functionName)324        else:325            self._utils.info('CheckCodeSHA256', 'FunctionName', functionName, "Code unchanged", "CodeSHA256", exCodeSha256)326        exReservedConcurrentExecutions = self.get_function_concurrency(functionName)327        if exReservedConcurrentExecutions != rqReservedConcurrentExecutions:328            self.put_function_concurrency(functionName, rqReservedConcurrentExecutions)329        exTags = Tags(exFunction['Tags'], functionName)330        self._utils.declare_tags(exFunctionArn, tags, exTags)331        return exFunctionArn332    def declareInvokePermission(self, functionArn, sid, principal, sourceArn):333        action = "lambda:InvokeFunction"334        self.remove_permission(functionArn, sid)335        self.add_permission(functionArn, sid, action, principal, sourceArn)336    def removeFunction(self, functionName):337        return self.delete_function(functionName)338    def declareEventSourceMappingUUID(self, functionName, eventSourceArn, cfg):339        exMapping = self.find_event_source_mapping(functionName, eventSourceArn)340        if not exMapping:...helper.py
Source:helper.py  
...24            return resp25        except Exception as e:26            logging.debug(e)27            return None28    def get_function_concurrency(self, function_name: str) -> int:29        try:30            resp = self.lambda_client.get_function_concurrency(31                FunctionName=function_name32            )33            return resp['ReservedConcurrentExecutions']34        except Exception as e:35            logging.debug(e)36            return None37    def get_function_code_signing_config(self, function_name: str) -> int:38        try:39            resp = self.lambda_client.get_function_code_signing_config(40                FunctionName=function_name41            )42            return resp['CodeSigningConfigArn']43        except Exception as e:44            logging.debug(e)...lambda_concurrency_limit.py
Source:lambda_concurrency_limit.py  
...9for items in page_iterator:10    for functions in items['Functions']:11        #print(cnt, functions['FunctionName'])12        cnt = cnt+113        response = lambda_client.get_function_concurrency(FunctionName=functions['FunctionName'])14        try:15                    print(cnt, functions['FunctionName'], response['ReservedConcurrentExecutions'] )16        except Exception as e:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
