How to use describe_fleet_instances method in localstack

Best Python code snippet using localstack_python

test_fleets.py

Source:test_fleets.py Github

copy

Full Screen

...72 fleet_config = fleet["LaunchTemplateConfigs"][0]73 launch_template_spec = fleet_config["LaunchTemplateSpecification"]74 launch_template_spec["LaunchTemplateId"].should.equal(launch_template_id)75 launch_template_spec["Version"].should.equal("1")76 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)77 instances = instance_res["ActiveInstances"]78 len(instances).should.equal(1)79@mock_ec280def test_create_on_demand_fleet():81 conn = boto3.client("ec2", region_name="us-west-2")82 launch_template_id, _ = get_launch_template(conn)83 fleet_res = conn.create_fleet(84 ExcessCapacityTerminationPolicy="terminate",85 LaunchTemplateConfigs=[86 {87 "LaunchTemplateSpecification": {88 "LaunchTemplateId": launch_template_id,89 "Version": "1",90 },91 },92 ],93 TargetCapacitySpecification={94 "DefaultTargetCapacityType": "on-demand",95 "OnDemandTargetCapacity": 1,96 "SpotTargetCapacity": 0,97 "TotalTargetCapacity": 1,98 },99 OnDemandOptions={100 "AllocationStrategy": "lowestPrice",101 },102 Type="maintain",103 ValidFrom="2020-01-01T00:00:00Z",104 ValidUntil="2020-12-31T00:00:00Z",105 )106 fleet_id = fleet_res["FleetId"]107 fleet_id.should.be.a(str)108 fleet_id.should.have.length_of(42)109 fleets_res = conn.describe_fleets(FleetIds=[fleet_id])110 fleets_res.should.have.key("Fleets")111 fleets = fleets_res["Fleets"]112 fleet = fleets[0]113 fleet["FleetState"].should.equal("active")114 fleet_config = fleet["LaunchTemplateConfigs"][0]115 launch_template_spec = fleet_config["LaunchTemplateSpecification"]116 launch_template_spec["LaunchTemplateId"].should.equal(launch_template_id)117 launch_template_spec["Version"].should.equal("1")118 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)119 instances = instance_res["ActiveInstances"]120 len(instances).should.equal(1)121@mock_ec2122def test_create_diversified_spot_fleet():123 conn = boto3.client("ec2", region_name="us-west-2")124 launch_template_id_1, _ = get_launch_template(conn, instance_type="t2.small")125 launch_template_id_2, _ = get_launch_template(conn, instance_type="t2.large")126 fleet_res = conn.create_fleet(127 ExcessCapacityTerminationPolicy="terminate",128 LaunchTemplateConfigs=[129 {130 "LaunchTemplateSpecification": {131 "LaunchTemplateId": launch_template_id_1,132 "Version": "1",133 },134 },135 {136 "LaunchTemplateSpecification": {137 "LaunchTemplateId": launch_template_id_2,138 "Version": "1",139 },140 },141 ],142 TargetCapacitySpecification={143 "DefaultTargetCapacityType": "spot",144 "OnDemandTargetCapacity": 0,145 "SpotTargetCapacity": 2,146 "TotalTargetCapacity": 2,147 },148 SpotOptions={149 "AllocationStrategy": "diversified",150 },151 Type="maintain",152 ValidFrom="2020-01-01T00:00:00Z",153 ValidUntil="2020-12-31T00:00:00Z",154 )155 fleet_id = fleet_res["FleetId"]156 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)157 instances = instance_res["ActiveInstances"]158 len(instances).should.equal(2)159 instance_types = set([instance["InstanceType"] for instance in instances])160 instance_types.should.equal(set(["t2.small", "t2.large"]))161 instances[0]["InstanceId"].should.contain("i-")162@mock_ec2163@pytest.mark.parametrize(164 "spot_allocation_strategy",165 [166 "diversified",167 "lowest-price",168 "capacity-optimized",169 "capacity-optimized-prioritized",170 ],171)172@pytest.mark.parametrize(173 "on_demand_allocation_strategy",174 ["lowestPrice", "prioritized"],175)176def test_request_fleet_using_launch_template_config__name(177 spot_allocation_strategy, on_demand_allocation_strategy178):179 conn = boto3.client("ec2", region_name="us-east-2")180 _, launch_template_name = get_launch_template(conn, instance_type="t2.medium")181 fleet_res = conn.create_fleet(182 LaunchTemplateConfigs=[183 {184 "LaunchTemplateSpecification": {185 "LaunchTemplateName": launch_template_name,186 "Version": "1",187 },188 },189 ],190 TargetCapacitySpecification={191 "DefaultTargetCapacityType": "spot",192 "OnDemandTargetCapacity": 1,193 "SpotTargetCapacity": 2,194 "TotalTargetCapacity": 3,195 },196 SpotOptions={197 "AllocationStrategy": spot_allocation_strategy,198 },199 OnDemandOptions={200 "AllocationStrategy": on_demand_allocation_strategy,201 },202 Type="maintain",203 ValidFrom="2020-01-01T00:00:00Z",204 ValidUntil="2020-12-31T00:00:00Z",205 )206 fleet_id = fleet_res["FleetId"]207 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)208 instances = instance_res["ActiveInstances"]209 len(instances).should.equal(3)210 instance_types = set([instance["InstanceType"] for instance in instances])211 instance_types.should.equal(set(["t2.medium"]))212 instances[0]["InstanceId"].should.contain("i-")213@mock_ec2214def test_create_fleet_request_with_tags():215 conn = boto3.client("ec2", region_name="us-west-2")216 launch_template_id, _ = get_launch_template(conn)217 tags = [218 {"Key": "Name", "Value": "test-fleet"},219 {"Key": "Another", "Value": "tag"},220 ]221 tags_instance = [222 {"Key": "test", "Value": "value"},223 {"Key": "Name", "Value": "test"},224 ]225 fleet_res = conn.create_fleet(226 DryRun=False,227 SpotOptions={228 "AllocationStrategy": "lowestPrice",229 "InstanceInterruptionBehavior": "terminate",230 },231 LaunchTemplateConfigs=[232 {233 "LaunchTemplateSpecification": {234 "LaunchTemplateId": launch_template_id,235 "Version": "1",236 },237 },238 ],239 TargetCapacitySpecification={240 "DefaultTargetCapacityType": "spot",241 "OnDemandTargetCapacity": 1,242 "SpotTargetCapacity": 2,243 "TotalTargetCapacity": 3,244 },245 Type="request",246 ValidFrom="2020-01-01T00:00:00Z",247 ValidUntil="2020-12-31T00:00:00Z",248 TagSpecifications=[249 {250 "ResourceType": "fleet",251 "Tags": tags,252 },253 ],254 )255 fleet_id = fleet_res["FleetId"]256 fleets = conn.describe_fleets(FleetIds=[fleet_id])["Fleets"]257 fleets[0]["Tags"].should.equal(tags)258 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)259 instances = conn.describe_instances(260 InstanceIds=[i["InstanceId"] for i in instance_res["ActiveInstances"]]261 )262 for instance in instances["Reservations"][0]["Instances"]:263 for tag in tags_instance:264 instance["Tags"].should.contain(tag)265@mock_ec2266def test_create_fleet_using_launch_template_config__overrides():267 conn = boto3.client("ec2", region_name="us-east-2")268 subnet_id = get_subnet_id(conn)269 template_id, _ = get_launch_template(conn, instance_type="t2.medium")270 template_config_overrides = [271 {272 "InstanceType": "t2.nano",273 "SubnetId": subnet_id,274 "AvailabilityZone": "us-west-1",275 "WeightedCapacity": 2,276 }277 ]278 fleet_res = conn.create_fleet(279 LaunchTemplateConfigs=[280 {281 "LaunchTemplateSpecification": {282 "LaunchTemplateId": template_id,283 "Version": "1",284 },285 "Overrides": template_config_overrides,286 },287 ],288 TargetCapacitySpecification={289 "DefaultTargetCapacityType": "spot",290 "OnDemandTargetCapacity": 1,291 "SpotTargetCapacity": 0,292 "TotalTargetCapacity": 1,293 },294 SpotOptions={295 "AllocationStrategy": "lowest-price",296 "InstanceInterruptionBehavior": "terminate",297 },298 Type="maintain",299 ValidFrom="2020-01-01T00:00:00Z",300 ValidUntil="2020-12-31T00:00:00Z",301 )302 fleet_id = fleet_res["FleetId"]303 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)304 instances = instance_res["ActiveInstances"]305 instances.should.have.length_of(1)306 instances[0].should.have.key("InstanceType").equals("t2.nano")307 instance = conn.describe_instances(308 InstanceIds=[i["InstanceId"] for i in instances]309 )["Reservations"][0]["Instances"][0]310 instance.should.have.key("SubnetId").equals(subnet_id)311@mock_ec2312def test_delete_fleet():313 conn = boto3.client("ec2", region_name="us-west-2")314 launch_template_id, _ = get_launch_template(conn)315 fleet_res = conn.create_fleet(316 LaunchTemplateConfigs=[317 {318 "LaunchTemplateSpecification": {319 "LaunchTemplateId": launch_template_id,320 "Version": "1",321 },322 },323 ],324 TargetCapacitySpecification={325 "DefaultTargetCapacityType": "spot",326 "OnDemandTargetCapacity": 1,327 "SpotTargetCapacity": 2,328 "TotalTargetCapacity": 3,329 },330 SpotOptions={331 "AllocationStrategy": "lowestPrice",332 "InstanceInterruptionBehavior": "terminate",333 },334 Type="maintain",335 ValidFrom="2020-01-01T00:00:00Z",336 ValidUntil="2020-12-31T00:00:00Z",337 )338 fleet_id = fleet_res["FleetId"]339 delete_fleet_out = conn.delete_fleets(FleetIds=[fleet_id], TerminateInstances=True)340 delete_fleet_out["SuccessfulFleetDeletions"].should.have.length_of(1)341 delete_fleet_out["SuccessfulFleetDeletions"][0]["FleetId"].should.equal(fleet_id)342 delete_fleet_out["SuccessfulFleetDeletions"][0]["CurrentFleetState"].should.equal(343 "deleted"344 )345 fleets = conn.describe_fleets(FleetIds=[fleet_id])["Fleets"]346 len(fleets).should.equal(1)347 target_capacity_specification = fleets[0]["TargetCapacitySpecification"]348 target_capacity_specification.should.have.key("TotalTargetCapacity").equals(0)349 fleets[0]["FleetState"].should.equal("deleted")350 # Instances should be terminated351 instance_res = conn.describe_fleet_instances(FleetId=fleet_id)352 instances = instance_res["ActiveInstances"]353 len(instances).should.equal(0)354@mock_ec2355def test_describe_fleet_instences_api():356 conn = boto3.client("ec2", region_name="us-west-1")357 launch_template_id, _ = get_launch_template(conn)358 fleet_res = conn.create_fleet(359 LaunchTemplateConfigs=[360 {361 "LaunchTemplateSpecification": {362 "LaunchTemplateId": launch_template_id,363 "Version": "1",364 },365 },366 ],367 TargetCapacitySpecification={368 "DefaultTargetCapacityType": "spot",369 "OnDemandTargetCapacity": 1,370 "SpotTargetCapacity": 2,371 "TotalTargetCapacity": 3,372 },373 SpotOptions={374 "AllocationStrategy": "lowestPrice",375 "InstanceInterruptionBehavior": "terminate",376 },377 Type="maintain",378 ValidFrom="2020-01-01T00:00:00Z",379 ValidUntil="2020-12-31T00:00:00Z",380 )381 fleet_id = fleet_res["FleetId"]382 fleet_res = conn.describe_fleet_instances(FleetId=fleet_id)383 fleet_res["FleetId"].should.equal(fleet_id)384 fleet_res["ActiveInstances"].should.have.length_of(3)385 instance_ids = [i["InstanceId"] for i in fleet_res["ActiveInstances"]]386 for instance_id in instance_ids:387 instance_id.startswith("i-").should.be.true388 instance_types = [i["InstanceType"] for i in fleet_res["ActiveInstances"]]389 instance_types.should.equal(["t2.micro", "t2.micro", "t2.micro"])390 instance_healths = [i["InstanceHealth"] for i in fleet_res["ActiveInstances"]]391 instance_healths.should.equal(["healthy", "healthy", "healthy"])392@mock_ec2393def test_create_fleet_api():394 conn = boto3.client("ec2", region_name="us-west-1")395 launch_template_id, _ = get_launch_template(conn)396 fleet_res = conn.create_fleet(...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...103 for page in instance_id_pages104 for reservation in ec2.describe_instances(InstanceIds=page)["Reservations"]105 for instance in reservation["Instances"]106 ]107def ec2_describe_fleet_instances(fleet_id: str) -> List[FleetInstanceDict]:108 next_token = ""109 instances: List[FleetInstanceDict] = []110 while True:111 page = ec2.describe_fleet_instances(FleetId=fleet_id, NextToken=next_token, MaxResults=MAX_PAGE_SIZE)112 instances.extend(page["ActiveInstances"])113 next_token = page["NextToken"]114 if not next_token:115 break...

Full Screen

Full Screen

test_get_nodes.py

Source:test_get_nodes.py Github

copy

Full Screen

1from unittest.mock import MagicMock2from unittest.mock import patch3import lobotomy4from kuber.latest import core_v15from manager import _configs6from manager import _controller7from manager import _types8def _create_node(9 name: str,10 fleet: str,11):12 """Creates a node for mocked testing."""13 node = core_v1.Node()14 with node.metadata as md:15 md.name = name16 md.labels.update(fleet=fleet)17 md.creation_timestamp = "2018-01-01T00:00:00Z"18 return node19@lobotomy.patch()20@patch("manager._controller.get_pods")21@patch("manager._controller._nodes.core_v1.Node.get_resource_api")22def test_get_nodes(23 get_resource_api: MagicMock,24 get_pods: MagicMock,25 lobotomized: lobotomy.Lobotomy,26):27 """..."""28 get_pods.return_value = []29 api = MagicMock()30 api.list_node.return_value = MagicMock(31 items=[_create_node("a", "primary-small"), _create_node("b", "primary-large")]32 )33 get_resource_api.return_value = api34 configs = _types.ManagerConfigs()35 configs.fleets.append(36 _types.FleetRequirements(37 configs=configs,38 sector="primary",39 size_spec=_types.SMALL_MEMORY_SPEC,40 )41 )42 fleet = _types.Fleet(configs.fleets[0], "fleet-identifier", 1, {})43 # Client for describing fleet instances that may not be in the44 # cluster at the moment.45 lobotomized.add_call(46 "ec2",47 "describe_fleet_instances",48 {"ActiveInstances": [{"InstanceId": "a"}, {"InstanceId": "c"}]},49 )50 lobotomized.add_call(51 "ec2",52 "describe_instances",53 {54 "Reservations": [55 {56 "Instances": [57 {"PrivateDnsName": "c", "InstanceId": "c"},58 {"PrivateDnsName": "d", "InstanceId": "d"},59 ]60 }61 ]62 },63 )64 nodes = _controller.get_nodes(configs, fleet)65 assert nodes["a"].requirements.size == "small"66 assert "b" not in nodes, '"b" is not in the small fleet'67 assert nodes["c"].state == _configs.WARMING_UP_STATE...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful