How to use list_aggregates method in tempest

Best Python code snippet using tempest_python

server_aggregation.py

Source:server_aggregation.py Github

copy

Full Screen

1#!/usr/bin/env python32import sys3import cv24import math5import threading6import asyncio7import websockets8import json9from camera import *10from vector2d import Vector2D11import itertools12import random13import angles14import time15import math16import numpy as np17red = (0, 0, 255)18green = (0, 255, 0)19magenta = (255, 0, 255)20cyan = (255, 255, 0)21yellow = (50, 255, 255)22black = (0, 0, 0)23white = (255, 255, 255)24class Tag:25 def __init__(self, id, raw_tag):26 self.id = id27 self.raw_tag = raw_tag28 self.corners = raw_tag.tolist()[0]29 # Individual corners (e.g. tl = top left corner in relation to the tag, not the camera)30 self.tl = Vector2D(int(self.corners[0][0]), int(self.corners[0][1])) # Top left31 self.tr = Vector2D(int(self.corners[1][0]), int(self.corners[1][1])) # Top right32 self.br = Vector2D(int(self.corners[2][0]), int(self.corners[2][1])) # Bottom right33 self.bl = Vector2D(int(self.corners[3][0]), int(self.corners[3][1])) # Bottom left34 # Calculate centre of the tag35 self.centre = Vector2D(int((self.tl.x + self.tr.x + self.br.x + self.bl.x) / 4),36 int((self.tl.y + self.tr.y + self.br.y + self.bl.y) / 4))37 # Calculate centre of top of tag38 self.front = Vector2D(int((self.tl.x + self.tr.x) / 2),39 int((self.tl.y + self.tr.y) / 2))40 # Calculate orientation of tag41 self.forward = math.atan2(self.front.y - self.centre.y, self.front.x - self.centre.x) # Forward vector42 self.angle = math.degrees(self.forward) # Angle between forward vector and x-axis43class Robot:44 def __init__(self, tag, position):45 self.tag = tag46 self.id = tag.id47 self.position = position48 self.orientation = tag.angle49 self.sensor_range = 0.15 # 15cm sensing radius # TODO please adjust this when the aggregation is too easy or too difficult50 self.neighbours = {}51 self.tasks = {}52class SensorReading:53 def __init__(self, range, bearing, orientation=0):54 self.range = range55 self.bearing = bearing56 self.orientation = orientation57class Task:58 def __init__(self, id, workers, position, radius, time_limit):59 self.id = id60 self.workers = workers61 self.position = position62 self.radius = radius63 self.time_limit = time_limit64 self.counter = time_limit65 self.completed = False66 self.failed = False67 self.start_time = time.time()68class Tracker(threading.Thread):69 def __init__(self):70 threading.Thread.__init__(self)71 self.camera = Camera()72 self.calibrated = False73 self.num_corner_tags = 0 # Needs to be defined74 self.min_x = 0 # In pixels75 self.min_y = 0 # In pixels76 self.max_x = 0 # In pixels77 self.max_y = 0 # In pixels78 self.centre = Vector2D(0, 0) # In metres79 self.corner_distance_metres = 2.06 # Euclidean distance between corner tags in metres80 self.corner_distance_pixels = 081 self.scale_factor = 0.082 self.robots = {}83 self.tasks = {}84 self.list_aggregates = []85 self.max_aggregates = 086 self.aggregation_times = {}87 88 self.max_timer = 0.089 self.max_end_time = 0.090 self.tmp_max_time = 0.091 self.total_max_time = 0.092 self.is_max = False93 94 self.task_counter = 095 self.score = 096 self.result = 0.097 self.experiment_time = 3098 def run(self):99 start_time = time.time()100 self.team_name = input(f"Please enter your team name ^_^: ")101 while True: #(time.time() - start_time < self.experiment_time):102 #print(f"time.time() - start_time: {time.time() - start_time}") 103 print(f"self.total_max_time: {self.total_max_time + self.tmp_max_time}")104 image = self.camera.get_frame()105 overlay = image.copy()106 107 aruco_dictionary = cv2.aruco.Dictionary_get(cv2.aruco.DICT_4X4_100)108 aruco_parameters = cv2.aruco.DetectorParameters_create()109 (raw_tags, tag_ids, rejected) = cv2.aruco.detectMarkers(image, aruco_dictionary, parameters=aruco_parameters)110 111 self.robots = {} # Clear dictionary every frame in case robots have disappeared112 # Check whether any tags were detected in this camera frame113 if tag_ids is not None and len(tag_ids.tolist()) > 0:114 tag_ids = list(itertools.chain(*tag_ids))115 tag_ids = [int(id) for id in tag_ids] # Convert from numpy.int32 to int116 #print(f"tag_ids: {tag_ids}")117 # Process raw ArUco output118 for id, raw_tag in zip(tag_ids, raw_tags):119 tag = Tag(id, raw_tag)120 if self.calibrated:121 if tag.id != 0: # Reserved tag ID for corners122 position = Vector2D(tag.centre.x / self.scale_factor, tag.centre.y / self.scale_factor) # Convert pixel coordinates to metres123 self.robots[id] = Robot(tag, position)124 else: # Only calibrate the first time two corner tags are detected125 126 if tag.id == 0: # Reserved tag ID for corners127 if self.num_corner_tags == 0: # Record the first corner tag detected128 self.min_x = tag.centre.x129 self.max_x = tag.centre.x130 self.min_y = tag.centre.y131 self.max_y = tag.centre.y132 else: # Set min/max boundaries of arena based on second corner tag detected133 if tag.centre.x < self.min_x:134 self.min_x = tag.centre.x135 if tag.centre.x > self.max_x:136 self.max_x = tag.centre.x137 if tag.centre.y < self.min_y:138 self.min_y = tag.centre.y139 if tag.centre.y > self.max_y:140 self.max_y = tag.centre.y141 self.corner_distance_pixels = math.dist([self.min_x, self.min_y], [self.max_x, self.max_y]) # Euclidean distance between corner tags in pixels142 self.scale_factor = self.corner_distance_pixels / self.corner_distance_metres143 x = ((self.max_x - self.min_x) / 2) / self.scale_factor # Convert to metres144 y = ((self.max_y - self.min_y) / 2) / self.scale_factor # Convert to metres145 self.centre = Vector2D(x, y)146 self.calibrated = True147 self.num_corner_tags = self.num_corner_tags + 1148 if self.calibrated:149 # Draw boundary of virtual environment based on corner tag positions150 cv2.rectangle(image, (self.min_x, self.min_y), (self.max_x, self.max_y), green, 1, lineType=cv2.LINE_AA)151 152 # Process robots153 self.list_aggregates = [] # Initialise list_aggregates154 for id, robot in self.robots.items():155 for other_id, other_robot in self.robots.items():156 if id != other_id: # Don't check this robot against itself157 range = robot.position.distance_to(other_robot.position)158 if range < robot.sensor_range:159 absolute_bearing = math.degrees(math.atan2(other_robot.position.y - robot.position.y, other_robot.position.x - robot.position.x))160 relative_bearing = absolute_bearing - robot.orientation161 normalised_bearing = angles.normalize(relative_bearing, -180, 180)162 robot.neighbours[other_id] = SensorReading(range, normalised_bearing, other_robot.orientation)163 # Draw tag164 tag = robot.tag165 # Draw border of tag166 cv2.line(image, (tag.tl.x, tag.tl.y), (tag.tr.x, tag.tr.y), green, 1, lineType=cv2.LINE_AA)167 cv2.line(image, (tag.tr.x, tag.tr.y), (tag.br.x, tag.br.y), green, 1, lineType=cv2.LINE_AA)168 cv2.line(image, (tag.br.x, tag.br.y), (tag.bl.x, tag.bl.y), green, 1, lineType=cv2.LINE_AA)169 cv2.line(image, (tag.bl.x, tag.bl.y), (tag.tl.x, tag.tl.y), green, 1, lineType=cv2.LINE_AA)170 171 # Draw circle on centre point172 cv2.circle(image, (tag.centre.x, tag.centre.y), 5, red, -1, lineType=cv2.LINE_AA)173 # Draw robot's sensor range174 sensor_range_pixels = int(robot.sensor_range * self.scale_factor)175 cv2.circle(overlay, (tag.centre.x, tag.centre.y), sensor_range_pixels, magenta, -1, lineType=cv2.LINE_AA)176 # Draw lines between robots if they are within sensor range177 # Add the aggregating robot in a list178 # ------------------------ Seongin -------------------------------------------179 for neighbour_id in robot.neighbours.keys():180 neighbour = self.robots[neighbour_id]181 cv2.line(image, (tag.centre.x, tag.centre.y), (neighbour.tag.centre.x, neighbour.tag.centre.y), black, 10, lineType=cv2.LINE_AA)182 cv2.line(image, (tag.centre.x, tag.centre.y), (neighbour.tag.centre.x, neighbour.tag.centre.y), cyan, 3, lineType=cv2.LINE_AA)183 is_find_id = False184 for list_agg_id, aggregates in enumerate(self.list_aggregates): ##185 if id in aggregates:186 is_find_id = True187 if neighbour_id not in aggregates:188 self.list_aggregates[list_agg_id].append(neighbour_id)189 else:190 pass191 else:192 pass193 if is_find_id == False:194 self.list_aggregates.append([id])195 # ------------------------ Seongin -------------------------------------------196 if len(self.list_aggregates) > 1:197 for i, aggregates in enumerate(self.list_aggregates):198 self.list_aggregates[i].sort() # sort the list to compare with other lists199 #print(f"self.list_aggregates[i]: {self.list_aggregates[i]}")200 201 self.list_aggregates = [ii for n, ii in enumerate(self.list_aggregates) if ii not in self.list_aggregates[:n]] 202 is_all_one = True203 #print(f"list_aggregates: {self.list_aggregates}")204 for i, aggregates in enumerate(self.list_aggregates):205 #print(f"len(aggregates): {len(aggregates)}")206 if len(aggregates) > 1:207 is_all_one = False208 #print(f"is_all_one: {is_all_one}")209 if is_all_one == True:210 self.list_aggregates = [np.array(self.list_aggregates).squeeze().tolist()]211 212 # print(f"list_aggregates: {self.list_aggregates}")213 # print(f"self.robots: {self.robots}")214 215 216 for id, robot in self.robots.items():217 tag = robot.tag218 # Draw line from centre point to front of tag219 forward_point = ((tag.front - tag.centre) * 2) + tag.centre220 cv2.line(image, (tag.centre.x, tag.centre.y), (forward_point.x, forward_point.y), black, 10, lineType=cv2.LINE_AA)221 cv2.line(image, (tag.centre.x, tag.centre.y), (forward_point.x, forward_point.y), green, 3, lineType=cv2.LINE_AA)222 # Draw tag ID223 text = str(tag.id)224 font = cv2.FONT_HERSHEY_SIMPLEX225 font_scale = 1.5226 thickness = 4227 textsize = cv2.getTextSize(text, font, font_scale, thickness)[0]228 position = (int(tag.centre.x - textsize[0]/2), int(tag.centre.y + textsize[1]/2))229 cv2.putText(image, text, position, font, font_scale, black, thickness * 3, cv2.LINE_AA)230 cv2.putText(image, text, position, font, font_scale, white, thickness, cv2.LINE_AA)231 232 # TODO I need to fix a bug. when the list changes it does not work. 233 # ------------------------ Seongin -------------------------------------------234 for i, aggregates in enumerate(self.list_aggregates):235 x_s = []236 y_s = []237 for id in aggregates:238 x_s.append(self.robots[id].position.x)239 y_s.append(self.robots[id].position.y)240 pos_center = [sum(x_s)/len(x_s), sum(y_s)/len(y_s)]241 radius = max([math.sqrt((x-pos_center[0])**2 + (y-pos_center[1])**2) for x, y in zip(x_s, y_s)])242 243 #print(f"x_s: {x_s}")244 #print(f"pos_centre: {pos_center}")245 #print(f"radius: {radius}")246 cv2.circle(image, (int(pos_center[0] * self.scale_factor), int(pos_center[1] *self.scale_factor)), int(radius * self.scale_factor), yellow, 3, lineType=cv2.LINE_AA)247 248 # ------------------------ Seongin -------------------------------------------249 current_max_aggregates = max([len(i) for i in self.list_aggregates]) if len(self.list_aggregates) != 0 else 0250 if current_max_aggregates > 0:251 if current_max_aggregates >= self.max_aggregates and self.is_max == False: # when current size of aggregate is larger or equal than previous ones. 252 #print(f"self.max_aggregates: {self.max_aggregates}, current_max_aggregates: {current_max_aggregates}")253 #print(f"Max activated")254 if current_max_aggregates > self.max_aggregates: # When we get bigger cluster, we reset the total max aggregation time, and start counting new time255 self.total_max_time = 0.0256 self.max_aggregates = current_max_aggregates257 self.is_max = True258 self.max_timer = time.time()259 260 elif current_max_aggregates == self.max_aggregates and self.is_max == True: # Stable state of the cluster, only time will be accumulated261 #print(f"self.max_aggregates: {self.max_aggregates}, current_max_aggregates: {current_max_aggregates}")262 self.tmp_max_time = time.time() - self.max_timer # duration of the aggregation263 #print(self.tmp_max_time)264 #print(f"counting tmp_max_time") 265 elif current_max_aggregates != self.max_aggregates and self.is_max == True: # when the aggregate changes either in a decreasing or increasing manner266 #print(f"self.max_aggregates: {self.max_aggregates}, current_max_aggregates: {current_max_aggregates}")267 self.is_max = False268 self.max_end_time = time.time() 269 self.total_max_time += self.tmp_max_time270 self.tmp_max_time = 0.0271 272 #print(f"max finished")273 274 #print(f"duration of agg: {self.max_timer - self.max_end_time }") 275 276 # Create any new tasks, if necessary277 while len(self.tasks) < 3:278 id = self.task_counter279 placed = False280 while not placed:281 overlaps = False282 workers = random.randint(1, 5)283 radius = math.sqrt(workers) * 0.1284 min_x_metres = self.min_x / self.scale_factor285 max_x_metres = self.max_x / self.scale_factor286 min_y_metres = self.min_y / self.scale_factor287 max_y_metres = self.max_y / self.scale_factor288 x = random.uniform(min_x_metres + radius, max_x_metres - radius)289 y = random.uniform(min_y_metres + radius, max_y_metres - radius)290 position = Vector2D(x, y) # In metres291 for other_task in self.tasks.values():292 overlap = radius + other_task.radius293 if position.distance_to(other_task.position) < overlap:294 overlaps = True295 296 if not overlaps:297 placed = True298 time_limit = 20 * workers # 20 seconds per robot299 self.tasks[id] = Task(id, workers, position, radius, time_limit)300 self.task_counter = self.task_counter + 1301 # Iterate over tasks302 for task_id, task in self.tasks.items():303 task.robots = []304 # Check whether robot is within range305 for robot_id, robot in self.robots.items():306 distance = task.position.distance_to(robot.position)307 if distance < robot.sensor_range:308 absolute_bearing = math.degrees(math.atan2(task.position.y - robot.position.y, task.position.x - robot.position.x))309 relative_bearing = absolute_bearing - robot.orientation310 normalised_bearing = angles.normalize(relative_bearing, -180, 180)311 robot.tasks[task_id] = SensorReading(distance, normalised_bearing)312 if distance < task.radius:313 task.robots.append(robot_id)314 # print(f"Task {task_id} - workers: {task.workers}, robots: {task.robots}")315 316 if len(task.robots) >= task.workers:317 task.completed = True318 pixel_radius = int(task.radius * self.scale_factor)319 x = int(task.position.x * self.scale_factor)320 y = int(task.position.y * self.scale_factor)321 # Draw task timer322 time_now = time.time()323 task.elapsed_time = time_now - task.start_time324 if task.elapsed_time > 1:325 task.start_time = time_now326 task.counter = task.counter - 1327 if task.counter <= 1:328 task.failed = True329 #cv2.circle(overlay, (x, y), int((pixel_radius / task.time_limit) * task.counter), cyan, -1, lineType=cv2.LINE_AA)330 colour = red331 # Draw task boundary332 #cv2.circle(image, (x, y), pixel_radius, black, 10, lineType=cv2.LINE_AA)333 #cv2.circle(image, (x, y), pixel_radius, colour, 5, lineType=cv2.LINE_AA)334 # Draw task ID335 # text = str(task.workers)336 # font = cv2.FONT_HERSHEY_SIMPLEX337 # font_scale = 1.5338 # thickness = 4339 # textsize = cv2.getTextSize(text, font, font_scale, thickness)[0]340 # position = (int(x - textsize[0]/2), int(y + textsize[1]/2))341 # cv2.putText(image, text, position, font, font_scale, black, thickness * 3, cv2.LINE_AA)342 # cv2.putText(image, text, position, font, font_scale, colour, thickness, cv2.LINE_AA)343 # Delete completed tasks344 for task_id in list(self.tasks.keys()):345 task = self.tasks[task_id]346 if task.completed:347 self.score = self.score + task.workers348 del self.tasks[task_id]349 elif task.failed:350 del self.tasks[task_id]351 # ------------------------ Seongin -------------------------------------------352 total_time = "{:.2f}".format(self.total_max_time + self.tmp_max_time)353 text = f"Size of the biggest aggregate: {current_max_aggregates}, total max time: {self.total_max_time}"354 font = cv2.FONT_HERSHEY_SIMPLEX355 font_scale = 2356 thickness = 5357 textsize = cv2.getTextSize(text, font, font_scale, thickness)[0]358 position = (10, 60)359 cv2.putText(image, text, position, font, font_scale, black, thickness * 3, cv2.LINE_AA)360 cv2.putText(image, text, position, font, font_scale, green, thickness, cv2.LINE_AA)361 # For the score362 self.result = (self.max_aggregates * 0.8) + ((self.total_max_time + self.tmp_max_time) * 0.2) # Caculate the result363 self.result = "{:.2f}".format(self.result)364 text = f"The Score is: {self.result}"365 font = cv2.FONT_HERSHEY_SIMPLEX366 font_scale = 3367 thickness = 7368 textsize = cv2.getTextSize(text, font, font_scale, thickness)[0]369 position = (10, 900)370 cv2.putText(image, text, position, font, font_scale, black, thickness * 3, cv2.LINE_AA)371 cv2.putText(image, text, position, font, font_scale, yellow, thickness, cv2.LINE_AA)372 373 # Transparency for overlaid augments374 alpha = 0.3375 image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)376 window_name = 'SwarmHack'377 # screen = screeninfo.get_monitors()[0]378 # width, height = screen.width, screen.height379 # image = cv2.resize(image, (width, height))380 # cv2.namedWindow(window_name, cv2.WND_PROP_FULLSCREEN)381 # cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)382 cv2.imshow(window_name, image)383 # TODO: Fix quitting with Q (necessary for fullscreen mode)384 if cv2.waitKey(1) == ord('q'):385 sys.exit()386 self.total_max_time += self.tmp_max_time; 387 print(f"Team: {self.team_name}, Size of Aggregate: {self.max_aggregates}, Aggregate Time: {self.total_max_time}, and their score {self.result}") 388async def handler(websocket):389 print("starting handler")390 async for packet in websocket:391 print("received packet")392 print(packet)393 message = json.loads(packet)394 print(message)395 396 # Process any requests received397 reply = {}398 send_reply = False399 if "check_awake" in message:400 print("CHECK AWAKE")401 reply["awake"] = True402 send_reply = True403 if "get_robots" in message:404 for id, robot in tracker.robots.items():405 reply[id] = {}406 reply[id]["orientation"] = robot.orientation407 reply[id]["neighbours"] = {}408 reply[id]["tasks"] = {}409 for neighbour_id, neighbour in robot.neighbours.items():410 reply[id]["neighbours"][neighbour_id] = {}411 reply[id]["neighbours"][neighbour_id]["range"] = neighbour.range412 reply[id]["neighbours"][neighbour_id]["bearing"] = neighbour.bearing413 reply[id]["neighbours"][neighbour_id]["orientation"] = neighbour.orientation414 for task_id, task in robot.tasks.items():415 reply[id]["tasks"][task_id] = {}416 reply[id]["tasks"][task_id]["range"] = task.range417 reply[id]["tasks"][task_id]["bearing"] = task.bearing418 reply[id]["tasks"][task_id]["workers"] = task.workers419 send_reply = True420 # Send reply, if requested421 if send_reply:422 await websocket.send(json.dumps(reply))423# TODO: Handle Ctrl+C signals424if __name__ == "__main__":425 global tracker426 tracker = Tracker()427 tracker.start()428 ##429 # Use the following iptables rule to forward port 80 to 6000 for the server to use:430 # sudo iptables -t nat -A PREROUTING -p tcp --dport 80 -j REDIRECT --to-port 6000431 # Alternatively, change the port below to 80 and run this Python script as root.432 ##433 #start_server = websockets.serve(ws_handler=handler, host=None, port=6000)434 # start_server = websockets.serve(ws_handler=handler, host="144.32.165.233", port=6000)435 #loop = asyncio.get_event_loop()436 #loop.run_until_complete(start_server)...

Full Screen

Full Screen

aggregates.py

Source:aggregates.py Github

copy

Full Screen

1# Copyright 2018 ZTE Corporation. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may4# not use this file except in compliance with the License. You may obtain5# a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations13# under the License.14import copy15from tempest.lib.api_schema.response.compute.v2_1 import aggregates16# 'uuid' of an aggregate is returned in microversion 2.4117aggregate_for_create = copy.deepcopy(aggregates.aggregate_for_create)18aggregate_for_create['properties'].update({'uuid': {'type': 'string',19 'format': 'uuid'}})20aggregate_for_create['required'].append('uuid')21common_aggregate_info = copy.deepcopy(aggregates.common_aggregate_info)22common_aggregate_info['properties'].update({'uuid': {'type': 'string',23 'format': 'uuid'}})24common_aggregate_info['required'].append('uuid')25list_aggregates = copy.deepcopy(aggregates.list_aggregates)26list_aggregates['response_body']['properties']['aggregates'].update(27 {'items': common_aggregate_info})28get_aggregate = copy.deepcopy(aggregates.get_aggregate)29get_aggregate['response_body']['properties'].update(30 {'aggregate': common_aggregate_info})31aggregate_set_metadata = get_aggregate32update_aggregate = copy.deepcopy(aggregates.update_aggregate)33update_aggregate['response_body']['properties'].update(34 {'aggregate': common_aggregate_info})35create_aggregate = copy.deepcopy(aggregates.create_aggregate)36create_aggregate['response_body']['properties'].update(37 {'aggregate': aggregate_for_create})38aggregate_add_remove_host = get_aggregate39# NOTE(zhufl): Below are the unchanged schema in this microversion. We need40# to keep this schema in this file to have the generic way to select the41# right schema based on self.schema_versions_info mapping in service client.42# ****** Schemas unchanged since microversion 2.1 ***...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful