Best Python code snippet using localstack_python
upload_server_certificate.py
Source:upload_server_certificate.py  
...80            if ExceptionHandler.client_exception(e):81                return e82        finally:83            counter += 184def upload_server_certificate(params):85    '''86    upload_server_certificateï¼ä¸ä¼ æå¡å¨è¯ä¹¦87    å®ç½APIåè龿¥: https://help.aliyun.com/document_detail/34181.html88    '''89    counter = 090    while counter < TRY_TIME:91        try:92            request = UploadServerCertificateRequest.UploadServerCertificateRequest()93            # è¦ä¸ä¼ çå
¬é¥è¯ä¹¦94            request.set_ServerCertificate(params["server_certificate"])95            # éè¦ä¸ä¼ çç§é¥96            request.set_PrivateKey(params["private_key"])97            # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®98            response = ACS_CLIENT.do_action_with_exception(request)99            response_json = json.loads(response)100            # è¿åç»æ101            return response_json102        except ServerException as e:103            ExceptionHandler.server_exception(e)104        except ClientException as e:105            ExceptionHandler.client_exception(e)106        finally:107            counter += 1108def create_https_listener(params):109    '''110    create_https_listenerï¼å建HTTPSçå¬111    å®ç½APIåè龿¥: https://help.aliyun.com/document_detail/27593.html112    '''113    counter = 0114    while counter < TRY_TIME:115        try:116            request = CreateLoadBalancerHTTPSListenerRequest.CreateLoadBalancerHTTPSListenerRequest()117            # è´è½½åè¡¡å®ä¾çID118            request.set_LoadBalancerId(params["load_balancer_id"])119            # çå¬ç带宽峰å¼120            request.set_Bandwidth(params["bandwidth"])121            # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£122            request.set_ListenerPort(params["listener_port"])123            # æ¯å¦å¼å¯å¥åº·æ£æ¥124            request.set_HealthCheck(params["health_check"])125            # æ¯å¦å¼å¯ä¼è¯ä¿æ126            request.set_StickySession(params["sticky_session"])127            # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£128            request.set_BackendServerPort(params["backend_server_port"])129            # æå¡å¨è¯ä¹¦çID130            request.set_ServerCertificateId(params["server_certificate_id"])131            # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®132            response = ACS_CLIENT.do_action_with_exception(request)133            response_json = json.loads(response)134            # è¿åç»æ135            return response_json136        except ServerException as e:137            ExceptionHandler.server_exception(e)138        except ClientException as e:139            ExceptionHandler.client_exception(e)140        finally:141            counter += 1142def set_https_listener_attribute(params):143    '''144    set_https_listener_attributeï¼ä¿®æ¹HTTPSçå¬çé
ç½®145    å®ç½APIåè龿¥: https://help.aliyun.com/document_detail/27603.html146    '''147    counter = 0148    while counter < TRY_TIME:149        try:150            request = SetLoadBalancerHTTPSListenerAttributeRequest.SetLoadBalancerHTTPSListenerAttributeRequest()151            # è´è½½åè¡¡å®ä¾çID152            request.set_LoadBalancerId(params["load_balancer_id"])153            # çå¬ç带宽峰å¼154            request.set_Bandwidth(params["bandwidth"])155            # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£156            request.set_ListenerPort(params["listener_port"])157            # æ¯å¦å¼å¯å¥åº·æ£æ¥158            request.set_HealthCheck(params["health_check"])159            # æ¯å¦å¼å¯ä¼è¯ä¿æ160            request.set_StickySession(params["sticky_session"])161            # æå¡å¨è¯ä¹¦çID162            request.set_ServerCertificateId(params["server_certificate_id"])163            # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®164            response = ACS_CLIENT.do_action_with_exception(request)165            response_json = json.loads(response)166            # è¿åç»æ167            return response_json168        except ServerException as e:169            ExceptionHandler.server_exception(e)170        except ClientException as e:171            ExceptionHandler.client_exception(e)172        finally:173            counter += 1174def delete_load_balancer(load_balancer_id):175    '''176    delete_load_balancerï¼å é¤slbå®ä¾177    å®ç½APIåè龿¥ï¼https://help.aliyun.com/document_detail/27579.html178    '''179    try:180        # 设置å é¤SLBå®ä¾çè°ç¨åæ°181        request = DeleteLoadBalancerRequest.DeleteLoadBalancerRequest()182        # è´è½½åè¡¡å®ä¾çID183        request.set_LoadBalancerId(load_balancer_id)184        # åéè°ç¨è¯·æ±å¹¶æ¥æ¶è¿åæ°æ®185        response = ACS_CLIENT.do_action_with_exception(request)186        response_json = json.loads(response)187        # è¿åç»æ188        return response_json189    except ServerException as e:190        ExceptionHandler.server_exception(e)191    except ClientException as e:192        ExceptionHandler.client_exception(e)193def main():194    params = {}195    # 设置å建SLBå®ä¾çåæ°196    # 设置æ°å»ºSLBå®ä¾ç主å¯ç¨åºä¸ºcn-zhangjiakou-a197    params["master_zone_id"] = "cn-zhangjiakou-a"198    # 设置æ°å»ºSLBå®ä¾ç主å¯ç¨åºä¸ºcn-zhangjiakou-b199    params["slave_zone_id"] = "cn-zhangjiakou-b"200    # 设置æ°å»ºSLBå®ä¾çå称为SLB1201    params["load_balancer_name"] = "SLB1"202    # 设置æ°å»ºSLBå®ä¾ç计费类å为æé计费203    params["pay_balancer_type"] = "PayOnDemand"204    # 设置添å å°é»è®¤æå¡å¨ç»çECSçå®ä¾IDåæé205    params["backend_servers"] = [{"ServerId": "i-8vbe8yixxxxxxxxxxxxxw", "Weight": "100"},206                                 {"ServerId": "i-8vbe8yxxxxxxxxxxxxxj9v", "Weight": "100"}]207    # 设置ä¸ä¼ æå¡å¨è¯ä¹¦çåæ°208    # 设置å建HTTPSçå¬çåæ°209    # å
³éå¥åº·æ£æ¥210    params["health_check"] = "off"211    # 设置çå¬ç带宽峰å¼212    params["bandwidth"] = 6213    # è´è½½åè¡¡å®ä¾å端使ç¨ç端å£214    params["listener_port"] = 80215    # 设置è´è½½åè¡¡å®ä¾å端使ç¨ç端å£216    params["backend_server_port"] = 443217    # å
³éä¼è¯ä¿æ218    params["sticky_session"] = "off"219    # å建slbå®ä¾220    # è·åcreate_load_balancer彿°è¿åå¼ï¼load_balancer_jsonä¸ºç»æçjson串221    load_balancer_json = create_load_balancer(params)222    # æå° load_balancer_jsonç»æï¼å
¶ä¸"create_load_balancer"æ¯å¯¹json串起çåå223    CommonUtil.log("create_load_balancer", load_balancer_json)224    # ä¸ä¼ æå¡å¨è¯ä¹¦225    # è¦ä¸ä¼ çå
¬é¥è¯ä¹¦226    params["server_certificate"] = "-----BEGIN CERTIFICATE-----xxxxxxx-----END CERTIFICATE-----"227    params["private_key"] = "-----BEGIN RSA PRIVATE KEY-----xxxxxxxxxxx-----END RSA PRIVATE KEY-----"228    result_json = upload_server_certificate(params)229    CommonUtil.log("upload_server_certificate", result_json)230    params["server_certificate_id"] = result_json["ServerCertificateId"]231    # slbå®ä¾id232    params["load_balancer_id"] = load_balancer_json["LoadBalancerId"]233    # å建httpsçå¬234    result_json = create_https_listener(params)235    CommonUtil.log("create_https_listener", result_json)236    # ä¸ä¼ æ°çæå¡å¨è¯ä¹¦237    # è¦ä¸ä¼ çæ°å
¬é¥è¯ä¹¦238    params["server_certificate"] = "-----BEGIN CERTIFICATE-----xxxxxxx-----END CERTIFICATE-----"239    params["private_key"] = "-----BEGIN RSA PRIVATE KEY-----xxxxxxxxxxx-----END RSA PRIVATE KEY-----"240    result_json = upload_server_certificate(params)241    CommonUtil.log("upload_server_certificate", result_json)242    params["server_certificate_id"] = result_json["ServerCertificateId"]243    # ä¿®æ¹httpsçå¬é
ç½®244    result_json = set_https_listener_attribute(params)245    CommonUtil.log("set_https_listener_attribute", result_json)246    # å é¤slbå®ä¾247    # å é¤è¿åçLoadBalancerId对åºçSLBå®ä¾248    load_balancer_json = delete_load_balancer(load_balancer_json["LoadBalancerId"])249    # æå° load_balancer_jsonç»æ250    CommonUtil.log("delete_load_balancer", load_balancer_json)251if __name__ == "__main__":...pipelines.py
Source:pipelines.py  
1import logging2from broker.tasks import alb, cloudfront, update_operations, iam, letsencrypt, route533from broker.tasks.huey import huey4logger = logging.getLogger(__name__)5def queue_all_alb_provision_tasks_for_operation(operation_id: int, correlation_id: str):6    if correlation_id is None:7        raise RuntimeError("correlation_id must be set")8    if operation_id is None:9        raise RuntimeError("operation_id must be set")10    correlation = {"correlation_id": correlation_id}11    task_pipeline = (12        letsencrypt.create_user.s(operation_id, **correlation)13        .then(letsencrypt.generate_private_key, operation_id, **correlation)14        .then(letsencrypt.initiate_challenges, operation_id, **correlation)15        .then(route53.create_TXT_records, operation_id, **correlation)16        .then(route53.wait_for_changes, operation_id, **correlation)17        .then(letsencrypt.answer_challenges, operation_id, **correlation)18        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)19        .then(iam.upload_server_certificate, operation_id, **correlation)20        .then(alb.select_alb, operation_id, **correlation)21        .then(alb.add_certificate_to_alb, operation_id, **correlation)22        .then(route53.create_ALIAS_records, operation_id, **correlation)23        .then(route53.wait_for_changes, operation_id, **correlation)24        .then(update_operations.provision, operation_id, **correlation)25    )26    huey.enqueue(task_pipeline)27def queue_all_alb_deprovision_tasks_for_operation(28    operation_id: int, correlation_id: str29):30    if correlation_id is None:31        raise RuntimeError("correlation_id must be set")32    if operation_id is None:33        raise RuntimeError("operation_id must be set")34    correlation = {"correlation_id": correlation_id}35    task_pipeline = (36        update_operations.cancel_pending_provisioning.s(operation_id, **correlation)37        .then(route53.remove_ALIAS_records, operation_id, **correlation)38        .then(route53.remove_TXT_records, operation_id, **correlation)39        .then(alb.remove_certificate_from_alb, operation_id, **correlation)40        .then(iam.delete_server_certificate, operation_id, **correlation)41        .then(update_operations.deprovision, operation_id, **correlation)42    )43    huey.enqueue(task_pipeline)44def queue_all_migration_deprovision_tasks_for_operation(45    operation_id: int, correlation_id: str46):47    if correlation_id is None:48        raise RuntimeError("correlation_id must be set")49    if operation_id is None:50        raise RuntimeError("operation_id must be set")51    correlation = {"correlation_id": correlation_id}52    task_pipeline = update_operations.deprovision.s(operation_id, **correlation)53    huey.enqueue(task_pipeline)54def queue_all_cdn_provision_tasks_for_operation(operation_id: int, correlation_id: str):55    if correlation_id is None:56        raise RuntimeError("correlation_id must be set")57    if operation_id is None:58        raise RuntimeError("operation_id must be set")59    correlation = {"correlation_id": correlation_id}60    task_pipeline = (61        letsencrypt.create_user.s(operation_id, **correlation)62        .then(letsencrypt.generate_private_key, operation_id, **correlation)63        .then(letsencrypt.initiate_challenges, operation_id, **correlation)64        .then(route53.create_TXT_records, operation_id, **correlation)65        .then(route53.wait_for_changes, operation_id, **correlation)66        .then(letsencrypt.answer_challenges, operation_id, **correlation)67        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)68        .then(iam.upload_server_certificate, operation_id, **correlation)69        .then(cloudfront.create_distribution, operation_id, **correlation)70        .then(cloudfront.wait_for_distribution, operation_id, **correlation)71        .then(route53.create_ALIAS_records, operation_id, **correlation)72        .then(route53.wait_for_changes, operation_id, **correlation)73        .then(update_operations.provision, operation_id, **correlation)74    )75    huey.enqueue(task_pipeline)76def queue_all_cdn_deprovision_tasks_for_operation(77    operation_id: int, correlation_id: str78):79    if correlation_id is None:80        raise RuntimeError("correlation_id must be set")81    if operation_id is None:82        raise RuntimeError("operation_id must be set")83    correlation = {"correlation_id": correlation_id}84    task_pipeline = (85        update_operations.cancel_pending_provisioning.s(operation_id, **correlation)86        .then(route53.remove_ALIAS_records, operation_id, **correlation)87        .then(route53.remove_TXT_records, operation_id, **correlation)88        .then(cloudfront.disable_distribution, operation_id, **correlation)89        .then(cloudfront.wait_for_distribution_disabled, operation_id, **correlation)90        .then(cloudfront.delete_distribution, operation_id=operation_id, **correlation)91        .then(iam.delete_server_certificate, operation_id, **correlation)92        .then(update_operations.deprovision, operation_id, **correlation)93    )94    huey.enqueue(task_pipeline)95def queue_all_alb_renewal_tasks_for_operation(operation_id, **kwargs):96    correlation = {"correlation_id": "Renewal"}97    task_pipeline = (98        letsencrypt.generate_private_key.s(operation_id, **correlation)99        .then(letsencrypt.initiate_challenges, operation_id, **correlation)100        .then(route53.create_TXT_records, operation_id, **correlation)101        .then(route53.wait_for_changes, operation_id, **correlation)102        .then(letsencrypt.answer_challenges, operation_id, **correlation)103        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)104        .then(iam.upload_server_certificate, operation_id, **correlation)105        .then(alb.select_alb, operation_id, **correlation)106        .then(alb.add_certificate_to_alb, operation_id, **correlation)107        .then(route53.create_ALIAS_records, operation_id, **correlation)108        .then(route53.wait_for_changes, operation_id, **correlation)109        .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)110        .then(iam.delete_previous_server_certificate, operation_id, **correlation)111        .then(update_operations.provision, operation_id, **correlation)112    )113    huey.enqueue(task_pipeline)114def queue_all_cdn_renewal_tasks_for_operation(operation_id, **kwargs):115    correlation = {"correlation_id": "Renewal"}116    task_pipeline = (117        letsencrypt.generate_private_key.s(operation_id, **correlation)118        .then(letsencrypt.initiate_challenges, operation_id, **correlation)119        .then(route53.create_TXT_records, operation_id, **correlation)120        .then(route53.wait_for_changes, operation_id, **correlation)121        .then(letsencrypt.answer_challenges, operation_id, **correlation)122        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)123        .then(iam.upload_server_certificate, operation_id, **correlation)124        .then(cloudfront.update_certificate, operation_id, **correlation)125        .then(iam.delete_previous_server_certificate, operation_id, **correlation)126        .then(update_operations.provision, operation_id, **correlation)127    )128    huey.enqueue(task_pipeline)129def queue_all_cdn_update_tasks_for_operation(operation_id, correlation_id):130    correlation = {"correlation_id": correlation_id}131    task_pipeline = (132        letsencrypt.generate_private_key.s(operation_id, **correlation)133        .then(letsencrypt.initiate_challenges, operation_id, **correlation)134        .then(route53.create_TXT_records, operation_id, **correlation)135        .then(route53.wait_for_changes, operation_id, **correlation)136        .then(letsencrypt.answer_challenges, operation_id, **correlation)137        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)138        .then(iam.upload_server_certificate, operation_id, **correlation)139        .then(cloudfront.update_distribution, operation_id, **correlation)140        .then(cloudfront.wait_for_distribution, operation_id, **correlation)141        .then(route53.create_ALIAS_records, operation_id, **correlation)142        .then(route53.wait_for_changes, operation_id, **correlation)143        .then(iam.delete_previous_server_certificate, operation_id, **correlation)144        .then(update_operations.update_complete, operation_id, **correlation)145    )146    huey.enqueue(task_pipeline)147def queue_all_alb_update_tasks_for_operation(operation_id, correlation_id):148    correlation = {"correlation_id": correlation_id}149    task_pipeline = (150        letsencrypt.generate_private_key.s(operation_id, **correlation)151        .then(letsencrypt.initiate_challenges, operation_id, **correlation)152        .then(route53.create_TXT_records, operation_id, **correlation)153        .then(route53.wait_for_changes, operation_id, **correlation)154        .then(letsencrypt.answer_challenges, operation_id, **correlation)155        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)156        .then(iam.upload_server_certificate, operation_id, **correlation)157        .then(alb.select_alb, operation_id, **correlation)158        .then(alb.add_certificate_to_alb, operation_id, **correlation)159        .then(route53.create_ALIAS_records, operation_id, **correlation)160        .then(route53.wait_for_changes, operation_id, **correlation)161        .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)162        .then(iam.delete_previous_server_certificate, operation_id, **correlation)163        .then(update_operations.provision, operation_id, **correlation)164    )165    huey.enqueue(task_pipeline)166def queue_all_cdn_broker_migration_tasks_for_operation(operation_id, correlation_id):167    correlation = {"correlation_id": correlation_id}168    task_pipeline = (169        cloudfront.remove_s3_bucket_from_cdn_broker_instance.s(170            operation_id, **correlation171        )172        .then(cloudfront.add_logging_to_bucket, operation_id, **correlation)173        .then(letsencrypt.create_user, operation_id, **correlation)174        .then(letsencrypt.generate_private_key, operation_id, **correlation)175        .then(letsencrypt.initiate_challenges, operation_id, **correlation)176        .then(route53.create_ALIAS_records, operation_id, **correlation)177        .then(route53.wait_for_changes, operation_id, **correlation)178        .then(route53.create_TXT_records, operation_id, **correlation)179        .then(route53.wait_for_changes, operation_id, **correlation)180        .then(letsencrypt.answer_challenges, operation_id, **correlation)181        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)182        .then(iam.upload_server_certificate, operation_id, **correlation)183        .then(cloudfront.update_certificate, operation_id, **correlation)184        .then(iam.delete_previous_server_certificate, operation_id, **correlation)185        .then(update_operations.provision, operation_id, **correlation)186    )187    huey.enqueue(task_pipeline)188def queue_all_domain_broker_migration_tasks_for_operation(operation_id, correlation_id):189    correlation = {"correlation_id": correlation_id}190    task_pipeline = (191        letsencrypt.create_user.s(operation_id, **correlation)192        .then(letsencrypt.generate_private_key, operation_id, **correlation)193        .then(letsencrypt.initiate_challenges, operation_id, **correlation)194        # create alias records here is probably not necessary, but belt + suspenders195        .then(route53.create_ALIAS_records, operation_id, **correlation)196        .then(route53.wait_for_changes, operation_id, **correlation)197        .then(route53.create_TXT_records, operation_id, **correlation)198        .then(route53.wait_for_changes, operation_id, **correlation)199        .then(letsencrypt.answer_challenges, operation_id, **correlation)200        .then(letsencrypt.retrieve_certificate, operation_id, **correlation)201        .then(iam.upload_server_certificate, operation_id, **correlation)202        .then(alb.select_alb, operation_id, **correlation)203        .then(alb.add_certificate_to_alb, operation_id, **correlation)204        .then(route53.create_ALIAS_records, operation_id, **correlation)205        .then(route53.wait_for_changes, operation_id, **correlation)206        .then(alb.remove_certificate_from_previous_alb, operation_id, **correlation)207        .then(iam.delete_previous_server_certificate, operation_id, **correlation)208        .then(update_operations.provision, operation_id, **correlation)209    )...test_iam_idempotent.py
Source:test_iam_idempotent.py  
...76    service_instance = CDNServiceInstance.query.get("1234")77    certificate = service_instance.new_certificate78    today = date.today().isoformat()79    assert today == simple_regex(r"^\d\d\d\d-\d\d-\d\d$")80    iam_commercial.expect_upload_server_certificate(81        name=f"{service_instance.id}-{today}-{certificate.id}",82        cert=certificate.leaf_pem,83        private_key=certificate.private_key_pem,84        chain=certificate.fullchain_pem,85        path="/cloudfront/external-domains-test/",86    )87    upload_server_certificate.call_local("4321")88    db.session.expunge_all()89    service_instance = CDNServiceInstance.query.get("1234")90    certificate = service_instance.new_certificate91    operation = Operation.query.get("4321")92    updated_at = operation.updated_at.timestamp()93    # unstubbed, so an error should be raised if we do try94    upload_server_certificate.call_local("4321")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
