How to use get_function_concurrency method in localstack

Best Python code snippet using localstack_python

svclambda.py

Source:svclambda.py Github

copy

Full Screen

...40 retry = (state == 'Pending') or (lastUpdateStatus == 'InProgress')41 if not retry: return exFunction42 if self._utils.retry(tracker):43 tracker = self._utils.backoff(tracker)44 def get_function_concurrency(self, functionArn):45 op = 'get_function_concurrency'46 try:47 response = self._client.get_function_concurrency(48 FunctionName=functionArn49 )50 return response.get('ReservedConcurrentExecutions', _UndefinedConcurrency)51 except botocore.exceptions.ClientError as e:52 if self._utils.is_resource_not_found(e): return -153 raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn))54 # Allow lambda:CreateFunction55 def create_function(self, functionName, rq, codeZip, tags):56 op = 'create_function'57 tracker = self._utils.init_tracker(op)58 while True:59 try:60 response = self._client.create_function(61 FunctionName=functionName,62 Runtime=rq['Runtime'],63 Role=rq['Role'],64 Handler=rq['Handler'],65 Description=rq['Description'],66 Timeout=rq['Timeout'],67 MemorySize=rq['MemorySize'],68 Environment=rq['Environment'],69 Code={70 'ZipFile': codeZip71 },72 Tags=tags.toDict()73 )74 return response75 except botocore.exceptions.ClientError as e:76 if self._utils.retry_propagation_delay(e, tracker):77 tracker = self._utils.backoff(tracker)78 continue79 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'RoleArn', rq['Role'], 'Tracker', tracker))80 # Allow lambda:UpdateFunctionConfiguration81 def update_function_configuration(self, functionName, rq):82 op = 'update_function_configuration'83 tracker = self._utils.init_tracker(op)84 while True:85 try:86 response = self._client.update_function_configuration(87 FunctionName=functionName,88 Runtime=rq['Runtime'],89 Role=rq['Role'],90 Handler=rq['Handler'],91 Description=rq['Description'],92 Timeout=rq['Timeout'],93 MemorySize=rq['MemorySize'],94 Environment=rq['Environment']95 )96 return response97 except botocore.exceptions.ClientError as e:98 if self._utils.retry_propagation_delay(e, tracker):99 tracker = self._utils.backoff(tracker)100 continue101 if self._utils.retry_resource_conflict(e, tracker):102 tracker = self._utils.backoff(tracker)103 continue104 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'RoleArn', rq['Role'], 'Tracker', tracker))105 # Allow lambda:UpdateFunctionCode106 def update_function_code(self, functionName, codeZip):107 op = 'update_function_code'108 tracker = self._utils.init_tracker(op)109 while True:110 try:111 response = self._client.update_function_code(112 FunctionName=functionName,113 ZipFile=codeZip114 )115 return response116 except botocore.exceptions.ClientError as e:117 if self._utils.retry_resource_conflict(e, tracker):118 tracker = self._utils.backoff(tracker)119 continue120 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))121 def put_function_concurrency(self, functionArn, reservedConcurrentExecutions: int):122 op = 'put_function_concurrency'123 if reservedConcurrentExecutions == _UndefinedConcurrency: return False124 try:125 self._client.put_function_concurrency(126 FunctionName=functionArn,127 ReservedConcurrentExecutions = reservedConcurrentExecutions128 )129 return True130 except botocore.exceptions.ClientError as e:131 raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'ReservedConcurrentExecutions', reservedConcurrentExecutions))132 def get_policy(self, functionArn):133 op = 'get_policy'134 try:135 response = self._client.get_policy(136 FunctionName=functionArn137 )138 return response139 except botocore.exceptions.ClientError as e:140 if self._utils.is_resource_not_found(e): return None141 raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn))142 def add_permission(self, functionArn, sid, action, principal, sourceArn):143 op = 'add_permission'144 try:145 self._client.add_permission(146 FunctionName=functionArn,147 StatementId = sid,148 Action = action,149 Principal = principal,150 SourceArn = sourceArn151 )152 except botocore.exceptions.ClientError as e:153 raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'Principal', principal))154 def remove_permission(self, functionArn, sid):155 op = 'remove_permission'156 try:157 self._client.remove_permission(158 FunctionName=functionArn,159 StatementId = sid160 )161 return True162 except botocore.exceptions.ClientError as e:163 if self._utils.is_resource_not_found(e): return False164 raise RdqError(self._utils.fail(e, op, 'FunctionArn', functionArn, 'Sid', sid))165 # Allow lambda:DeleteFunction166 def delete_function(self, functionName):167 op ='delete_function'168 try:169 self._client.delete_function(170 FunctionName=functionName171 )172 return True173 except botocore.exceptions.ClientError as e:174 if self._utils.is_resource_not_found(e): return False175 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))176 def list_event_source_mappings(self, functionName, eventSourceArn):177 op = "list_event_source_mappings"178 try:179 paginator = self._client.get_paginator(op)180 page_iterator = paginator.paginate(FunctionName=functionName, EventSourceArn=eventSourceArn)181 mappings = []182 for page in page_iterator:183 items = page["EventSourceMappings"]184 for item in items:185 mappings.append(item)186 return mappings187 except botocore.exceptions.ClientError as e:188 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))189 def list_event_source_mappings_all(self, functionName):190 op = "list_event_source_mappings"191 try:192 paginator = self._client.get_paginator(op)193 page_iterator = paginator.paginate(FunctionName=functionName)194 mappings = []195 for page in page_iterator:196 items = page["EventSourceMappings"]197 for item in items:198 mappings.append(item)199 return mappings200 except botocore.exceptions.ClientError as e:201 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))202 def get_event_source_mapping(self, uuid):203 op = 'get_event_source_mapping'204 try:205 response = self._client.get_event_source_mapping(206 UUID=uuid207 )208 return response209 except botocore.exceptions.ClientError as e:210 raise RdqError(self._utils.fail(e, op, 'UUID', uuid))211 def find_event_source_mapping(self, functionName, eventSourceArn):212 mappings = self.list_event_source_mappings(functionName, eventSourceArn)213 mappingCount = len(mappings)214 if mappingCount == 0: return None215 if mappingCount > 1:216 msg = "Multiple ({}) event source mappings".format(mappingCount)217 raise RdqError(self._utils.integrity(msg, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))218 mapping = mappings[0]219 uuid = mapping['UUID']220 return self.get_event_source_mapping(uuid)221 def create_event_source_mapping_uuid(self, functionName, eventSourceArn, cfg):222 op = 'create_event_source_mapping'223 try:224 response = self._client.create_event_source_mapping(225 FunctionName=functionName,226 EventSourceArn=eventSourceArn,227 BatchSize=cfg['BatchSize'],228 MaximumBatchingWindowInSeconds=cfg['MaximumBatchingWindowInSeconds'],229 Enabled=True230 )231 return response['UUID']232 except botocore.exceptions.ClientError as e:233 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName, 'EventSourceArn', eventSourceArn))234 def update_event_source_mapping(self, uuid, cfg):235 op = 'update_event_source_mapping'236 try:237 self._client.update_event_source_mapping(238 UUID=uuid,239 BatchSize=cfg['BatchSize'],240 MaximumBatchingWindowInSeconds=cfg['MaximumBatchingWindowInSeconds']241 )242 except botocore.exceptions.ClientError as e:243 raise RdqError(self._utils.fail(e, op, 'UUID', uuid))244 def enable_event_source_mapping_for_uuid(self, uuid, enabled):245 op = 'update_event_source_mapping'246 try:247 self._client.update_event_source_mapping(248 UUID=uuid,249 Enabled=enabled250 )251 except botocore.exceptions.ClientError as e:252 raise RdqError(self._utils.fail(e, op, 'UUID', uuid))253 def enable_event_source_mappings(self, mappings, enabled):254 for mapping in mappings:255 uuid = mapping['UUID']256 self.enable_event_source_mapping_for_uuid(uuid, enabled)257 def delete_event_source_mappings(self, mappings):258 for mapping in mappings:259 uuid = mapping['UUID']260 self.delete_event_source_mapping_for_uuid(uuid)261 def delete_event_source_mapping_for_uuid(self, uuid):262 op = 'delete_event_source_mapping'263 tracker = self._utils.init_tracker(op)264 while True:265 try:266 self._client.delete_event_source_mapping(267 UUID=uuid268 )269 return270 except botocore.exceptions.ClientError as e:271 if self._utils.is_resource_not_found(e): return272 if self._utils.retry_resource_in_use(e, tracker):273 tracker = self._utils.backoff(tracker)274 continue275 raise RdqError(self._utils.fail(e, op, 'UUID', uuid))276 # Allow lambda:InvokeFunction277 def invoke_function_json(self, functionName, payloadMap):278 op = 'invoke'279 inJson = self._utils.to_json(payloadMap)280 inBytes = inJson.encode("utf-8")281 try:282 response = self._client.invoke(283 FunctionName=functionName,284 InvocationType='RequestResponse',285 Payload=inBytes286 )287 outBytes = response['Payload'].read()288 outJson = outBytes.decode("utf-8")289 outMap = self._utils.from_json(outJson)290 return {291 'StatusCode': response['StatusCode'],292 'Payload': outMap293 }294 except botocore.exceptions.ClientError as e:295 raise RdqError(self._utils.fail(e, op, 'FunctionName', functionName))296 def declareFunctionArn(self, functionName, description, roleArn, cfg: dict, concurrency: dict, codeZip, tags: Tags):297 dbCfg = DeltaBuild()298 dbCfg.putRequired('Handler', 'lambda_function.lambda_handler')299 dbCfg.putRequired('Environment.Variables.LOGLEVEL', 'INFO')300 dbCfg.updateRequired(cfg)301 dbCfg.putRequired('Description', description)302 dbCfg.putRequired('Role', roleArn)303 rqCfg = dbCfg.required()304 rqReservedConcurrentExecutions = concurrency.get('ReservedConcurrentExecutions', _UndefinedConcurrency)305 exFunction = self.get_function(functionName)306 if not exFunction:307 newFunction = self.create_function(functionName, rqCfg, codeZip, tags)308 self.get_function_nonpending(functionName)309 self.put_function_concurrency(functionName, rqReservedConcurrentExecutions)310 return newFunction['FunctionArn']311 exFunctionConfiguration = exFunction['Configuration']312 exFunctionArn = exFunctionConfiguration['FunctionArn']313 dbCfg.loadExisting(exFunctionConfiguration)314 delta = dbCfg.delta()315 if delta:316 self._utils.info('CheckConfigurationDelta', 'FunctionName', functionName, "Reconfiguring", "Delta", delta)317 self.update_function_configuration(functionName, rqCfg)318 self.get_function_nonpending(functionName) 319 exCodeSha256 = exFunctionConfiguration['CodeSha256']320 inCodeSha256 = _codeSha256(codeZip)321 if inCodeSha256 != exCodeSha256:322 self.update_function_code(functionName, codeZip)323 self.get_function_nonpending(functionName)324 else:325 self._utils.info('CheckCodeSHA256', 'FunctionName', functionName, "Code unchanged", "CodeSHA256", exCodeSha256)326 exReservedConcurrentExecutions = self.get_function_concurrency(functionName)327 if exReservedConcurrentExecutions != rqReservedConcurrentExecutions:328 self.put_function_concurrency(functionName, rqReservedConcurrentExecutions)329 exTags = Tags(exFunction['Tags'], functionName)330 self._utils.declare_tags(exFunctionArn, tags, exTags)331 return exFunctionArn332 def declareInvokePermission(self, functionArn, sid, principal, sourceArn):333 action = "lambda:InvokeFunction"334 self.remove_permission(functionArn, sid)335 self.add_permission(functionArn, sid, action, principal, sourceArn)336 def removeFunction(self, functionName):337 return self.delete_function(functionName)338 def declareEventSourceMappingUUID(self, functionName, eventSourceArn, cfg):339 exMapping = self.find_event_source_mapping(functionName, eventSourceArn)340 if not exMapping:...

Full Screen

Full Screen

helper.py

Source:helper.py Github

copy

Full Screen

...24 return resp25 except Exception as e:26 logging.debug(e)27 return None28 def get_function_concurrency(self, function_name: str) -> int:29 try:30 resp = self.lambda_client.get_function_concurrency(31 FunctionName=function_name32 )33 return resp['ReservedConcurrentExecutions']34 except Exception as e:35 logging.debug(e)36 return None37 def get_function_code_signing_config(self, function_name: str) -> int:38 try:39 resp = self.lambda_client.get_function_code_signing_config(40 FunctionName=function_name41 )42 return resp['CodeSigningConfigArn']43 except Exception as e:44 logging.debug(e)...

Full Screen

Full Screen

lambda_concurrency_limit.py

Source:lambda_concurrency_limit.py Github

copy

Full Screen

...9for items in page_iterator:10 for functions in items['Functions']:11 #print(cnt, functions['FunctionName'])12 cnt = cnt+113 response = lambda_client.get_function_concurrency(FunctionName=functions['FunctionName'])14 try:15 print(cnt, functions['FunctionName'], response['ReservedConcurrentExecutions'] )16 except Exception as e:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful