Best Python code snippet using localstack_python
test_spot_fleet.py
Source:test_spot_fleet.py  
...73    spot_fleet_res = conn.request_spot_fleet(74        SpotFleetRequestConfig=spot_config(subnet_id)75    )76    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]77    spot_fleet_requests = conn.describe_spot_fleet_requests(78        SpotFleetRequestIds=[spot_fleet_id]79    )["SpotFleetRequestConfigs"]80    len(spot_fleet_requests).should.equal(1)81    spot_fleet_request = spot_fleet_requests[0]82    spot_fleet_request["SpotFleetRequestState"].should.equal("active")83    spot_fleet_config = spot_fleet_request["SpotFleetRequestConfig"]84    spot_fleet_config["SpotPrice"].should.equal("0.12")85    spot_fleet_config["TargetCapacity"].should.equal(6)86    spot_fleet_config["IamFleetRole"].should.equal(87        "arn:aws:iam::{}:role/fleet".format(ACCOUNT_ID)88    )89    spot_fleet_config["AllocationStrategy"].should.equal("lowestPrice")90    spot_fleet_config["FulfilledCapacity"].should.equal(6.0)91    len(spot_fleet_config["LaunchSpecifications"]).should.equal(2)92    launch_spec = spot_fleet_config["LaunchSpecifications"][0]93    launch_spec["EbsOptimized"].should.equal(False)94    launch_spec["SecurityGroups"].should.equal([{"GroupId": "sg-123"}])95    launch_spec["IamInstanceProfile"].should.equal(96        {"Arn": "arn:aws:iam::{}:role/fleet".format(ACCOUNT_ID)}97    )98    launch_spec["ImageId"].should.equal("ami-123")99    launch_spec["InstanceType"].should.equal("t2.small")100    launch_spec["KeyName"].should.equal("my-key")101    launch_spec["Monitoring"].should.equal({"Enabled": True})102    launch_spec["SpotPrice"].should.equal("0.13")103    launch_spec["SubnetId"].should.equal(subnet_id)104    launch_spec["UserData"].should.equal("some user data")105    launch_spec["WeightedCapacity"].should.equal(2.0)106    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)107    instances = instance_res["ActiveInstances"]108    len(instances).should.equal(3)109@mock_ec2110def test_create_diversified_spot_fleet():111    conn = boto3.client("ec2", region_name="us-west-2")112    subnet_id = get_subnet_id(conn)113    diversified_config = spot_config(subnet_id, allocation_strategy="diversified")114    spot_fleet_res = conn.request_spot_fleet(SpotFleetRequestConfig=diversified_config)115    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]116    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)117    instances = instance_res["ActiveInstances"]118    len(instances).should.equal(2)119    instance_types = set([instance["InstanceType"] for instance in instances])120    instance_types.should.equal(set(["t2.small", "t2.large"]))121    instances[0]["InstanceId"].should.contain("i-")122@mock_ec2123def test_create_spot_fleet_request_with_tag_spec():124    conn = boto3.client("ec2", region_name="us-west-2")125    subnet_id = get_subnet_id(conn)126    tag_spec = [127        {128            "ResourceType": "instance",129            "Tags": [130                {"Key": "tag-1", "Value": "foo"},131                {"Key": "tag-2", "Value": "bar"},132            ],133        }134    ]135    config = spot_config(subnet_id)136    config["LaunchSpecifications"][0]["TagSpecifications"] = tag_spec137    spot_fleet_res = conn.request_spot_fleet(SpotFleetRequestConfig=config)138    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]139    spot_fleet_requests = conn.describe_spot_fleet_requests(140        SpotFleetRequestIds=[spot_fleet_id]141    )["SpotFleetRequestConfigs"]142    spot_fleet_config = spot_fleet_requests[0]["SpotFleetRequestConfig"]143    spot_fleet_config["LaunchSpecifications"][0]["TagSpecifications"][0][144        "ResourceType"145    ].should.equal("instance")146    for tag in tag_spec[0]["Tags"]:147        spot_fleet_config["LaunchSpecifications"][0]["TagSpecifications"][0][148            "Tags"149        ].should.contain(tag)150    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)151    instances = conn.describe_instances(152        InstanceIds=[i["InstanceId"] for i in instance_res["ActiveInstances"]]153    )154    for instance in instances["Reservations"][0]["Instances"]:155        for tag in tag_spec[0]["Tags"]:156            instance["Tags"].should.contain(tag)157@mock_ec2158def test_cancel_spot_fleet_request():159    conn = boto3.client("ec2", region_name="us-west-2")160    subnet_id = get_subnet_id(conn)161    spot_fleet_res = conn.request_spot_fleet(162        SpotFleetRequestConfig=spot_config(subnet_id)163    )164    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]165    conn.cancel_spot_fleet_requests(166        SpotFleetRequestIds=[spot_fleet_id], TerminateInstances=True167    )168    spot_fleet_requests = conn.describe_spot_fleet_requests(169        SpotFleetRequestIds=[spot_fleet_id]170    )["SpotFleetRequestConfigs"]171    len(spot_fleet_requests).should.equal(0)172@mock_ec2173def test_modify_spot_fleet_request_up():174    conn = boto3.client("ec2", region_name="us-west-2")175    subnet_id = get_subnet_id(conn)176    spot_fleet_res = conn.request_spot_fleet(177        SpotFleetRequestConfig=spot_config(subnet_id)178    )179    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]180    conn.modify_spot_fleet_request(SpotFleetRequestId=spot_fleet_id, TargetCapacity=20)181    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)182    instances = instance_res["ActiveInstances"]183    len(instances).should.equal(10)184    spot_fleet_config = conn.describe_spot_fleet_requests(185        SpotFleetRequestIds=[spot_fleet_id]186    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]187    spot_fleet_config["TargetCapacity"].should.equal(20)188    spot_fleet_config["FulfilledCapacity"].should.equal(20.0)189@mock_ec2190def test_modify_spot_fleet_request_up_diversified():191    conn = boto3.client("ec2", region_name="us-west-2")192    subnet_id = get_subnet_id(conn)193    spot_fleet_res = conn.request_spot_fleet(194        SpotFleetRequestConfig=spot_config(subnet_id, allocation_strategy="diversified")195    )196    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]197    conn.modify_spot_fleet_request(SpotFleetRequestId=spot_fleet_id, TargetCapacity=19)198    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)199    instances = instance_res["ActiveInstances"]200    len(instances).should.equal(7)201    spot_fleet_config = conn.describe_spot_fleet_requests(202        SpotFleetRequestIds=[spot_fleet_id]203    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]204    spot_fleet_config["TargetCapacity"].should.equal(19)205    spot_fleet_config["FulfilledCapacity"].should.equal(20.0)206@mock_ec2207def test_modify_spot_fleet_request_down_no_terminate():208    conn = boto3.client("ec2", region_name="us-west-2")209    subnet_id = get_subnet_id(conn)210    spot_fleet_res = conn.request_spot_fleet(211        SpotFleetRequestConfig=spot_config(subnet_id)212    )213    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]214    conn.modify_spot_fleet_request(215        SpotFleetRequestId=spot_fleet_id,216        TargetCapacity=1,217        ExcessCapacityTerminationPolicy="noTermination",218    )219    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)220    instances = instance_res["ActiveInstances"]221    len(instances).should.equal(3)222    spot_fleet_config = conn.describe_spot_fleet_requests(223        SpotFleetRequestIds=[spot_fleet_id]224    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]225    spot_fleet_config["TargetCapacity"].should.equal(1)226    spot_fleet_config["FulfilledCapacity"].should.equal(6.0)227@mock_ec2228def test_modify_spot_fleet_request_down_odd():229    conn = boto3.client("ec2", region_name="us-west-2")230    subnet_id = get_subnet_id(conn)231    spot_fleet_res = conn.request_spot_fleet(232        SpotFleetRequestConfig=spot_config(subnet_id)233    )234    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]235    conn.modify_spot_fleet_request(SpotFleetRequestId=spot_fleet_id, TargetCapacity=7)236    conn.modify_spot_fleet_request(SpotFleetRequestId=spot_fleet_id, TargetCapacity=5)237    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)238    instances = instance_res["ActiveInstances"]239    len(instances).should.equal(3)240    spot_fleet_config = conn.describe_spot_fleet_requests(241        SpotFleetRequestIds=[spot_fleet_id]242    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]243    spot_fleet_config["TargetCapacity"].should.equal(5)244    spot_fleet_config["FulfilledCapacity"].should.equal(6.0)245@mock_ec2246def test_modify_spot_fleet_request_down():247    conn = boto3.client("ec2", region_name="us-west-2")248    subnet_id = get_subnet_id(conn)249    spot_fleet_res = conn.request_spot_fleet(250        SpotFleetRequestConfig=spot_config(subnet_id)251    )252    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]253    conn.modify_spot_fleet_request(SpotFleetRequestId=spot_fleet_id, TargetCapacity=1)254    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)255    instances = instance_res["ActiveInstances"]256    len(instances).should.equal(1)257    spot_fleet_config = conn.describe_spot_fleet_requests(258        SpotFleetRequestIds=[spot_fleet_id]259    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]260    spot_fleet_config["TargetCapacity"].should.equal(1)261    spot_fleet_config["FulfilledCapacity"].should.equal(2.0)262@mock_ec2263def test_modify_spot_fleet_request_down_no_terminate_after_custom_terminate():264    conn = boto3.client("ec2", region_name="us-west-2")265    subnet_id = get_subnet_id(conn)266    spot_fleet_res = conn.request_spot_fleet(267        SpotFleetRequestConfig=spot_config(subnet_id)268    )269    spot_fleet_id = spot_fleet_res["SpotFleetRequestId"]270    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)271    instances = instance_res["ActiveInstances"]272    conn.terminate_instances(InstanceIds=[i["InstanceId"] for i in instances[1:]])273    conn.modify_spot_fleet_request(274        SpotFleetRequestId=spot_fleet_id,275        TargetCapacity=1,276        ExcessCapacityTerminationPolicy="noTermination",277    )278    instance_res = conn.describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_id)279    instances = instance_res["ActiveInstances"]280    len(instances).should.equal(1)281    spot_fleet_config = conn.describe_spot_fleet_requests(282        SpotFleetRequestIds=[spot_fleet_id]283    )["SpotFleetRequestConfigs"][0]["SpotFleetRequestConfig"]284    spot_fleet_config["TargetCapacity"].should.equal(1)285    spot_fleet_config["FulfilledCapacity"].should.equal(2.0)286@mock_ec2287def test_create_spot_fleet_without_spot_price():288    conn = boto3.client("ec2", region_name="us-west-2")289    subnet_id = get_subnet_id(conn)290    # remove prices to force a fallback to ondemand price291    spot_config_without_price = spot_config(subnet_id)292    del spot_config_without_price["SpotPrice"]293    for spec in spot_config_without_price["LaunchSpecifications"]:294        del spec["SpotPrice"]295    spot_fleet_id = conn.request_spot_fleet(296        SpotFleetRequestConfig=spot_config_without_price297    )["SpotFleetRequestId"]298    spot_fleet_requests = conn.describe_spot_fleet_requests(299        SpotFleetRequestIds=[spot_fleet_id]300    )["SpotFleetRequestConfigs"]301    len(spot_fleet_requests).should.equal(1)302    spot_fleet_request = spot_fleet_requests[0]303    spot_fleet_config = spot_fleet_request["SpotFleetRequestConfig"]304    len(spot_fleet_config["LaunchSpecifications"]).should.equal(2)305    launch_spec1 = spot_fleet_config["LaunchSpecifications"][0]306    launch_spec2 = spot_fleet_config["LaunchSpecifications"][1]307    # AWS will figure out the price308    assert "SpotPrice" not in launch_spec1...ec2.py
Source:ec2.py  
...41        message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,42                'METHOD': stack_trace, 'EXCEPTION': str(e)}43        return message44        45    def describe_spot_fleet_requests(self, **kwargs):46        method = inspect.stack()[0][3]47        self.logger.info('Executing function {}'.format(method))48        try:49            if kwargs.get('SpotFleetRequestIds') is not None:50                response = self.ec2_client.describe_spot_fleet_requests(51                    SpotFleetRequestIds=[52                        kwargs['SpotFleetRequestIds'],53                    ]54                )55                return response56            else:57                response = self.ec2_client.describe_spot_fleet_requests(58                )59                return response60        except Exception as e:61            self.logger.exception(self.error_message(method, e))62            raise63    def modify_spot_fleet_request(self, SpotFleetRequestId, TargetCapacity):64        method = inspect.stack()[0][3]65        self.logger.info('Executing function {}'.format(method))66        try:67            response = self.ec2_client.modify_spot_fleet_request(68                SpotFleetRequestId=SpotFleetRequestId,69                TargetCapacity=TargetCapacity70            )71            return response72        except Exception as e:73            self.logger.exception(self.error_message(method, e))74            raise75    def wait_sfid_ready(self, SpotFleetRequestId):76        method = inspect.stack()[0][3]77        self.logger.info('Executing function {}'.format(method))78        try:79            for i in range(100):80                response = self.ec2_client.describe_spot_fleet_requests(81                    SpotFleetRequestIds=[82                        SpotFleetRequestId83                    ]84                )85                for fleets in response['SpotFleetRequestConfigs']:86                    if fleets['SpotFleetRequestState'] == 'active':87                        return88                    else:89                        time.sleep(3)90                        self.logger.info('waiting for spotfleet request to become active')91                        i += 192            self.logger.info('waiting for spotfleet request to become active')93        except Exception as e:94            self.logger.exception(self.error_message(method, e))...lambdaEc2SpotFleetsWatcher.py
Source:lambdaEc2SpotFleetsWatcher.py  
...10    """11    """12    client = boto3.client('ec2')13    if next_token is not None:14        fleets_r = client.describe_spot_fleet_requests(NextToken=next_token)15        print ("Made a call with a next token")16    else:17        fleets_r = client.describe_spot_fleet_requests(MaxResults=1)18        print ("Made a call without next token")19    if 'NextToken' in fleets_r.keys():20        return get_all_fleets(fleets_r['SpotFleetRequestConfigs'], fleets_r['NextToken'], loop)21    else:22        fleet_list += fleets_r['SpotFleetRequestConfigs']23    if loop:24        return25    print (fleet_list[0].keys())26    return fleet_list27def get_all_spot_requests():28    """29    """30    client = boto3.client('ec2')31    requests = client.describe_spot_instance_requests()['SpotInstanceRequests']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
